[jira] [Comment Edited] (FLINK-10389) TaskManagerServicesConfiguration ctor contains self assignment

2018-09-21 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16624449#comment-16624449
 ] 

vinoyang edited comment on FLINK-10389 at 9/22/18 3:06 AM:
---

Good catch, Does it make sense to send a PR or "hotfix" to fix this? 
[~till.rohrmann] and [~Zentol]


was (Author: yanghua):
Good catach, Does it make sense to send a PR or "hotfix" to fix this? 
[~till.rohrmann] and [~Zentol]

> TaskManagerServicesConfiguration ctor contains self assignment
> --
>
> Key: FLINK-10389
> URL: https://issues.apache.org/jira/browse/FLINK-10389
> Project: Flink
>  Issue Type: Task
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> TaskManagerServicesConfiguration has:
> {code}
> this.systemResourceMetricsEnabled = systemResourceMetricsEnabled;
> {code}
> There is no systemResourceMetricsEnabled parameter to the ctor.
> This was reported by findbugs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10389) TaskManagerServicesConfiguration ctor contains self assignment

2018-09-21 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16624449#comment-16624449
 ] 

vinoyang commented on FLINK-10389:
--

Good catach, Does it make sense to send a PR or "hotfix" to fix this? 
[~till.rohrmann] and [~Zentol]

> TaskManagerServicesConfiguration ctor contains self assignment
> --
>
> Key: FLINK-10389
> URL: https://issues.apache.org/jira/browse/FLINK-10389
> Project: Flink
>  Issue Type: Task
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> TaskManagerServicesConfiguration has:
> {code}
> this.systemResourceMetricsEnabled = systemResourceMetricsEnabled;
> {code}
> There is no systemResourceMetricsEnabled parameter to the ctor.
> This was reported by findbugs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10389) TaskManagerServicesConfiguration ctor contains self assignment

2018-09-21 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10389:


Assignee: vinoyang

> TaskManagerServicesConfiguration ctor contains self assignment
> --
>
> Key: FLINK-10389
> URL: https://issues.apache.org/jira/browse/FLINK-10389
> Project: Flink
>  Issue Type: Task
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> TaskManagerServicesConfiguration has:
> {code}
> this.systemResourceMetricsEnabled = systemResourceMetricsEnabled;
> {code}
> There is no systemResourceMetricsEnabled parameter to the ctor.
> This was reported by findbugs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10391) MillisOfDay is used in place of instant for LocalTime ctor in AvroKryoSerializerUtils

2018-09-21 Thread Ted Yu (JIRA)
Ted Yu created FLINK-10391:
--

 Summary: MillisOfDay is used in place of instant for LocalTime 
ctor in AvroKryoSerializerUtils
 Key: FLINK-10391
 URL: https://issues.apache.org/jira/browse/FLINK-10391
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


>From the JodaLocalTimeSerializer#write, we serialize getMillisOfDay() value 
>from LocalTime.
For read method:
{code}
  final int time = input.readInt(true);
  return new LocalTime(time, 
ISOChronology.getInstanceUTC().withZone(DateTimeZone.UTC));
{code}
It seems 
http://joda-time.sourceforge.net/apidocs/org/joda/time/LocalTime.html#fromMillisOfDay(long,%20org.joda.time.Chronology)
 should be used instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10390) DataDog metric reporter leak warning

2018-09-21 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10390:
--

 Summary: DataDog metric reporter leak warning
 Key: FLINK-10390
 URL: https://issues.apache.org/jira/browse/FLINK-10390
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.6.1
Reporter: Elias Levy


After upgrading to 1.6.1 from 1.4.2 we starting observing in the log warnings 
associated with the DataDog metrics reporter:
{quote}Sep 21, 2018 9:43:20 PM 
org.apache.flink.shaded.okhttp3.internal.platform.Platform log WARNING: A 
connection to https://app.datadoghq.com/ was leaked. Did you forget to close a 
response body? To see where this was allocated, set the OkHttpClient logger 
level to FINE: 
Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE);
{quote}
The metric reporter's okhttp dependency version (3.7.0) has not changed, so 
that does not appear to be the source of the warning.

I believe the issue is the changed made in 
[FLINK-8553|https://github.com/apache/flink/commit/ae3d547afe7ec44d37b38222a3ea40d9181e#diff-fc396ba6772815fc05efc1310760cd4b].
  The HTTP calls were made async.  The previous code called 
{{client.newCall(r).execute().close()}}.  The new call does nothing in the 
callback, even thought the [Callback.onResponse 
documentation|https://square.github.io/okhttp/3.x/okhttp/okhttp3/Callback.html#onResponse-okhttp3.Call-okhttp3.Response-]
 states:

bq. Called when the HTTP response was successfully returned by the remote 
server. The callback may proceed to read the response body with Response.body. 
The response is still live until its response body is closed. 



 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10298) Batch Job Failover Strategy

2018-09-21 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16624217#comment-16624217
 ] 

JIN SUN commented on FLINK-10298:
-

Hi Tison,

Thanks for point this. Yes, i know there is a DataConsumptionException, and we 
can refer DataComsumptionException as PartitionDataMissingError, 
DataComsumptionException is one of the issue we need to handle. This JIRA is 
target for a framework to improve failover, especially in Batch job scenario.  

Jin

> Batch Job Failover Strategy
> ---
>
> Key: FLINK-10298
> URL: https://issues.apache.org/jira/browse/FLINK-10298
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>
> The new failover strategy needs to consider handling failures according to 
> different failure types. It orchestrates all the logics we mentioned in this 
> [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit],
>  we can put the logic in onTaskFailure method of the FailoverStrategy 
> interface, with the logic inline:
> {code:java}
> public void onTaskFailure(Execution taskExecution, Throwable cause) {  
>     //1. Get the throwable type
>     //2. If the type is NonrecoverableType fail the job
>     //3. If the type is PatritionDataMissingError, do revocation
>     //4. If the type is EnvironmentError, do check blacklist
> //5. Other failure types are recoverable, but we need to remember the 
> count of the failure,
> //6. if it exceeds the threshold, fail the job
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10389) TaskManagerServicesConfiguration ctor contains self assignment

2018-09-21 Thread Ted Yu (JIRA)
Ted Yu created FLINK-10389:
--

 Summary: TaskManagerServicesConfiguration ctor contains self 
assignment
 Key: FLINK-10389
 URL: https://issues.apache.org/jira/browse/FLINK-10389
 Project: Flink
  Issue Type: Task
Reporter: Ted Yu


TaskManagerServicesConfiguration has:
{code}
this.systemResourceMetricsEnabled = systemResourceMetricsEnabled;
{code}
There is no systemResourceMetricsEnabled parameter to the ctor.

This was reported by findbugs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10388) RestClientTest sometimes fails with AssertionError

2018-09-21 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623945#comment-16623945
 ] 

tison edited comment on FLINK-10388 at 9/21/18 5:49 PM:


Maybe relevant to FLINK-4052

cc [~StephanEwen]


was (Author: tison):
Maybe relevant to FLINK-4052

> RestClientTest sometimes fails with AssertionError
> --
>
> Key: FLINK-10388
> URL: https://issues.apache.org/jira/browse/FLINK-10388
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> Running the test on Linux I got:
> {code}
> testConnectionTimeout(org.apache.flink.runtime.rest.RestClientTest)  Time 
> elapsed: 1.918 sec  <<< FAILURE!
> java.lang.AssertionError:
> Expected: an instance of 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException
>  but: 
>   Network is unreachable: /10.255.255.1:80> is a 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedSocketException
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:956)
>   at org.junit.Assert.assertThat(Assert.java:923)
>   at 
> org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:69)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10388) RestClientTest sometimes fails with AssertionError

2018-09-21 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623945#comment-16623945
 ] 

tison commented on FLINK-10388:
---

Maybe relevant to FLINK-4052

> RestClientTest sometimes fails with AssertionError
> --
>
> Key: FLINK-10388
> URL: https://issues.apache.org/jira/browse/FLINK-10388
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> Running the test on Linux I got:
> {code}
> testConnectionTimeout(org.apache.flink.runtime.rest.RestClientTest)  Time 
> elapsed: 1.918 sec  <<< FAILURE!
> java.lang.AssertionError:
> Expected: an instance of 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException
>  but: 
>   Network is unreachable: /10.255.255.1:80> is a 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedSocketException
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:956)
>   at org.junit.Assert.assertThat(Assert.java:923)
>   at 
> org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:69)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10388) RestClientTest sometimes fails with AssertionError

2018-09-21 Thread Ted Yu (JIRA)
Ted Yu created FLINK-10388:
--

 Summary: RestClientTest sometimes fails with AssertionError
 Key: FLINK-10388
 URL: https://issues.apache.org/jira/browse/FLINK-10388
 Project: Flink
  Issue Type: Test
Reporter: Ted Yu


Running the test on Linux I got:
{code}
testConnectionTimeout(org.apache.flink.runtime.rest.RestClientTest)  Time 
elapsed: 1.918 sec  <<< FAILURE!
java.lang.AssertionError:
Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException
 but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedSocketException
  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
  at org.junit.Assert.assertThat(Assert.java:956)
  at org.junit.Assert.assertThat(Assert.java:923)
  at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:69)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10378) Hide/Comment out contribute guide from PULL_REQUEST_TEMPLATE

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623911#comment-16623911
 ] 

ASF GitHub Bot commented on FLINK-10378:


TisonKun commented on issue #6722: [FLINK-10378] [github] Comment out 
contribute guide from PULL_REQUEST_TEMPLATE
URL: https://github.com/apache/flink/pull/6722#issuecomment-423611886
 
 
   @StephanEwen travis-ci must fail on other issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Hide/Comment out contribute guide from PULL_REQUEST_TEMPLATE
> 
>
> Key: FLINK-10378
> URL: https://issues.apache.org/jira/browse/FLINK-10378
> Project: Flink
>  Issue Type: Improvement
>  Components: GitHub
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
>
> Explicitly comment out contribute guide from PULL_REQUEST_TEMPLATE by .
> This is a hint to contributor that such message is as information and would 
> not appear at the final content, as a side effect also reduce the work the a 
> contributor delete such text every time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623910#comment-16623910
 ] 

ASF GitHub Bot commented on FLINK-10386:


TisonKun commented on issue #6729: [FLINK-10386] [taskmanager] Remove legacy 
class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#issuecomment-423611672
 
 
   cc @StefanRRichter @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6722: [FLINK-10378] [github] Comment out contribute guide from PULL_REQUEST_TEMPLATE

2018-09-21 Thread GitBox
TisonKun commented on issue #6722: [FLINK-10378] [github] Comment out 
contribute guide from PULL_REQUEST_TEMPLATE
URL: https://github.com/apache/flink/pull/6722#issuecomment-423611886
 
 
   @StephanEwen travis-ci must fail on other issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-21 Thread GitBox
TisonKun commented on issue #6729: [FLINK-10386] [taskmanager] Remove legacy 
class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#issuecomment-423611672
 
 
   cc @StefanRRichter @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623777#comment-16623777
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin edited a comment on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423573937
 
 
   Thanks for the update, @yanghua. Looking at the checkpoint coordinator more 
