[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows
[ https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060477#comment-16060477 ] Cody commented on FLINK-6990: - I ran the code on my Mac and didn't see the pause. > Poor performance with Sliding Time Windows > -- > > Key: FLINK-6990 > URL: https://issues.apache.org/jira/browse/FLINK-6990 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.3.0 > Environment: OSX 10.11.4 > 2.8 GHz Intel Core i7 > 16 GB 1600 MHz DDR3 >Reporter: Brice Bingman > > I'm experiencing poor performance when using sliding time windows. Here is a > simple example that performs poorly for me: > {code:java} > public class FlinkPerfTest { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment see = > StreamExecutionEnvironment.getExecutionEnvironment(); > //Streaming 10,000 events per second > see.addSource(new SourceFunction() { > transient ScheduledExecutorService executor; > @Override > public synchronized void run(final SourceContext ctx) > throws Exception { > executor = Executors.newSingleThreadScheduledExecutor(); > executor.scheduleAtFixedRate(new Runnable() { > @Override > public void run() { > for (int k = 0; k < 10; k++) { > for (int i = 0; i < 1000; i++) { > TestObject obj = new TestObject(); > obj.setKey(k); > ctx.collect(obj); > } > } > } > }, 0, 1, TimeUnit.SECONDS); > this.wait(); > } > @Override > public synchronized void cancel() { > executor.shutdown(); > this.notify(); > } > }).keyBy("key") > .window(SlidingProcessingTimeWindows.of(Time.minutes(10), > Time.seconds(1))).apply(new WindowFunctionTimeWindow>() { > @Override > public void apply(Tuple key, TimeWindow window, > Iterable input, Collector out) throws Exception { > int count = 0; > for (Object obj : input) { > count++; > } > out.collect(key.getField(0) + ": " + count); > } > }) > .print(); > see.execute(); > } > public static class TestObject { > private Integer key; > public Integer getKey() { > return key; > } > public void setKey(Integer key) { > this.key = key; > } > } > } > {code} > When running this, flink periodically pauses for long periods of time. I > would expect a steady stream of output at 1 second intervals. For > comparison, you can switch to a count window of similar size which peforms > just fine: > {code:java} >.countWindow(60, 1000).apply(new > WindowFunction () { > @Override > public void apply(Tuple key, GlobalWindow window, > Iterable input, Collector out) throws Exception { > int count = 0; > for (Object obj : input) { > count++; > } > out.collect(key.getField(0) + ": " + count); > } > }) > {code} > I would expect the sliding time window to perform similarly to a count > window. The sliding time window also uses significantly more cpu and memory > than the count window. I would also expect resource consumption to be > similar. > A possible cause could be that the SystemProcessingTimeService.TriggerTask is > locking with the checkpointLock which acts like a global lock. There should > be a lock per key or preferably a lock-less solution. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-4215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376597#comment-15376597 ] Cody commented on FLINK-4215: - Sorry this should be desired behavior in processing time case. close issue. > timestamp of StreamRecord is lost in WindowOperator > --- > > Key: FLINK-4215 > URL: https://issues.apache.org/jira/browse/FLINK-4215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: Cody > > In a WindowedStream, if the subsequent operator is a WindowOperator(by > applying a fold function), the timestamp of StreamRecord will be lost. Here's > my test code: > - > def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment): > DataStream[(Int, Long, String, String)] = { > val data = new mutable.MutableList[(Int, Long, String, String)] > data.+=((1, 1L, "Hi", "2016-07-06 14:00:00")) > data.+=((2, 2L, "Hello", "2016-07-06 14:01:00")) > data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00")) > env.fromCollection(data) > } > @Test > def testTimestampInWindowOperator(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val stream = > StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks( > new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] { > override def getCurrentWatermark: Watermark = null > override def extractTimestamp(element: (Int, Long, String, String), > previousElementTimestamp: Long): Long = { > DateFormat.getDateTimeInstance.parse(element._4).getTime > } > }).keyBy(3).timeWindow(Time.milliseconds(1000)) > .fold((0, 0L, "", ""), > new FoldFunction[(Int, Long, String, String), (Int, Long, String, > String)] { > override def fold(v1: (Int, Long, String, String), v2: (Int, Long, > String, String)) > : (Int, Long, String, String) = { > (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4) > } > }).addSink(new PrintSinkFunction[(Int, Long, String, String)]()) > env.execute() > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-4215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody closed FLINK-4215. --- Resolution: Invalid > timestamp of StreamRecord is lost in WindowOperator > --- > > Key: FLINK-4215 > URL: https://issues.apache.org/jira/browse/FLINK-4215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: Cody > > In a WindowedStream, if the subsequent operator is a WindowOperator(by > applying a fold function), the timestamp of StreamRecord will be lost. Here's > my test code: > - > def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment): > DataStream[(Int, Long, String, String)] = { > val data = new mutable.MutableList[(Int, Long, String, String)] > data.+=((1, 1L, "Hi", "2016-07-06 14:00:00")) > data.+=((2, 2L, "Hello", "2016-07-06 14:01:00")) > data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00")) > env.fromCollection(data) > } > @Test > def testTimestampInWindowOperator(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val stream = > StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks( > new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] { > override def getCurrentWatermark: Watermark = null > override def extractTimestamp(element: (Int, Long, String, String), > previousElementTimestamp: Long): Long = { > DateFormat.getDateTimeInstance.parse(element._4).getTime > } > }).keyBy(3).timeWindow(Time.milliseconds(1000)) > .fold((0, 0L, "", ""), > new FoldFunction[(Int, Long, String, String), (Int, Long, String, > String)] { > override def fold(v1: (Int, Long, String, String), v2: (Int, Long, > String, String)) > : (Int, Long, String, String) = { > (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4) > } > }).addSink(new PrintSinkFunction[(Int, Long, String, String)]()) > env.execute() > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-4215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376579#comment-15376579 ] Cody commented on FLINK-4215: - After further testing, I found that timestamp will be lost in all window operators, am I setting something wrong? > timestamp of StreamRecord is lost in WindowOperator > --- > > Key: FLINK-4215 > URL: https://issues.apache.org/jira/browse/FLINK-4215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: Cody > > In a WindowedStream, if the subsequent operator is a WindowOperator(by > applying a fold function), the timestamp of StreamRecord will be lost. Here's > my test code: > - > def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment): > DataStream[(Int, Long, String, String)] = { > val data = new mutable.MutableList[(Int, Long, String, String)] > data.+=((1, 1L, "Hi", "2016-07-06 14:00:00")) > data.+=((2, 2L, "Hello", "2016-07-06 14:01:00")) > data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00")) > env.fromCollection(data) > } > @Test > def testTimestampInWindowOperator(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val stream = > StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks( > new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] { > override def getCurrentWatermark: Watermark = null > override def extractTimestamp(element: (Int, Long, String, String), > previousElementTimestamp: Long): Long = { > DateFormat.getDateTimeInstance.parse(element._4).getTime > } > }).keyBy(3).timeWindow(Time.milliseconds(1000)) > .fold((0, 0L, "", ""), > new FoldFunction[(Int, Long, String, String), (Int, Long, String, > String)] { > override def fold(v1: (Int, Long, String, String), v2: (Int, Long, > String, String)) > : (Int, Long, String, String) = { > (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4) > } > }).addSink(new PrintSinkFunction[(Int, Long, String, String)]()) > env.execute() > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator
Cody created FLINK-4215: --- Summary: timestamp of StreamRecord is lost in WindowOperator Key: FLINK-4215 URL: https://issues.apache.org/jira/browse/FLINK-4215 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.0.3 Reporter: Cody In a WindowedStream, if the subsequent operator is a WindowOperator(by applying a fold function), the timestamp of StreamRecord will be lost. Here's my test code: - def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment): DataStream[(Int, Long, String, String)] = { val data = new mutable.MutableList[(Int, Long, String, String)] data.+=((1, 1L, "Hi", "2016-07-06 14:00:00")) data.+=((2, 2L, "Hello", "2016-07-06 14:01:00")) data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00")) env.fromCollection(data) } @Test def testTimestampInWindowOperator(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] { override def getCurrentWatermark: Watermark = null override def extractTimestamp(element: (Int, Long, String, String), previousElementTimestamp: Long): Long = { DateFormat.getDateTimeInstance.parse(element._4).getTime } }).keyBy(3).timeWindow(Time.milliseconds(1000)) .fold((0, 0L, "", ""), new FoldFunction[(Int, Long, String, String), (Int, Long, String, String)] { override def fold(v1: (Int, Long, String, String), v2: (Int, Long, String, String)) : (Int, Long, String, String) = { (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4) } }).addSink(new PrintSinkFunction[(Int, Long, String, String)]()) env.execute() } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4130) CallGenerator could generate illegal code when taking no operands
Cody created FLINK-4130: --- Summary: CallGenerator could generate illegal code when taking no operands Key: FLINK-4130 URL: https://issues.apache.org/jira/browse/FLINK-4130 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.1.0 Reporter: Cody Priority: Minor In CallGenerator, when a call takes no operands, and null check is enabled, it will generate code like: boolean isNull$17 = ; which will fail to compile at runtime. -- This message was sent by Atlassian JIRA (v6.3.4#6332)