Hi!
I am investigating the use of Flink for a new project and started some simple
demos.
Currently I am stuck at the point where I need to deal with events arriving out
of order based on their event time. I’ve spent quite some time researching on
SO, the docs, the Ververica training (excellent resource btw), however, I
assume I still run into some conceptual misconceptions :-)
I put together the following demo code, and I would expect that the console
output would list the events chronologically based on their embedded event
time. However, events are always printed in the same order as they are pushed
into the data stream by the OutOfOrderEventSource.
Sample console output:
—
3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810}
5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815}
7> EventRecord{id=1, counter=5, eventId=1, timestamp=2021-11-02T20:10:05.819}
8> EventRecord{id=4, counter=6, eventId=0, timestamp=2021-11-02T20:10:04.819}
9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824}
10> EventRecord{id=0, counter=8, eventId=1, timestamp=2021-11-02T20:10:05.828}
11> EventRecord{id=3, counter=9, eventId=1, timestamp=2021-11-02T20:10:09.829}
—
My expectation would be to receive the events ordered:
—
5> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
6> EventRecord{id=4, counter=4, eventId=1, timestamp=2021-11-02T20:10:00.815}
3> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
4> EventRecord{id=2, counter=2, eventId=1, timestamp=2021-11-02T20:10:02.810}
9> EventRecord{id=0, counter=7, eventId=1, timestamp=2021-11-02T20:10:03.824}
…
—
Given a BoundedOutOfOrderness watermarking strategy with a 20 seconds duration,
my expectation would have been that for the first event that is pushed to the
demo source
EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
this would set the initial watermark to "2021-11-02T20:09:41.554”, hence events
that are older than this timestamp are not considered, but events younger than
this timestamps are considered and ordering of events happens accordingly. That
would bean that
EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
would still be considered on time.
I’m sure I am missing something conceptually.
Here is the code that I’m using:
---
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import static java.time.Instant.ofEpochMilli;
import static java.time.LocalDateTime.ofInstant;
public class SimpleOutOfOrderDemo {
public static void main(String... args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var watermarkStrategy =
WatermarkStrategy.<EventRecord>forGenerator(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
.withTimestampAssigner((e, rt) -> e.timestamp);
var source = new OutOfOrderEventSource();
env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy).print();
env.execute("Simple Out of Order Demo");
}
public static class OutOfOrderEventSource implements
SourceFunction<EventRecord> {
static final int MAX_ELEMENTS = 10;
static final long INTERVAL = 1000;
AtomicInteger counter = new AtomicInteger();
@Override
public void run(SourceFunction.SourceContext<EventRecord> ctx) throws
Exception {
var c = counter.incrementAndGet();
while (c < MAX_ELEMENTS) {
var id = new Random().nextInt(5);
var eventId = new Random().nextInt(2);
var delay = new Random().nextInt(5);
var dateTime = LocalDateTime.now().minus(Duration.of(delay,
ChronoUnit.SECONDS));
var record = new EventRecord(id, eventId,
dateTime.toInstant(ZoneOffset.UTC).toEpochMilli(), c);
ctx.collect(record);
c = counter.incrementAndGet();
Thread.sleep(INTERVAL);
}
}
@Override
public void cancel() {
counter.set(MAX_ELEMENTS);
}
}
public static class EventRecord {
public int id;
public int eventId;
public long timestamp;
public long counter;
public EventRecord(int id, int eventId, long timestamp, long counter) {
this.id = id;
this.eventId = eventId;
this.timestamp = timestamp;
this.counter = counter;
}
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("EventRecord{");
sb.append("id=").append(id);
sb.append(", counter=").append(counter);
sb.append(", eventId=").append(eventId);
sb.append(", timestamp=").append(ofInstant(ofEpochMilli(timestamp),
ZoneOffset.UTC));
sb.append('}');
return sb.toString();
}
}
}
—
POM:
—
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.app</groupId>
<artifactId>event-hubs-kafka-flink-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.14.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
—
Would be cool if someone could point me in the right direction. Really great
project!
Thanks
Oliver