Hi Sundar,
1. Could you check the GC status of the process? or you could increase the
memory size of your TM. (I find that you use the value state and I assume
that you use the MemoryStatebackend)
2. AFAIK there is no performance limitation in using the `connect` operator
for mixing the bounded/unbounded stream.

Best,
Guowei


sundar <[email protected]> 于2020年5月20日周三 上午9:54写道:

> Hi Guaowei,
> Here is what the code for my pipeline looks like.
>
> Class CoFlatMapFunc extends
> CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput>
> {
>        ValueState<FileInput> cache;
>
>        public void open(Configuration parameters){
>              //initialize cache
>        }
>
>        //read element from file and update cache.
>        public void flatMap1(FileInput fileInput, Collector<KafkaOutput>
> collector){
>                 cache.update(fileInput);
>        }
>        //read element from kafka, look up cache and output tuple.
>        public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput>
> collector){
>               return new KafkaOutput(kafkaInput,cache.value());
>        }
> }
>
>
> // Old pipeline that works fine.
> Class OldFlinkPipeline {
>
>        public SingleOutputStreamOperator<KafkaOutput>
>
> generateOutput(StreamExecutionEnvironment env){
>
>              DataStream<KafkaInput> kafkaStream = env
>                                .addSource(new KafkaSourceFunction());
>
>             return kafkaStream
>             .map(kafkaInput -&gt;
>                   new KafkaOutput(kafkaInput, null /*fileInput*/ );
>
>        }
>
>
> }
>
> //New pipeline that is consuming more than 4X the resources.
> Class NewFlinkPipeline {
>
>        public SingleOutputStreamOperator<KafkaOutput>
>
> generateOutput(StreamExecutionEnvironment env){
>
>              KeyedStream<KafkaInput,ID> kafkaStream = env
>                                .addSource(new KafkaSourceFunction())
>                                .keyBy(kafkaInput -&gt; kafkaInput.getId());
>
>              KeyedStream<FileInput,ID> fileStream = env
>                                 .readTextFile(&quot;file.txt&quot;)
>                                 .keyBy(fileInput -&gt; fileInput.getId());
>
>             return fileStream
>            .connect(kafkaStream)
>            .coFlatMap(new CoFlatMapFunc())
>
>        }
>
> }
>
> Please do let me know if this is the recommended way to connect a bounded
> stream with an unbounded stream, or if I am doing something obviously
> expensive here.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to