Hi Sundar, 1. Could you check the GC status of the process? or you could increase the memory size of your TM. (I find that you use the value state and I assume that you use the MemoryStatebackend) 2. AFAIK there is no performance limitation in using the `connect` operator for mixing the bounded/unbounded stream.
Best, Guowei sundar <[email protected]> 于2020年5月20日周三 上午9:54写道: > Hi Guaowei, > Here is what the code for my pipeline looks like. > > Class CoFlatMapFunc extends > CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput> > { > ValueState<FileInput> cache; > > public void open(Configuration parameters){ > //initialize cache > } > > //read element from file and update cache. > public void flatMap1(FileInput fileInput, Collector<KafkaOutput> > collector){ > cache.update(fileInput); > } > //read element from kafka, look up cache and output tuple. > public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput> > collector){ > return new KafkaOutput(kafkaInput,cache.value()); > } > } > > > // Old pipeline that works fine. > Class OldFlinkPipeline { > > public SingleOutputStreamOperator<KafkaOutput> > > generateOutput(StreamExecutionEnvironment env){ > > DataStream<KafkaInput> kafkaStream = env > .addSource(new KafkaSourceFunction()); > > return kafkaStream > .map(kafkaInput -> > new KafkaOutput(kafkaInput, null /*fileInput*/ ); > > } > > > } > > //New pipeline that is consuming more than 4X the resources. > Class NewFlinkPipeline { > > public SingleOutputStreamOperator<KafkaOutput> > > generateOutput(StreamExecutionEnvironment env){ > > KeyedStream<KafkaInput,ID> kafkaStream = env > .addSource(new KafkaSourceFunction()) > .keyBy(kafkaInput -> kafkaInput.getId()); > > KeyedStream<FileInput,ID> fileStream = env > .readTextFile("file.txt") > .keyBy(fileInput -> fileInput.getId()); > > return fileStream > .connect(kafkaStream) > .coFlatMap(new CoFlatMapFunc()) > > } > > } > > Please do let me know if this is the recommended way to connect a bounded > stream with an unbounded stream, or if I am doing something obviously > expensive here. > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