deeply, I think we firstly have to work a bit more on design for this kind of 
change. We have to take into account at least the following points (roughly):
   - Introduce the separate component/class responsible for failure management, 
counting e.g. `CheckpointFailureManager` or something
   - Job manager should 
   - construct `CheckpointFailureManager`
   - configure with the max failure count and a proper action how to fail.
   - pass it to `CheckpointCoordinator`
   - `CheckpointCoordinator`
   - should give callbacks to `CheckpointFailureManager` about failures and 
successes of checkpoints
   - needs some refactoring to distinguish better failures of 
`PendingCheckpoint`. Now it is `abortXXX()` methods which do not provide enough 
information to count it as a failure for `CheckpointFailureManager` or not 
(like we have for `triggerSavepoint` in `CheckpointDeclineReason`). At the end 
there should be clear place in the `CheckpointCoordinator` where to give 
callbacks to `CheckpointFailureManager`, e.g.:
   - `CheckpointDeclineReason.EXCEPTION` result of `triggerSavepoint`
   - some cases of `PendingCheckpoint.abortDeclined()`, `abortError()`, 
maybe `abortExpired()`
   - Consider having only `DecliningCheckpointExceptionHandler` on 
`TaskExecutor` side and letting now to handle all failure cases only in 
`CheckpointCoordinator`
   
   There might be more points. I suggest we step back and continue discussion 
in the jira issue. Once we have clear design, a PR can be opened again.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin edited a comment on issue #6567: [FLINK-10074] Allowable number of checkpoint failures

2018-09-21 Thread GitBox
azagrebin edited a comment on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423573937
 
 
   Thanks for the update, @yanghua. Looking at the checkpoint coordinator more 
deeply, I think we firstly have to work a bit more on design for this kind of 
change. We have to take into account at least the following points (roughly):
   - Introduce the separate component/class responsible for failure management, 
counting e.g. `CheckpointFailureManager` or something
   - Job manager should 
   - construct `CheckpointFailureManager`
   - configure with the max failure count and a proper action how to fail.
   - pass it to `CheckpointCoordinator`
   - `CheckpointCoordinator`
   - should give callbacks to `CheckpointFailureManager` about failures and 
successes of checkpoints
   - needs some refactoring to distinguish better failures of 
`PendingCheckpoint`. Now it is `abortXXX()` methods which do not provide enough 
information to count it as a failure for `CheckpointFailureManager` or not 
(like we have for `triggerSavepoint` in `CheckpointDeclineReason`). At the end 
there should be clear place in the `CheckpointCoordinator` where to give 
callbacks to `CheckpointFailureManager`, e.g.:
   - `CheckpointDeclineReason.EXCEPTION` result of `triggerSavepoint`
   - some cases of `PendingCheckpoint.abortDeclined()`, `abortError()`, 
maybe `abortExpired()`
   - Consider having only `DecliningCheckpointExceptionHandler` on 
`TaskExecutor` side and letting now to handle all failure cases only in 
`CheckpointCoordinator`
   
   There might be more points. I suggest we step back and continue discussion 
in the jira issue. Once we have clear design, a PR can be opened again.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623775#comment-16623775
 ] 

ASF GitHub Bot commented on FLINK-10074:


azagrebin commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423573937
 
 
   Thanks for the update, @yanghua. Looking at the checkpoint coordinator more 
deeply, I think we firstly have to work a bit more on design for this kind of 
change. We have to take into account at least the following points (roughly):
   
   - Introduce the separate component/class responsible for failure management, 
counting e.g. `CheckpointFailureManager` or something
   
   - Job manager should 
   - construct `CheckpointFailureManager`
   - configure with the max failure count and a proper action how to fail.
   - pass it to `CheckpointCoordinator`
   
   - `CheckpointCoordinator`
   - should give callbacks to `CheckpointFailureManager` about failures and 
successes of checkpoints
   - needs some refactoring to distinguish better failures of 
`PendingCheckpoint`. Now it is `abortXXX()` methods which do not provide enough 
information to count it as a failure for `CheckpointFailureManager` or not 
(like we have for `triggerSavepoint` in `CheckpointDeclineReason`). At the end 
there should be clear place in the `CheckpointCoordinator` where to give 
callbacks to `CheckpointFailureManager`, e.g.:
   - `CheckpointDeclineReason.EXCEPTION` result of `triggerSavepoint`
   - some cases of `PendingCheckpoint.abortDeclined()`, `abortError()`, 
maybe `abortExpired()`
   
   - Consider having only `DecliningCheckpointExceptionHandler` on 
`TaskExecutor` side and letting now to handle all failure cases only in 
`CheckpointCoordinator`
   
   There might be more points. I suggest we step back and continue discussion 
in the jira issue. Once we have clear design, a PR can be opened again.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures

2018-09-21 Thread GitBox
azagrebin commented on issue #6567: [FLINK-10074] Allowable number of 
checkpoint failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423573937
 
 
   Thanks for the update, @yanghua. Looking at the checkpoint coordinator more 
deeply, I think we firstly have to work a bit more on design for this kind of 
change. We have to take into account at least the following points (roughly):
   
   - Introduce the separate component/class responsible for failure management, 
counting e.g. `CheckpointFailureManager` or something
   
   - Job manager should 
   - construct `CheckpointFailureManager`
   - configure with the max failure count and a proper action how to fail.
   - pass it to `CheckpointCoordinator`
   
   - `CheckpointCoordinator`
   - should give callbacks to `CheckpointFailureManager` about failures and 
successes of checkpoints
   - needs some refactoring to distinguish better failures of 
`PendingCheckpoint`. Now it is `abortXXX()` methods which do not provide enough 
information to count it as a failure for `CheckpointFailureManager` or not 
(like we have for `triggerSavepoint` in `CheckpointDeclineReason`). At the end 
there should be clear place in the `CheckpointCoordinator` where to give 
callbacks to `CheckpointFailureManager`, e.g.:
   - `CheckpointDeclineReason.EXCEPTION` result of `triggerSavepoint`
   - some cases of `PendingCheckpoint.abortDeclined()`, `abortError()`, 
maybe `abortExpired()`
   
   - Consider having only `DecliningCheckpointExceptionHandler` on 
`TaskExecutor` side and letting now to handle all failure cases only in 
`CheckpointCoordinator`
   
   There might be more points. I suggest we step back and continue discussion 
in the jira issue. Once we have clear design, a PR can be opened again.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL

2018-09-21 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623770#comment-16623770
 ] 

Dawid Wysakowicz commented on FLINK-:
-

I would be against adding this function. It is not part of SQL 2016 standard 
and even in the only RDBMS that provides this function it is discouraged to 
use. Therefore I would vote for closing this issue and PR.

> Add ISNUMERIC supported in Table API/SQL
> 
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> ISNUMERIC function used to verify a expression is a valid numberic type.
> documentation : 
> https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623752#comment-16623752
 ] 

ASF GitHub Bot commented on FLINK-10255:


Clark commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph 
signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-423567013
 
 
   Thanks for the reply, that'll make sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Clarkkkkk commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader

2018-09-21 Thread GitBox
Clark commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph 
signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-423567013
 
 
   Thanks for the reply, that'll make sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623731#comment-16623731
 ] 

ASF GitHub Bot commented on FLINK-6847:
---

twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r219524046
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ##
 @@ -338,3 +341,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, intervalUnit.endUnit, 
SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
+List(timestamp2.toRexNode, timestamp1.toRexNode))
+
+val intType = typeFactory.createSqlType(SqlTypeName.INTEGER)
+
+relBuilder.getRexBuilder.makeCast(intType, rexCall)
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
 
 Review comment:
   Calcite returns integer so far. See 
`org.apache.calcite.sql.fun.SqlTimestampDiffFunction`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add TIMESTAMPDIFF supported in TableAPI
> ---
>
> Key: FLINK-6847
> URL: https://issues.apache.org/jira/browse/FLINK-6847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> see FLINK-6813



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623730#comment-16623730
 ] 

ASF GitHub Bot commented on FLINK-6847:
---

twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r219524046
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ##
 @@ -338,3 +341,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, intervalUnit.endUnit, 
SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
+List(timestamp2.toRexNode, timestamp1.toRexNode))
+
+val intType = typeFactory.createSqlType(SqlTypeName.INTEGER)
+
+relBuilder.getRexBuilder.makeCast(intType, rexCall)
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
 
 Review comment:
   Calcite return integer so far. See 
`org.apache.calcite.sql.fun.SqlTimestampDiffFunction`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add TIMESTAMPDIFF supported in TableAPI
> ---
>
> Key: FLINK-6847
> URL: https://issues.apache.org/jira/browse/FLINK-6847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
>
> see FLINK-6813



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-09-21 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r219524046
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ##
 @@ -338,3 +341,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, intervalUnit.endUnit, 
SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
+List(timestamp2.toRexNode, timestamp1.toRexNode))
+
+val intType = typeFactory.createSqlType(SqlTypeName.INTEGER)
+
+relBuilder.getRexBuilder.makeCast(intType, rexCall)
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
 
 Review comment:
   Calcite returns integer so far. See 
`org.apache.calcite.sql.fun.SqlTimestampDiffFunction`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-09-21 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r219524046
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ##
 @@ -338,3 +341,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, intervalUnit.endUnit, 
SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
+List(timestamp2.toRexNode, timestamp1.toRexNode))
+
+val intType = typeFactory.createSqlType(SqlTypeName.INTEGER)
+
+relBuilder.getRexBuilder.makeCast(intType, rexCall)
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
 
 Review comment:
   Calcite return integer so far. See 
`org.apache.calcite.sql.fun.SqlTimestampDiffFunction`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-8914) CEP's greedy() modifier doesn't work

2018-09-21 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-8914.
---
Resolution: Not A Problem

Using proper AfterMatchSkip strategy you can achieve expected results.

> CEP's greedy() modifier doesn't work
> 
>
> Key: FLINK-8914
> URL: https://issues.apache.org/jira/browse/FLINK-8914
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.4.1
>Reporter: David Anderson
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> When applied to the first or last component of a CEP Pattern, greedy() 
> doesn't work correctly. Here's an example:
> {code:java}
> package com.dataartisans.flinktraining.exercises.datastream_java.cep;
> import org.apache.flink.cep.CEP;
> import org.apache.flink.cep.PatternSelectFunction;
> import org.apache.flink.cep.PatternStream;
> import org.apache.flink.cep.pattern.Pattern;
> import org.apache.flink.cep.pattern.conditions.SimpleCondition;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.util.List;
> import java.util.Map;
> public class RunLength {
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream input = env.fromElements(1, 1, 1, 1, 1, 0, 1, 1, 1, 
> 0);
> Pattern onesThenZero = Pattern.begin("ones")
>   .where(new SimpleCondition() {
> @Override
> public boolean filter(Integer value) throws Exception {
>   return value == 1;
> }
>   })
>   .oneOrMore()
>   .greedy()
>   .consecutive()
>   .next("zero")
>   .where(new SimpleCondition() {
> @Override
> public boolean filter(Integer value) throws Exception {
>   return value == 0;
> }
>   });
>   PatternStream patternStream = CEP.pattern(input, onesThenZero);
>   // Expected: 5 3
>   // Actual: 5 4 3 2 1 3 2 1
>   patternStream.select(new LengthOfRun()).print();
>   env.execute();
> }
> public static class LengthOfRun implements PatternSelectFunction Integer> {
>   public Integer select(Map> pattern) {
>   return pattern.get("ones").size();
> }
>   }
> }
> {code}
> The only workaround for now seems to be to rewrite the pattern so that 
> greedy() isn't needed – i.e. by bracketing the greedy section with a prefix 
> and suffix that both have to be matched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8914) CEP's greedy() modifier doesn't work

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623721#comment-16623721
 ] 

