Digging a bit more in this. I might have set the Coder in the wrong place before.
Using setCoder directly after the parameterized transform, explicitly specifying the correct subclass (`AvroCoder.of(Person.class)` in this case) does give me a Person object in the next step as suggested. Thank you so much and sorry for my confusion! Still a bit interesting that I didn't need to do this explicitly when running in Dataflow but it was needed in the Testing case. Br, Vilhelm On Wed, Sep 20, 2017 at 11:50 PM, Vilhelm von Ehrenheim < [email protected]> wrote: > I dont have any mix of type in the actual pcollections. I just wanted to > parameterize a transform that does essentially the same kind of aggregation > regardless of the types used for the aggregation. They are later used in > different places in my pipelines. > > I'm not even sure I have a problem with the coder to be honest. It does > work really well running it in Dataflow but gives me strange results when > running it in the tests. Even though I'm doing the same transforms. Thats > why I thought there might be a difference in how PAssert does it but I > realize that transforming it into a string before the PAssert also gives me > the superclass representation. I really don't see what could be the cause > of the difference as this is not the case in Dataflow. > > > On Wed, Sep 20, 2017 at 11:39 PM, Thomas Groh <[email protected]> wrote: > >> It looks like you're trying to encode a field with a null value, and Avro >> does not consider that field to be nullable. I believe you can use >> "ReflectData.AllowNull" to construct a schema which can encode/decode these >> records, but by default such a thing is not done. Additionally, if there >> are multiple types which require different encodings or processing, it gets >> harder to reason about the coder decoding the original inputs with full >> fidelity - splitting these into multiple PCollections is preferable if they >> need to be processed as their original type. >> >> On Wed, Sep 20, 2017 at 2:24 PM, Vilhelm von Ehrenheim < >> [email protected]> wrote: >> >>> @Daniel: What is the TestSatisfies in your PAssert.satisfies? I tried >>> with my own SerializableFunction but cannot get it to work. >>> >>> @Eugene: I can get rid of the error by assigning a default coder to the >>> superclass. However the test problem persists. I tried to do a >>> .setCoder() on the resulting collection but that faild with a >>> NullPointerException. I really don’t get why since the stacktrace is quite >>> extensive but not very helpful: >>> >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>> java.lang.NullPointerException: in co.motherbrain.materializer.Person null >>> of co.motherbrain.materializer.Person >>> >>> at >>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344) >>> at >>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314) >>> at >>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208) >>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62) >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) >>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:356) >>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:338) >>> at >>> co.motherbrain.materializer.MaterializerTest.testMaterializePersonFn(MaterializerTest.java:109) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >>> at >>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>> at >>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >>> at >>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>> at >>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>> at >>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:327) >>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >>> at >>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) >>> at >>> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) >>> at >>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) >>> at >>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) >>> Caused by: java.lang.NullPointerException: in >>> co.motherbrain.materializer.Person null of >>> co.motherbrain.materializer.Person >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161) >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) >>> at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308) >>> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) >>> at >>> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93) >>> at >>> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77) >>> at >>> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62) >>> at >>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106) >>> at >>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44) >>> at >>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111) >>> at >>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) >>> at >>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122) >>> Caused by: java.lang.NullPointerException >>> at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:158) >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164) >>> at >>> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90) >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191) >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156) >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118) >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) >>> at >>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159) >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) >>> at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308) >>> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) >>> at >>> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93) >>> at >>> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77) >>> at >>> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62) >>> at >>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106) >>> at >>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44) >>> at >>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111) >>> at >>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) >>> at >>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122) >>> at >>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>> Source) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) >>> at >>> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70) >>> at >>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182) >>> at >>> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51) >>> at >>> org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146) >>> at >>> org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> is that to be expected or am I missing something? I thought that was the >>> coder it was using already. If it uses the Superclass coder I would expect >>> it to lose data but it seems to give me meaningful output. >>> >>> Really appreciate the help guys! >>> >>> // Vilhelm >>> >>> >>> On Wed, Sep 20, 2017 at 8:33 PM, Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> The error suggests that you .setCoder() on your resulting collection, >>>> have you tried that? The error seems unrelated to PAssert. >>>> >>>> On Wed, Sep 20, 2017, 7:46 AM Daniel Harper <[email protected]> >>>> wrote: >>>> >>>>> I’ve been able to use generics + PAssert testing before, I have a >>>>> class that looks like this >>>>> >>>>> public class TagByConsumptionType<Record extends CoreEntity> extends >>>>> DoFn<Record, Record> { >>>>> >>>>> @ProcessElement >>>>> public void process(ProcessContext c) { … >>>>> >>>>> >>>>> Where Record is the generic type. >>>>> >>>>> The CoreEntity class looks like this >>>>> >>>>> @DefaultCoder(AvroCoder.class) >>>>> public class CoreEntity implements Serializable { >>>>> >>>>> >>>>> Testing it just involved >>>>> >>>>> List<CoreEntity> records = … // create test records >>>>> >>>>> PCollection<CoreEntity> inputRecords = pipeline.apply(Create.of(records)); >>>>> >>>>> PCollection<CoreEntity> output = >>>>> input.apply(ParDo.of(myTagByConsumptionTypeFunc)); >>>>> >>>>> PAssert.that(output).satisfies(new TestSatisfies<>(results -> { >>>>> assertThat(results, hasSize(3)); >>>>> // other asserts >>>>> })); >>>>> >>>>> pipeline.run(); >>>>> >>>>> >>>>> Hope that helps. >>>>> >>>>> From: Vilhelm von Ehrenheim <[email protected]> >>>>> Reply-To: "[email protected]" <[email protected]> >>>>> Date: Wednesday, 20 September 2017 at 15:29 >>>>> To: "[email protected]" <[email protected]> >>>>> Subject: Testing transforms with generics using >>>>> PAssert.containsAnyOrder >>>>> >>>>> Hi! >>>>> I have a parameterized a DoFn using generics to return different >>>>> Classes depending on where I use it. The generics type is required to be >>>>> extending a superclass I have specified myself. >>>>> >>>>> The ParDo seem to function as expected and I get the right output >>>>> using it but when I try to test this the PAssert containsAnyOrder only >>>>> compares the superclass of the objects and not the parameterized type. >>>>> >>>>> I might be missing something Java specific here but the PAssert is >>>>> really hard to get what it is actually doing looking at the code. >>>>> >>>>> So is there any way to force it to compare the correct class or can I >>>>> in some way extract the objects of the pcollection in the test and do the >>>>> checks manually? >>>>> >>>>> The method signature looks like this: >>>>> >>>>> static class MaterializeFn<T extends Entity> extends DoFn<KV<String, >>>>> Log>, KV<String, T>> >>>>> >>>>> And if the superclass Entity does not have a Default coder I get the >>>>> following error: >>>>> >>>>> java.lang.IllegalStateException: Unable to return a default Coder for >>>>> Materialize/ParMultiDo(Materialize).out0 [PCollection]. Correct one of >>>>> the following root causes: >>>>> No Coder has been manually specified; you may do so using .setCoder(). >>>>> Inferring a Coder from the CoderRegistry failed: Cannot provide coder >>>>> for parameterized type org.apache.beam.sdk.values.KV<java.lang.String, >>>>> T>: Unable to provide a Coder for T. >>>>> Building a Coder using a registered CoderProvider failed. >>>>> See suppressed exceptions for detailed failures. >>>>> Using the default output Coder from the producing PTransform failed: >>>>> PTransform.getOutputCoder called. >>>>> >>>>> Which kind of makes sense i guess as it needs to make sure there is a >>>>> coder for all possible classes in runtime. The Sub classes do also have a >>>>> coder specified (all use AvroCoder) and seem to work fine when running the >>>>> pipeline as I get the data I am expecting as output (in Kafka). >>>>> >>>>> Any ideas on what do do? I tried to look at the PAssert to figure out >>>>> why this could be but it was a bit over my head. >>>>> >>>>> Thanks, >>>>> Vilhelm von Ehrenheim >>>>> >>>>> >>>>> >>>>> >>>>> ---------------------------- >>>>> >>>>> http://www.bbc.co.uk >>>>> This e-mail (and any attachments) is confidential and may contain >>>>> personal views which are not the views of the BBC unless specifically >>>>> stated. >>>>> If you have received it in error, please delete it from your system. >>>>> Do not use, copy or disclose the information in any way nor act in >>>>> reliance on it and notify the sender immediately. >>>>> Please note that the BBC monitors e-mails sent or received. >>>>> Further communication will signify your consent to this. >>>>> >>>>> --------------------- >>>>> >>>> >>> >> >
