Hi Sundar,

in general, you wouldn't load the static data in the driver, but upon
opening the map on the processing nodes. If your processing nodes could
hold the data, it might be the easiest to switch to this pattern. You
usually load it once per node across all subtasks by using some kind of
static map. I can go into details if you are interested.

Now for the actual question.

1. As soon as you use Co* functions, your pipeline cannot be chained
anymore and needs to go over the network. I suspect that this is where the
lag comes from. You can try to tweak the network configurations if you use
a high degree of parallelism [1].
2. Not that I know of.
3. See above.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#configuring-the-network-buffers

On Wed, May 20, 2020 at 4:55 AM Guowei Ma <[email protected]> wrote:

> Hi Sundar,
> 1. Could you check the GC status of the process? or you could increase the
> memory size of your TM. (I find that you use the value state and I assume
> that you use the MemoryStatebackend)
> 2. AFAIK there is no performance limitation in using the `connect`
> operator for mixing the bounded/unbounded stream.
>
> Best,
> Guowei
>
>
> sundar <[email protected]> 于2020年5月20日周三 上午9:54写道:
>
>> Hi Guaowei,
>> Here is what the code for my pipeline looks like.
>>
>> Class CoFlatMapFunc extends
>> CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput>
>> {
>>        ValueState<FileInput> cache;
>>
>>        public void open(Configuration parameters){
>>              //initialize cache
>>        }
>>
>>        //read element from file and update cache.
>>        public void flatMap1(FileInput fileInput, Collector<KafkaOutput>
>> collector){
>>                 cache.update(fileInput);
>>        }
>>        //read element from kafka, look up cache and output tuple.
>>        public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput>
>> collector){
>>               return new KafkaOutput(kafkaInput,cache.value());
>>        }
>> }
>>
>>
>> // Old pipeline that works fine.
>> Class OldFlinkPipeline {
>>
>>        public SingleOutputStreamOperator<KafkaOutput>
>>
>> generateOutput(StreamExecutionEnvironment env){
>>
>>              DataStream<KafkaInput> kafkaStream = env
>>                                .addSource(new KafkaSourceFunction());
>>
>>             return kafkaStream
>>             .map(kafkaInput -&gt;
>>                   new KafkaOutput(kafkaInput, null /*fileInput*/ );
>>
>>        }
>>
>>
>> }
>>
>> //New pipeline that is consuming more than 4X the resources.
>> Class NewFlinkPipeline {
>>
>>        public SingleOutputStreamOperator<KafkaOutput>
>>
>> generateOutput(StreamExecutionEnvironment env){
>>
>>              KeyedStream<KafkaInput,ID> kafkaStream = env
>>                                .addSource(new KafkaSourceFunction())
>>                                .keyBy(kafkaInput -&gt;
>> kafkaInput.getId());
>>
>>              KeyedStream<FileInput,ID> fileStream = env
>>                                 .readTextFile(&quot;file.txt&quot;)
>>                                 .keyBy(fileInput -&gt; fileInput.getId());
>>
>>             return fileStream
>>            .connect(kafkaStream)
>>            .coFlatMap(new CoFlatMapFunc())
>>
>>        }
>>
>> }
>>
>> Please do let me know if this is the recommended way to connect a bounded
>> stream with an unbounded stream, or if I am doing something obviously
>> expensive here.
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to