Ori Popowski created FLINK-25007:
------------------------------------

             Summary: Session window with dynamic gap doesn't work
                 Key: FLINK-25007
                 URL: https://issues.apache.org/jira/browse/FLINK-25007
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.12.0
         Environment: Local environment
            Reporter: Ori Popowski


I am creating a simple application with events firing every 15 seconds. I 
created a {{

SessionWindowTimeGapExtractor}} which returns 90 minutes, but after the 4th 
event, it should return 1 millisecond. I expected that after the 4th event, a 
session window will trigger, but it's not what happens. In reality the session 
window never triggers, even though after the 4th event, the session gap is 
effectively 1 millisecond and the interval between events is 15 seconds.

 
{code:java}
object Main {

  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val now = Instant.now()

    senv
      .addSource(new Source(now))
      .assignAscendingTimestamps(_.time.toEpochMilli)
      .keyBy(_ => 1)
      .window(DynamicEventTimeSessionWindows.withDynamicGap(new 
SessionWindowTimeGapExtractor[Element] {
        override def extract(element: Element): Long = {
          if (element.sessionEnd) 1
          else 90.minutes.toMillis
        }
      }))
      .process(new ProcessWindowFunction[Element, Vector[Element], Int, 
TimeWindow] {
        override def process(k: Int, context: Context, elements: 
Iterable[Element], out: Collector[Vector[Element]]): Unit = {
          out.collect(elements.toVector)
        }
      })
      .print()

    senv.execute()
  }
}

case class Element(id: Int, time: Instant, sessionEnd: Boolean = false)

class Source(now: Instant) extends RichSourceFunction[Element] {
  @volatile private var isRunning = true
  private var totalInterval = 0L
  private var i = 0

  override def run(ctx: SourceFunction.SourceContext[Element]): Unit = {
    while (isRunning) {
      val element = Element(i, now.plusMillis(totalInterval))

      if (i >= 4) ctx.collect(element.copy(sessionEnd = true))
      else ctx.collect(element)

      i += 1
      totalInterval += 15.seconds.toMillis
      Thread.sleep(15.seconds.toMillis)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to