OK, after read the docs: https://docs.google.com/document/d/1vyel3XRfdeGyqLvXiy1C3mrw9QBbveoKjjVLuRxMw4k/edit#. I known the answer. Just use StatefulDoFn.
On 2017-05-21 15:49 (+0800), yunfan zhang <[email protected]> wrote: > For example, two kafka topic has a field named key. And I want to merge the > two topics in beam streams in max timeout time. > For now, I implement it in flink by using flink state storage. > Code is like: > > KeyedStream<T> keyedStreams = topicA.connect( topicB).map(XXX).keyBy(XXX); > keyedStream.process(new RichProcessFunction<Object, Object>() { > private ValueState<Object> state; > @Override > public void open(Configuration parameters) throws Exception { > state = getRuntimeContext().getState(new > ValueStateDescriptor<Object>("name", XXXXXX); > } > > @Override > public void processElement(Object o, Context context, Collector<Object> > collector) throws Exception { > if (state.value() != null) { > // merge two value and emit in there. > state.update(null); //clear after merge > } else { > state.update(o); > > context.timerService().registerProcessingTimeTimer(System.currentTimeMillis() > + mergeTimeoutMs); //set timeout process > } > } > > @Override > public void onTimer(long l, OnTimerContext onTimerContext, > Collector<Object> collector) throws Exception { > if (state.value() != null) { > // process timeout element > } > } > }); > > By using this code, most data can be emitted immediately after it merged, > some of data with some latency will be merged in late. Some of data canât > be merged in specific time will be > process by timeout function. > How can I implement this in beam?
