I’ve been able to use generics + PAssert testing before, I have a class that
looks like this
public class TagByConsumptionType<Record extends CoreEntity> extends
DoFn<Record, Record> {
@ProcessElement
public void process(ProcessContext c) { …
Where Record is the generic type.
The CoreEntity class looks like this
@DefaultCoder(AvroCoder.class)
public class CoreEntity implements Serializable {
Testing it just involved
List<CoreEntity> records = … // create test records
PCollection<CoreEntity> inputRecords = pipeline.apply(Create.of(records));
PCollection<CoreEntity> output =
input.apply(ParDo.of(myTagByConsumptionTypeFunc));
PAssert.that(output).satisfies(new TestSatisfies<>(results -> {
assertThat(results, hasSize(3));
// other asserts
}));
pipeline.run();
Hope that helps.
From: Vilhelm von Ehrenheim
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Wednesday, 20 September 2017 at 15:29
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Testing transforms with generics using PAssert.containsAnyOrder
Hi!
I have a parameterized a DoFn using generics to return different Classes
depending on where I use it. The generics type is required to be extending a
superclass I have specified myself.
The ParDo seem to function as expected and I get the right output using it but
when I try to test this the PAssert containsAnyOrder only compares the
superclass of the objects and not the parameterized type.
I might be missing something Java specific here but the PAssert is really hard
to get what it is actually doing looking at the code.
So is there any way to force it to compare the correct class or can I in some
way extract the objects of the pcollection in the test and do the checks
manually?
The method signature looks like this:
static class MaterializeFn<T extends Entity> extends DoFn<KV<String, Log>,
KV<String, T>>
And if the superclass Entity does not have a Default coder I get the following
error:
java.lang.IllegalStateException: Unable to return a default Coder for
Materialize/ParMultiDo(Materialize).out0 [PCollection]. Correct one of the
following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Cannot provide coder for
parameterized type org.apache.beam.sdk.values.KV<java.lang.String, T>: Unable
to provide a Coder for T.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed:
PTransform.getOutputCoder called.
Which kind of makes sense i guess as it needs to make sure there is a coder for
all possible classes in runtime. The Sub classes do also have a coder specified
(all use AvroCoder) and seem to work fine when running the pipeline as I get
the data I am expecting as output (in Kafka).
Any ideas on what do do? I tried to look at the PAssert to figure out why this
could be but it was a bit over my head.
Thanks,
Vilhelm von Ehrenheim
----------------------------
http://www.bbc.co.uk
This e-mail (and any attachments) is confidential and may contain personal
views which are not the views of the BBC unless specifically stated.
If you have received it in error, please delete it from your system.
Do not use, copy or disclose the information in any way nor act in reliance on
it and notify the sender immediately.
Please note that the BBC monitors e-mails sent or received.
Further communication will signify your consent to this.
---------------------