Hi everyone,

I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some
Flink processors using Flink 1.12, and tried to get them working on Amazon
EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I
went to downgrade, I found, inexplicably, that watermarks were no longer
propagating.

There is only one partition on the topic, and parallelism is set to 1. Is
there something I'm missing here? I feel like I'm going a bit crazy.

I've cross-posted this on stackoverflow, but I figure the mailing list is
probably a better avenue for this question.

Thanks,
Ned


Here's the output for Flink 1.12 (correctly propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=86400000 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=864000000 watermark=0] "test message"
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=8640000000 watermark=777600000] "test message"
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=86400000000 watermark=8553600000] "test message"
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=86313600000] "test message"
Emitting watermark 9223372036768375807

And here is the output for Flink 1.11 (not propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 9223372036768375807

Here's the integration test that exposes it:

package mytest;
import com.fasterxml.jackson.core.JsonProcessingException;import
com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream;import java.io.InputStream;import
java.io.IOException;
import java.nio.file.Files;import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Arrays;import
java.util.concurrent.CompletableFuture;import
java.util.concurrent.TimeUnit;import java.util.Date;import
java.util.HashMap;import java.util.Map;import java.util.Properties;
import kafka.server.KafkaConfig;import kafka.server.KafkaServer;
import kafka.utils.MockTime;import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.flink.api.common.eventtime.TimestampAssigner;import
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import
org.apache.flink.api.common.eventtime.Watermark;import
org.apache.flink.api.common.eventtime.WatermarkGenerator;import
org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import
org.apache.flink.api.common.eventtime.WatermarkOutput;import
org.apache.flink.api.common.eventtime.WatermarkStrategy;import
org.apache.flink.api.common.JobExecutionResult;import
org.apache.flink.api.common.serialization.SimpleStringSchema;import
org.apache.flink.core.execution.JobClient;import
org.apache.flink.runtime.client.JobCancellationException;import
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.streaming.api.functions.ProcessFunction;import
org.apache.flink.streaming.api.functions.ProcessFunction.Context;import
org.apache.flink.streaming.api.TimerService;import
org.apache.flink.streaming.api.windowing.time.Time;import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import
org.apache.flink.test.util.MiniClusterWithClientResource;import
org.apache.flink.util.Collector;import
org.apache.kafka.clients.admin.AdminClient;import
org.apache.kafka.clients.admin.CreateTopicsResult;import
org.apache.kafka.clients.admin.DescribeTopicsResult;import
org.apache.kafka.clients.admin.NewTopic;import
org.apache.kafka.clients.admin.TopicDescription;import
org.apache.kafka.clients.producer.KafkaProducer;import
org.apache.kafka.clients.producer.ProducerRecord;import
org.apache.kafka.common.serialization.Serializer;import
org.apache.kafka.common.serialization.StringDeserializer;import
org.apache.kafka.common.serialization.StringSerializer;import
org.apache.kafka.streams.StreamsConfig;
import org.junit.*;
public class FailTest {
    private static EmbeddedZookeeper zooKeeper = null;
    private static KafkaServer server = null;
    public static AdminClient admin = null;
    private static int connected = 0;

    private static StringSerializer stringSerializer = new StringSerializer();
    private static StringDeserializer stringDeserializer = new
StringDeserializer();

    private static final Properties ZooKeeperProperties =
getZooKeeperProperties();
    private static final Properties ServerProperties        =
getServerProperties();
    private static final Properties ProducerProperties  =
getProducerProperties();

    public static StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

    public static Properties getProducerProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("bootstrap.servers", "localhost:9092");
        result.put("compression.type", "none");
        return result;
    }

    public static Properties getServerProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("broker.id", "0");
        result.put("num.network.threads", "3");
        result.put("num.io.threads", "8");
        result.put("socket.send.buffer.bytes", "102400");
        result.put("socket.recv.buffer.bytes", "102400");
        result.put("log.dirs", "target/kafka-logs");
        result.put("num.partitions", "1");
        result.put("offset.topic.replication.factor", "1");
        result.put("transaction.state.log.replication.factor", "1");
        result.put("transaction.state.log.min.isr", "1");
        result.put("auto.create.topics.enable", "true");
        result.put("log.retention.hours", "168");
        result.put("log.segment.bytes", "1073741824");
        result.put("log.retention.check.interval.ms", "300000");
        result.put("zookeeper.connect", "localhost:2181");
        result.put("zookeeper.connection.timeout.ms", "18000");
        result.put("group.initial.rebalance.delay.ms", "0");
        return result;
    }

    public static Properties getZooKeeperProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("dataDir", "/tmp/zookeeper");
        result.put("clientPort", "2181");
        result.put("maxClientCnxns", "0");
        result.put("admin.enableServer", "false");
        return result;
    }

    private static Properties getNewLogDir(Properties props) {
        String path = props.getProperty("log.dirs");
        path = path + "/run.";
        int index = 0;
        boolean done = false;
        while (!Files.notExists(Paths.get(path + String.valueOf(index)))) {
            index += 1;
        }
        props.setProperty("log.dirs", path + String.valueOf(index));
        return props;
    }

    public static class Print<V> extends ProcessFunction<V, V> {
        private static final ObjectMapper mapper = new ObjectMapper();
        public String prefix;

        public Print() {
            this.prefix = "";
        }

        public Print(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public void processElement(V value, Context ctx, Collector<V> out) {
            System.out.printf("%s ", prefix);
            if (ctx != null) {
                TimerService srv = ctx.timerService();
                Long timestampLong = ctx.timestamp();
                long timestamp = 0;
                if (timestampLong != null) {
                    timestamp = timestampLong;
                }
                long watermark = 0;
                if (srv != null) {
                    watermark = srv.currentWatermark();
                }
                System.out.printf("[timestamp=%d watermark=%d] ",
timestamp, watermark);
            }

            if (value == null) {
                System.out.println("null");
            } else {
                try {
                    System.out.println(new
String(mapper.writeValueAsBytes(value)));
                } catch (Exception e) {
                    System.out.println("exception");
                    e.printStackTrace();
                }
            }
            out.collect(value);
        }
    }

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
        new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(2)
                .setNumberTaskManagers(1)
                .build());

    @BeforeClass
    public static void setup() {
        env.setParallelism(1);
        if (connected == 0) {
            zooKeeper = new EmbeddedZookeeper();
            ServerProperties.setProperty("zookeeper.connect",
"localhost:" + zooKeeper.port());

            server = TestUtils.createServer(new
KafkaConfig(getNewLogDir(ServerProperties)), new MockTime());
            admin = AdminClient.create(ProducerProperties);
        }
        connected += 1;
    }

    @AfterClass
    public static void tearDown() {
        if (connected == 1) {
            try {
                server.shutdown();
                zooKeeper.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }

            zooKeeper = null;
            server = null;
            admin = null;
        }
        connected -= 1;
    }

    @Test
    public void testFail() throws Exception {
        String inputTopic = "input";

        Map<String, String> configs = new HashMap<>();
        int partitions = 1;
        short replication = 1;

        CreateTopicsResult result = admin.createTopics(Arrays.asList(
            new NewTopic(inputTopic, partitions, replication).configs(configs)
        ));
        result.all().get();

        KafkaProducer<String, String> producer = new
KafkaProducer<String, String>(ProducerProperties, stringSerializer,
stringSerializer);
;

        DescribeTopicsResult topics =
admin.describeTopics(Arrays.asList(inputTopic));
        for (Map.Entry<String, TopicDescription> topic :
topics.all().get().entrySet()) {
            System.out.printf("%s %d\n", topic.getValue().name(),
topic.getValue().partitions().size());
            System.out.println(topic.getValue().toString());
        }

        // Some subscription events
        producer.send(new ProducerRecord<String, String>(inputTopic,
0, Time.days(1).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic,
0, Time.days(10).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic,
0, Time.days(100).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic,
0, Time.days(1000).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic,
0, Long.MAX_VALUE, "0", "test message"));
        producer.flush();
        producer.close();

        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        prop.put("group.id", "0");
        prop.put("enable.auto.commit", "true");
        prop.put("auto.commit.interval.ms", "1000");
        prop.put("session.timeout.ms", "30000");
        FlinkKafkaConsumer<String> source = new
FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(),
prop);
        source.assignTimestampsAndWatermarks(
            new WatermarkStrategy<String>() {
                @Override
                public TimestampAssigner<String>
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return new TimestampAssigner<String>() {
                        @Override
                        public long extractTimestamp(String event,
long recordTimestamp) {
                            System.out.printf("Assigning timestamp
%d\n", recordTimestamp);
                            return recordTimestamp;
                        }
                    };
                }

                @Override
                public WatermarkGenerator<String>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new WatermarkGenerator<String>() {
                        public long latestWatermark = Long.MIN_VALUE;

                        @Override
                        public void onEvent(String event, long
timestamp, WatermarkOutput output) {
                            long eventWatermark = timestamp -
Time.days(1).toMilliseconds();
                            if (eventWatermark > latestWatermark) {
                                System.out.printf("Emitting watermark
%d\n", eventWatermark);
                                output.emitWatermark(new
Watermark(eventWatermark));
                                latestWatermark = eventWatermark;
                            }
                        }

                        @Override
                        public void onPeriodicEmit(WatermarkOutput output) {
                        }
                    };
                }
            });
        source.setStartFromEarliest();

        env.addSource(source)
            .process(new Print<String>("Source"));

        System.out.println(env.getExecutionPlan());
        JobClient client = null;
        try {
            client = env.executeAsync("Fail Test");
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }

        topics = admin.describeTopics(Arrays.asList(inputTopic));
        for (Map.Entry<String, TopicDescription> topic :
topics.all().get().entrySet()) {
            System.out.printf("%s %d\n", topic.getValue().name(),
topic.getValue().partitions().size());
            System.out.println(topic.getValue().toString());
        }

        TimeUnit.SECONDS.sleep(5);
        client.cancel().get(5, TimeUnit.SECONDS);
    }
}

Reply via email to