Re: Flink 1.11 FlinkKafkaConsumer not propagating watermarks

2021-04-21 Thread Arvid Heise
For reference: self answered on [1].

Turns out that Flink 1.12 defaults the TimeCharacteristic to EventTime and
> deprecates the whole TimeCharacteristic flow. So to downgrade to Flink
> 1.11, you must add the following statement to configure the
> StreamExecutionEnvironment.
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
[1] https://stackoverflow.com/a/67111541/10299342

On Thu, Apr 15, 2021 at 12:08 AM Edward Bingham 
wrote:

> Hi everyone,
>
> I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some
> Flink processors using Flink 1.12, and tried to get them working on Amazon
> EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I
> went to downgrade, I found, inexplicably, that watermarks were no longer
> propagating.
>
> There is only one partition on the topic, and parallelism is set to 1. Is
> there something I'm missing here? I feel like I'm going a bit crazy.
>
> I've cross-posted this on stackoverflow, but I figure the mailing list is
> probably a better avenue for this question.
>
> Thanks,
> Ned
>
>
> Here's the output for Flink 1.12 (correctly propagating the watermark):
>
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 
> (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), 
> isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> {
>   "nodes" : [ {
> "id" : 1,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 1
>   }, {
> "id" : 2,
> "type" : "Process",
> "pact" : "Operator",
> "contents" : "Process",
> "parallelism" : 1,
> "predecessors" : [ {
>   "id" : 1,
>   "ship_strategy" : "FORWARD",
>   "side" : "second"
> } ]
>   } ]
> }
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 
> (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), 
> isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> Assigning timestamp 8640
> Source [timestamp=8640 watermark=-9223372036854775808] "test message"
> Emitting watermark 0
> Assigning timestamp 86400
> Source [timestamp=86400 watermark=0] "test message"
> Emitting watermark 77760
> Assigning timestamp 864000
> Source [timestamp=864000 watermark=77760] "test message"
> Emitting watermark 855360
> Assigning timestamp 864
> Source [timestamp=864 watermark=855360] "test message"
> Emitting watermark 8631360
> Assigning timestamp 9223372036854775807
> Source [timestamp=9223372036854775807 watermark=8631360] "test message"
> Emitting watermark 9223372036768375807
>
> And here is the output for Flink 1.11 (not propagating the watermark):
>
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 
> (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), 
> isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> {
>   "nodes" : [ {
> "id" : 1,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 1
>   }, {
> "id" : 2,
> "type" : "Process",
> "pact" : "Operator",
> "contents" : "Process",
> "parallelism" : 1,
> "predecessors" : [ {
>   "id" : 1,
>   "ship_strategy" : "FORWARD",
>   "side" : "second"
> } ]
>   } ]
> }
> input 1
> (name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 
> (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), 
> isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
> Assigning timestamp 8640
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 0
> Assigning timestamp 86400
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 77760
> Assigning timestamp 864000
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 855360
> Assigning timestamp 864
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 8631360
> Assigning timestamp 9223372036854775807
> Source [timestamp=0 watermark=-9223372036854775808] "test message"
> Emitting watermark 9223372036768375807
>
> Here's the integration test that exposes it:
>
> package mytest;
> import com.fasterxml.jackson.core.JsonProcessingException;import 
> com.fasterxml.jackson.databind.ObjectMapper;
> import java.io.FileInputStream;import java.io.InputStream;import 
> java.io.IOException;
> import java.nio.file.Files;import java.nio.file.Paths;
> import java.text.SimpleDateFormat;
> import java.util.Arrays;import java.util.concurrent.CompletableFuture;import 
> java.util.concurrent.TimeUnit;import java.util.Date;import 
> java.util.HashMap;import java.util.Map;import java.util.Properties;
> import 

Flink 1.11 FlinkKafkaConsumer not propagating watermarks

2021-04-14 Thread Edward Bingham
Hi everyone,

I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some
Flink processors using Flink 1.12, and tried to get them working on Amazon
EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I
went to downgrade, I found, inexplicably, that watermarks were no longer
propagating.

There is only one partition on the topic, and parallelism is set to 1. Is
there something I'm missing here? I feel like I'm going a bit crazy.

I've cross-posted this on stackoverflow, but I figure the mailing list is
probably a better avenue for this question.

Thanks,
Ned


Here's the output for Flink 1.12 (correctly propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
  "nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
  }, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
  "id" : 1,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 8640
Source [timestamp=8640 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 86400
Source [timestamp=86400 watermark=0] "test message"
Emitting watermark 77760
Assigning timestamp 864000
Source [timestamp=864000 watermark=77760] "test message"
Emitting watermark 855360
Assigning timestamp 864
Source [timestamp=864 watermark=855360] "test message"
Emitting watermark 8631360
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=8631360] "test message"
Emitting watermark 9223372036768375807

And here is the output for Flink 1.11 (not propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
  "nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
  }, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
  "id" : 1,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 8640
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 86400
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 77760
Assigning timestamp 864000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 855360
Assigning timestamp 864
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 8631360
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 9223372036768375807

Here's the integration test that exposes it:

package mytest;
import com.fasterxml.jackson.core.JsonProcessingException;import
com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream;import java.io.InputStream;import
java.io.IOException;
import java.nio.file.Files;import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Arrays;import
java.util.concurrent.CompletableFuture;import
java.util.concurrent.TimeUnit;import java.util.Date;import
java.util.HashMap;import java.util.Map;import java.util.Properties;
import kafka.server.KafkaConfig;import kafka.server.KafkaServer;
import kafka.utils.MockTime;import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.flink.api.common.eventtime.TimestampAssigner;import
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import
org.apache.flink.api.common.eventtime.Watermark;import
org.apache.flink.api.common.eventtime.WatermarkGenerator;import
org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import
org.apache.flink.api.common.eventtime.WatermarkOutput;import
org.apache.flink.api.common.eventtime.WatermarkStrategy;import
org.apache.flink.api.common.JobExecutionResult;import