Hi Matthias,
It seems there is a bit of an API inconsistency here. ParDo has the
.withSideInputs(Iterable<PCollectionView<...>>) version but also a varargs
version withSideInputs(PCollectionView<?>...) but Combine only has the
Iterable version.
So you have the right idea, and just need to put your PCollectionView into
a trivial iterable to pass it to Combine's withSideInputs. Leaving out type
annotations for readability:
logs_keyed.apply(
Combine.perKey(new CombineLogs(sideInput))
.withSideInputs(Collections.singletonList(sideInput)));
Just replying without checking with a compiler, so I hope this helps but
let me know if you hit another snag.
We'll get these APIs consistent.
Kenn
On Tue, Jan 3, 2017 at 11:29 AM, Matthias Baetens <
[email protected]> wrote:
> Hi everyone,
>
> I have been trying to write a CombineFnWithContext; in the Context I want
> to have a map containing values to filter the input values.
>
> If I'm correct, I can access the sideinput in the addInput(Accum accum,
> Log input, Context c) operation using c.sideInput(). When constructing the
> main flow of the pipeline though, I tried to do things similarly to what is
> outlined here
> <https://beam.apache.org/documentation/programming-guide/#transforms-sideio>
> for
> ParDo's (creating a PCollectionView and specifying the side input using
> .withSideInputs()) - but then an Iterable<? extends PCollectionView<?>> is
> required rather than a PCollectionView<>):
>
> PCollection<KV<String, String>> logs_aggregated =
> logs_keyed.apply(Combine.<String, Log, String>perKey(new
> CombineLogs(sideInput)).withSideInputs(sideInput));
>
> How should I go about this, because the syntax is confusing me and I was
> not able to find a clear example floating around the internet.
>
> Thanks!
>
> Best regards,
>
> Matthias
>