Hi Sundar, in general, you wouldn't load the static data in the driver, but upon opening the map on the processing nodes. If your processing nodes could hold the data, it might be the easiest to switch to this pattern. You usually load it once per node across all subtasks by using some kind of static map. I can go into details if you are interested.
Now for the actual question. 1. As soon as you use Co* functions, your pipeline cannot be chained anymore and needs to go over the network. I suspect that this is where the lag comes from. You can try to tweak the network configurations if you use a high degree of parallelism [1]. 2. Not that I know of. 3. See above. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#configuring-the-network-buffers On Wed, May 20, 2020 at 4:55 AM Guowei Ma <[email protected]> wrote: > Hi Sundar, > 1. Could you check the GC status of the process? or you could increase the > memory size of your TM. (I find that you use the value state and I assume > that you use the MemoryStatebackend) > 2. AFAIK there is no performance limitation in using the `connect` > operator for mixing the bounded/unbounded stream. > > Best, > Guowei > > > sundar <[email protected]> 于2020年5月20日周三 上午9:54写道: > >> Hi Guaowei, >> Here is what the code for my pipeline looks like. >> >> Class CoFlatMapFunc extends >> CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput> >> { >> ValueState<FileInput> cache; >> >> public void open(Configuration parameters){ >> //initialize cache >> } >> >> //read element from file and update cache. >> public void flatMap1(FileInput fileInput, Collector<KafkaOutput> >> collector){ >> cache.update(fileInput); >> } >> //read element from kafka, look up cache and output tuple. >> public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput> >> collector){ >> return new KafkaOutput(kafkaInput,cache.value()); >> } >> } >> >> >> // Old pipeline that works fine. >> Class OldFlinkPipeline { >> >> public SingleOutputStreamOperator<KafkaOutput> >> >> generateOutput(StreamExecutionEnvironment env){ >> >> DataStream<KafkaInput> kafkaStream = env >> .addSource(new KafkaSourceFunction()); >> >> return kafkaStream >> .map(kafkaInput -> >> new KafkaOutput(kafkaInput, null /*fileInput*/ ); >> >> } >> >> >> } >> >> //New pipeline that is consuming more than 4X the resources. >> Class NewFlinkPipeline { >> >> public SingleOutputStreamOperator<KafkaOutput> >> >> generateOutput(StreamExecutionEnvironment env){ >> >> KeyedStream<KafkaInput,ID> kafkaStream = env >> .addSource(new KafkaSourceFunction()) >> .keyBy(kafkaInput -> >> kafkaInput.getId()); >> >> KeyedStream<FileInput,ID> fileStream = env >> .readTextFile("file.txt") >> .keyBy(fileInput -> fileInput.getId()); >> >> return fileStream >> .connect(kafkaStream) >> .coFlatMap(new CoFlatMapFunc()) >> >> } >> >> } >> >> Please do let me know if this is the recommended way to connect a bounded >> stream with an unbounded stream, or if I am doing something obviously >> expensive here. >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
