Hi Maximilian,

Sorry there was a mistake in my sketch implementation. The wrapped DoFn was
supposed to transform the value in each KV only, not the whole KV!

This means I need the result of the output of the wrapper DoFn to build a
new KV instance with the result of the inner DoFn.

The code should have looked like this:

public class ApplyToValues<K, InputT, OutputT> extends
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {

    private DoFn<InputT ,OutputT> fun;

    private class WrapperDoFn extends DoFn<KV<K,InputT>, KV<K,OutputT>> {

        class WrapperContext extends DoFn<InputT ,OutputT>.ProcessContext {

            private OutputT output;

            public WrapperContext(DoFn<KV<K,InputT>,
KV<K,OutputT>>.ProcessContext inner) {

            }

            public OutputT getOutput() {
                return output;
            }

            // methods to implement
        }

        @Override
        public void processElement(ProcessContext c) throws Exception {
            WrapperContext wrapperContext = new WrapperContext(c);
            fun.processElement(wrapperContext);

            K key = c.element().getKey();

            c.output(KV.of(key, wrapperContext.getOutput()));
        }
    }

    public ApplyToValues(DoFn<InputT ,OutputT> fun) {
        this.fun = fun;
    }

    @Override
    public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>>
input) {
        return input.apply(ParDo.of(new WrapperDoFn()));
    }
}

But your are absolutely right if I didn't need the output I could have just
called processElement on the wrapped DoFn with the given context.

Thanks,


Frank

On 14 June 2016 at 10:09, Maximilian Michels <[email protected]> wrote:

> Hi Frank,
>
> Why don't you simply pass on the context to your value-transforming
> wrapped DoFn? Your wrapped function will call `output(..)` on the passed
> context:
>
>
>

Reply via email to