Hi Guaowei,
Here is what the code for my pipeline looks like. 

Class CoFlatMapFunc extends
CoFlatMapFunction<FileInput,KafkaInput,KafkaOutput>
{
       ValueState<FileInput> cache;

       public void open(Configuration parameters){
             //initialize cache
       }

       //read element from file and update cache.
       public void flatMap1(FileInput fileInput, Collector<KafkaOutput>
collector){
                cache.update(fileInput);
       }
       //read element from kafka, look up cache and output tuple.
       public void flatMap2(KafkaInput kafkaInput, Collector<KafkaOutput>
collector){
              return new KafkaOutput(kafkaInput,cache.value());
       }
}


// Old pipeline that works fine. 
Class OldFlinkPipeline {
       
       public SingleOutputStreamOperator<KafkaOutput>
                                       
generateOutput(StreamExecutionEnvironment env){
              
             DataStream<KafkaInput> kafkaStream = env
                               .addSource(new KafkaSourceFunction());

            return kafkaStream
            .map(kafkaInput -&gt; 
                  new KafkaOutput(kafkaInput, null /*fileInput*/ );
                          
       }
        

}

//New pipeline that is consuming more than 4X the resources.  
Class NewFlinkPipeline {
       
       public SingleOutputStreamOperator<KafkaOutput>
                                       
generateOutput(StreamExecutionEnvironment env){
              
             KeyedStream<KafkaInput,ID> kafkaStream = env
                               .addSource(new KafkaSourceFunction())
                               .keyBy(kafkaInput -&gt; kafkaInput.getId());

             KeyedStream<FileInput,ID> fileStream = env
                                .readTextFile(&quot;file.txt&quot;)
                                .keyBy(fileInput -&gt; fileInput.getId());

            return fileStream
           .connect(kafkaStream)
           .coFlatMap(new CoFlatMapFunc())
                          
       }
        
}

Please do let me know if this is the recommended way to connect a bounded
stream with an unbounded stream, or if I am doing something obviously
expensive here.  




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to