ASF GitHub Bot commented on FLINK-8914:
---

Aitozi closed pull request #6124: [FLINK-8914][CEP]Fix wrong semantic when 
greedy pattern is the head of the pattern
URL: https://github.com/apache/flink/pull/6124
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 88ef3d3288a..1685ffacc7b 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -91,6 +91,10 @@ public boolean isStartState() {
return state.isStart() && event == null;
}
 
+   public boolean isGreedyState() {
+   return state.isGreedy();
+   }
+
public long getTimestamp() {
return timestamp;
}
@@ -137,6 +141,19 @@ public int hashCode() {
return Objects.hash(state, event, counter, timestamp, version, 
startTimestamp, previousState);
}
 
+   @Override
+   public String toString() {
+   StringBuilder builder = new StringBuilder();
+
+   builder.append("current state: ").append(state).append("\n")
+   .append("previous state: 
").append(previousState).append("\n")
+   .append("start timestamp: 
").append(startTimestamp).append("\n")
+   .append("counter: ").append(counter).append("\n")
+   .append("version: ").append(version);
+
+   return builder.toString();
+   }
+
public static  ComputationState createStartState(final NFA 
nfa, final State state) {
Preconditions.checkArgument(state.isStart());
return new ComputationState<>(nfa, state, null, null, 0, -1L, 
new DeweyNumber(1), -1L);
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 5624db9de43..16ea163aadf 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -220,7 +220,10 @@ public void resetNFAChanged() {
 */
public Tuple2>>, 
Collection>, Long>>> process(final T event,
final long timestamp, AfterMatchSkipStrategy 
afterMatchSkipStrategy) {
+   List> redundantStart = 
pickRedundantGreedyStartState(new ArrayList<>(computationStates));
+   computationStates.removeAll(redundantStart);
final int numberComputationStates = computationStates.size();
+
final Collection>> result = new 
ArrayList<>();
final Collection>, Long>> 
timeoutResult = new ArrayList<>();
 
@@ -309,7 +312,7 @@ public void resetNFAChanged() {
}
 
}
-
+   computationStates.addAll(redundantStart);
discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
 
// prune shared buffer based on window length
@@ -626,6 +629,29 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
return resultingComputationStates;
}
 
+   private List> 
pickRedundantGreedyStartState(List> computationStates) {
+   List> greedyState = new ArrayList<>();
+   List> startState = new ArrayList<>();
+   List> redundantStart = new ArrayList<>();
+   for (ComputationState computationState : computationStates) {
+   if (computationState.isGreedyState()) {
+   greedyState.add(computationState);
+   } else if (computationState.isStartState()) {
+   startState.add(computationState);
+   }
+   }
+
+   for (ComputationState start: startState) {
+   for (ComputationState greedy : greedyState) {
+   if 
(NFAStateNameHandler.getOriginalNameFromInternal(start.getState().getName()).equals(
+   
NFAStateNameHandler.getOriginalNameFromInternal(greedy.getState().getName( {
+   redundantStart.add(start);
+   }
+   }
+   

[jira] [Commented] (FLINK-8914) CEP's greedy() modifier doesn't work

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623720#comment-16623720
 ] 

ASF GitHub Bot commented on FLINK-8914:
---

Aitozi commented on issue #6124: [FLINK-8914][CEP]Fix wrong semantic when 
greedy pattern is the head of the pattern
URL: https://github.com/apache/flink/pull/6124#issuecomment-423555799
 
 
   Sure, closing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CEP's greedy() modifier doesn't work
> 
>
> Key: FLINK-8914
> URL: https://issues.apache.org/jira/browse/FLINK-8914
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.4.1
>Reporter: David Anderson
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> When applied to the first or last component of a CEP Pattern, greedy() 
> doesn't work correctly. Here's an example:
> {code:java}
> package com.dataartisans.flinktraining.exercises.datastream_java.cep;
> import org.apache.flink.cep.CEP;
> import org.apache.flink.cep.PatternSelectFunction;
> import org.apache.flink.cep.PatternStream;
> import org.apache.flink.cep.pattern.Pattern;
> import org.apache.flink.cep.pattern.conditions.SimpleCondition;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.util.List;
> import java.util.Map;
> public class RunLength {
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream input = env.fromElements(1, 1, 1, 1, 1, 0, 1, 1, 1, 
> 0);
> Pattern onesThenZero = Pattern.begin("ones")
>   .where(new SimpleCondition() {
> @Override
> public boolean filter(Integer value) throws Exception {
>   return value == 1;
> }
>   })
>   .oneOrMore()
>   .greedy()
>   .consecutive()
>   .next("zero")
>   .where(new SimpleCondition() {
> @Override
> public boolean filter(Integer value) throws Exception {
>   return value == 0;
> }
>   });
>   PatternStream patternStream = CEP.pattern(input, onesThenZero);
>   // Expected: 5 3
>   // Actual: 5 4 3 2 1 3 2 1
>   patternStream.select(new LengthOfRun()).print();
>   env.execute();
> }
> public static class LengthOfRun implements PatternSelectFunction Integer> {
>   public Integer select(Map> pattern) {
>   return pattern.get("ones").size();
> }
>   }
> }
> {code}
> The only workaround for now seems to be to rewrite the pattern so that 
> greedy() isn't needed – i.e. by bracketing the greedy section with a prefix 
> and suffix that both have to be matched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Aitozi closed pull request #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pattern is the head of the pattern

2018-09-21 Thread GitBox
Aitozi closed pull request #6124: [FLINK-8914][CEP]Fix wrong semantic when 
greedy pattern is the head of the pattern
URL: https://github.com/apache/flink/pull/6124
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 88ef3d3288a..1685ffacc7b 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -91,6 +91,10 @@ public boolean isStartState() {
return state.isStart() && event == null;
}
 
+   public boolean isGreedyState() {
+   return state.isGreedy();
+   }
+
public long getTimestamp() {
return timestamp;
}
@@ -137,6 +141,19 @@ public int hashCode() {
return Objects.hash(state, event, counter, timestamp, version, 
startTimestamp, previousState);
}
 
+   @Override
+   public String toString() {
+   StringBuilder builder = new StringBuilder();
+
+   builder.append("current state: ").append(state).append("\n")
+   .append("previous state: 
").append(previousState).append("\n")
+   .append("start timestamp: 
").append(startTimestamp).append("\n")
+   .append("counter: ").append(counter).append("\n")
+   .append("version: ").append(version);
+
+   return builder.toString();
+   }
+
public static  ComputationState createStartState(final NFA 
nfa, final State state) {
Preconditions.checkArgument(state.isStart());
return new ComputationState<>(nfa, state, null, null, 0, -1L, 
new DeweyNumber(1), -1L);
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 5624db9de43..16ea163aadf 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -220,7 +220,10 @@ public void resetNFAChanged() {
 */
public Tuple2>>, 
Collection>, Long>>> process(final T event,
final long timestamp, AfterMatchSkipStrategy 
afterMatchSkipStrategy) {
+   List> redundantStart = 
pickRedundantGreedyStartState(new ArrayList<>(computationStates));
+   computationStates.removeAll(redundantStart);
final int numberComputationStates = computationStates.size();
+
final Collection>> result = new 
ArrayList<>();
final Collection>, Long>> 
timeoutResult = new ArrayList<>();
 
@@ -309,7 +312,7 @@ public void resetNFAChanged() {
}
 
}
-
+   computationStates.addAll(redundantStart);
discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
 
// prune shared buffer based on window length
@@ -626,6 +629,29 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
return resultingComputationStates;
}
 
+   private List> 
pickRedundantGreedyStartState(List> computationStates) {
+   List> greedyState = new ArrayList<>();
+   List> startState = new ArrayList<>();
+   List> redundantStart = new ArrayList<>();
+   for (ComputationState computationState : computationStates) {
+   if (computationState.isGreedyState()) {
+   greedyState.add(computationState);
+   } else if (computationState.isStartState()) {
+   startState.add(computationState);
+   }
+   }
+
+   for (ComputationState start: startState) {
+   for (ComputationState greedy : greedyState) {
+   if 
(NFAStateNameHandler.getOriginalNameFromInternal(start.getState().getName()).equals(
+   
NFAStateNameHandler.getOriginalNameFromInternal(greedy.getState().getName( {
+   redundantStart.add(start);
+   }
+   }
+   }
+   return redundantStart;
+   }
+
private void addComputationState(
List> computationStates,
State currentState,
diff --git 

[GitHub] Aitozi commented on issue #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pattern is the head of the pattern

2018-09-21 Thread GitBox
Aitozi commented on issue #6124: [FLINK-8914][CEP]Fix wrong semantic when 
greedy pattern is the head of the pattern
URL: https://github.com/apache/flink/pull/6124#issuecomment-423555799
 
 
   Sure, closing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219449333
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
 ##
 @@ -127,6 +127,8 @@
/** Flag to indicate whether sinks have been cleared in previous 
executions. */
private boolean wasExecuted = false;
 
+   private String description;
 
 Review comment:
   I think `ExecutionEnvironment` should not have the description. I would 
really prefer to follow exactly the same scenario as with `jobName`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219486186
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
 ##
 @@ -97,6 +101,7 @@ public JobDetails(
"tasksPerState argument must be of size {}.", 
ExecutionState.values().length);
this.tasksPerState = checkNotNull(tasksPerState);
this.numTasks = numTasks;
+   this.jobDescription = jobDescription;
 
 Review comment:
   Please check with `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219476983
 
 

 ##
 File path: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
 ##
 @@ -49,23 +49,28 @@
/** Name of the job */
private final String jobName;
 
+   /** Descrption of the job */
+   private final String jobDescription;
+
/**
 * Creates a new instance of this optimizer plan container. The plan is 
given and fully
 * described by the data sources, sinks and the collection of all nodes.
-* 
+*
 * @param sources The data sources.
 * @param sinks The data sinks.
 * @param allNodes A collection containing all nodes in the plan.
 * @param jobName The name of the program
+* @param jobDescription The description of the program
 */
public OptimizedPlan(Collection sources, 
Collection sinks,
-   Collection allNodes, String jobName, Plan 
programPlan)
+Collection allNodes, 
String jobName, Plan programPlan, String jobDescription)
 
 Review comment:
   Actually I would remove the `jobName` and `jobDescription` parameters, and 
use those values from the programPlan, which we always do anyways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219485200
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
 ##
 @@ -147,7 +147,7 @@ public void testMultipleJobDetails() {
JobID jid = new JobID();
JobStatus status = 
JobStatus.values()[rnd.nextInt(JobStatus.values().length)];
 
-   details[k] = new JobDetails(jid, name, time, endTime, 
endTime - time, status, lastModified, numVerticesPerState, numTotal);
+   details[k] = new JobDetails(jid, name, time, endTime, 
endTime - time, status, lastModified, numVerticesPerState, numTotal, "");
 
 Review comment:
   Please use the `GenericMessageTester` for the description the same way it is 
used for `jobName`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219477941
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
 ##
 @@ -198,6 +201,24 @@ public String getName() {
return this.jobName;
}
 
+   /**
+* Returns the description assigned to the job graph.
+*
+* @return the description assigned to the job graph
+*/
+   public String getDescription() {
+   return jobDescription;
+   }
+
+   /**
+* Set the description for this job graph.
+*
+* @param jobDescription the description of the job graph
+*/
+   public void setDescription(String jobDescription) {
 
 Review comment:
   Please set the `jobDescription` only in the ctor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8914) CEP's greedy() modifier doesn't work

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623711#comment-16623711
 ] 

ASF GitHub Bot commented on FLINK-8914:
---

dawidwys commented on issue #6124: [FLINK-8914][CEP]Fix wrong semantic when 
greedy pattern is the head of the pattern
URL: https://github.com/apache/flink/pull/6124#issuecomment-423553722
 
 
   Hi @Aitozi. As I described previously, this case should be handled with 
greedy and proper `AfterMatchSkip` strategy. Would you be ok with closing this 
PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CEP's greedy() modifier doesn't work
> 
>
> Key: FLINK-8914
> URL: https://issues.apache.org/jira/browse/FLINK-8914
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.4.1
>Reporter: David Anderson
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> When applied to the first or last component of a CEP Pattern, greedy() 
> doesn't work correctly. Here's an example:
> {code:java}
> package com.dataartisans.flinktraining.exercises.datastream_java.cep;
> import org.apache.flink.cep.CEP;
> import org.apache.flink.cep.PatternSelectFunction;
> import org.apache.flink.cep.PatternStream;
> import org.apache.flink.cep.pattern.Pattern;
> import org.apache.flink.cep.pattern.conditions.SimpleCondition;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.util.List;
> import java.util.Map;
> public class RunLength {
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream input = env.fromElements(1, 1, 1, 1, 1, 0, 1, 1, 1, 
> 0);
> Pattern onesThenZero = Pattern.begin("ones")
>   .where(new SimpleCondition() {
> @Override
> public boolean filter(Integer value) throws Exception {
>   return value == 1;
> }
>   })
>   .oneOrMore()
>   .greedy()
>   .consecutive()
>   .next("zero")
>   .where(new SimpleCondition() {
> @Override
> public boolean filter(Integer value) throws Exception {
>   return value == 0;
> }
>   });
>   PatternStream patternStream = CEP.pattern(input, onesThenZero);
>   // Expected: 5 3
>   // Actual: 5 4 3 2 1 3 2 1
>   patternStream.select(new LengthOfRun()).print();
>   env.execute();
> }
> public static class LengthOfRun implements PatternSelectFunction Integer> {
>   public Integer select(Map> pattern) {
>   return pattern.get("ones").size();
> }
>   }
> }
> {code}
> The only workaround for now seems to be to rewrite the pattern so that 
> greedy() isn't needed – i.e. by bracketing the greedy section with a prefix 
> and suffix that both have to be matched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8914) CEP's greedy() modifier doesn't work

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8914:
--
Labels: pull-request-available  (was: )

> CEP's greedy() modifier doesn't work
> 
>
> Key: FLINK-8914
> URL: https://issues.apache.org/jira/browse/FLINK-8914
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.4.1
>Reporter: David Anderson
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> When applied to the first or last component of a CEP Pattern, greedy() 
> doesn't work correctly. Here's an example:
> {code:java}
> package com.dataartisans.flinktraining.exercises.datastream_java.cep;
> import org.apache.flink.cep.CEP;
> import org.apache.flink.cep.PatternSelectFunction;
> import org.apache.flink.cep.PatternStream;
> import org.apache.flink.cep.pattern.Pattern;
> import org.apache.flink.cep.pattern.conditions.SimpleCondition;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.util.List;
> import java.util.Map;
> public class RunLength {
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream input = env.fromElements(1, 1, 1, 1, 1, 0, 1, 1, 1, 
> 0);
> Pattern onesThenZero = Pattern.begin("ones")
>   .where(new SimpleCondition() {
> @Override
> public boolean filter(Integer value) throws Exception {
>   return value == 1;
> }
>   })
>   .oneOrMore()
>   .greedy()
>   .consecutive()
>   .next("zero")
>   .where(new SimpleCondition() {
> @Override
> public boolean filter(Integer value) throws Exception {
>   return value == 0;
> }
>   });
>   PatternStream patternStream = CEP.pattern(input, onesThenZero);
>   // Expected: 5 3
>   // Actual: 5 4 3 2 1 3 2 1
>   patternStream.select(new LengthOfRun()).print();
>   env.execute();
> }
> public static class LengthOfRun implements PatternSelectFunction Integer> {
>   public Integer select(Map> pattern) {
>   return pattern.get("ones").size();
> }
>   }
> }
> {code}
> The only workaround for now seems to be to rewrite the pattern so that 
> greedy() isn't needed – i.e. by bracketing the greedy section with a prefix 
> and suffix that both have to be matched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on issue #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pattern is the head of the pattern

2018-09-21 Thread GitBox
dawidwys commented on issue #6124: [FLINK-8914][CEP]Fix wrong semantic when 
greedy pattern is the head of the pattern
URL: https://github.com/apache/flink/pull/6124#issuecomment-423553722
 
 
   Hi @Aitozi. As I described previously, this case should be handled with 
greedy and proper `AfterMatchSkip` strategy. Would you be ok with closing this 
PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623703#comment-16623703
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219485200
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
 ##
 @@ -147,7 +147,7 @@ public void testMultipleJobDetails() {
JobID jid = new JobID();
JobStatus status = 
JobStatus.values()[rnd.nextInt(JobStatus.values().length)];
 
-   details[k] = new JobDetails(jid, name, time, endTime, 
endTime - time, status, lastModified, numVerticesPerState, numTotal);
+   details[k] = new JobDetails(jid, name, time, endTime, 
endTime - time, status, lastModified, numVerticesPerState, numTotal, "");
 
 Review comment:
   Please use the `GenericMessageTester` for the description the same way it is 
used for `jobName`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623700#comment-16623700
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219486325
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
 ##
 @@ -61,13 +64,15 @@ public JobInformation(
SerializedValue 
serializedExecutionConfig,
Configuration jobConfiguration,
Collection requiredJarFileBlobKeys,
-   Collection requiredClasspathURLs) {
+   Collection requiredClasspathURLs,
+   String jobDescription) {
this.jobId = Preconditions.checkNotNull(jobId);
this.jobName = Preconditions.checkNotNull(jobName);
this.serializedExecutionConfig = 
Preconditions.checkNotNull(serializedExecutionConfig);
this.jobConfiguration = 
Preconditions.checkNotNull(jobConfiguration);
this.requiredJarFileBlobKeys = 
Preconditions.checkNotNull(requiredJarFileBlobKeys);
this.requiredClasspathURLs = 
Preconditions.checkNotNull(requiredClasspathURLs);
+   this.jobDescription = jobDescription;
 
 Review comment:
   Please check with `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623705#comment-16623705
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219485512
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
 ##
 @@ -55,6 +55,7 @@
private ArchivedExecutionConfig archivedExecutionConfig;
private boolean isStoppable;
private Map>> 
serializedUserAccumulators;
+   private String jobDescription;
 
 Review comment:
   Please move it next to `jobName`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623707#comment-16623707
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219476983
 
 

 ##
 File path: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
 ##
 @@ -49,23 +49,28 @@
/** Name of the job */
private final String jobName;
 
+   /** Descrption of the job */
+   private final String jobDescription;
+
/**
 * Creates a new instance of this optimizer plan container. The plan is 
given and fully
 * described by the data sources, sinks and the collection of all nodes.
-* 
+*
 * @param sources The data sources.
 * @param sinks The data sinks.
 * @param allNodes A collection containing all nodes in the plan.
 * @param jobName The name of the program
+* @param jobDescription The description of the program
 */
public OptimizedPlan(Collection sources, 
Collection sinks,
-   Collection allNodes, String jobName, Plan 
programPlan)
+Collection allNodes, 
String jobName, Plan programPlan, String jobDescription)
 
 Review comment:
   Actually I would remove the `jobName` and `jobDescription` parameters, and 
use those values from the programPlan, which we always do anyways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623708#comment-16623708
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219487366
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ##
 @@ -156,6 +158,25 @@ public ExecutionConfig getConfig() {
return config;
}
 
+   /**
+* Gets the description of the job.
+*
+* @return the job's description
+*/
+   public String getDescription() {
 
 Review comment:
   I would prefer not to add those methods. I would handle the description the 
same way the `jobName` is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623702#comment-16623702
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219477941
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
 ##
 @@ -198,6 +201,24 @@ public String getName() {
return this.jobName;
}
 
+   /**
+* Returns the description assigned to the job graph.
+*
+* @return the description assigned to the job graph
+*/
+   public String getDescription() {
+   return jobDescription;
+   }
+
+   /**
+* Set the description for this job graph.
+*
+* @param jobDescription the description of the job graph
+*/
+   public void setDescription(String jobDescription) {
 
 Review comment:
   Please set the `jobDescription` only in the ctor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623706#comment-16623706
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219449333
 
 

 ##
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
 ##
 @@ -127,6 +127,8 @@
/** Flag to indicate whether sinks have been cleared in previous 
executions. */
private boolean wasExecuted = false;
 
+   private String description;
 
 Review comment:
   I think `ExecutionEnvironment` should not have the description. I would 
really prefer to follow exactly the same scenario as with `jobName`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623699#comment-16623699
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219485130
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
 ##
 @@ -92,8 +92,8 @@ public void testJobDetailsMessage() {
JobID jid = GenericMessageTester.randomJobId(rnd);
JobStatus status = 
GenericMessageTester.randomJobStatus(rnd);

-   JobDetails msg1 = new JobDetails(jid, name, time, 
endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal);
-   JobDetails msg2 = new JobDetails(jid, name, time, 
endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal);
+   JobDetails msg1 = new JobDetails(jid, name, time, 
endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal, 
"");
 
 Review comment:
   Please use the `GenericMessageTester` for the description the same way it is 
used for `jobName`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623701#comment-16623701
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219517956
 
 

 ##
 File path: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ##
 @@ -56,6 +56,22 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 */
   def getCachedFiles = javaEnv.getCachedFiles
 
+  /**
+* Gets the description of the job.
+*
+* @return the job's description
+*/
+  def getDescription: String = javaEnv.getDescription
 
 Review comment:
   I would remove those methods, as described above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623704#comment-16623704
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219486186
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
 ##
 @@ -97,6 +101,7 @@ public JobDetails(
"tasksPerState argument must be of size {}.", 
ExecutionState.values().length);
this.tasksPerState = checkNotNull(tasksPerState);
this.numTasks = numTasks;
+   this.jobDescription = jobDescription;
 
 Review comment:
   Please check with `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219486325
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
 ##
 @@ -61,13 +64,15 @@ public JobInformation(
SerializedValue 
serializedExecutionConfig,
Configuration jobConfiguration,
Collection requiredJarFileBlobKeys,
-   Collection requiredClasspathURLs) {
+   Collection requiredClasspathURLs,
+   String jobDescription) {
this.jobId = Preconditions.checkNotNull(jobId);
this.jobName = Preconditions.checkNotNull(jobName);
this.serializedExecutionConfig = 
Preconditions.checkNotNull(serializedExecutionConfig);
this.jobConfiguration = 
Preconditions.checkNotNull(jobConfiguration);
this.requiredJarFileBlobKeys = 
Preconditions.checkNotNull(requiredJarFileBlobKeys);
this.requiredClasspathURLs = 
Preconditions.checkNotNull(requiredClasspathURLs);
+   this.jobDescription = jobDescription;
 
 Review comment:
   Please check with `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219517956
 
 

 ##
 File path: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ##
 @@ -56,6 +56,22 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 */
   def getCachedFiles = javaEnv.getCachedFiles
 
+  /**
+* Gets the description of the job.
+*
+* @return the job's description
+*/
+  def getDescription: String = javaEnv.getDescription
 
 Review comment:
   I would remove those methods, as described above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219485512
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
 ##
 @@ -55,6 +55,7 @@
private ArchivedExecutionConfig archivedExecutionConfig;
private boolean isStoppable;
private Map>> 
serializedUserAccumulators;
+   private String jobDescription;
 
 Review comment:
   Please move it next to `jobName`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219485130
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
 ##
 @@ -92,8 +92,8 @@ public void testJobDetailsMessage() {
JobID jid = GenericMessageTester.randomJobId(rnd);
JobStatus status = 
GenericMessageTester.randomJobStatus(rnd);

-   JobDetails msg1 = new JobDetails(jid, name, time, 
endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal);
-   JobDetails msg2 = new JobDetails(jid, name, time, 
endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal);
+   JobDetails msg1 = new JobDetails(jid, name, time, 
endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal, 
"");
 
 Review comment:
   Please use the `GenericMessageTester` for the description the same way it is 
used for `jobName`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6266: [FLINK-9682] Add 
setDescription to execution environment and provide description field for the 
rest api
URL: https://github.com/apache/flink/pull/6266#discussion_r219487366
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ##
 @@ -156,6 +158,25 @@ public ExecutionConfig getConfig() {
return config;
}
 
+   /**
+* Gets the description of the job.
+*
+* @return the job's description
+*/
+   public String getDescription() {
 
 Review comment:
   I would prefer not to add those methods. I would handle the description the 
same way the `jobName` is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10379) Can not use Table Functions in Java Table API

2018-09-21 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623627#comment-16623627
 ] 

Hequn Cheng commented on FLINK-10379:
-

Hi [~pnowojski] Thanks for sharing the information and suggestions. I haven't 
take a deep look at the problem. I will update the issue once figure it out :-)

> Can not use Table Functions in Java Table API
> -
>
> Key: FLINK-10379
> URL: https://issues.apache.org/jira/browse/FLINK-10379
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: Piotr Nowojski
>Assignee: Hequn Cheng
>Priority: Critical
>
> As stated in the 
> [documentation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#table-functions],]
>  this is how table functions should be used in Java Table API:
> {code:java}
> // Register the function.
> tableEnv.registerFunction("split", new Split("#"));
> myTable.join("split(a) as (word, length)");
> {code}
> However {{Table.join(String)}} was removed sometime ago and now it is 
> impossible to use Table Functions in Java Table API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10157) Allow `null` user values in map state with TTL

2018-09-21 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-10157.
--
   Resolution: Fixed
Fix Version/s: 1.6.2
   1.7.0

Merged in:
master: f3432042fe
release-1.6: 3558d1586c

> Allow `null` user values in map state with TTL
> --
>
> Key: FLINK-10157
> URL: https://issues.apache.org/jira/browse/FLINK-10157
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
> Environment: Flink:1.6.0
> Scala:2.11
> JDK:1.8
>Reporter: chengjie.wu
>Assignee: Andrey Zagrebin
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
> Attachments: StateWithOutTtlTest.scala, StateWithTtlTest.scala
>
>
> Thanks for the StateTtl feature,this is exactly what I need now! But I found 
> an issue.
> In the previous version or when StateTtl is not enabled,MapState allows 
> `null` value,that means after
> {code:java}
> mapState.put("key", null){code}
> , then
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff}*true*{color}, but when StateTtl is enabled,
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff}*false*{color}(*the key has not expired*).
>  So I think the field `userValue` in 
> `org.apache.flink.runtime.state.ttl.TtlValue` should allow `null` value. User 
> state is null may not means the TtlValue should be null.
>  
> {code:java}
> /**
>  * This class wraps user value of state with TTL.
>  *
>  * @param  Type of the user value of state with TTL
>  */
> class TtlValue implements Serializable {
>  private final T userValue;
>  private final long lastAccessTimestamp;
> TtlValue(T userValue, long lastAccessTimestamp) {
>  Preconditions.checkNotNull(userValue);
>  this.userValue = userValue;
>  this.lastAccessTimestamp = lastAccessTimestamp;
>  }
> T getUserValue() {
>  return userValue;
>  }
> long getLastAccessTimestamp() {
>  return lastAccessTimestamp;
>  }
> }
> {code}
> Am I understanding right?
> This is my test class.
> [^StateWithTtlTest.scala] [^StateWithOutTtlTest.scala]
> ^Thanks!:)^



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10157) Allow `null` user values in map state with TTL

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623621#comment-16623621
 ] 

ASF GitHub Bot commented on FLINK-10157:


asfgit closed pull request #6707: [FLINK-10157] [State TTL] Allow `null` user 
values in map state with TTL
URL: https://github.com/apache/flink/pull/6707
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index fb787763536..decf1dbe9de 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -350,6 +350,9 @@ will lead to compatibility failure and 
`StateMigrationException`.
 
 - The TTL configuration is not part of check- or savepoints but rather a way 
of how Flink treats it in the currently running job.
 
+- The map state with TTL currently supports null user values only if the user 
value serializer can handle null values. 
+If the serializer does not support null values, it can be wrapped with 
`NullableSerializer` at the cost of an extra byte in the serialized form.
+
  Cleanup of Expired State
 
 Currently, expired values are only removed when they are read out explicitly, 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 6eb8ddcd649..9b0094d50cd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -37,6 +37,12 @@
  * To create keyed map state (on a KeyedStream), use
  * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}.
  *
+ * Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link 
org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
+ *
  * @param  The type of the keys that can be added to the map state.
  */
 @PublicEvolving
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index f4ed929bca9..42eaea4c482 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -32,6 +32,12 @@
 
 /**
  * Configuration of state TTL logic.
+ *
+ * Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link 
org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
  */
 public class StateTtlConfig implements Serializable {
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
new file mode 100644
index 000..fe392e4b1cb
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import 

[GitHub] asfgit closed pull request #6707: [FLINK-10157] [State TTL] Allow `null` user values in map state with TTL

2018-09-21 Thread GitBox
asfgit closed pull request #6707: [FLINK-10157] [State TTL] Allow `null` user 
values in map state with TTL
URL: https://github.com/apache/flink/pull/6707
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index fb787763536..decf1dbe9de 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -350,6 +350,9 @@ will lead to compatibility failure and 
`StateMigrationException`.
 
 - The TTL configuration is not part of check- or savepoints but rather a way 
of how Flink treats it in the currently running job.
 
+- The map state with TTL currently supports null user values only if the user 
value serializer can handle null values. 
+If the serializer does not support null values, it can be wrapped with 
`NullableSerializer` at the cost of an extra byte in the serialized form.
+
  Cleanup of Expired State
 
 Currently, expired values are only removed when they are read out explicitly, 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 6eb8ddcd649..9b0094d50cd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -37,6 +37,12 @@
  * To create keyed map state (on a KeyedStream), use
  * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}.
  *
+ * Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link 
org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
+ *
  * @param  The type of the keys that can be added to the map state.
  */
 @PublicEvolving
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index f4ed929bca9..42eaea4c482 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -32,6 +32,12 @@
 
 /**
  * Configuration of state TTL logic.
+ *
+ * Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link 
org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
  */
 public class StateTtlConfig implements Serializable {
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
new file mode 100644
index 000..fe392e4b1cb
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import 

[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623521#comment-16623521
 ] 

ASF GitHub Bot commented on FLINK-10384:


yanghua commented on issue #6730: [FLINK-10384] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-423516113
 
 
   cc @xccui 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Sinh math function supported in Table API and SQL
> -
>
> Key: FLINK-10384
> URL: https://issues.apache.org/jira/browse/FLINK-10384
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> like FLINK-10340 for adding Cosh math function



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6730: [FLINK-10384] Add Sinh math function supported in Table API and SQL

2018-09-21 Thread GitBox
yanghua commented on issue #6730: [FLINK-10384] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-423516113
 
 
   cc @xccui 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10320) Introduce JobMaster schedule micro-benchmark

2018-09-21 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623481#comment-16623481
 ] 

Piotr Nowojski commented on FLINK-10320:


[~till.rohrmann] might have a good point. [~Tison] could you provide a profiler 
logs for both JobManager and the TaskManager during this 10,000 parallelism 
scheduling issue? Maybe we could even narrow down the problematic component and 
write benchmarks target specifically for it instead of for whole JobManager?

> Introduce JobMaster schedule micro-benchmark
> 
>
> Key: FLINK-10320
> URL: https://issues.apache.org/jira/browse/FLINK-10320
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: tison
>Assignee: tison
>Priority: Major
>
> Based on {{org.apache.flink.streaming.runtime.io.benchmark}} stuff and the 
> repo [flink-benchmark|https://github.com/dataArtisans/flink-benchmarks], I 
> proposal to introduce another micro-benchmark which focuses on {{JobMaster}} 
> schedule performance
> h3. Target
> Benchmark how long from {{JobMaster}} startup(receive the {{JobGraph}} and 
> init) to all tasks RUNNING. Technically we use bounded stream and TM finishes 
> tasks as soon as they arrived. So the real interval we measure is to all 
> tasks FINISHED.
> h3. Case
> 1. JobGraph that cover EAGER + PIPELINED edges
> 2. JobGraph that cover LAZY_FROM_SOURCES + PIPELINED edges
> 3. JobGraph that cover LAZY_FROM_SOURCES + BLOCKING edges
> ps: maybe benchmark if the source is get from {{InputSplit}}?
> h3. Implement
> Based on the flink-benchmark repo, we finally run benchmark using jmh. So the 
> whole test suit is separated into two repos. The testing environment could be 
> located in the main repo, maybe under 
> flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/benchmark.
> To measure the performance of {{JobMaster}} scheduling, we need to simulate 
> an environment that:
> 1. has a real {{JobMaster}}
> 2. has a mock/testing {{ResourceManager}} that having infinite resource and 
> react immediately.
> 3. has a(many?) mock/testing {{TaskExecutor}} that deploy and finish tasks 
> immediately.
> [~trohrm...@apache.org] [~GJL] [~pnowojski] could you please review this 
> proposal to help clarify the goal and concrete details? Thanks in advance.
> Any suggestions are welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10379) Can not use Table Functions in Java Table API

2018-09-21 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623471#comment-16623471
 ] 

Piotr Nowojski commented on FLINK-10379:


Thanks [~hequn8128] :)

Probably we should first also think about how to solve this. Do you have some 
specific solution in mind?

 I wasn't taking part of the discussion/effort that resulted in removing 
{{Table#join(String)}} method, but there were probably some reasons behind 
that. I heard that part of it was to switch to using implicit conversions of 
table functions to {{Table}} instances in Scala. Obviously that doesn't work, 
but maybe we could provide something to explicitly apply argument to 
{{TableFunction}} instance and convert it to {{Table}}? Something along the 
lines:

 
{code:java}
// Register the function.
TableFunction split = tableEnv.registerFunction("split", new Split("#"));

myTable.join(split.apply("a").as("word, length"));
{code}
(But I'm pretty unfamiliar with Table API, so treat this with grain of salt)

What do you think?

 

> Can not use Table Functions in Java Table API
> -
>
> Key: FLINK-10379
> URL: https://issues.apache.org/jira/browse/FLINK-10379
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: Piotr Nowojski
>Assignee: Hequn Cheng
>Priority: Critical
>
> As stated in the 
> [documentation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#table-functions],]
>  this is how table functions should be used in Java Table API:
> {code:java}
> // Register the function.
> tableEnv.registerFunction("split", new Split("#"));
> myTable.join("split(a) as (word, length)");
> {code}
> However {{Table.join(String)}} was removed sometime ago and now it is 
> impossible to use Table Functions in Java Table API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10157) Allow `null` user values in map state with TTL

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623467#comment-16623467
 ] 

ASF GitHub Bot commented on FLINK-10157:


StefanRRichter commented on issue #6707: [FLINK-10157] [State TTL] Allow `null` 
user values in map state with TTL
URL: https://github.com/apache/flink/pull/6707#issuecomment-423507977
 
 
   Thanks @azagrebin ! Changes look good to me, merging.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow `null` user values in map state with TTL
> --
>
> Key: FLINK-10157
> URL: https://issues.apache.org/jira/browse/FLINK-10157
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
> Environment: Flink:1.6.0
> Scala:2.11
> JDK:1.8
>Reporter: chengjie.wu
>Assignee: Andrey Zagrebin
>Priority: Minor
>  Labels: pull-request-available
> Attachments: StateWithOutTtlTest.scala, StateWithTtlTest.scala
>
>
> Thanks for the StateTtl feature,this is exactly what I need now! But I found 
> an issue.
> In the previous version or when StateTtl is not enabled,MapState allows 
> `null` value,that means after
> {code:java}
> mapState.put("key", null){code}
> , then
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff}*true*{color}, but when StateTtl is enabled,
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff}*false*{color}(*the key has not expired*).
>  So I think the field `userValue` in 
> `org.apache.flink.runtime.state.ttl.TtlValue` should allow `null` value. User 
> state is null may not means the TtlValue should be null.
>  
> {code:java}
> /**
>  * This class wraps user value of state with TTL.
>  *
>  * @param  Type of the user value of state with TTL
>  */
> class TtlValue implements Serializable {
>  private final T userValue;
>  private final long lastAccessTimestamp;
> TtlValue(T userValue, long lastAccessTimestamp) {
>  Preconditions.checkNotNull(userValue);
>  this.userValue = userValue;
>  this.lastAccessTimestamp = lastAccessTimestamp;
>  }
> T getUserValue() {
>  return userValue;
>  }
> long getLastAccessTimestamp() {
>  return lastAccessTimestamp;
>  }
> }
> {code}
> Am I understanding right?
> This is my test class.
> [^StateWithTtlTest.scala] [^StateWithOutTtlTest.scala]
> ^Thanks!:)^



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on issue #6707: [FLINK-10157] [State TTL] Allow `null` user values in map state with TTL

2018-09-21 Thread GitBox
StefanRRichter commented on issue #6707: [FLINK-10157] [State TTL] Allow `null` 
user values in map state with TTL
URL: https://github.com/apache/flink/pull/6707#issuecomment-423507977
 
 
   Thanks @azagrebin ! Changes look good to me, merging.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-9983) Savepoints should count as checkpoints when recovering

2018-09-21 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-9983.
---
Resolution: Duplicate

Will be fixed in [FLINK-10354]

> Savepoints should count as checkpoints when recovering
> --
>
> Key: FLINK-9983
> URL: https://issues.apache.org/jira/browse/FLINK-9983
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Priority: Critical
> Fix For: 1.7.0
>
>
> If they are not used when recovering you can get intro problems with 
> duplicate output data when a failure occurs after a savepoint was taken but 
> before the next checkpoint occurs.
> The fix, in the long run, is to differentiate between savepoints that have 
> side effects and those that don't. The former would be used for a 
> "stop-with-savepoint" scenario while the latter is for in-between savepoints. 
> This is harder to implement, so I vote for the easy fix described for now for 
> fixing the duplication problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10354) Savepoints should be counted as retained checkpoints

2018-09-21 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623465#comment-16623465
 ] 

Dawid Wysakowicz commented on FLINK-10354:
--

You're right, haven't seen the other one. Will close the other one as this have 
PR opened already.

> Savepoints should be counted as retained checkpoints
> 
>
> Key: FLINK-10354
> URL: https://issues.apache.org/jira/browse/FLINK-10354
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This task is about reverting [FLINK-6328].
> The problem is that you can get incorrect results with exactly-once sinks if 
> there is a failure after taking a savepoint but before taking the next 
> checkpoint because the savepoint will also have manifested side effects to 
> the sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-21 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623464#comment-16623464
 ] 

Piotr Nowojski commented on FLINK-10339:


I'm not aware about any concerns, but I'm also not very familiar with this 
aspects of network stack. [~NicoK] should be more familiar with this.

> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623461#comment-16623461
 ] 

ASF GitHub Bot commented on FLINK-10377:


pnowojski commented on a change in pull request #6723: [FLINK-10377] Remove 
precondition in TwoPhaseCommitSinkFunction.notif…
URL: https://github.com/apache/flink/pull/6723#discussion_r219471764
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ##
 @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long 
checkpointId) throws Exception {
//
 
 Review comment:
   Maybe you can ask the reporter of the bug to provide debug or at least info 
logs?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
> --
>
> Key: FLINK-10377
> URL: https://issues.apache.org/jira/browse/FLINK-10377
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The precondition {{checkState(pendingTransactionIterator.hasNext(), 
> "checkpoint completed, but no transaction pending");}} in 
> {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems too strict, 
> because checkpoints can overtake checkpoints and will fail the precondition. 
> In this case the commit was already performed by the first notification and 
> subsumes the late checkpoint. I think the check can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif…

2018-09-21 Thread GitBox
pnowojski commented on a change in pull request #6723: [FLINK-10377] Remove 
precondition in TwoPhaseCommitSinkFunction.notif…
URL: https://github.com/apache/flink/pull/6723#discussion_r219471764
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ##
 @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long 
checkpointId) throws Exception {
//
 
 Review comment:
   Maybe you can ask the reporter of the bug to provide debug or at least info 
logs?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-8803) Mini Cluster Shutdown with HA unstable, causing test failures

2018-09-21 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623459#comment-16623459
 ] 

