[jira] [Updated] (FLINK-5077) testStreamTableSink fails unstable

2016-11-15 Thread Boris Osipov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boris Osipov updated FLINK-5077:

Summary: testStreamTableSink fails unstable  (was: testStreamTableSink 
falls unstable)

> testStreamTableSink fails unstable
> --
>
> Key: FLINK-5077
> URL: https://issues.apache.org/jira/browse/FLINK-5077
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.4
>Reporter: Boris Osipov
>
> I've faced with several fails TableSinkITCase.testStreamTableSink test.
> {code}
> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.938 sec 
> <<< FAILURE! - in org.apache.flink.api.scala.stream.TableSinkITCase
> testStreamTableSink(org.apache.flink.api.scala.stream.TableSinkITCase)  Time 
> elapsed: 10.534 sec  <<< FAILURE!
> java.lang.AssertionError: Different number of lines in expected and obtained 
> result. expected:<8> but was:<4>
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.failNotEquals(Assert.java:834)
> at org.junit.Assert.assertEquals(Assert.java:645)
> at 
> org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:316)
> at 
> org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:302)
> at 
> org.apache.flink.api.scala.stream.TableSinkITCase.testStreamTableSink(TableSinkITCase.scala:61)
> {code}
> I made small research. I added additional StreamITCase.StringSink
> {code}
> val results = input.toTable(tEnv, 'a, 'b, 'c)
>   .where('a < 5 || 'a > 17)
>   .select('c, 'b)
> results.writeToSink(new CsvTableSink(path))
> results.toDataStream[Row]
>   .addSink(new StreamITCase.StringSink)
> {code}
> and logging. I've ran test several times and I got following resuts in log on 
> fail:
> {noformat}
> -- Actual CsvTableSink:
> Comment#13,6
> Comment#14,6
> Comment#15,6
> Hello world, how are you?,3
> Hello world,2
> Hi,1
> -- Stream sink:
> Comment#12,6
> Comment#13,6
> Comment#14,6
> Comment#15,6
> Hello world, how are you?,3
> Hello world,2
> Hello,2
> Hi,1
> -- Expected result:
> Comment#12,6
> Comment#13,6
> Comment#14,6
> Comment#15,6
> Hello world, how are you?,3
> Hello world,2
> Hello,2
> Hi,1
> {noformat}
> Looks like writing to cvs works wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5077) testStreamTableSink falls unstable

2016-11-15 Thread Boris Osipov (JIRA)
Boris Osipov created FLINK-5077:
---

 Summary: testStreamTableSink falls unstable
 Key: FLINK-5077
 URL: https://issues.apache.org/jira/browse/FLINK-5077
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.1.4
Reporter: Boris Osipov


I've faced with several fails TableSinkITCase.testStreamTableSink test.

{code}
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.938 sec <<< 
FAILURE! - in org.apache.flink.api.scala.stream.TableSinkITCase
testStreamTableSink(org.apache.flink.api.scala.stream.TableSinkITCase)  Time 
elapsed: 10.534 sec  <<< FAILURE!
java.lang.AssertionError: Different number of lines in expected and obtained 
result. expected:<8> but was:<4>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:316)
at 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:302)
at 
org.apache.flink.api.scala.stream.TableSinkITCase.testStreamTableSink(TableSinkITCase.scala:61)
{code}

I made small research. I added additional StreamITCase.StringSink
{code}
val results = input.toTable(tEnv, 'a, 'b, 'c)
  .where('a < 5 || 'a > 17)
  .select('c, 'b)
results.writeToSink(new CsvTableSink(path))
results.toDataStream[Row]
  .addSink(new StreamITCase.StringSink)
{code}
and logging. I've ran test several times and I got following resuts in log on 
fail:
{noformat}
-- Actual CsvTableSink:
Comment#13,6
Comment#14,6
Comment#15,6
Hello world, how are you?,3
Hello world,2
Hi,1
-- Stream sink:
Comment#12,6
Comment#13,6
Comment#14,6
Comment#15,6
Hello world, how are you?,3
Hello world,2
Hello,2
Hi,1
-- Expected result:
Comment#12,6
Comment#13,6
Comment#14,6
Comment#15,6
Hello world, how are you?,3
Hello world,2
Hello,2
Hi,1
{noformat}

Looks like writing to cvs works wrong.








--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5076) Shutting down TM when shutting down new mini cluster

2016-11-15 Thread Biao Liu (JIRA)
Biao Liu created FLINK-5076:
---

 Summary: Shutting down TM when shutting down new mini cluster
 Key: FLINK-5076
 URL: https://issues.apache.org/jira/browse/FLINK-5076
 Project: Flink
  Issue Type: Improvement
  Components: Cluster Management
 Environment: FLIP-6 feature branch
Reporter: Biao Liu
Priority: Minor


Currently we don't shut down task manager when shutting down mini cluster. It 
will cause mini cluster can not exit normally.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5013) Flink Kinesis connector doesn't work on old EMR versions

2016-11-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15669628#comment-15669628
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5013 at 11/16/16 6:55 AM:
--

Resolved for master with 
http://git-wip-us.apache.org/repos/asf/flink/commit/53aae50
Resolved for release-1.1 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/723ce7

Hi [~foscraig], thanks a lot for looking at this on the EMR side! I'm marking 
this ticket as resolved now, as the "working with old EMR version" part of the 
problem is solved. If you can keep us updated on the flink-dist shading issue, 
that would be great.


was (Author: tzulitai):
Resolved for master with 
http://git-wip-us.apache.org/repos/asf/flink/commit/53aae50
Resolved for release-1.1 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/723ce7

Hi [~foscraig], thanks a lot for looking at this on the EMR side! I'm marking 
this ticket as resolved now, as the "working with old EMR version" part of the 
problem is solved.

> Flink Kinesis connector doesn't work on old EMR versions
> 
>
> Key: FLINK-5013
> URL: https://issues.apache.org/jira/browse/FLINK-5013
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> A user reported on the mailing list that our Kinesis connector doesn't work 
> with EMR 4.4.0: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-Dependency-Problems-td9790.html
> The problem seems to be that Flink is loading older libraries from the "YARN 
> container classpath", which on EMR contains the default Amazon libraries.
> We should try to shade kinesis and its amazon dependencies into a different 
> namespace.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5013) Flink Kinesis connector doesn't work on old EMR versions

2016-11-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15669628#comment-15669628
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5013 at 11/16/16 6:53 AM:
--

Resolved for master with 
http://git-wip-us.apache.org/repos/asf/flink/commit/53aae50
Resolved for release-1.1 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/723ce7

Hi [~foscraig], thanks a lot for looking at this on the EMR side! I'm marking 
this ticket as resolved now, as the "working with old EMR version" part of the 
problem is solved.


