[jira] [Created] (FLINK-7786) JarActionHandler.tokenizeArguments removes all quotes from program arguments
Brice Bingman created FLINK-7786: Summary: JarActionHandler.tokenizeArguments removes all quotes from program arguments Key: FLINK-7786 URL: https://issues.apache.org/jira/browse/FLINK-7786 Project: Flink Issue Type: Bug Affects Versions: 1.3.0 Reporter: Brice Bingman Priority: Minor I would like to send json as an argument to my program, but when submitting it via the REST API, all the quotes in the json are gone, resulting in invalid json. The JarActionHandler.tokenizeArguments should only remove the leading and trailing quotes of an argument, not all of the quotes. Current workaround: Replace the quotes in json with an escape character and replace them back in the run method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows
[ https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16063245#comment-16063245 ] Brice Bingman commented on FLINK-6990: -- [~jark] Good to hear. In the meantime I will look into using a ProcessFunction. > Poor performance with Sliding Time Windows > -- > > Key: FLINK-6990 > URL: https://issues.apache.org/jira/browse/FLINK-6990 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 1.3.0 > Environment: OSX 10.11.4 > 2.8 GHz Intel Core i7 > 16 GB 1600 MHz DDR3 >Reporter: Brice Bingman > > I'm experiencing poor performance when using sliding time windows. Here is a > simple example that performs poorly for me: > {code:java} > public class FlinkPerfTest { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment see = > StreamExecutionEnvironment.getExecutionEnvironment(); > //Streaming 10,000 events per second > see.addSource(new SourceFunction() { > transient ScheduledExecutorService executor; > @Override > public synchronized void run(final SourceContext ctx) > throws Exception { > executor = Executors.newSingleThreadScheduledExecutor(); > executor.scheduleAtFixedRate(new Runnable() { > @Override > public void run() { > for (int k = 0; k < 10; k++) { > for (int i = 0; i < 1000; i++) { > TestObject obj = new TestObject(); > obj.setKey(k); > ctx.collect(obj); > } > } > } > }, 0, 1, TimeUnit.SECONDS); > this.wait(); > } > @Override > public synchronized void cancel() { > executor.shutdown(); > this.notify(); > } > }).keyBy("key") > .window(SlidingProcessingTimeWindows.of(Time.minutes(10), > Time.seconds(1))).apply(new WindowFunctionTimeWindow>() { > @Override > public void apply(Tuple key, TimeWindow window, > Iterable input, Collector out) throws Exception { > int count = 0; > for (Object obj : input) { > count++; > } > out.collect(key.getField(0) + ": " + count); > } > }) > .print(); > see.execute(); > } > public static class TestObject { > private Integer key; > public Integer getKey() { > return key; > } > public void setKey(Integer key) { > this.key = key; > } > } > } > {code} > When running this, flink periodically pauses for long periods of time. I > would expect a steady stream of output at 1 second intervals. For > comparison, you can switch to a count window of similar size which peforms > just fine: > {code:java} >.countWindow(60, 1000).apply(new > WindowFunction () { > @Override > public void apply(Tuple key, GlobalWindow window, > Iterable input, Collector out) throws Exception { > int count = 0; > for (Object obj : input) { > count++; > } > out.collect(key.getField(0) + ": " + count); > } > }) > {code} > I would expect the sliding time window to perform similarly to a count > window. The sliding time window also uses significantly more cpu and memory > than the count window. I would also expect resource consumption to be > similar. > A possible cause could be that the SystemProcessingTimeService.TriggerTask is > locking with the checkpointLock which acts like a global lock. There should > be a lock per key or preferably a lock-less solution. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows
[ https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060881#comment-16060881 ] Brice Bingman commented on FLINK-6990: -- [~Cody] I chose the smallest scenario that shows the slowdown on my machine. If you are on a machine with better resources, you may need to increase the event input rate or increase the window size to see the slowdown. I noticed if I reduced the window size to 1 minute, the problem goes away. [~fhueske] 1) So on each slide of a time window, the data in that window is replicated and put into the new window? That does seem like a lot of overhead. Why is everything replicated? Is there any way to disable that? 2) While this example could be implemented in a ReduceFunction, I also need to support more complicated calculations such as a linear regression or an exponential moving average where you would need access to all the data in the window. Which is why I've been stress testing with the WindowFunction. Perhaps there should be a more performant time window that doesn't replicate data on each slide. Similar to how the count window is implemented. > Poor performance with Sliding Time Windows > -- > > Key: FLINK-6990 > URL: https://issues.apache.org/jira/browse/FLINK-6990 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 1.3.0 > Environment: OSX 10.11.4 > 2.8 GHz Intel Core i7 > 16 GB 1600 MHz DDR3 >Reporter: Brice Bingman > > I'm experiencing poor performance when using sliding time windows. Here is a > simple example that performs poorly for me: > {code:java} > public class FlinkPerfTest { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment see = > StreamExecutionEnvironment.getExecutionEnvironment(); > //Streaming 10,000 events per second > see.addSource(new SourceFunction() { > transient ScheduledExecutorService executor; > @Override > public synchronized void run(final SourceContext ctx) > throws Exception { > executor = Executors.newSingleThreadScheduledExecutor(); > executor.scheduleAtFixedRate(new Runnable() { > @Override > public void run() { > for (int k = 0; k < 10; k++) { > for (int i = 0; i < 1000; i++) { > TestObject obj = new TestObject(); > obj.setKey(k); > ctx.collect(obj); > } > } > } > }, 0, 1, TimeUnit.SECONDS); > this.wait(); > } > @Override > public synchronized void cancel() { > executor.shutdown(); > this.notify(); > } > }).keyBy("key") > .window(SlidingProcessingTimeWindows.of(Time.minutes(10), > Time.seconds(1))).apply(new WindowFunctionTimeWindow>() { > @Override > public void apply(Tuple key, TimeWindow window, > Iterable input, Collector out) throws Exception { > int count = 0; > for (Object obj : input) { > count++; > } > out.collect(key.getField(0) + ": " + count); > } > }) > .print(); > see.execute(); > } > public static class TestObject { > private Integer key; > public Integer getKey() { > return key; > } > public void setKey(Integer key) { > this.key = key; > } > } > } > {code} > When running this, flink periodically pauses for long periods of time. I > would expect a steady stream of output at 1 second intervals. For > comparison, you can switch to a count window of similar size which peforms > just fine: > {code:java} >.countWindow(60, 1000).apply(new > WindowFunction () { > @Override > public void apply(Tuple key, GlobalWindow window, > Iterable input, Collector out) throws Exception { > int count = 0; > for (Object obj : input) { > count++; > } > out.collect(key.getField(0) + ": " + count); > } > }) > {code} > I would expect the sliding time window to perform similarly to a count > window. The sliding time window also uses significantly more cpu and memory > than the count window. I would also expect resource
[jira] [Created] (FLINK-6990) Poor performance with Sliding Time Windows
Brice Bingman created FLINK-6990: Summary: Poor performance with Sliding Time Windows Key: FLINK-6990 URL: https://issues.apache.org/jira/browse/FLINK-6990 Project: Flink Issue Type: Improvement Affects Versions: 1.3.0 Environment: OSX 10.11.4 2.8 GHz Intel Core i7 16 GB 1600 MHz DDR3 Reporter: Brice Bingman I'm experiencing poor performance when using sliding time windows. Here is a simple example that performs poorly for me: {code:java} public class FlinkPerfTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); //Streaming 10,000 events per second see.addSource(new SourceFunction() { transient ScheduledExecutorService executor; @Override public synchronized void run(final SourceContext ctx) throws Exception { executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (int k = 0; k < 10; k++) { for (int i = 0; i < 1000; i++) { TestObject obj = new TestObject(); obj.setKey(k); ctx.collect(obj); } } } }, 0, 1, TimeUnit.SECONDS); this.wait(); } @Override public synchronized void cancel() { executor.shutdown(); this.notify(); } }).keyBy("key") .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new WindowFunction() { @Override public void apply(Tuple key, TimeWindow window, Iterable input, Collector out) throws Exception { int count = 0; for (Object obj : input) { count++; } out.collect(key.getField(0) + ": " + count); } }) .print(); see.execute(); } public static class TestObject { private Integer key; public Integer getKey() { return key; } public void setKey(Integer key) { this.key = key; } } } {code} When running this, flink periodically pauses for long periods of time. I would expect a steady stream of output at 1 second intervals. For comparison, you can switch to a count window of similar size which peforms just fine: {code:java} .countWindow(60, 1000).apply(new WindowFunction () { @Override public void apply(Tuple key, GlobalWindow window, Iterable input, Collector out) throws Exception { int count = 0; for (Object obj : input) { count++; } out.collect(key.getField(0) + ": " + count); } }) {code} I would expect the sliding time window to perform similarly to a count window. The sliding time window also uses significantly more cpu and memory than the count window. I would also expect resource consumption to be similar. A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking with the checkpointLock which acts like a global lock. There should be a lock per key or preferably a lock-less solution. -- This message was sent by Atlassian JIRA (v6.4.14#64029)