tison edited comment on FLINK-8803 at 9/21/18 11:52 AM:


is it "won't fix" since it's all about {{FlinkMiniCluster}} which based on 
legacy mode? It is said the removal of legacy mode is part of 1.7.0. Maybe we 
would fix this for 1.5.x and 1.6.x but not 1.7.0?


was (Author: tison):
is it "won't fix" since it's all about {{FlinkMiniCluster}} which based on 
legacy mode?

> Mini Cluster Shutdown with HA unstable, causing test failures
> -
>
> Key: FLINK-8803
> URL: https://issues.apache.org/jira/browse/FLINK-8803
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Stephan Ewen
>Priority: Critical
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> When the {{FlinkMiniCluster}} is created for HA tests with ZooKeeper, the 
> shutdown is unstable.
> It looks like ZooKeeper may be shut down before the JobManager is shut down, 
> causing the shutdown procedure of the JobManager (specifically 
> {{ZooKeeperSubmittedJobGraphStore.removeJobGraph}}) to block until tests time 
> out.
> Full log: https://api.travis-ci.org/v3/job/346853707/log.txt
> Note that no ZK threads are alive any more, seems ZK is shut down already.
> Relevant Stack Traces:
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7f973800a800 nid=0x43b4 waiting on 
> condition [0x7f973eb0b000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x8966cf18> (a 
> scala.concurrent.impl.Promise$CompletionLatch)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
>   at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
>   at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
>   at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>   at scala.concurrent.Await$.ready(package.scala:169)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:469)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:435)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.closeAsync(FlinkMiniCluster.scala:719)
>   at 
> org.apache.flink.test.util.MiniClusterResource.after(MiniClusterResource.java:104)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)
> ...
> {code}
> {code}
> "flink-akka.actor.default-dispatcher-2" #1012 prio=5 os_prio=0 
> tid=0x7f97394fa800 nid=0x3328 waiting on condition [0x7f971db29000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x87f82a70> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.internalBlockUntilConnectedOrTimedOut(CuratorZookeeperClient.java:336)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:241)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:225)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:35)
>   at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.release(ZooKeeperStateHandleStore.java:478)
>   at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.releaseAndTryRemove(ZooKeeperStateHandleStore.java:435)
>   at 
> 

