It looks like you're trying to encode a field with a null value, and Avro
does not consider that field to be nullable. I believe you can use
"ReflectData.AllowNull" to construct a schema which can encode/decode these
records, but by default such a thing is not done. Additionally, if there
are multiple types which require different encodings or processing, it gets
harder to reason about the coder decoding the original inputs with full
fidelity - splitting these into multiple PCollections is preferable if they
need to be processed as their original type.

On Wed, Sep 20, 2017 at 2:24 PM, Vilhelm von Ehrenheim <
[email protected]> wrote:

> @Daniel: What is the TestSatisfies in your PAssert.satisfies? I tried
> with my own SerializableFunction but cannot get it to work.
>
> @Eugene: I can get rid of the error by assigning a default coder to the
> superclass. However the test problem persists. I tried to do a .setCoder()
> on the resulting collection but that faild with a NullPointerException. I
> really don’t get why since the stacktrace is quite extensive but not very
> helpful:
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.NullPointerException: in co.motherbrain.materializer.Person null of 
> co.motherbrain.materializer.Person
>
>     at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
>     at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>     at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:356)
>     at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:338)
>     at 
> co.motherbrain.materializer.MaterializerTest.testMaterializePersonFn(MaterializerTest.java:109)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:327)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>     at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>     at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.NullPointerException: in 
> co.motherbrain.materializer.Person null of co.motherbrain.materializer.Person
>     at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
>     at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>     at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
>     at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
>     at 
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
>     at 
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
>     at 
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
>     at 
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
>     at 
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
>     at 
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
>     at 
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
>     at 
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
> Caused by: java.lang.NullPointerException
>     at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:158)
>     at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
>     at 
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
>     at 
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
>     at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
>     at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
>     at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
>     at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
>     at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>     at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
>     at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
>     at 
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
>     at 
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
>     at 
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
>     at 
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
>     at 
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
>     at 
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
>     at 
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
>     at 
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
>     at 
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
>     at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70)
>     at 
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182)
>     at 
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
>     at 
> org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
>     at 
> org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> is that to be expected or am I missing something? I thought that was the
> coder it was using already. If it uses the Superclass coder I would expect
> it to lose data but it seems to give me meaningful output.
>
> Really appreciate the help guys!
>
> // Vilhelm
> ​
>
> On Wed, Sep 20, 2017 at 8:33 PM, Eugene Kirpichov <[email protected]>
> wrote:
>
>> The error suggests that you .setCoder() on your resulting collection,
>> have you tried that? The error seems unrelated to PAssert.
>>
>> On Wed, Sep 20, 2017, 7:46 AM Daniel Harper <[email protected]>
>> wrote:
>>
>>> I’ve been able to use generics + PAssert testing before, I have a class
>>> that looks like this
>>>
>>> public class TagByConsumptionType<Record extends CoreEntity> extends 
>>> DoFn<Record, Record> {
>>>
>>>     @ProcessElement
>>>     public void process(ProcessContext c) { …
>>>
>>>
>>> Where Record is the generic type.
>>>
>>> The CoreEntity class looks like this
>>>
>>> @DefaultCoder(AvroCoder.class)
>>> public class CoreEntity implements Serializable {
>>>
>>>
>>> Testing it just involved
>>>
>>> List<CoreEntity> records = … // create test records
>>>
>>> PCollection<CoreEntity> inputRecords = pipeline.apply(Create.of(records));
>>>
>>> PCollection<CoreEntity> output = 
>>> input.apply(ParDo.of(myTagByConsumptionTypeFunc));
>>>
>>> PAssert.that(output).satisfies(new TestSatisfies<>(results -> {
>>> assertThat(results, hasSize(3));
>>> // other asserts
>>> }));
>>>
>>> pipeline.run();
>>>
>>>
>>> Hope that helps.
>>>
>>> From: Vilhelm von Ehrenheim <[email protected]>
>>> Reply-To: "[email protected]" <[email protected]>
>>> Date: Wednesday, 20 September 2017 at 15:29
>>> To: "[email protected]" <[email protected]>
>>> Subject: Testing transforms with generics using PAssert.containsAnyOrder
>>>
>>> Hi!
>>> I have a parameterized a DoFn using generics to return different Classes
>>> depending on where I use it. The generics type is required to be extending
>>> a superclass I have specified myself.
>>>
>>> The ParDo seem to function as expected and I get the right output using
>>> it but when I try to test this the PAssert containsAnyOrder only compares
>>> the superclass of the objects and not the parameterized type.
>>>
>>> I might be missing something Java specific here but the PAssert is
>>> really hard to get what it is actually doing looking at the code.
>>>
>>> So is there any way to force it to compare the correct class or can I in
>>> some way extract the objects of the pcollection in the test and do the
>>> checks manually?
>>>
>>> The method signature looks like this:
>>>
>>> static class MaterializeFn<T extends Entity> extends DoFn<KV<String, Log>, 
>>> KV<String, T>>
>>>
>>> And if the superclass Entity does not have a Default coder I get the
>>> following error:
>>>
>>> java.lang.IllegalStateException: Unable to return a default Coder for 
>>> Materialize/ParMultiDo(Materialize).out0 [PCollection]. Correct one of the 
>>> following root causes:
>>>   No Coder has been manually specified;  you may do so using .setCoder().
>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide coder for 
>>> parameterized type org.apache.beam.sdk.values.KV<java.lang.String, T>: 
>>> Unable to provide a Coder for T.
>>>   Building a Coder using a registered CoderProvider failed.
>>>   See suppressed exceptions for detailed failures.
>>>   Using the default output Coder from the producing PTransform failed: 
>>> PTransform.getOutputCoder called.
>>>
>>> Which kind of makes sense i guess as it needs to make sure there is a
>>> coder for all possible classes in runtime. The Sub classes do also have a
>>> coder specified (all use AvroCoder) and seem to work fine when running the
>>> pipeline as I get the data I am expecting as output (in Kafka).
>>>
>>> Any ideas on what do do? I tried to look at the PAssert to figure out
>>> why this could be but it was a bit over my head.
>>>
>>> Thanks,
>>> Vilhelm von Ehrenheim
>>> ​
>>>
>>>
>>>
>>> ----------------------------
>>>
>>> http://www.bbc.co.uk
>>> This e-mail (and any attachments) is confidential and may contain
>>> personal views which are not the views of the BBC unless specifically
>>> stated.
>>> If you have received it in error, please delete it from your system.
>>> Do not use, copy or disclose the information in any way nor act in
>>> reliance on it and notify the sender immediately.
>>> Please note that the BBC monitors e-mails sent or received.
>>> Further communication will signify your consent to this.
>>>
>>> ---------------------
>>>
>>
>

Reply via email to