was (Author: tzulitai):
Resolved for master with 
http://git-wip-us.apache.org/repos/asf/flink/commit/53aae50
Resolved for release-1.1 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/723ce7

Hi [~foscraig], thanks a lot for looking at this on the EMR side!

> Flink Kinesis connector doesn't work on old EMR versions
> 
>
> Key: FLINK-5013
> URL: https://issues.apache.org/jira/browse/FLINK-5013
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> A user reported on the mailing list that our Kinesis connector doesn't work 
> with EMR 4.4.0: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-Dependency-Problems-td9790.html
> The problem seems to be that Flink is loading older libraries from the "YARN 
> container classpath", which on EMR contains the default Amazon libraries.
> We should try to shade kinesis and its amazon dependencies into a different 
> namespace.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5013) Flink Kinesis connector doesn't work on old EMR versions

2016-11-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15669628#comment-15669628
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5013:


Resolved for master with 
http://git-wip-us.apache.org/repos/asf/flink/commit/53aae50
Resolved for release-1.1 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/723ce7

Hi [~foscraig], thanks a lot for looking at this on the EMR side!

> Flink Kinesis connector doesn't work on old EMR versions
> 
>
> Key: FLINK-5013
> URL: https://issues.apache.org/jira/browse/FLINK-5013
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> A user reported on the mailing list that our Kinesis connector doesn't work 
> with EMR 4.4.0: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-Dependency-Problems-td9790.html
> The problem seems to be that Flink is loading older libraries from the "YARN 
> container classpath", which on EMR contains the default Amazon libraries.
> We should try to shade kinesis and its amazon dependencies into a different 
> namespace.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5013) Flink Kinesis connector doesn't work on old EMR versions

2016-11-15 Thread Craig Foster (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15669623#comment-15669623
 ] 

Craig Foster commented on FLINK-5013:
-

Is there a fix/issue pending for correctly shading in flink-dist? What work is 
involved in there? The issue potentially affects downstream users such as 
people using BigTop and not just EMR. 

> Flink Kinesis connector doesn't work on old EMR versions
> 
>
> Key: FLINK-5013
> URL: https://issues.apache.org/jira/browse/FLINK-5013
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> A user reported on the mailing list that our Kinesis connector doesn't work 
> with EMR 4.4.0: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-Dependency-Problems-td9790.html
> The problem seems to be that Flink is loading older libraries from the "YARN 
> container classpath", which on EMR contains the default Amazon libraries.
> We should try to shade kinesis and its amazon dependencies into a different 
> namespace.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4727) Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read

2016-11-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-4727.
--
Resolution: Resolved

> Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no 
> data is read
> --
>
> Key: FLINK-4727
> URL: https://issues.apache.org/jira/browse/FLINK-4727
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0, 1.1.4
>
>
> This is basically the 0.9 version counterpart for FLINK-3440.
> When the 0.9 consumer fetches initial offsets from Kafka on startup, but does 
> not have any data to read, it should also checkpoint & commit these initial 
> offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5013) Flink Kinesis connector doesn't work on old EMR versions

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15669591#comment-15669591
 ] 

ASF GitHub Bot commented on FLINK-5013:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2787


> Flink Kinesis connector doesn't work on old EMR versions
> 
>
> Key: FLINK-5013
> URL: https://issues.apache.org/jira/browse/FLINK-5013
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> A user reported on the mailing list that our Kinesis connector doesn't work 
> with EMR 4.4.0: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-Dependency-Problems-td9790.html
> The problem seems to be that Flink is loading older libraries from the "YARN 
> container classpath", which on EMR contains the default Amazon libraries.
> We should try to shade kinesis and its amazon dependencies into a different 
> namespace.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2787: [FLINK-5013] [kinesis] Shade AWS dependencies to w...

2016-11-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2787


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2787: [FLINK-5013] [kinesis] Shade AWS dependencies to work wit...

2016-11-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2787
  
Merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5013) Flink Kinesis connector doesn't work on old EMR versions

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15669582#comment-15669582
 ] 

ASF GitHub Bot commented on FLINK-5013:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2787
  
Merging ...


> Flink Kinesis connector doesn't work on old EMR versions
> 
>
> Key: FLINK-5013
> URL: https://issues.apache.org/jira/browse/FLINK-5013
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> A user reported on the mailing list that our Kinesis connector doesn't work 
> with EMR 4.4.0: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-Dependency-Problems-td9790.html
> The problem seems to be that Flink is loading older libraries from the "YARN 
> container classpath", which on EMR contains the default Amazon libraries.
> We should try to shade kinesis and its amazon dependencies into a different 
> namespace.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite

2016-11-15 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-5075:
--

 Summary: Kinesis consumer incorrectly determines shards as newly 
discovered when tested against Kinesalite
 Key: FLINK-5075
 URL: https://issues.apache.org/jira/browse/FLINK-5075
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


A user reported that when our Kinesis connector is used against Kinesalite 
(https://github.com/mhart/kinesalite), we're incorrectly determining already 
found shards as newly discovered:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html

I suspect the problem to be the mock Kinesis API implementations of Kinesalite 
doesn't completely match with the official AWS Kinesis behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5074) Implement a RunningJobRegistry based on Zookeeper

2016-11-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-5074:
---

Assignee: shuai.xu

> Implement a RunningJobRegistry based on Zookeeper 
> --
>
> Key: FLINK-5074
> URL: https://issues.apache.org/jira/browse/FLINK-5074
> Project: Flink
>  Issue Type: Task
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> For flip-6, it has implemented the ZookeeperHaServices, but 
> ZookeeperHaServices does not support getRunningJobsRegistry. So need to 
> implement a ZK based running job registry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5074) Implement a RunningJobRegistry based on Zookeeper

2016-11-15 Thread shuai.xu (JIRA)
shuai.xu created FLINK-5074:
---

 Summary: Implement a RunningJobRegistry based on Zookeeper 
 Key: FLINK-5074
 URL: https://issues.apache.org/jira/browse/FLINK-5074
 Project: Flink
  Issue Type: Task
  Components: Cluster Management
Reporter: shuai.xu


For flip-6, it has implemented the ZookeeperHaServices, but ZookeeperHaServices 
does not support getRunningJobsRegistry. So need to implement a ZK based 
running job registry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4164) Use only one rocksdb column family to store all states in an operator

2016-11-15 Thread LiuBiao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LiuBiao closed FLINK-4164.
--
Resolution: Won't Fix

As the state backend has been reconstructed, this issue should be closed. 