[jira] [Commented] (FLINK-8803) Mini Cluster Shutdown with HA unstable, causing test failures

2018-09-21 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623459#comment-16623459
 ] 

tison commented on FLINK-8803:
--

is it "won't fix" since it's all about {{FlinkMiniCluster}} which based on 
legacy mode?

> Mini Cluster Shutdown with HA unstable, causing test failures
> -
>
> Key: FLINK-8803
> URL: https://issues.apache.org/jira/browse/FLINK-8803
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Stephan Ewen
>Priority: Critical
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> When the {{FlinkMiniCluster}} is created for HA tests with ZooKeeper, the 
> shutdown is unstable.
> It looks like ZooKeeper may be shut down before the JobManager is shut down, 
> causing the shutdown procedure of the JobManager (specifically 
> {{ZooKeeperSubmittedJobGraphStore.removeJobGraph}}) to block until tests time 
> out.
> Full log: https://api.travis-ci.org/v3/job/346853707/log.txt
> Note that no ZK threads are alive any more, seems ZK is shut down already.
> Relevant Stack Traces:
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7f973800a800 nid=0x43b4 waiting on 
> condition [0x7f973eb0b000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x8966cf18> (a 
> scala.concurrent.impl.Promise$CompletionLatch)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
>   at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
>   at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
>   at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>   at scala.concurrent.Await$.ready(package.scala:169)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:469)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:435)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.closeAsync(FlinkMiniCluster.scala:719)
>   at 
> org.apache.flink.test.util.MiniClusterResource.after(MiniClusterResource.java:104)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)
> ...
> {code}
> {code}
> "flink-akka.actor.default-dispatcher-2" #1012 prio=5 os_prio=0 
> tid=0x7f97394fa800 nid=0x3328 waiting on condition [0x7f971db29000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x87f82a70> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.internalBlockUntilConnectedOrTimedOut(CuratorZookeeperClient.java:336)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:241)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:225)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:35)
>   at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.release(ZooKeeperStateHandleStore.java:478)
>   at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.releaseAndTryRemove(ZooKeeperStateHandleStore.java:435)
>   at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.releaseAndTryRemove(ZooKeeperStateHandleStore.java:405)
>   at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.removeJobGraph(ZooKeeperSubmittedJobGraphStore.java:266)
>   - locked <0x807f4258> (a 

[jira] [Closed] (FLINK-9738) Provide a way to define Table Version Functions in Table API

2018-09-21 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-9738.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Merged as f2a67a1682249d83711030f4e55b824cb18336d7

> Provide a way to define Table Version Functions in Table API
> 
>
> Key: FLINK-9738
> URL: https://issues.apache.org/jira/browse/FLINK-9738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9715) Support versioned joins with event time

2018-09-21 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-9715:
-

Assignee: Piotr Nowojski

> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9714) Support versioned joins with processing time

