[jira] [Commented] (FLINK-6990) Poor performance with Sliding Time Windows

2017-06-23 Thread Cody (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060477#comment-16060477
 ] 

Cody commented on FLINK-6990:
-

I ran the code on my Mac and didn't see the pause.

> Poor performance with Sliding Time Windows
> --
>
> Key: FLINK-6990
> URL: https://issues.apache.org/jira/browse/FLINK-6990
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.3.0
> Environment: OSX 10.11.4
> 2.8 GHz Intel Core i7
> 16 GB 1600 MHz DDR3
>Reporter: Brice Bingman
>
> I'm experiencing poor performance when using sliding time windows.  Here is a 
> simple example that performs poorly for me:
> {code:java}
> public class FlinkPerfTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment see = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> //Streaming 10,000 events per second
> see.addSource(new SourceFunction() {
> transient ScheduledExecutorService executor;
> @Override
> public synchronized void run(final SourceContext ctx) 
> throws Exception {
> executor = Executors.newSingleThreadScheduledExecutor();
> executor.scheduleAtFixedRate(new Runnable() {
> @Override
> public void run() {
> for (int k = 0; k < 10; k++) {
> for (int i = 0; i < 1000; i++) {
> TestObject obj = new TestObject();
> obj.setKey(k);
> ctx.collect(obj);
> }
> }
> }
> }, 0, 1, TimeUnit.SECONDS);
> this.wait();
> }
> @Override
> public synchronized void cancel() {
> executor.shutdown();
> this.notify();
> }
> }).keyBy("key")
> .window(SlidingProcessingTimeWindows.of(Time.minutes(10), 
> Time.seconds(1))).apply(new WindowFunction TimeWindow>() {
> @Override
> public void apply(Tuple key, TimeWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> .print();
> see.execute();
> }
> public static class TestObject {
> private Integer key;
> public Integer getKey() {
> return key;
> }
> public void setKey(Integer key) {
> this.key = key;
> }
> }
> }
> {code}
> When running this, flink periodically pauses for long periods of time.  I 
> would expect a steady stream of output at 1 second intervals.  For 
> comparison, you can switch to a count window of similar size which peforms 
> just fine:
> {code:java}
>.countWindow(60, 1000).apply(new 
> WindowFunction() {
> @Override
> public void apply(Tuple key, GlobalWindow window, 
> Iterable input, Collector out) throws Exception {
> int count = 0;
> for (Object obj : input) {
> count++;
> }
> out.collect(key.getField(0) + ": " + count);
> }
> })
> {code}
> I would expect the sliding time window to perform similarly to a count 
> window.  The sliding time window also uses significantly more cpu and memory 
> than the count window.  I would also expect resource consumption to be 
> similar.
> A possible cause could be that the SystemProcessingTimeService.TriggerTask is 
> locking with the checkpointLock which acts like a global lock.  There should 
> be a lock per key or preferably a lock-less solution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator

2016-07-14 Thread Cody (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376597#comment-15376597
 ] 

Cody commented on FLINK-4215:
-

Sorry this should be desired behavior in processing time case. close issue.

> timestamp of StreamRecord is lost in WindowOperator
> ---
>
> Key: FLINK-4215
> URL: https://issues.apache.org/jira/browse/FLINK-4215
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: Cody
>
> In a WindowedStream, if the subsequent operator is a WindowOperator(by 
> applying a fold function), the timestamp of StreamRecord will be lost. Here's 
> my test code:
> -
>   def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment):
>   DataStream[(Int, Long, String, String)] = {
> val data = new mutable.MutableList[(Int, Long, String, String)]
> data.+=((1, 1L, "Hi", "2016-07-06 14:00:00"))
> data.+=((2, 2L, "Hello", "2016-07-06 14:01:00"))
> data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00"))
> env.fromCollection(data)
>   }
>   @Test
>   def testTimestampInWindowOperator(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = 
> StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks(
> new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] {
>   override def getCurrentWatermark: Watermark = null
>   override def extractTimestamp(element: (Int, Long, String, String),
> previousElementTimestamp: Long): Long = {
> DateFormat.getDateTimeInstance.parse(element._4).getTime
>   }
> }).keyBy(3).timeWindow(Time.milliseconds(1000))
>   .fold((0, 0L, "", ""),
> new FoldFunction[(Int, Long, String, String), (Int, Long, String, 
> String)] {
> override def fold(v1: (Int, Long, String, String), v2: (Int, Long, 
> String, String))
> : (Int, Long, String, String) = {
>   (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4)
> }
> }).addSink(new PrintSinkFunction[(Int, Long, String, String)]())
> env.execute()
>   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator

