@Override
public void processElement(Integer value, Context ctx, Collector<Integer>
out) throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<
Integer> out) throws Exception {
   // …
}

correcting myself regarding the above timer proposal. it still requires a
message/record come in. I am trying to guard against when there is a long
gap of idle. then I won't be able to register a timer.


On Mon, Jun 11, 2018 at 9:22 AM, Steven Wu <[email protected]> wrote:

> Pirotr,
>
> > However you could do it via a custom Operator (there you have a
> constant access to output collector).
>
> Can you elaborate that a little bit? are you referring to
> "Output<StreamRecord<OUT>> output" in AbstractStreamOperator class?
>
> > register processing time service in your ProcessFunction.
>
> I think your timer proposal can work.
>
> I was originally register timer like this. ProcessingTimeCallback
> interface doesn't supply the Collector parameter
>
> ((StreamingRuntimeContext) getRuntimeContext())
>     .getProcessingTimeService()
>     .registerTimer(..., this);
>
> Thanks,
> Steven
>
>
>
> On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski <[email protected]>
> wrote:
>
>> Hi,
>>
>> Indeed it seems like this is not possible to emit records on
>> checkpoint/snapshot through ProcessFunction. However you could do it via a
>> custom Operator (there you have a constant access to output collector).
>> Another workaround might be to register processing time service in your
>> ProcessFunction.
>>
>> @Override
>> public void processElement(Integer value, Context ctx, Collector<Integer>
>> out) throws Exception {
>>    ctx.timerService().registerProcessingTimeTimer(...);
>> }
>>
>> @Override
>> public void onTimer(long timestamp, OnTimerContext ctx, Collector<I
>> nteger> out) throws Exception {
>>    // …
>> }
>>
>> Piotrek
>>
>> On 11 Jun 2018, at 01:07, Steven Wu <[email protected]> wrote:
>>
>> I have a process function defined with these interfaces
>>
>> public class MyProcessFunction<IN> extends ProcessFunction<IN, OUT>
>>     implements CheckpointedFunction, ProcessingTimeCallback {...}
>>
>> In snapshotState() method, I want to close files and emit the metadata
>> about the closed files to downstream operator. it doesn't seem possible
>> with *snapshotState(FunctionSnapshotContext context*) interface.
>>
>> I can keep metadata in snapshot and restore them during recovery. but if
>> there is no input record coming for a long time, * processElement(T
>> value, Context ctx, Collector<DataFile> out)* won't be called. Then I
>> can't forward the restored data to downstream operator with guaranteed
>> latency.
>>
>> I can add a timer. but it doesn't seem that *onProcessingTime(long
>> timestamp)* allows me to forward output to downstream operator either.
>>
>> Thanks,
>> Steven
>>
>>
>>
>

Reply via email to