2018-09-21 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-9714.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Merged as 00add9cdc02ccfacb38a566099d877e4669c6f65

> Support versioned joins with processing time
> 
>
> Key: FLINK-9714
> URL: https://issues.apache.org/jira/browse/FLINK-9714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.7.0
>
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.proctime)) AS r 
> WHERE o.currency = r.currency{code}
> should work for processing time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-9713.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Merged as 77c38346cb06b6e6c1bb672695c54f4ba253bd3f

> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-21 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-9913.
-

> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-21 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski resolved FLINK-9913.
---
Resolution: Fixed

merged commit 914dffb into apache:master

> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623446#comment-16623446
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

pnowojski commented on issue #6299: [FLINK-9713][table][sql] Support processing 
time versioned joins
URL: https://github.com/apache/flink/pull/6299#issuecomment-423503610
 
 
   Travis is green - merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
pnowojski commented on issue #6299: [FLINK-9713][table][sql] Support processing 
time versioned joins
URL: https://github.com/apache/flink/pull/6299#issuecomment-423503610
 
 
   Travis is green - merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623432#comment-16623432
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski closed pull request #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 25d292771d0..6eebbbe88eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
/**
-* Sets a (next) target buffer to use and continues writing remaining 
data
-* to it until it is full.
+* Copies the intermediate data serialization buffer to the given 
target buffer.
 *
 * @param bufferBuilder the new target buffer to use
 * @return how much information was written to the target buffer and
 * whether this buffer is full
 */
