Hi: we use pyflink[1.15],but find it have large delay,avg to 500ms,with same java code,it's delay in range 1-6 ms,it's have any idea to fix it?
Thanks pyflink demo code: from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode from pyflink.datastream.connectors import FlinkKafkaConsumer,FlinkKafkaProducer from pyflink.datastream.functions import RuntimeContext, MapFunction import time,json def mymap(value): now = time.time() sv = json.loads(value) num = float(sv) print(now,"recv:",value,"span:",now - num) return sv +"_"+str(now) def demo1(): env = StreamExecutionEnvironment.get_execution_environment(); env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC); env.set_parallelism(1); # 启动消费者 deserialization_schema = SimpleStringSchema() kafka_props = { 'bootstrap.servers': "127.0.0.1:9092", 'group.id': "test_group_1", }; kafka_source = FlinkKafkaConsumer( topics = "kafka_demo", deserialization_schema = deserialization_schema, properties = kafka_props, ); ds = env.add_source(kafka_source).set_parallelism(1) serialization_schema = SimpleStringSchema() kafka_producer = FlinkKafkaProducer( topic = "test_producer_topic", serialization_schema = serialization_schema, producer_config = kafka_props); ds = ds.map(mymap, Types.STRING()).add_sink(kafka_producer); env.execute("Test"); if __name__ == '__main__': print("start flink_demo1") demo1() java code: package com.lhhj; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import java.util.Properties; import org.apache.flink.api.common.eventtime.WatermarkStrategy; public class Test { public static byte[] ProcessMsg(byte[] value) { try { long now = System.currentTimeMillis(); String sb = new String(value, "UTF-8"); double recvf = Double.parseDouble(sb)*1000; long recv = (long)recvf; System.out.println("recv msg " + recv + " now:" + now + " diff:" + (now - recv)); String ret = Long.toString(recv) + "_" + Long.toString(now); return ret.getBytes(); } catch (Exception e) { System.out.println("err msg:"+e.getMessage()); return value; } } public static void main(String[] args) { System.out.println("Hello World! FlinkDelayTest"); String broker = "127.0.0.1:9092"; KafkaSource<byte[]> source = KafkaSource.<byte[]>builder() .setBootstrapServers(broker) .setStartingOffsets(OffsetsInitializer.latest()) .setTopics("kafka_demo") .setValueOnlyDeserializer(new CharSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<byte[]> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "cal_req"); KafkaSink<byte[]> sink = KafkaSink.<byte[]>builder() .setBootstrapServers(broker) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setValueSerializationSchema(new CharSchema()) .setTopicSelector((record) -> { return "test_producer_topic"; }) .build()) .build(); stream.map(new MapFunction<byte[],byte[]>() { @Override public byte[] map(byte[] value){ return ProcessMsg(value); } }).filter(new FilterFunction<byte[]>() { @Override public boolean filter(byte[] value) throws Exception { if (value.length > 0) { return true; } return false; } }).sinkTo(sink); try { env.execute("FlinkDelayTest"); } catch (Exception e) { e.printStackTrace(); } } } max...@foxmail.com