Hi,

我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
        .process(new ProcessFunction<RatioValue, RatioValue>() {
            @Override
            public void processElement(RatioValuevalue, Context ctx, 
Collector<RatioValue> out) throws Exception {
                out.collect(value);
                ctx.output(ratioOutputTag, value);
            }
        });
sideStream.addSink(new FlinkKafkaProducer<>(
        "ratio_value",
        new RatioValueSerializationSchema(suffix),
        PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), 
tool.get(SCHEMA_REGISTRY_URL)),
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
DataStream<RatioValue> ratioSideStream = 
sideStream.getSideOutput(ratioOutputTag);
ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
想问下这种情况是否有什么排查手段?


[email protected]

回复