-   SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder 
bufferBuilder) throws IOException;
+   SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
+
+   /**
+* Checks to decrease the size of intermediate data serialization 
buffer after finishing the
+* whole serialization process including {@link 
#serializeRecord(IOReadableWritable)} and
+* {@link #copyToBufferBuilder(BufferBuilder)}.
+*/
+   void prune();
 
/**
-* Clear and release internal state.
+* Supports copying an intermediate data serialization buffer to 
multiple target buffers
+* by resetting its initial position before each copying.
 */
-   void clear();
+   void reset();
 
/**
 * @return true if has some serialized data pending copying to 
the result {@link BufferBuilder}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index c4ab53f4b3a..ba2ed0133fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -20,11 +20,8 @@
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -32,7 +29,7 @@
 /**
  * Record serializer which serializes the complete record to an intermediate
  * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link 
#continueWritingWithNextBufferBuilder(BufferBuilder)}.
+ * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}.
  *
  * @param  The type of the records that are serialized.
  */
@@ -50,10 +47,6 @@
/** Intermediate buffer for length serialization. */
private final ByteBuffer lengthBuffer;
 
-   /** Current target {@link Buffer} of the serializer. */
-   @Nullable
-   private BufferBuilder targetBuffer;
-
public SpanningRecordSerializer() {
serializationBuffer = new DataOutputSerializer(128);
 
@@ -66,15 +59,12 @@ public SpanningRecordSerializer() {
}
 
/**
-* Serializes the complete record to an intermediate data serialization
-* buffer and starts copying it to the target buffer (if available).
+* Serializes 

[GitHub] pnowojski closed pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-21 Thread GitBox
pnowojski closed pull request #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 25d292771d0..6eebbbe88eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
/**
-* Sets a (next) target buffer to use and continues writing remaining 
data
-* to it until it is full.
+* Copies the intermediate data serialization buffer to the given 
target buffer.
 *
 * @param bufferBuilder the new target buffer to use
 * @return how much information was written to the target buffer and
 * whether this buffer is full
 */
-   SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder 
bufferBuilder) throws IOException;
+   SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
+
+   /**
+* Checks to decrease the size of intermediate data serialization 
buffer after finishing the
+* whole serialization process including {@link 
#serializeRecord(IOReadableWritable)} and
+* {@link #copyToBufferBuilder(BufferBuilder)}.
+*/
+   void prune();
 
/**
-* Clear and release internal state.
+* Supports copying an intermediate data serialization buffer to 
multiple target buffers
+* by resetting its initial position before each copying.
 */
-   void clear();
+   void reset();
 
/**
 * @return true if has some serialized data pending copying to 
the result {@link BufferBuilder}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index c4ab53f4b3a..ba2ed0133fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -20,11 +20,8 @@
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -32,7 +29,7 @@
 /**
  * Record serializer which serializes the complete record to an intermediate
  * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link 
#continueWritingWithNextBufferBuilder(BufferBuilder)}.
+ * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}.
  *
  * @param  The type of the records that are serialized.
  */
@@ -50,10 +47,6 @@
/** Intermediate buffer for length serialization. */
private final ByteBuffer lengthBuffer;
 
