Re: Flink 1.11 FlinkKafkaConsumer not propagating watermarks
For reference: self answered on [1]. Turns out that Flink 1.12 defaults the TimeCharacteristic to EventTime and > deprecates the whole TimeCharacteristic flow. So to downgrade to Flink > 1.11, you must add the following statement to configure the > StreamExecutionEnvironment. > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > [1] https://stackoverflow.com/a/67111541/10299342 On Thu, Apr 15, 2021 at 12:08 AM Edward Bingham wrote: > Hi everyone, > > I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some > Flink processors using Flink 1.12, and tried to get them working on Amazon > EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I > went to downgrade, I found, inexplicably, that watermarks were no longer > propagating. > > There is only one partition on the topic, and parallelism is set to 1. Is > there something I'm missing here? I feel like I'm going a bit crazy. > > I've cross-posted this on stackoverflow, but I figure the mailing list is > probably a better avenue for this question. > > Thanks, > Ned > > > Here's the output for Flink 1.12 (correctly propagating the watermark): > > input 1 > (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 > (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), > isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) > { > "nodes" : [ { > "id" : 1, > "type" : "Source: Custom Source", > "pact" : "Data Source", > "contents" : "Source: Custom Source", > "parallelism" : 1 > }, { > "id" : 2, > "type" : "Process", > "pact" : "Operator", > "contents" : "Process", > "parallelism" : 1, > "predecessors" : [ { > "id" : 1, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > } ] > } > input 1 > (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 > (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), > isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) > Assigning timestamp 8640 > Source [timestamp=8640 watermark=-9223372036854775808] "test message" > Emitting watermark 0 > Assigning timestamp 86400 > Source [timestamp=86400 watermark=0] "test message" > Emitting watermark 77760 > Assigning timestamp 864000 > Source [timestamp=864000 watermark=77760] "test message" > Emitting watermark 855360 > Assigning timestamp 864 > Source [timestamp=864 watermark=855360] "test message" > Emitting watermark 8631360 > Assigning timestamp 9223372036854775807 > Source [timestamp=9223372036854775807 watermark=8631360] "test message" > Emitting watermark 9223372036768375807 > > And here is the output for Flink 1.11 (not propagating the watermark): > > input 1 > (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 > (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), > isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) > { > "nodes" : [ { > "id" : 1, > "type" : "Source: Custom Source", > "pact" : "Data Source", > "contents" : "Source: Custom Source", > "parallelism" : 1 > }, { > "id" : 2, > "type" : "Process", > "pact" : "Operator", > "contents" : "Process", > "parallelism" : 1, > "predecessors" : [ { > "id" : 1, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > } ] > } > input 1 > (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 > (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), > isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) > Assigning timestamp 8640 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 0 > Assigning timestamp 86400 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 77760 > Assigning timestamp 864000 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 855360 > Assigning timestamp 864 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 8631360 > Assigning timestamp 9223372036854775807 > Source [timestamp=0 watermark=-9223372036854775808] "test message" > Emitting watermark 9223372036768375807 > > Here's the integration test that exposes it: > > package mytest; > import com.fasterxml.jackson.core.JsonProcessingException;import > com.fasterxml.jackson.databind.ObjectMapper; > import java.io.FileInputStream;import java.io.InputStream;import > java.io.IOException; > import java.nio.file.Files;import java.nio.file.Paths; > import java.text.SimpleDateFormat; > import java.util.Arrays;import java.util.concurrent.CompletableFuture;import > java.util.concurrent.TimeUnit;import java.util.Date;import > java.util.HashMap;import java.util.Map;import java.util.Properties; > import
Flink 1.11 FlinkKafkaConsumer not propagating watermarks
Hi everyone, I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some Flink processors using Flink 1.12, and tried to get them working on Amazon EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I went to downgrade, I found, inexplicably, that watermarks were no longer propagating. There is only one partition on the topic, and parallelism is set to 1. Is there something I'm missing here? I feel like I'm going a bit crazy. I've cross-posted this on stackoverflow, but I figure the mailing list is probably a better avenue for this question. Thanks, Ned Here's the output for Flink 1.12 (correctly propagating the watermark): input 1 (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) { "nodes" : [ { "id" : 1, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 1 }, { "id" : 2, "type" : "Process", "pact" : "Operator", "contents" : "Process", "parallelism" : 1, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } input 1 (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) Assigning timestamp 8640 Source [timestamp=8640 watermark=-9223372036854775808] "test message" Emitting watermark 0 Assigning timestamp 86400 Source [timestamp=86400 watermark=0] "test message" Emitting watermark 77760 Assigning timestamp 864000 Source [timestamp=864000 watermark=77760] "test message" Emitting watermark 855360 Assigning timestamp 864 Source [timestamp=864 watermark=855360] "test message" Emitting watermark 8631360 Assigning timestamp 9223372036854775807 Source [timestamp=9223372036854775807 watermark=8631360] "test message" Emitting watermark 9223372036768375807 And here is the output for Flink 1.11 (not propagating the watermark): input 1 (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) { "nodes" : [ { "id" : 1, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 1 }, { "id" : 2, "type" : "Process", "pact" : "Operator", "contents" : "Process", "parallelism" : 1, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } input 1 (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null) Assigning timestamp 8640 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 0 Assigning timestamp 86400 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 77760 Assigning timestamp 864000 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 855360 Assigning timestamp 864 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 8631360 Assigning timestamp 9223372036854775807 Source [timestamp=0 watermark=-9223372036854775808] "test message" Emitting watermark 9223372036768375807 Here's the integration test that exposes it: package mytest; import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper; import java.io.FileInputStream;import java.io.InputStream;import java.io.IOException; import java.nio.file.Files;import java.nio.file.Paths; import java.text.SimpleDateFormat; import java.util.Arrays;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Properties; import kafka.server.KafkaConfig;import kafka.server.KafkaServer; import kafka.utils.MockTime;import kafka.utils.TestUtils; import kafka.zk.EmbeddedZookeeper; import org.apache.flink.api.common.eventtime.TimestampAssigner;import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import org.apache.flink.api.common.eventtime.Watermark;import org.apache.flink.api.common.eventtime.WatermarkGenerator;import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import org.apache.flink.api.common.eventtime.WatermarkOutput;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.JobExecutionResult;import