[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows

2017-06-26 Thread Brice Bingman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16063245#comment-16063245
 ] 

Brice Bingman commented on FLINK-6990:
--

[~jark] Good to hear.  In the meantime I will look into using a ProcessFunction.

> Poor performance with Sliding Time Windows
> --
>
> Key: FLINK-6990
> URL: https://issues.apache.org/jira/browse/FLINK-6990
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming
>Affects Versions: 1.3.0
> Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a 
> simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> //Streaming 10,000 events per second
> see.addSource(new SourceFunction() {
> transient ScheduledExecutorService executor;
> @Override
> public synchronized void run(final SourceContext ctx) 
> throws Exception {
> executor = Executors.newSingleThreadScheduledExecutor();
> executor.scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
> for (int k = 0; k < 10; k++) {
> for (int i = 0; i < 1000; i++) {
> TestObject obj = new TestObject();
> obj.setKey(k);
> ctx.collect(obj);
> }
> }
> }
> }, 0, 1, TimeUnit.SECONDS);
> this.wait();
> }
> @Override
> public synchronized void cancel() {
> executor.shutdown();
> this.notify();
> }
> }).keyBy("key")
> .window(SlidingProcessingTimeWindows.of(Time.minutes(10), 
> Time.seconds(1))).apply(new WindowFunction TimeWindow>() {
> @Override
> public void apply(Tuple key, TimeWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> .print();
> see.execute();
> }
> public static class TestObject {
> private Integer key;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I 
> would expect a steady stream of output at 1 second intervals.  For 
> comparison, you can switch to a count window of similar size which peforms 
> just fine:
> {code:java}
>.countWindow(60, 1000).apply(new 
> WindowFunction() {
> @Override
> public void apply(Tuple key, GlobalWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> {code}
> I would expect the sliding time window to perform similarly to a count 
> window.  The sliding time window also uses significantly more cpu and memory 
> than the count window.  I would also expect resource consumption to be 
> similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is 
> locking with the checkpointLock which acts like a global lock.  There should 
> be a lock per key or preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows

2017-06-25 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062338#comment-16062338
 ] 

Jark Wu commented on FLINK-6990:


[~BriceBingman] [~fhueske] the poor performance of sliding time window is 
obvious in our production. We have planed to optimize it with window pane 
optimization. I have created a issue FLINK-7001 about it and will attach a more 
detail design doc on it. 

> Poor performance with Sliding Time Windows
> --
>
> Key: FLINK-6990
> URL: https://issues.apache.org/jira/browse/FLINK-6990
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming
>Affects Versions: 1.3.0
> Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a 
> simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> //Streaming 10,000 events per second
> see.addSource(new SourceFunction() {
> transient ScheduledExecutorService executor;
> @Override
> public synchronized void run(final SourceContext ctx) 
> throws Exception {
> executor = Executors.newSingleThreadScheduledExecutor();
> executor.scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
> for (int k = 0; k < 10; k++) {
> for (int i = 0; i < 1000; i++) {
> TestObject obj = new TestObject();
> obj.setKey(k);
> ctx.collect(obj);
> }
> }
> }
> }, 0, 1, TimeUnit.SECONDS);
> this.wait();
> }
> @Override
> public synchronized void cancel() {
> executor.shutdown();
> this.notify();
> }
> }).keyBy("key")
> .window(SlidingProcessingTimeWindows.of(Time.minutes(10), 
> Time.seconds(1))).apply(new WindowFunction TimeWindow>() {
> @Override
> public void apply(Tuple key, TimeWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> .print();
> see.execute();
> }
> public static class TestObject {
> private Integer key;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I 
> would expect a steady stream of output at 1 second intervals.  For 
> comparison, you can switch to a count window of similar size which peforms 
> just fine:
> {code:java}
>.countWindow(60, 1000).apply(new 
> WindowFunction() {
> @Override
> public void apply(Tuple key, GlobalWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> {code}
> I would expect the sliding time window to perform similarly to a count 
> window.  The sliding time window also uses significantly more cpu and memory 
> than the count window.  I would also expect resource consumption to be 
> similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is 
> locking with the checkpointLock which acts like a global lock.  There should 
> be a lock per key or preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows

2017-06-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060903#comment-16060903
 ] 

Fabian Hueske commented on FLINK-6990:
--

No, there is no option to disable the replication. 

You could implement the operator yourself based on a ProcessFunction. 
ProcessFunction gives you access to state to manage the buffered records 
yourself and time (record timestamps and timers).

> Poor performance with Sliding Time Windows
> --
>
> Key: FLINK-6990
> URL: https://issues.apache.org/jira/browse/FLINK-6990
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming
>Affects Versions: 1.3.0
> Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a 
> simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> //Streaming 10,000 events per second
> see.addSource(new SourceFunction() {
> transient ScheduledExecutorService executor;
> @Override
> public synchronized void run(final SourceContext ctx) 
> throws Exception {
> executor = Executors.newSingleThreadScheduledExecutor();
> executor.scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
> for (int k = 0; k < 10; k++) {
> for (int i = 0; i < 1000; i++) {
> TestObject obj = new TestObject();
> obj.setKey(k);
> ctx.collect(obj);
> }
> }
> }
> }, 0, 1, TimeUnit.SECONDS);
> this.wait();
> }
> @Override
> public synchronized void cancel() {
> executor.shutdown();
> this.notify();
> }
> }).keyBy("key")
> .window(SlidingProcessingTimeWindows.of(Time.minutes(10), 
> Time.seconds(1))).apply(new WindowFunction TimeWindow>() {
> @Override
> public void apply(Tuple key, TimeWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> .print();
> see.execute();
> }
> public static class TestObject {
> private Integer key;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I 
> would expect a steady stream of output at 1 second intervals.  For 
> comparison, you can switch to a count window of similar size which peforms 
> just fine:
> {code:java}
>.countWindow(60, 1000).apply(new 
> WindowFunction() {
> @Override
> public void apply(Tuple key, GlobalWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> {code}
> I would expect the sliding time window to perform similarly to a count 
> window.  The sliding time window also uses significantly more cpu and memory 
> than the count window.  I would also expect resource consumption to be 
> similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is 
> locking with the checkpointLock which acts like a global lock.  There should 
> be a lock per key or preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows

2017-06-23 Thread Brice Bingman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060881#comment-16060881
 ] 

Brice Bingman commented on FLINK-6990:
--

[~Cody] I chose the smallest scenario that shows the slowdown on my machine.  
If you are on a machine with better resources, you may need to increase the 
event input rate or increase the window size to see the slowdown.  I noticed if 
I reduced the window size to 1 minute, the problem goes away.

[~fhueske] 
1) So on each slide of a time window, the data in that window is replicated and 
put into the new window?  That does seem like a lot of overhead.  Why is 
everything replicated?  Is there any way to disable that?
2) While this example could be implemented in a ReduceFunction, I also need to 
support more complicated calculations such as a linear regression or an 
exponential moving average where you would need access to all the data in the 
window.  Which is why I've been stress testing with the WindowFunction.

Perhaps there should be a more performant time window that doesn't replicate 
data on each slide.  Similar to how the count window is implemented.

> Poor performance with Sliding Time Windows
> --
>
> Key: FLINK-6990
> URL: https://issues.apache.org/jira/browse/FLINK-6990
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming
>Affects Versions: 1.3.0
> Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a 
> simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> //Streaming 10,000 events per second
> see.addSource(new SourceFunction() {
> transient ScheduledExecutorService executor;
> @Override
> public synchronized void run(final SourceContext ctx) 
> throws Exception {
> executor = Executors.newSingleThreadScheduledExecutor();
> executor.scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
> for (int k = 0; k < 10; k++) {
> for (int i = 0; i < 1000; i++) {
> TestObject obj = new TestObject();
> obj.setKey(k);
> ctx.collect(obj);
> }
> }
> }
> }, 0, 1, TimeUnit.SECONDS);
> this.wait();
> }
> @Override
> public synchronized void cancel() {
> executor.shutdown();
> this.notify();
> }
> }).keyBy("key")
> .window(SlidingProcessingTimeWindows.of(Time.minutes(10), 
> Time.seconds(1))).apply(new WindowFunction TimeWindow>() {
> @Override
> public void apply(Tuple key, TimeWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> .print();
> see.execute();
> }
> public static class TestObject {
> private Integer key;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I 
> would expect a steady stream of output at 1 second intervals.  For 
> comparison, you can switch to a count window of similar size which peforms 
> just fine:
> {code:java}
>.countWindow(60, 1000).apply(new 
> WindowFunction() {
> @Override
> public void apply(Tuple key, GlobalWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> {code}
> I would expect the sliding time window to perform similarly to a count 
> window.  The sliding time window also uses significantly more cpu and memory 
> than the count window.  I would also expect resource 

[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows

2017-06-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060530#comment-16060530
 ] 

Fabian Hueske commented on FLINK-6990:
--

The poor performance has two reasons:

1) The implementation of time-based sliding windows in Flink. Flink treats each 
window individually and replicates records to each window. For a window of 10 
minute size that slides by 1 second the data is replicated 600 fold (10 minutes 
/ 1 second).
2) Your choice of using a WindowFunction instead of a ReduceFunction or 
AggregateFunction. A WindowFundtion requires to collect all elements and 
applies the function at the end of the window. If you implement this with a 
ReduceFunction (or AggregateFunction) the aggregation can be incrementally 
applied whenever a new record is assigned to a window. Consequently, the window 
only holds a single aggregated record instead of a list of all records.

Count-based sliding windows are differently implemented and avoid the 
replication of records. However, they cannot leverage the eager aggregation of 
a ReduceFunction and apply the function at the end of a window similar to a 
WindowFunction.

> Poor performance with Sliding Time Windows
> --
>
> Key: FLINK-6990
> URL: https://issues.apache.org/jira/browse/FLINK-6990
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.3.0
> Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a 
> simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> //Streaming 10,000 events per second
> see.addSource(new SourceFunction() {
> transient ScheduledExecutorService executor;
> @Override
> public synchronized void run(final SourceContext ctx) 
> throws Exception {
> executor = Executors.newSingleThreadScheduledExecutor();
> executor.scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
> for (int k = 0; k < 10; k++) {
> for (int i = 0; i < 1000; i++) {
> TestObject obj = new TestObject();
> obj.setKey(k);
> ctx.collect(obj);
> }
> }
> }
> }, 0, 1, TimeUnit.SECONDS);
> this.wait();
> }
> @Override
> public synchronized void cancel() {
> executor.shutdown();
> this.notify();
> }
> }).keyBy("key")
> .window(SlidingProcessingTimeWindows.of(Time.minutes(10), 
> Time.seconds(1))).apply(new WindowFunction TimeWindow>() {
> @Override
> public void apply(Tuple key, TimeWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> .print();
> see.execute();
> }
> public static class TestObject {
> private Integer key;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I 
> would expect a steady stream of output at 1 second intervals.  For 
> comparison, you can switch to a count window of similar size which peforms 
> just fine:
> {code:java}
>.countWindow(60, 1000).apply(new 
> WindowFunction() {
> @Override
> public void apply(Tuple key, GlobalWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> {code}
> I would expect the sliding time window to perform similarly to a count 
> window.  The sliding time window also uses significantly more cpu and memory 
> than the count window.  I would also expect resource consumption to be 
> similar.
> A 

[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows

2017-06-23 Thread Cody (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060477#comment-16060477
 ] 

Cody commented on FLINK-6990:
-

I ran the code on my Mac and didn't see the pause.

> Poor performance with Sliding Time Windows
> --
>
> Key: FLINK-6990
> URL: https://issues.apache.org/jira/browse/FLINK-6990
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.3.0
> Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a 
> simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> //Streaming 10,000 events per second
> see.addSource(new SourceFunction() {
> transient ScheduledExecutorService executor;
> @Override
> public synchronized void run(final SourceContext ctx) 
> throws Exception {
> executor = Executors.newSingleThreadScheduledExecutor();
> executor.scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
> for (int k = 0; k < 10; k++) {
> for (int i = 0; i < 1000; i++) {
> TestObject obj = new TestObject();
> obj.setKey(k);
> ctx.collect(obj);
> }
> }
> }
> }, 0, 1, TimeUnit.SECONDS);
> this.wait();
> }
> @Override
> public synchronized void cancel() {
> executor.shutdown();
> this.notify();
> }
> }).keyBy("key")
> .window(SlidingProcessingTimeWindows.of(Time.minutes(10), 
> Time.seconds(1))).apply(new WindowFunction TimeWindow>() {
> @Override
> public void apply(Tuple key, TimeWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> .print();
> see.execute();
> }
> public static class TestObject {
> private Integer key;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I 
> would expect a steady stream of output at 1 second intervals.  For 
> comparison, you can switch to a count window of similar size which peforms 
> just fine:
> {code:java}
>.countWindow(60, 1000).apply(new 
> WindowFunction() {
> @Override
> public void apply(Tuple key, GlobalWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> {code}
> I would expect the sliding time window to perform similarly to a count 
> window.  The sliding time window also uses significantly more cpu and memory 
> than the count window.  I would also expect resource consumption to be 
> similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is 
> locking with the checkpointLock which acts like a global lock.  There should 
> be a lock per key or preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)