-   /** Current target {@link Buffer} of the serializer. */
-   @Nullable
-   private BufferBuilder targetBuffer;
-
public SpanningRecordSerializer() {
serializationBuffer = new DataOutputSerializer(128);
 
@@ -66,15 +59,12 @@ public SpanningRecordSerializer() {
}
 
/**
-* Serializes the complete record to an intermediate data serialization
-* buffer and starts copying it to the target buffer (if available).
+* Serializes the complete record to an intermediate data serialization 
buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623429#comment-16623429
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-423499421
 
 
   @NicoK said to me his LGTM, so I'm merging this :) Thank you @zhijiangW for 
the contribution and the time spent on the feature/PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-21 Thread GitBox
pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-423499421
 
 
   @NicoK said to me his LGTM, so I'm merging this :) Thank you @zhijiangW for 
the contribution and the time spent on the feature/PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10310) Cassandra Sink - Handling failing requests

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623422#comment-16623422
 ] 

ASF GitHub Bot commented on FLINK-10310:


wittyameta opened a new pull request #6732: [FLINK-10310] Cassandra Sink - 
Handling failing requests.
URL: https://github.com/apache/flink/pull/6732
 
 
   ## What is the purpose of the change
   
   This pull request provides support to optionally handle cassandra sink 
errors. A user may choose to ignore an exception instead of allowing the sink 
to fail. The handler is similar to `ActionRequestFailureHandler` in Elastic 
Sink.
   
   
   ## Brief change log
   
 - *Added interface `CassandraFailureHandler` and a no-op implementation 
`NoOpCassandraFailureHandler`*
 - *`CassandraSinkBase` has a new field `failureHandler` and updated 
constructor*
 - *`checkAsyncErrors` method in `CassandraSinkBase` calls the 
`failureHandler` instead of throwing an `IOException`*
 - *`CassandraSinkBuilder` has a new optional field `failureHandler` and 
updated setter. Uses no-op implementation as default*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added test in class `CassandraSinkBaseTest` that validates that 
failureHandler is called and the error is ignored*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **yes**
 - If yes, how is the feature documented? **docs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cassandra Sink - Handling failing requests
> --
>
> Key: FLINK-10310
> URL: https://issues.apache.org/jira/browse/FLINK-10310
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jayant Ameta
>Priority: Major
>  Labels: pull-request-available
>
> The cassandra sink fails for any kind of error. For some exceptions (e.g 
> WriteTimeoutException), ignoring the exception may be acceptable as well.
> Can we discuss having a FailureHandler on the lines of 
> ActionRequestFailureHandler?
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10310) Cassandra Sink - Handling failing requests

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10310:
---
Labels: pull-request-available  (was: )

> Cassandra Sink - Handling failing requests
> --
>
> Key: FLINK-10310
> URL: https://issues.apache.org/jira/browse/FLINK-10310
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jayant Ameta
>Priority: Major
>  Labels: pull-request-available
>
> The cassandra sink fails for any kind of error. For some exceptions (e.g 
> WriteTimeoutException), ignoring the exception may be acceptable as well.
> Can we discuss having a FailureHandler on the lines of 
> ActionRequestFailureHandler?
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wittyameta opened a new pull request #6732: [FLINK-10310] Cassandra Sink - Handling failing requests.

2018-09-21 Thread GitBox
wittyameta opened a new pull request #6732: [FLINK-10310] Cassandra Sink - 
Handling failing requests.
URL: https://github.com/apache/flink/pull/6732
 
 
   ## What is the purpose of the change
   
   This pull request provides support to optionally handle cassandra sink 
errors. A user may choose to ignore an exception instead of allowing the sink 
to fail. The handler is similar to `ActionRequestFailureHandler` in Elastic 
Sink.
   
   
   ## Brief change log
   
 - *Added interface `CassandraFailureHandler` and a no-op implementation 
`NoOpCassandraFailureHandler`*
 - *`CassandraSinkBase` has a new field `failureHandler` and updated 
constructor*
 - *`checkAsyncErrors` method in `CassandraSinkBase` calls the 
`failureHandler` instead of throwing an `IOException`*
 - *`CassandraSinkBuilder` has a new optional field `failureHandler` and 
updated setter. Uses no-op implementation as default*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added test in class `CassandraSinkBaseTest` that validates that 
failureHandler is called and the error is ignored*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **yes**
 - If yes, how is the feature documented? **docs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10354) Savepoints should be counted as retained checkpoints

2018-09-21 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623403#comment-16623403
 ] 

Till Rohrmann commented on FLINK-10354:
---

This looks like a duplicate of FLINK-9983?

> Savepoints should be counted as retained checkpoints
> 
>
> Key: FLINK-10354
> URL: https://issues.apache.org/jira/browse/FLINK-10354
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This task is about reverting [FLINK-6328].
> The problem is that you can get incorrect results with exactly-once sinks if 
> there is a failure after taking a savepoint but before taking the next 
> checkpoint because the savepoint will also have manifested side effects to 
> the sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10263) User-defined function with LITERAL paramters yields CompileException

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623381#comment-16623381
 ] 

ASF GitHub Bot commented on FLINK-10263:


twalthr commented on issue #6725: [FLINK-10263] [sql-client] Fix classloader 
issues in SQL Client
URL: https://github.com/apache/flink/pull/6725#issuecomment-423486904
 
 
   Thank you @dawidwys. I simplified the code there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined function with LITERAL paramters yields CompileException
> 
>
> Key: FLINK-10263
> URL: https://issues.apache.org/jira/browse/FLINK-10263
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> When using a user-defined scalar function only with literal parameters, a 
> {{CompileException}} is thrown. For example
> {code}
> SELECT myFunc(CAST(40.750444 AS FLOAT), CAST(-73.993475 AS FLOAT))
> public class MyFunc extends ScalarFunction {
>   public int eval(float lon, float lat) {
>   // do something
>   }
> }
> {code}
> results in 
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 5, Column 10: Cannot 
> determine simple type name "com"
> {code}
> The problem is probably caused by the expression reducer because it 
> disappears if a regular attribute is added to a parameter expression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client

2018-09-21 Thread GitBox
twalthr commented on issue #6725: [FLINK-10263] [sql-client] Fix classloader 
issues in SQL Client
URL: https://github.com/apache/flink/pull/6725#issuecomment-423486904
 
 
   Thank you @dawidwys. I simplified the code there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10387) StateBackend create methods should return interface not abstract KeyedStateBackend classes

2018-09-21 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-10387:
--

 Summary: StateBackend create methods should return interface not 
abstract KeyedStateBackend classes
 Key: FLINK-10387
 URL: https://issues.apache.org/jira/browse/FLINK-10387
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Gyula Fora


Currently the createKeyedStateBackend(...) methods return 
AbstractKeyedStateBackend instead of an interface.

This makes it virtually impossible to write nice extensions to StateBackends 
that add additional functionality to existing backends while delegating other 
method calls.

It should be easy enough to add a new interface that extends everything that 
the AbstractKeyedStateBackend does and the method should return that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623331#comment-16623331
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

yanghua commented on issue #6266: [FLINK-9682] Add setDescription to execution 
environment and provide description field for the rest api
URL: https://github.com/apache/flink/pull/6266#issuecomment-423475109
 
 
   @dawidwys any review suggestion?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread GitBox
yanghua commented on issue #6266: [FLINK-9682] Add setDescription to execution 
environment and provide description field for the rest api
URL: https://github.com/apache/flink/pull/6266#issuecomment-423475109
 
 
   @dawidwys any review suggestion?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10363) S3 FileSystem factory prints secrets into logs

2018-09-21 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623330#comment-16623330
 ] 

Steve Loughran commented on FLINK-10363:


see WHIRR-642 for this same issue; it's easy to do. For that one I had to 
google for every whirr log entry & notify at least two people they'd 
accidentally shared their secrets. Luckily that was the era before bitcoin 
miners scanned the internet for AWS keys

> S3 FileSystem factory prints secrets into logs
> --
>
> Key: FLINK-10363
> URL: https://issues.apache.org/jira/browse/FLINK-10363
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.7.0, 1.6.2
>
>
> The file system factory logs all values it applies from the flink 
> configuration.
> That frequently includes access keys, which should not leak into logs.
> The loader should only log the keys, not the values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10260) Confusing log during TaskManager registration

2018-09-21 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10260.
---
Resolution: Fixed

Fixed via
1.7.0: 542e8cc22290984b4b2e32577430ceb82dd82fde
1.6.2: 23221f97f3462031fa5374803df1a1d57634ff8d
1.5.5: 8dee8625502a828a303b8ec4180ab00b7e794334

> Confusing log during TaskManager registration
> -
>
> Key: FLINK-10260
> URL: https://issues.apache.org/jira/browse/FLINK-10260
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Stephan Ewen
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> During startup, when TaskManagers register, I see a lot of confusing log 
> lines.
> The below case happened during startup of a cloud setup where TaskManagers 
> took a varying amount of time to start and might have started before the 
> JobManager
> {code}
> -- Logs begin at Thu 2018-08-30 14:51:58 UTC, end at Thu 2018-08-30 14:55:39 
> UTC. --
> Aug 30 14:52:52 flink-belgium-1 systemd[1]: Started flink-jobmanager.service.
> -- Subject: Unit flink-jobmanager.service has finished start-up
> -- Defined-By: systemd
> -- Support: http://www.ubuntu.com/support
> -- 
> -- Unit flink-jobmanager.service has finished starting up.
> -- 
> -- The start-up result is RESULT.
> Aug 30 14:52:52 flink-belgium-1 jobmanager.sh[5416]: used deprecated key 
> `jobmanager.heap.mb`, please replace with key `jobmanager.heap.size`
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: Starting 
> standalonesession as a console application on host flink-belgium-1.
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,221 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,222 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> Starting StandaloneSessionClusterEntrypoint (Version: 1.6.0, Rev:ff472b4, 
> Date:07.08.2018 @ 13:31:13 UTC)
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,222 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  OS 
> current user: flink
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,718 
> WARN  org.apache.hadoop.util.NativeCodeLoader   - Unable 
> to load native-hadoop library for your platform... using builtin-java classes 
> where applicable
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,847 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> Current Hadoop/Kerberos user: flink
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,848 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM: 
> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,848 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> Maximum heap size: 1963 MiBytes
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,848 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> JAVA_HOME: (not set)
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop 
> version: 2.8.3
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM 
> Options:
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> -Xms2048m
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> -Xmx2048m
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,854 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> Program Arguments:
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,854 
> 

[jira] [Resolved] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-09-21 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9567.
--
   Resolution: Fixed
Fix Version/s: (was: 1.5.1)
   (was: 1.6.0)
   1.5.5
   1.6.2
   1.7.0

Fixed via
1.7.0: 0cf776be4483e7f939de0a7f2b1fe3263a14c6fb
1.6.2: 806d3a424c0ad66f49282d0496599597d0f9f0c0
1.5.5: 69cae4f6daf6580ca83c452c7c3ad33489551a36

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10375) ExceptionInChainedStubException hides wrapped exception in cause

2018-09-21 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10375.
---
   Resolution: Fixed
Fix Version/s: 1.5.5
   1.6.2
   1.7.0

Fixed via
1.7.0: f1511a3602690f2c44d4feb78b8961e31d345326
1.6.2: 4529e377eb23155f53fe1c1dd86830098313bc4f
1.5.5: c94b125d1ae143d92cadc93d388703f58e723bae

> ExceptionInChainedStubException hides wrapped exception in cause
> 
>
> Key: FLINK-10375
> URL: https://issues.apache.org/jira/browse/FLINK-10375
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Mike Pedersen
>Assignee: Mike Pedersen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> ExceptionInChainedStubException does not have the wrapped exception as the 
> cause. This creates generally unhelpful exception traces like this:
> {code:java}
> org.apache.beam.sdk.util.UserCodeException: 
> org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
>   at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>   at 
> org.apache.beam.sdk.io.WriteFiles$ApplyShardingKeyFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
>   at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:66)
>   at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120)
>   at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:149)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
>   at 
> org.apache.beam.sdk.io.WriteFiles$ApplyShardingKeyFn.processElement(WriteFiles.java:686)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10282) Provide separate thread-pool for REST endpoint

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623298#comment-16623298
 ] 

ASF GitHub Bot commented on FLINK-10282:


TisonKun commented on issue #6661: FLINK-10282][runtime] Separate RPC and REST 
thread-pools
URL: https://github.com/apache/flink/pull/6661#issuecomment-423466269
 
 
   Once introduce a new thread-pool to deal with REST tasks, we need to manage 
its lifecycle inside `WebMonitorEndpoint` instead of initial it at 
`ClusterEntrypoint`.
   To make it more clear, we should pass a argument that how much threads the 
thread-pool should contain instead of a `Executor`, and init `ExecutorService` 
in `WebMonitorEndpoint`. Also, when shutdown the endpoint, shutdown the service.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide separate thread-pool for REST endpoint
> --
>
> Key: FLINK-10282
> URL: https://issues.apache.org/jira/browse/FLINK-10282
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, REST
>Affects Versions: 1.5.1, 1.6.0, 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> The REST endpoints currently share their thread-pools with the RPC system, 
> which can cause the Dispatcher to become unresponsive if the REST parts are 
> overloaded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6661: FLINK-10282][runtime] Separate RPC and REST thread-pools

2018-09-21 Thread GitBox
TisonKun commented on issue #6661: FLINK-10282][runtime] Separate RPC and REST 
thread-pools
URL: https://github.com/apache/flink/pull/6661#issuecomment-423466269
 
 
   Once introduce a new thread-pool to deal with REST tasks, we need to manage 
its lifecycle inside `WebMonitorEndpoint` instead of initial it at 
`ClusterEntrypoint`.
   To make it more clear, we should pass a argument that how much threads the 
thread-pool should contain instead of a `Executor`, and init `ExecutorService` 
in `WebMonitorEndpoint`. Also, when shutdown the endpoint, shutdown the service.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10312) Wrong / missing exception when submitting job

2018-09-21 Thread Andrey Zagrebin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623266#comment-16623266
 ] 

Andrey Zagrebin edited comment on FLINK-10312 at 9/21/18 8:42 AM:
--

PR suggests the following embedding of server side failure:
{code:java}

{code}
Example:
{code:java}
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: ed85deec64effb201fa00401e2ead30b)
 at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
 at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
 at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:805)
 at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:281)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1045)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1121)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1121)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
 at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:379)
 at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
 at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
 at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Exception is not retryable.
 at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
 at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
 at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
 at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
 ... 12 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Exception is not retryable.
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:215)
 ... 9 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job 
submission failed.,
(JobManagerRunner.java:176)
 at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1049)
 at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:304)
 at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
 ... 7 more
Caused by: java.lang.RuntimeException: Failed to start 

[jira] [Commented] (FLINK-10344) Rethink SubmittedJobGraphListener

2018-09-21 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623273#comment-16623273
 ] 

Biao Liu commented on FLINK-10344:
--

[~dangdangdang] 
Oh that's my fault. I was reading a paper. Dragged the doc into chrome. I think 
that caused the attachment problem.
I have deleted the attachment. Sorry for that.

> Rethink SubmittedJobGraphListener
> -
>
> Key: FLINK-10344
> URL: https://issues.apache.org/jira/browse/FLINK-10344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
> return false positives. This is obviously problematic, because it causes the 
> subsequent recovery operation to fail. Ideally we would not require the 
> {{SubmittedJobGraphListener}}. One could, for example, periodically check 
> from the main thread whether there are new jobs. That way we would know which 
> jobs are currently running and which are being cleaned up. 
> Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10344) Rethink SubmittedJobGraphListener

2018-09-21 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-10344:
-
Attachment: (was: Tornado.pdf)

> Rethink SubmittedJobGraphListener
> -
>
> Key: FLINK-10344
> URL: https://issues.apache.org/jira/browse/FLINK-10344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
> return false positives. This is obviously problematic, because it causes the 
> subsequent recovery operation to fail. Ideally we would not require the 
> {{SubmittedJobGraphListener}}. One could, for example, periodically check 
> from the main thread whether there are new jobs. That way we would know which 
> jobs are currently running and which are being cleaned up. 
> Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >