@Daniel: What is the TestSatisfies in your PAssert.satisfies? I tried with
my own SerializableFunction but cannot get it to work.

@Eugene: I can get rid of the error by assigning a default coder to the
superclass. However the test problem persists. I tried to do a .setCoder()
on the resulting collection but that faild with a NullPointerException. I
really don’t get why since the stacktrace is quite extensive but not very
helpful:

org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.NullPointerException: in co.motherbrain.materializer.Person
null of co.motherbrain.materializer.Person

    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
    at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
    at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:356)
    at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:338)
    at 
co.motherbrain.materializer.MaterializerTest.testMaterializePersonFn(MaterializerTest.java:109)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at 
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:327)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NullPointerException: in
co.motherbrain.materializer.Person null of
co.motherbrain.materializer.Person
    at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
    at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
    at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
    at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
    at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
    at 
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
    at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
    at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
    at 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
Caused by: java.lang.NullPointerException
    at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:158)
    at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
    at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
    at 
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
    at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
    at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
    at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
    at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
    at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
    at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
    at 
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
    at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
    at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
    at 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
    at 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
    at 
org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70)
    at 
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182)
    at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
    at 
org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
    at 
org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

is that to be expected or am I missing something? I thought that was the
coder it was using already. If it uses the Superclass coder I would expect
it to lose data but it seems to give me meaningful output.

Really appreciate the help guys!

// Vilhelm
​

On Wed, Sep 20, 2017 at 8:33 PM, Eugene Kirpichov <[email protected]>
wrote:

> The error suggests that you .setCoder() on your resulting collection, have
> you tried that? The error seems unrelated to PAssert.
>
> On Wed, Sep 20, 2017, 7:46 AM Daniel Harper <[email protected]>
> wrote:
>
>> I’ve been able to use generics + PAssert testing before, I have a class
>> that looks like this
>>
>> public class TagByConsumptionType<Record extends CoreEntity> extends 
>> DoFn<Record, Record> {
>>
>>     @ProcessElement
>>     public void process(ProcessContext c) { …
>>
>>
>> Where Record is the generic type.
>>
>> The CoreEntity class looks like this
>>
>> @DefaultCoder(AvroCoder.class)
>> public class CoreEntity implements Serializable {
>>
>>
>> Testing it just involved
>>
>> List<CoreEntity> records = … // create test records
>>
>> PCollection<CoreEntity> inputRecords = pipeline.apply(Create.of(records));
>>
>> PCollection<CoreEntity> output = 
>> input.apply(ParDo.of(myTagByConsumptionTypeFunc));
>>
>> PAssert.that(output).satisfies(new TestSatisfies<>(results -> {
>> assertThat(results, hasSize(3));
>> // other asserts
>> }));
>>
>> pipeline.run();
>>
>>
>> Hope that helps.
>>
>> From: Vilhelm von Ehrenheim <[email protected]>
>> Reply-To: "[email protected]" <[email protected]>
>> Date: Wednesday, 20 September 2017 at 15:29
>> To: "[email protected]" <[email protected]>
>> Subject: Testing transforms with generics using PAssert.containsAnyOrder
>>
>> Hi!
>> I have a parameterized a DoFn using generics to return different Classes
>> depending on where I use it. The generics type is required to be extending
>> a superclass I have specified myself.
>>
>> The ParDo seem to function as expected and I get the right output using
>> it but when I try to test this the PAssert containsAnyOrder only compares
>> the superclass of the objects and not the parameterized type.
>>
>> I might be missing something Java specific here but the PAssert is really
>> hard to get what it is actually doing looking at the code.
>>
>> So is there any way to force it to compare the correct class or can I in
>> some way extract the objects of the pcollection in the test and do the
>> checks manually?
>>
>> The method signature looks like this:
>>
>> static class MaterializeFn<T extends Entity> extends DoFn<KV<String, Log>, 
>> KV<String, T>>
>>
>> And if the superclass Entity does not have a Default coder I get the
>> following error:
>>
>> java.lang.IllegalStateException: Unable to return a default Coder for 
>> Materialize/ParMultiDo(Materialize).out0 [PCollection]. Correct one of the 
>> following root causes:
>>   No Coder has been manually specified;  you may do so using .setCoder().
>>   Inferring a Coder from the CoderRegistry failed: Cannot provide coder for 
>> parameterized type org.apache.beam.sdk.values.KV<java.lang.String, T>: 
>> Unable to provide a Coder for T.
>>   Building a Coder using a registered CoderProvider failed.
>>   See suppressed exceptions for detailed failures.
>>   Using the default output Coder from the producing PTransform failed: 
>> PTransform.getOutputCoder called.
>>
>> Which kind of makes sense i guess as it needs to make sure there is a
>> coder for all possible classes in runtime. The Sub classes do also have a
>> coder specified (all use AvroCoder) and seem to work fine when running the
>> pipeline as I get the data I am expecting as output (in Kafka).
>>
>> Any ideas on what do do? I tried to look at the PAssert to figure out why
>> this could be but it was a bit over my head.
>>
>> Thanks,
>> Vilhelm von Ehrenheim
>> ​
>>
>>
>>
>> ----------------------------
>>
>> http://www.bbc.co.uk
>> This e-mail (and any attachments) is confidential and may contain
>> personal views which are not the views of the BBC unless specifically
>> stated.
>> If you have received it in error, please delete it from your system.
>> Do not use, copy or disclose the information in any way nor act in
>> reliance on it and notify the sender immediately.
>> Please note that the BBC monitors e-mails sent or received.
>> Further communication will signify your consent to this.
>>
>> ---------------------
>>
>

Reply via email to