2016-07-14 Thread Cody (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody closed FLINK-4215.
---
Resolution: Invalid

> timestamp of StreamRecord is lost in WindowOperator
> ---
>
> Key: FLINK-4215
> URL: https://issues.apache.org/jira/browse/FLINK-4215
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: Cody
>
> In a WindowedStream, if the subsequent operator is a WindowOperator(by 
> applying a fold function), the timestamp of StreamRecord will be lost. Here's 
> my test code:
> -
>   def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment):
>   DataStream[(Int, Long, String, String)] = {
> val data = new mutable.MutableList[(Int, Long, String, String)]
> data.+=((1, 1L, "Hi", "2016-07-06 14:00:00"))
> data.+=((2, 2L, "Hello", "2016-07-06 14:01:00"))
> data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00"))
> env.fromCollection(data)
>   }
>   @Test
>   def testTimestampInWindowOperator(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = 
> StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks(
> new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] {
>   override def getCurrentWatermark: Watermark = null
>   override def extractTimestamp(element: (Int, Long, String, String),
> previousElementTimestamp: Long): Long = {
> DateFormat.getDateTimeInstance.parse(element._4).getTime
>   }
> }).keyBy(3).timeWindow(Time.milliseconds(1000))
>   .fold((0, 0L, "", ""),
> new FoldFunction[(Int, Long, String, String), (Int, Long, String, 
> String)] {
> override def fold(v1: (Int, Long, String, String), v2: (Int, Long, 
> String, String))
> : (Int, Long, String, String) = {
>   (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4)
> }
> }).addSink(new PrintSinkFunction[(Int, Long, String, String)]())
> env.execute()
>   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator

2016-07-14 Thread Cody (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376579#comment-15376579
 ] 

Cody commented on FLINK-4215:
-

After further testing, I found that timestamp will be lost in all window 
operators, am I setting something wrong?

> timestamp of StreamRecord is lost in WindowOperator
> ---
>
> Key: FLINK-4215
> URL: https://issues.apache.org/jira/browse/FLINK-4215
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: Cody
>
> In a WindowedStream, if the subsequent operator is a WindowOperator(by 
> applying a fold function), the timestamp of StreamRecord will be lost. Here's 
> my test code:
> -
>   def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment):
>   DataStream[(Int, Long, String, String)] = {
> val data = new mutable.MutableList[(Int, Long, String, String)]
> data.+=((1, 1L, "Hi", "2016-07-06 14:00:00"))
> data.+=((2, 2L, "Hello", "2016-07-06 14:01:00"))
> data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00"))
> env.fromCollection(data)
>   }
>   @Test
>   def testTimestampInWindowOperator(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = 
> StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks(
> new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] {
>   override def getCurrentWatermark: Watermark = null
>   override def extractTimestamp(element: (Int, Long, String, String),
> previousElementTimestamp: Long): Long = {
> DateFormat.getDateTimeInstance.parse(element._4).getTime
>   }
> }).keyBy(3).timeWindow(Time.milliseconds(1000))
>   .fold((0, 0L, "", ""),
> new FoldFunction[(Int, Long, String, String), (Int, Long, String, 
> String)] {
> override def fold(v1: (Int, Long, String, String), v2: (Int, Long, 
> String, String))
> : (Int, Long, String, String) = {
>   (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4)
> }
> }).addSink(new PrintSinkFunction[(Int, Long, String, String)]())
> env.execute()
>   }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4215) timestamp of StreamRecord is lost in WindowOperator

2016-07-14 Thread Cody (JIRA)
Cody created FLINK-4215:
---

 Summary: timestamp of StreamRecord is lost in WindowOperator
 Key: FLINK-4215
 URL: https://issues.apache.org/jira/browse/FLINK-4215
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.0.3
Reporter: Cody


In a WindowedStream, if the subsequent operator is a WindowOperator(by applying 
a fold function), the timestamp of StreamRecord will be lost. Here's my test 
code:
-
  def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment):
  DataStream[(Int, Long, String, String)] = {
val data = new mutable.MutableList[(Int, Long, String, String)]
data.+=((1, 1L, "Hi", "2016-07-06 14:00:00"))
data.+=((2, 2L, "Hello", "2016-07-06 14:01:00"))
data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00"))
env.fromCollection(data)
  }

  @Test
  def testTimestampInWindowOperator(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = 
StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] {
  override def getCurrentWatermark: Watermark = null

  override def extractTimestamp(element: (Int, Long, String, String),
previousElementTimestamp: Long): Long = {
DateFormat.getDateTimeInstance.parse(element._4).getTime
  }
}).keyBy(3).timeWindow(Time.milliseconds(1000))
  .fold((0, 0L, "", ""),
new FoldFunction[(Int, Long, String, String), (Int, Long, String, 
String)] {
override def fold(v1: (Int, Long, String, String), v2: (Int, Long, 
String, String))
: (Int, Long, String, String) = {
  (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4)
}
}).addSink(new PrintSinkFunction[(Int, Long, String, String)]())

env.execute()
  }




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4130) CallGenerator could generate illegal code when taking no operands

2016-06-29 Thread Cody (JIRA)
Cody created FLINK-4130:
---

 Summary: CallGenerator could generate illegal code when taking no 
operands
 Key: FLINK-4130
 URL: https://issues.apache.org/jira/browse/FLINK-4130
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.1.0
Reporter: Cody
Priority: Minor


In CallGenerator, when a call takes no operands, and null check is enabled, it 
will generate code like:

boolean isNull$17 = ;

which will fail to compile at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)