> Use only one rocksdb column family to store all states in an operator
> -
>
> Key: FLINK-4164
> URL: https://issues.apache.org/jira/browse/FLINK-4164
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: LiuBiao
>Priority: Minor
>
> I see now in master branch, there is only one rocksdb instance in an 
> operator, each state will be assigned into separated column families. It's a 
> good improvement, but I think there are still some problems. Column families 
> in rocksdb do not share memtable and sst files(correct me if I am wrong, 
> there is not much docs about column families in rocksdb). If user have 
> thousands of  states, the overhead will be costly. Memory may will be 
> exhausted, also there will be too much small sst files.
> If we use prefix in key instead of column family, the memory that rocksdb 
> costed will be under control. The shortcoming is that we can not optimize 
> states separately, and performance may be a little worse than before. But I 
> think it is worth to do, it is meaning for some cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3869) WindowedStream.apply with FoldFunction is too restrictive

2016-11-15 Thread Yassine Marzougui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15669039#comment-15669039
 ] 

Yassine Marzougui commented on FLINK-3869:
--

[~aljoscha] would it be OK if I incorporate the changes you proposed only to 
the Java API? The Scala compiler is complaining about too many arguments for 
method reduce and overloaded method value fold with alternatives..


> WindowedStream.apply with FoldFunction is too restrictive
> -
>
> Key: FLINK-3869
> URL: https://issues.apache.org/jira/browse/FLINK-3869
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Yassine Marzougui
>
> Right now we have this signature:
> {code}
> public  SingleOutputStreamOperator apply(R initialValue, 
> FoldFunction foldFunction, WindowFunction function) {
> {code}
> but we should have this signature to allow users to return a type other than 
> the fold accumulator type from their window function:
> {code}
> public  SingleOutputStreamOperator apply(ACC initialValue, 
> FoldFunction foldFunction, WindowFunction function) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5013) Flink Kinesis connector doesn't work on old EMR versions

2016-11-15 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668845#comment-15668845
 ] 

Robert Metzger commented on FLINK-5013:
---

We only strictly enforce the maven version when building a release.
We don't want to force regular users to use an outdated Maven version for 
building Flink.

> Flink Kinesis connector doesn't work on old EMR versions
> 
>
> Key: FLINK-5013
> URL: https://issues.apache.org/jira/browse/FLINK-5013
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> A user reported on the mailing list that our Kinesis connector doesn't work 
> with EMR 4.4.0: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-Dependency-Problems-td9790.html
> The problem seems to be that Flink is loading older libraries from the "YARN 
> container classpath", which on EMR contains the default Amazon libraries.
> We should try to shade kinesis and its amazon dependencies into a different 
> namespace.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668827#comment-15668827
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2797
  
Do we provide anything that allows the user to identify files that weren't 
cleaned up but can be removed? Something like a list of "active" pending files?


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2797
  
Do we provide anything that allows the user to identify files that weren't 
cleaned up but can be removed? Something like a list of "active" pending files?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4499) Introduce findbugs maven plugin

2016-11-15 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585619#comment-15585619
 ] 

Ted Yu edited comment on FLINK-4499 at 11/15/16 10:55 PM:
--

+1 on starting with a very small set of findbugs rules


was (Author: yuzhih...@gmail.com):
+1 on starting with a very small set of rules

> Introduce findbugs maven plugin
> ---
>
> Key: FLINK-4499
> URL: https://issues.apache.org/jira/browse/FLINK-4499
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> As suggested by Stephan in FLINK-4482, this issue is to add 
> findbugs-maven-plugin into the build process so that we can detect lack of 
> proper locking and other defects automatically.
> We can begin with small set of rules.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668574#comment-15668574
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88124075
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -118,48 +150,290 @@ public static void destroyHDFS() {
}
 
@Test
-   public void testCheckpointWithoutNotify() throws Exception {
-   File dataDir = tempFolder.newFolder();
+   public void testInactivityPeriodWithLateNotify() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 2, 0 ,0, 0);
+
+   testHarness.setProcessingTime(101L);// put some in pending
+   checkFs(outDir, 0, 2, 0, 0);
+
+   testHarness.snapshot(0, 0); // put 
them in pending for 0
+   checkFs(outDir, 0, 2, 0, 0);
 
-   OneInputStreamOperatorTestHarness testHarness = 
createTestSink(dataDir);
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   testHarness.processElement(new StreamRecord<>("test4", 1L));
 
+   testHarness.setProcessingTime(202L);// put some in pending
+
+   testHarness.snapshot(1, 0); // put 
them in pending for 1
+   checkFs(outDir, 0, 4, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0); // put the 
pending for 0 to the "committed" state
+   checkFs(outDir, 0, 2, 2, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(1); // put the pending 
for 1 to the "committed" state
+   checkFs(outDir, 0, 0, 4, 0);
+   }
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
testHarness.setup();
testHarness.open();
 
-   testHarness.processElement(new StreamRecord<>("Hello"));
-   testHarness.processElement(new StreamRecord<>("Hello"));
-   testHarness.processElement(new StreamRecord<>("Hello"));
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 2, 0 ,0, 0);
+
+   // this is to check the inactivity threshold
+   testHarness.setProcessingTime(101L);
+   checkFs(outDir, 0, 2, 0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
 
-   testHarness.setProcessingTime(1L);
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
 
-   // snapshot but don't call notify to simulate a notify that 
never
-   // arrives, the sink should move pending files in restore() in 
that case
-   StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0);
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
 
-   testHarness = createTestSink(dataDir);
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0, 100);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0, 100);
testHarness.setup();
-   testHarness.restore(snapshot1);
+   

[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668573#comment-15668573
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88127261
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultTolerance2ITCase.java
 ---
@@ -61,6 +62,10 @@
 * high. This provokes the case that the sink restarts without any 
checkpoint having been performed.
 * This tests the initial cleanup of pending/in-progress files.
 */
+
+// I suggest to remove this as we no longer clean up lingering files.
+
+@Ignore
 public class BucketingSinkFaultTolerance2ITCase extends 
StreamFaultToleranceTestBase {
--- End diff --

according to the javadocs we can indeed remove this test. Even without 
that, i can't quite figure out where the test actually verified that the files 
were deleted :/


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88124075
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -118,48 +150,290 @@ public static void destroyHDFS() {
}
 
@Test
-   public void testCheckpointWithoutNotify() throws Exception {
-   File dataDir = tempFolder.newFolder();
+   public void testInactivityPeriodWithLateNotify() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 2, 0 ,0, 0);
+
+   testHarness.setProcessingTime(101L);// put some in pending
+   checkFs(outDir, 0, 2, 0, 0);
+
+   testHarness.snapshot(0, 0); // put 
them in pending for 0
+   checkFs(outDir, 0, 2, 0, 0);
 
-   OneInputStreamOperatorTestHarness testHarness = 
createTestSink(dataDir);
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   testHarness.processElement(new StreamRecord<>("test4", 1L));
 
+   testHarness.setProcessingTime(202L);// put some in pending
+
+   testHarness.snapshot(1, 0); // put 
them in pending for 1
+   checkFs(outDir, 0, 4, 0, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(0); // put the 
pending for 0 to the "committed" state
+   checkFs(outDir, 0, 2, 2, 0);
+
+   testHarness.notifyOfCompletedCheckpoint(1); // put the pending 
for 1 to the "committed" state
+   checkFs(outDir, 0, 0, 4, 0);
+   }
+
+   @Test
+   public void testBucketStateTransitions() throws Exception {
+   final File outDir = tempFolder.newFolder();
+
+   OneInputStreamOperatorTestHarness testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
testHarness.setup();
testHarness.open();
 
-   testHarness.processElement(new StreamRecord<>("Hello"));
-   testHarness.processElement(new StreamRecord<>("Hello"));
-   testHarness.processElement(new StreamRecord<>("Hello"));
+   testHarness.setProcessingTime(0L);
+
+   testHarness.processElement(new StreamRecord<>("test1", 1L));
+   testHarness.processElement(new StreamRecord<>("test2", 1L));
+   checkFs(outDir, 2, 0 ,0, 0);
+
+   // this is to check the inactivity threshold
+   testHarness.setProcessingTime(101L);
+   checkFs(outDir, 0, 2, 0, 0);
+
+   testHarness.processElement(new StreamRecord<>("test3", 1L));
+   checkFs(outDir, 1, 2, 0, 0);
+
+   testHarness.snapshot(0, 0);
+   checkFs(outDir, 1, 2, 0, 0);
 
-   testHarness.setProcessingTime(1L);
+   testHarness.notifyOfCompletedCheckpoint(0);
+   checkFs(outDir, 1, 0, 2, 0);
 
-   // snapshot but don't call notify to simulate a notify that 
never
-   // arrives, the sink should move pending files in restore() in 
that case
-   StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0);
+   OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
 
-   testHarness = createTestSink(dataDir);
+   testHarness.close();
+   checkFs(outDir, 0, 1, 2, 0);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0, 100);
+   testHarness.setup();
+   testHarness.initializeState(snapshot);
+   testHarness.open();
+   checkFs(outDir, 0, 0, 3, 1);
+
+   snapshot = testHarness.snapshot(2, 0);
+
+   testHarness.processElement(new StreamRecord<>("test4", 10));
+   checkFs(outDir, 1, 0, 3, 1);
+
+   testHarness = createRescalingTestSink(outDir, 1, 0, 100);
testHarness.setup();
-   testHarness.restore(snapshot1);
+   testHarness.initializeState(snapshot);
testHarness.open();
 
-   testHarness.processElement(new StreamRecord<>("Hello"));
+   // the in-progress file remains as we do not clean up now
+   checkFs(outDir, 1, 0, 3, 1);
  

[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88125887
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -70,7 +71,37 @@
private static org.apache.hadoop.fs.FileSystem dfs;
private static String hdfsURI;
 
-   private OneInputStreamOperatorTestHarness 
createTestSink(File dataDir) throws Exception {
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   private OneInputStreamOperatorTestHarness 
createRescalingTestSink(
+   File outDir, int totalParallelism, int taskIdx, long 
inactivityInterval) throws Exception {
+
+   BucketingSink sink = new 
BucketingSink(outDir.getAbsolutePath())
+   .setBucketer(new Bucketer() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public Path getBucketPath(Clock clock, Path 
basePath, String element) {
+   return new Path(basePath, element);
+   }
+   })
+   .setWriter(new StringWriter())
+   .setInactiveBucketCheckInterval(inactivityInterval)
+   .setInactiveBucketThreshold(inactivityInterval)
+   .setPartPrefix("part")
--- End diff --

this should also be moved into a field since we use it in checkFS


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88127261
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultTolerance2ITCase.java
 ---
@@ -61,6 +62,10 @@
 * high. This provokes the case that the sink restarts without any 
checkpoint having been performed.
 * This tests the initial cleanup of pending/in-progress files.
 */
+
+// I suggest to remove this as we no longer clean up lingering files.
+
+@Ignore
 public class BucketingSinkFaultTolerance2ITCase extends 
StreamFaultToleranceTestBase {
--- End diff --

according to the javadocs we can indeed remove this test. Even without 
that, i can't quite figure out where the test actually verified that the files 
were deleted :/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668572#comment-15668572
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88125887
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -70,7 +71,37 @@
private static org.apache.hadoop.fs.FileSystem dfs;
private static String hdfsURI;
 
-   private OneInputStreamOperatorTestHarness 
createTestSink(File dataDir) throws Exception {
+   private static final String PENDING_SUFFIX = ".pending";
+   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+   private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+   private OneInputStreamOperatorTestHarness 
createRescalingTestSink(
+   File outDir, int totalParallelism, int taskIdx, long 
inactivityInterval) throws Exception {
+
+   BucketingSink sink = new 
BucketingSink(outDir.getAbsolutePath())
+   .setBucketer(new Bucketer() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public Path getBucketPath(Clock clock, Path 
basePath, String element) {
+   return new Path(basePath, element);
+   }
+   })
+   .setWriter(new StringWriter())
+   .setInactiveBucketCheckInterval(inactivityInterval)
+   .setInactiveBucketThreshold(inactivityInterval)
+   .setPartPrefix("part")
--- End diff --

this should also be moved into a field since we use it in checkFS


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668498#comment-15668498
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88122097
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -71,6 +71,10 @@
private static org.apache.hadoop.fs.FileSystem dfs;
private static String hdfsURI;
 
+   private static final String pendingSuffix = ".pending";
--- End diff --

by convention, the variable name should be upper-case, like 
`PENDING_SUFFIX`.


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88122097
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
@@ -71,6 +71,10 @@
private static org.apache.hadoop.fs.FileSystem dfs;
private static String hdfsURI;
 
+   private static final String pendingSuffix = ".pending";
--- End diff --

by convention, the variable name should be upper-case, like 
`PENDING_SUFFIX`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668495#comment-15668495
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2797
  
Sorry I just amended my previous commit with the comment integration. :S


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2797
  
Sorry I just amended my previous commit with the comment integration. :S


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668477#comment-15668477
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88120352
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -161,9 +161,7 @@
// These are initialized with some defaults but are meant to be 
changeable by the user
 
/**
-* The default maximum size of part files.
-* 
-* By default, {@code 6 X} the default block size.
+* The default maximum size of part files (currently {@code 384 MB}).
--- End diff --

do we actually know what this value is based on ? :D


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88120352
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -161,9 +161,7 @@
// These are initialized with some defaults but are meant to be 
changeable by the user
 
/**
-* The default maximum size of part files.
-* 
-* By default, {@code 6 X} the default block size.
+* The default maximum size of part files (currently {@code 384 MB}).
--- End diff --

do we actually know what this value is based on ? :D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668461#comment-15668461
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2797
  
That makes sense, thanks for the explanation.


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2797
  
That makes sense, thanks for the explanation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668371#comment-15668371
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2797
  
The subtask matters for files that were opened but they did not make it 
into a checkpoint because 
a failure occurred after they were opened but before a checkpoint barrier 
arrived. These are the ones
that are not cleaned up anymore. The rest, that are part of the checkpoint, 
they are "cleaned up” or 
rolled back if needed.
 
Probably you may understand more how the cleaning up was happening if you 
checkout the code 
in the master for the cleanup. 

> On Nov 15, 2016, at 10:12 PM, zentol  wrote:
> 
> @zentol commented on this pull request.
> 
> In 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 :
> 
> > }
>  
> - @Override
> - public void restoreState(State state) {
> - this.state = state;
> -
> - try {
> - initFileSystem();
> - } catch (IOException e) {
> - LOG.error("Error while creating FileSystem in 
checkpoint restore.", e);
> - throw new RuntimeException("Error while creating 
FileSystem in checkpoint restore.", e);
> - }
> + private void handleRestoredBucketState(BucketState bucketState) {
> + // we can clean all the pending files since they were renamed to
> I don't understand why the subtask index even matters. In a simplified 
view, every subtask maintains a set of Files. When rescaling/restoring state, 
this set of files is given to another subtask. If this set contains a file 
marked as pending, it should be allowed to delete it, since no other task 
should have the same file since the state containing the information about the 
file (aka, the file path) is not given to multiple subtasks.
> 
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub 
, or mute the thread 
.
> 




> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2797
  
The subtask matters for files that were opened but they did not make it 
into a checkpoint because 
a failure occurred after they were opened but before a checkpoint barrier 
arrived. These are the ones
that are not cleaned up anymore. The rest, that are part of the checkpoint, 
they are "cleaned up” or 
rolled back if needed.
 
Probably you may understand more how the cleaning up was happening if you 
checkout the code 
in the master for the cleanup. 

> On Nov 15, 2016, at 10:12 PM, zentol  wrote:
> 
> @zentol commented on this pull request.
> 
> In 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 :
> 
> > }
>  
> - @Override
> - public void restoreState(State state) {
> - this.state = state;
> -
> - try {
> - initFileSystem();
> - } catch (IOException e) {
> - LOG.error("Error while creating FileSystem in 
checkpoint restore.", e);
> - throw new RuntimeException("Error while creating 
FileSystem in checkpoint restore.", e);
> - }
> + private void handleRestoredBucketState(BucketState bucketState) {
> + // we can clean all the pending files since they were renamed to
> I don't understand why the subtask index even matters. In a simplified 
view, every subtask maintains a set of Files. When rescaling/restoring state, 
this set of files is given to another subtask. If this set contains a file 
marked as pending, it should be allowed to delete it, since no other task 
should have the same file since the state containing the information about the 
file (aka, the file path) is not given to multiple subtasks.
> 
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub 
, or mute the thread 
.
> 




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-15 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668362#comment-15668362
 ] 

sunjincheng commented on FLINK-4937:


I really want to separate incremental and non-incremental aggregates into 
separate private methods, but when we want to keep the input parameters 
concise, both methods will have more repetitive code, and when we eliminate the 
repetition of the code, the input Parameters more, although the current code a 
little more than a little, but the logic is clear, so I prefer to maintain the 
status quo..What do you think [~twalthr] [~fhueske]]

> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-15 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668332#comment-15668332
 ] 

sunjincheng commented on FLINK-4937:


done

> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88113221
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -341,23 +355,49 @@ public void setInputType(TypeInformation type, 
ExecutionConfig executionConfi
}
 
@Override
-   public void open(Configuration parameters) throws Exception {
-   super.open(parameters);
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   Preconditions.checkArgument(this.restoredBucketStates == null, 
"The operator has already been initialized.");
 
-   subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+   initFileSystem();
 
-   state = new State();
--- End diff --

undoing it was a bit of an overstatement; moving it into a separate commit 
with all the other formatting changes would do the trick as well. If you try 
that out you will see that the changes to open and initializeState no longer 
intersect so heavily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-15 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668331#comment-15668331
 ] 

sunjincheng commented on FLINK-4937:


done

> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-15 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668327#comment-15668327
 ] 

sunjincheng commented on FLINK-4937:


done

> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668324#comment-15668324
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88113221
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -341,23 +355,49 @@ public void setInputType(TypeInformation type, 
ExecutionConfig executionConfi
}
 
@Override
-   public void open(Configuration parameters) throws Exception {
-   super.open(parameters);
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   Preconditions.checkArgument(this.restoredBucketStates == null, 
"The operator has already been initialized.");
 
-   subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+   initFileSystem();
 
-   state = new State();
--- End diff --

undoing it was a bit of an overstatement; moving it into a separate commit 
with all the other formatting changes would do the trick as well. If you try 
that out you will see that the changes to open and initializeState no longer 
intersect so heavily.


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668314#comment-15668314
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2797
  
No, it's not just that one method. The method is on a whole other level; it 
is so easy to edit a commit there simply is no reason for such a thing to exist 
in the *initial PR*.

So, let's go through a few examples of purely cosmetic changes that could 
easily have been moved into separate commit. These changes have absolutely no 
bearing on the core of the PR, which is the port to the new state interface / 
rescaling.
* Removing `State#hasBucketState`
* Addition, or rather full propagation of getPendingPathFor/etc. in the 
entire class
* all the random small formatting changes; removing iniitializations to 
null, generic parameters stuff, removing commas, adding missing spaces
* plenty small comment fixes

Note that these changes are all *good*, it was just be nice to separate 
these *cosmetic* changes from the *functional* ones.

Now, let's talk about changes that are plain unnecessary and just noise:
* the modifications to reflectTruncate
* the loop change in snapshotState
* no longer storing the subtaskIndex in a field
* no longer looping over all buckets in restoreState (now 
handleRestoredState)

With a properly cleaned diff i was able to determine very quickly what has 
actually changed; and this was simply not possible in the current state of the 
PR.

That is was I'm referring to.



> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2797
  
No, it's not just that one method. The method is on a whole other level; it 
is so easy to edit a commit there simply is no reason for such a thing to exist 
in the *initial PR*.

So, let's go through a few examples of purely cosmetic changes that could 
easily have been moved into separate commit. These changes have absolutely no 
bearing on the core of the PR, which is the port to the new state interface / 
rescaling.
* Removing `State#hasBucketState`
* Addition, or rather full propagation of getPendingPathFor/etc. in the 
entire class
* all the random small formatting changes; removing iniitializations to 
null, generic parameters stuff, removing commas, adding missing spaces
* plenty small comment fixes

Note that these changes are all *good*, it was just be nice to separate 
these *cosmetic* changes from the *functional* ones.

Now, let's talk about changes that are plain unnecessary and just noise:
* the modifications to reflectTruncate
* the loop change in snapshotState
* no longer storing the subtaskIndex in a field
* no longer looping over all buckets in restoreState (now 
handleRestoredState)

With a properly cleaned diff i was able to determine very quickly what has 
actually changed; and this was simply not possible in the current state of the 
PR.

That is was I'm referring to.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668281#comment-15668281
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88110293
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +583,278 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
 
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
+
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   

[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88110293
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +583,278 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
 
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
+
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   m.invoke(fs, testPath, 2);
-   } catch (IllegalAccessException | 
InvocationTargetException e) {
-   LOG.debug("Truncate is not supported.", e);
-   m = null;
-  

[jira] [Commented] (FLINK-4913) Per-job Yarn clusters: include user jar in system class loader

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668275#comment-15668275
 ] 

ASF GitHub Bot commented on FLINK-4913:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2795
  
Thanks. +1 for reverting then!


> Per-job Yarn clusters: include user jar in system class loader 
> ---
>
> Key: FLINK-4913
> URL: https://issues.apache.org/jira/browse/FLINK-4913
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, YARN Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> Including the jar directly in the system classloader avoids loading it for 
> every instantiation of the ExecutionGraph and every Task execution. Note, 
> this is only possible for per-job clusters (i.e. Yarn/Mesos).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2795: Revert "[FLINK-4913][yarn] include user jars in system cl...

2016-11-15 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2795
  
Thanks. +1 for reverting then!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668195#comment-15668195
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88105517
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -341,23 +355,49 @@ public void setInputType(TypeInformation type, 
ExecutionConfig executionConfi
}
 
@Override
-   public void open(Configuration parameters) throws Exception {
-   super.open(parameters);
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   Preconditions.checkArgument(this.restoredBucketStates == null, 
"The operator has already been initialized.");
 
-   subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+   initFileSystem();
 
-   state = new State();
--- End diff --

I can undo the L355 although I personally think it is not nice to split 
assignments without a reason. Now for this one, I will leave as the type is 
inferred and also like this it gives warnings. 


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88105517
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -341,23 +355,49 @@ public void setInputType(TypeInformation type, 
ExecutionConfig executionConfi
}
 
@Override
-   public void open(Configuration parameters) throws Exception {
-   super.open(parameters);
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
+   Preconditions.checkArgument(this.restoredBucketStates == null, 
"The operator has already been initialized.");
 
-   subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+   initFileSystem();
 
-   state = new State();
--- End diff --

I can undo the L355 although I personally think it is not nice to split 
assignments without a reason. Now for this one, I will leave as the type is 
inferred and also like this it gives warnings. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4900) Implement Docker image support

2016-11-15 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mischa Krüger updated FLINK-4900:
-
Labels: release  (was: release review)

> Implement Docker image support
> --
>
> Key: FLINK-4900
> URL: https://issues.apache.org/jira/browse/FLINK-4900
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Mischa Krüger
>  Labels: release
>
> Support the use of a docker image, with both the unified containerizer and 
> the Docker containerizer.
> Use a configuration setting to explicitly configure which image and 
> containerizer to use.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15668128#comment-15668128
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88101910
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -150,53 +162,59 @@
 
/**
 * The default maximum size of part files.
-*
-* 6 times the default block size
+* 
+* By default, {@code 6 X} the default block size.
--- End diff --

i think we should either not mention "block size" or use a more common term 
for whatever it stands for.


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4900) Implement Docker image support

2016-11-15 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mischa Krüger updated FLINK-4900:
-
Labels: release review  (was: release)

> Implement Docker image support
> --
>
> Key: FLINK-4900
> URL: https://issues.apache.org/jira/browse/FLINK-4900
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Mischa Krüger
>  Labels: release, review
>
> Support the use of a docker image, with both the unified containerizer and 
> the Docker containerizer.
> Use a configuration setting to explicitly configure which image and 
> containerizer to use.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88101910
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -150,53 +162,59 @@
 
/**
 * The default maximum size of part files.
-*
-* 6 times the default block size
+* 
+* By default, {@code 6 X} the default block size.
--- End diff --

i think we should either not mention "block size" or use a more common term 
for whatever it stands for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5073) ZooKeeperCompleteCheckpointStore executes blocking delete operation in ZooKeeper client thread

2016-11-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5073:


 Summary: ZooKeeperCompleteCheckpointStore executes blocking delete 
operation in ZooKeeper client thread
 Key: FLINK-5073
 URL: https://issues.apache.org/jira/browse/FLINK-5073
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.1.3, 1.2.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.2.0, 1.1.4


When deleting completed checkpoints from the 
{{ZooKeeperCompletedCheckpointStore}}, one first tries to delete the meta state 
handle from ZooKeeper and then deletes the actual checkpoint in a callback from 
the delete operation. This callback is executed by the ZooKeeper client's main 
thread which is problematic, because it blocks the ZooKeeper client. If a 
delete operation takes longer than it takes to complete a checkpoint, then it 
might even happen that delete operations of outdated checkpoints are piling up 
because they are effectively executed sequentially.

I propose to execute the delete operations by a dedicated {{Executor}} so that 
we keep the client's main thread free to do ZooKeeper related work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4900) Implement Docker image support

2016-11-15 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mischa Krüger updated FLINK-4900:
-
Labels: release  (was: )

> Implement Docker image support
> --
>
> Key: FLINK-4900
> URL: https://issues.apache.org/jira/browse/FLINK-4900
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Mischa Krüger
>  Labels: release
>
> Support the use of a docker image, with both the unified containerizer and 
> the Docker containerizer.
> Use a configuration setting to explicitly configure which image and 
> containerizer to use.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4900) Implement Docker image support

2016-11-15 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mischa Krüger reassigned FLINK-4900:


Assignee: Mischa Krüger  (was: Eron Wright )

> Implement Docker image support
> --
>
> Key: FLINK-4900
> URL: https://issues.apache.org/jira/browse/FLINK-4900
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Mischa Krüger
>
> Support the use of a docker image, with both the unified containerizer and 
> the Docker containerizer.
> Use a configuration setting to explicitly configure which image and 
> containerizer to use.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5008) Update quickstart documentation

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667701#comment-15667701
 ] 

ASF GitHub Bot commented on FLINK-5008:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/2764
  
I tried with several versions of Eclipse and Scala IDE, even with the one 
claimed to work. Unfortunately, I got none of them to work.


> Update quickstart documentation
> ---
>
> Key: FLINK-5008
> URL: https://issues.apache.org/jira/browse/FLINK-5008
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> * The IDE setup documentation of Flink is outdated in both parts: IntelliJ 
> IDEA was based on an old version and Eclipse/Scala IDE does not work at all 
> anymore.
> * The example in the "Quickstart: Setup" is outdated and requires "." to be 
> in the path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2764: [FLINK-5008] Update quickstart documentation

2016-11-15 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/2764
  
I tried with several versions of Eclipse and Scala IDE, even with the one 
claimed to work. Unfortunately, I got none of them to work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4604) Add support for standard deviation/variance

2016-11-15 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667697#comment-15667697
 ] 

Timo Walther commented on FLINK-4604:
-

I looked into the problem. If you take a look at 
{{AggregateReduceFunctionsRule}} (e.g. line 354), you see that the null literal 
is created with no type/the {{NULL}} type. We currently do not support this 
type. Either we replace this and similar lines with 
{{rexBuilder.makeNullLiteral(sumZeroRef.getType().getSqlTypeName())}} to give 
it a type or we create a new type but I don't know how we want to represent it 
so far.

> Add support for standard deviation/variance
> ---
>
> Key: FLINK-4604
> URL: https://issues.apache.org/jira/browse/FLINK-4604
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
> Attachments: 1.jpg
>
>
> Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, 
> STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test 
> and document this rule. 
> If we also want to add this aggregates to Table API is up for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2797
  
Thanks for the first review @zentol . 

On the comment about the diff reduction, you are referring to the removal 
of the method in the intermediate commits that was not used? If yes, then this 
is the reason it was removed in the next commit. Either way, the commit about 
the documentation should be squashed before merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2814: Corrected log4j files.

2016-11-15 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2814
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667681#comment-15667681
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2797
  
Thanks for the first review @zentol . 

On the comment about the diff reduction, you are referring to the removal 
of the method in the intermediate commits that was not used? If yes, then this 
is the reason it was removed in the next commit. Either way, the commit about 
the documentation should be squashed before merging.


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667672#comment-15667672
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88063735
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
--- End diff --

what will this save?


> BucketingSink deletes valid data when checkpoint notification is slow.
> 

[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88063735
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
--- End diff --

what will this save?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-15 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan Excuse my ignorance, I'm only now learning about Flink internals 
:)
It seems like the issue here was that `partitionByRange` partitions keys in 
ascending order but we want the end result in descending order.

@tfournier314 I think the following should work, here I use a key extractor 
to negate the value of the key to achieve the desired effect:

```Scala
itData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => -x._2) // Take the negative count as the key
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667671#comment-15667671
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan Excuse my ignorance, I'm only now learning about Flink internals 
:)
It seems like the issue here was that `partitionByRange` partitions keys in 
ascending order but we want the end result in descending order.

@tfournier314 I think the following should work, here I use a key extractor 
to negate the value of the key to achieve the desired effect:

```Scala
itData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => -x._2) // Take the negative count as the key
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
```


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667664#comment-15667664
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88063330
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -150,53 +162,59 @@
 
/**
 * The default maximum size of part files.
-*
-* 6 times the default block size
+* 
+* By default, {@code 6 X} the default block size.
--- End diff --

This existed from before. Just reformatted it. I can remove it if it is 
clearer.


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88063330
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -150,53 +162,59 @@
 
/**
 * The default maximum size of part files.
-*
-* 6 times the default block size
+* 
+* By default, {@code 6 X} the default block size.
--- End diff --

This existed from before. Just reformatted it. I can remove it if it is 
clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2814: Corrected log4j files.

2016-11-15 Thread bitchelov
GitHub user bitchelov opened a pull request:

https://github.com/apache/flink/pull/2814

Corrected log4j files.

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bitchelov/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2814.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2814


commit 11763155b8ca8b64e1c6db342c887876f68d373e
Author: sergey_sokur 
Date:   2016-11-15T16:59:21Z

Corrected log4j files.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2802: Minor fixes

2016-11-15 Thread bitchelov
Github user bitchelov closed the pull request at:

https://github.com/apache/flink/pull/2802


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2800: Update README.md

2016-11-15 Thread bitchelov
Github user bitchelov closed the pull request at:

https://github.com/apache/flink/pull/2800


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-11-15 Thread Nikolay Vasilishin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667527#comment-15667527
 ] 

Nikolay Vasilishin commented on FLINK-4565:
---

Thanks, guys.
I've implemented first two cases: disjunctive equality predicates for less then 
20 entries and reusable hashset for >= 20 predicates.

Can you give any advice, where I can find anything in code related to 3 
subissue? Where is the point in which I can access the subquery results and use 
them for IN operator?

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Nikolay Vasilishin
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667497#comment-15667497
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2797
  
I also haven't checked the tests yet.


> BucketingSink deletes valid data when checkpoint notification is slow.
> --
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.1.3
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2797
  
I also haven't checked the tests yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88044100
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   m.invoke(fs, testPath, 2);
-   } catch (IllegalAccessException | 
InvocationTargetException e) {
-   LOG.debug("Truncate is not supported.", e);
-   m = null;
-

[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667493#comment-15667493
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88044100
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   

[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667489#comment-15667489
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88044327
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
--- End diff --

these helper methods can be static.


> BucketingSink deletes valid data when checkpoint notification is slow.
> 

[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667490#comment-15667490
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88044225
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   

[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88044225
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   m.invoke(fs, testPath, 2);
-   } catch (IllegalAccessException | 
InvocationTargetException e) {
-   LOG.debug("Truncate is not supported.", e);
-   m = null;
-

[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667492#comment-15667492
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88043878
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   

[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667491#comment-15667491
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88032585
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   

[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88043878
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   m.invoke(fs, testPath, 2);
-   } catch (IllegalAccessException | 
InvocationTargetException e) {
-   LOG.debug("Truncate is not supported.", e);
-   m = null;
-

[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88044327
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
--- End diff --

these helper methods can be static.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2797: [FLINK-5056] Makes the BucketingSink rescalable.

2016-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88032585
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
+
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   m.invoke(fs, testPath, 2);
-   } catch (IllegalAccessException | 
InvocationTargetException e) {
-   LOG.debug("Truncate is not supported.", e);
-   m = null;
-

[jira] [Closed] (FLINK-4369) EvictingWindowOperator Must Actually Evict Elements

2016-11-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-4369.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

> EvictingWindowOperator Must Actually Evict Elements
> ---
>
> Key: FLINK-4369
> URL: https://issues.apache.org/jira/browse/FLINK-4369
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: vishnu viswanath
>Priority: Blocker
> Fix For: 1.2.0
>
>
> {{EvictingWindowOperator}} does not actually remove evicted elements from the 
> state. They are only filtered from the Iterable that is given to the 
> WindowFunction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-4369) EvictingWindowOperator Must Actually Evict Elements

2016-11-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-4369:
-

Reopen to set fix version.

> EvictingWindowOperator Must Actually Evict Elements
> ---
>
> Key: FLINK-4369
> URL: https://issues.apache.org/jira/browse/FLINK-4369
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: vishnu viswanath
>Priority: Blocker
> Fix For: 1.2.0
>
>
> {{EvictingWindowOperator}} does not actually remove evicted elements from the 
> state. They are only filtered from the Iterable that is given to the 
> WindowFunction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667457#comment-15667457
 ] 

ASF GitHub Bot commented on FLINK-4174:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2736
  
I reopened and then closed with the correct fix version. I think no-one can 
change closed issues.


> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
> Fix For: 1.2.0
>
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality

2016-11-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2736
  
I reopened and then closed with the correct fix version. I think no-one can 
change closed issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-4174.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
> Fix For: 1.2.0
>
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-4174:
-

Reopen to set fix version.


> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
> Fix For: 1.2.0
>
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5072) MetricFetcher Ask Timeout

2016-11-15 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667421#comment-15667421
 ] 

Chesnay Schepler commented on FLINK-5072:
-

hmm. Well it isn't something critical if the MetricFetcher request times out; 
it will simply not update the metrics in the web interface and will try again 
10 seconds later if required.

However, the MetricQueryService is a separate actor that, if a job is fully 
running, only receives a request from the fetcher. I would think that it should 
be able to serve that request within the 10 second timeout. But frankly, i 
don't know a lot about the network conditions under heavy load.

> MetricFetcher Ask Timeout
> -
>
> Key: FLINK-5072
> URL: https://issues.apache.org/jira/browse/FLINK-5072
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>
> Running a large scale test with 1.2-SNAPSHOT and heavy load on the TMs, I 
> encountered a lot of ask timeouts for the metric fetcher:
> {code}
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka.tcp://flink@10.240.0.52:34471/user/MetricQueryService_container_1479207428252_0014_01_26]]
>  after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> [~zentol] Does it make sense to investigate this further?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-15 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2740
  
`zipWithIndex` preserves the order between partitions 
(DataSetUtils.java:121). @tfournier314, I don't think it's a problem pushing 
your current code since we're still discussing the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667423#comment-15667423
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2740
  
`zipWithIndex` preserves the order between partitions 
(DataSetUtils.java:121). @tfournier314, I don't think it's a problem pushing 
your current code since we're still discussing the PR.


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality

2016-11-15 Thread soniclavier
Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
I don't think I can edit a closed issue, could you please make the edit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667391#comment-15667391
 ] 

ASF GitHub Bot commented on FLINK-4174:
---

Github user soniclavier commented on the issue:

https://github.com/apache/flink/pull/2736
  
I don't think I can edit a closed issue, could you please make the edit.


> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5072) MetricFetcher Ask Timeout

2016-11-15 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5072:
--

 Summary: MetricFetcher Ask Timeout
 Key: FLINK-5072
 URL: https://issues.apache.org/jira/browse/FLINK-5072
 Project: Flink
  Issue Type: Improvement
Reporter: Ufuk Celebi


Running a large scale test with 1.2-SNAPSHOT and heavy load on the TMs, I 
encountered a lot of ask timeouts for the metric fetcher:

{code}
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@10.240.0.52:34471/user/MetricQueryService_container_1479207428252_0014_01_26]]
 after [1 ms]
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at 
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
{code}

[~zentol] Does it make sense to investigate this further?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5056) BucketingSink deletes valid data when checkpoint notification is slow.

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667325#comment-15667325
 ] 

ASF GitHub Bot commented on FLINK-5056:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2797#discussion_r88028071
  
--- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -570,284 +583,278 @@ private void closeCurrentPartFile(BucketState 
bucketState) throws Exception {
 
/**
 * Gets the truncate() call using reflection.
-*
 * 
-* Note: This code comes from Flume
+* NOTE: This code comes from Flume.
 */
private Method reflectTruncate(FileSystem fs) {
-   Method m = null;
-   if(fs != null) {
-   Class fsClass = fs.getClass();
-   try {
-   m = fsClass.getMethod("truncate", Path.class, 
long.class);
-   } catch (NoSuchMethodException ex) {
-   LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
-   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-   return null;
-   }
+   if (this.refTruncate == null) {
+   Method m = null;
+   if (fs != null) {
+   Class fsClass = fs.getClass();
+   try {
+   m = fsClass.getMethod("truncate", 
Path.class, long.class);
+   } catch (NoSuchMethodException ex) {
+   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
+   " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
+   validLengthSuffix, 
validLengthPrefix);
+   return null;
+   }
 
+   // verify that truncate actually works
+   FSDataOutputStream outputStream;
+   Path testPath = new 
Path(UUID.randomUUID().toString());
+   try {
+   outputStream = fs.create(testPath);
+   outputStream.writeUTF("hello");
+   outputStream.close();
+   } catch (IOException e) {
+   LOG.error("Could not create file for 
checking if truncate works.", e);
+   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
+   }
 
-   // verify that truncate actually works
-   FSDataOutputStream outputStream;
-   Path testPath = new Path(UUID.randomUUID().toString());
-   try {
-   outputStream = fs.create(testPath);
-   outputStream.writeUTF("hello");
-   outputStream.close();
-   } catch (IOException e) {
-   LOG.error("Could not create file for checking 
if truncate works.", e);
-   throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+   try {
+   m.invoke(fs, testPath, 2);
+   } catch (IllegalAccessException | 
InvocationTargetException e) {
+   LOG.debug("Truncate is not supported.", 
e);
+   m = null;
+   }
+
+   try {
+   fs.delete(testPath, false);
+   } catch (IOException e) {
+   LOG.error("Could not delete truncate 
test file.", e);
+   throw new RuntimeException("Could not 
delete truncate test file.", e);
+   }
}
+   this.refTruncate = m;
+   }
+   return this.refTruncate;
+   }
 
+   private Path getPendingPathFor(Path path) {
+   return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+   }
 
-   try {
-   

[jira] [Commented] (FLINK-4174) Enhance Window Evictor

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667353#comment-15667353
 ] 

ASF GitHub Bot commented on FLINK-4174:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2736
  
Sure, you're very welcome! `1.2` should be the fix version since that's 
going to be the first version that has this new code.


> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >