Digging a bit more in this. I might have set the Coder in the wrong place
before.

Using setCoder directly after the parameterized transform, explicitly
specifying the correct subclass (`AvroCoder.of(Person.class)` in this case)
does give me a Person object in the next step as suggested.

Thank you so much and sorry for my confusion!

Still a bit interesting that I didn't need to do this explicitly when
running in Dataflow but it was needed in the Testing case.

Br,
Vilhelm

On Wed, Sep 20, 2017 at 11:50 PM, Vilhelm von Ehrenheim <
[email protected]> wrote:

> I dont have any mix of type in the actual pcollections. I just wanted to
> parameterize a transform that does essentially the same kind of aggregation
> regardless of the types used for the aggregation. They are later used in
> different places in my pipelines.
>
> I'm not even sure I have a problem with the coder to be honest. It does
> work really well running it in Dataflow but gives me strange results when
> running it in the tests. Even though I'm doing the same transforms. Thats
> why I thought there might be a difference in how PAssert does it but I
> realize that transforming it into a string before the PAssert also gives me
> the superclass representation. I really don't see what could be the cause
> of the difference as this is not the case in Dataflow.
>
>
> On Wed, Sep 20, 2017 at 11:39 PM, Thomas Groh <[email protected]> wrote:
>
>> It looks like you're trying to encode a field with a null value, and Avro
>> does not consider that field to be nullable. I believe you can use
>> "ReflectData.AllowNull" to construct a schema which can encode/decode these
>> records, but by default such a thing is not done. Additionally, if there
>> are multiple types which require different encodings or processing, it gets
>> harder to reason about the coder decoding the original inputs with full
>> fidelity - splitting these into multiple PCollections is preferable if they
>> need to be processed as their original type.
>>
>> On Wed, Sep 20, 2017 at 2:24 PM, Vilhelm von Ehrenheim <
>> [email protected]> wrote:
>>
>>> @Daniel: What is the TestSatisfies in your PAssert.satisfies? I tried
>>> with my own SerializableFunction but cannot get it to work.
>>>
>>> @Eugene: I can get rid of the error by assigning a default coder to the
>>> superclass. However the test problem persists. I tried to do a
>>> .setCoder() on the resulting collection but that faild with a
>>> NullPointerException. I really don’t get why since the stacktrace is quite
>>> extensive but not very helpful:
>>>
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
>>> java.lang.NullPointerException: in co.motherbrain.materializer.Person null 
>>> of co.motherbrain.materializer.Person
>>>
>>>     at 
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
>>>     at 
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
>>>     at 
>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
>>>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>>     at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:356)
>>>     at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:338)
>>>     at 
>>> co.motherbrain.materializer.MaterializerTest.testMaterializePersonFn(MaterializerTest.java:109)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>     at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at 
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>>>     at 
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>     at 
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>>>     at 
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>     at 
>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>>     at 
>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:327)
>>>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>>     at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>>>     at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>>>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>>>     at 
>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>>>     at 
>>> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>>>     at 
>>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>>>     at 
>>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
>>> Caused by: java.lang.NullPointerException: in 
>>> co.motherbrain.materializer.Person null of 
>>> co.motherbrain.materializer.Person
>>>     at 
>>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>>>     at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
>>>     at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
>>>     at 
>>> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
>>>     at 
>>> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
>>>     at 
>>> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
>>>     at 
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
>>>     at 
>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
>>>     at 
>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
>>>     at 
>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
>>>     at 
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
>>> Caused by: java.lang.NullPointerException
>>>     at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:158)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:164)
>>>     at 
>>> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
>>>     at 
>>> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
>>>     at 
>>> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>>>     at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
>>>     at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
>>>     at 
>>> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
>>>     at 
>>> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
>>>     at 
>>> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
>>>     at 
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
>>>     at 
>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
>>>     at 
>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
>>>     at 
>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
>>>     at 
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
>>>     at 
>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>  Source)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
>>>     at 
>>> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70)
>>>     at 
>>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182)
>>>     at 
>>> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
>>>     at 
>>> org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
>>>     at 
>>> org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
>>>     at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> is that to be expected or am I missing something? I thought that was the
>>> coder it was using already. If it uses the Superclass coder I would expect
>>> it to lose data but it seems to give me meaningful output.
>>>
>>> Really appreciate the help guys!
>>>
>>> // Vilhelm
>>> ​
>>>
>>> On Wed, Sep 20, 2017 at 8:33 PM, Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>>> The error suggests that you .setCoder() on your resulting collection,
>>>> have you tried that? The error seems unrelated to PAssert.
>>>>
>>>> On Wed, Sep 20, 2017, 7:46 AM Daniel Harper <[email protected]>
>>>> wrote:
>>>>
>>>>> I’ve been able to use generics + PAssert testing before, I have a
>>>>> class that looks like this
>>>>>
>>>>> public class TagByConsumptionType<Record extends CoreEntity> extends 
>>>>> DoFn<Record, Record> {
>>>>>
>>>>>     @ProcessElement
>>>>>     public void process(ProcessContext c) { …
>>>>>
>>>>>
>>>>> Where Record is the generic type.
>>>>>
>>>>> The CoreEntity class looks like this
>>>>>
>>>>> @DefaultCoder(AvroCoder.class)
>>>>> public class CoreEntity implements Serializable {
>>>>>
>>>>>
>>>>> Testing it just involved
>>>>>
>>>>> List<CoreEntity> records = … // create test records
>>>>>
>>>>> PCollection<CoreEntity> inputRecords = pipeline.apply(Create.of(records));
>>>>>
>>>>> PCollection<CoreEntity> output = 
>>>>> input.apply(ParDo.of(myTagByConsumptionTypeFunc));
>>>>>
>>>>> PAssert.that(output).satisfies(new TestSatisfies<>(results -> {
>>>>> assertThat(results, hasSize(3));
>>>>> // other asserts
>>>>> }));
>>>>>
>>>>> pipeline.run();
>>>>>
>>>>>
>>>>> Hope that helps.
>>>>>
>>>>> From: Vilhelm von Ehrenheim <[email protected]>
>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>> Date: Wednesday, 20 September 2017 at 15:29
>>>>> To: "[email protected]" <[email protected]>
>>>>> Subject: Testing transforms with generics using
>>>>> PAssert.containsAnyOrder
>>>>>
>>>>> Hi!
>>>>> I have a parameterized a DoFn using generics to return different
>>>>> Classes depending on where I use it. The generics type is required to be
>>>>> extending a superclass I have specified myself.
>>>>>
>>>>> The ParDo seem to function as expected and I get the right output
>>>>> using it but when I try to test this the PAssert containsAnyOrder only
>>>>> compares the superclass of the objects and not the parameterized type.
>>>>>
>>>>> I might be missing something Java specific here but the PAssert is
>>>>> really hard to get what it is actually doing looking at the code.
>>>>>
>>>>> So is there any way to force it to compare the correct class or can I
>>>>> in some way extract the objects of the pcollection in the test and do the
>>>>> checks manually?
>>>>>
>>>>> The method signature looks like this:
>>>>>
>>>>> static class MaterializeFn<T extends Entity> extends DoFn<KV<String, 
>>>>> Log>, KV<String, T>>
>>>>>
>>>>> And if the superclass Entity does not have a Default coder I get the
>>>>> following error:
>>>>>
>>>>> java.lang.IllegalStateException: Unable to return a default Coder for 
>>>>> Materialize/ParMultiDo(Materialize).out0 [PCollection]. Correct one of 
>>>>> the following root causes:
>>>>>   No Coder has been manually specified;  you may do so using .setCoder().
>>>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide coder 
>>>>> for parameterized type org.apache.beam.sdk.values.KV<java.lang.String, 
>>>>> T>: Unable to provide a Coder for T.
>>>>>   Building a Coder using a registered CoderProvider failed.
>>>>>   See suppressed exceptions for detailed failures.
>>>>>   Using the default output Coder from the producing PTransform failed: 
>>>>> PTransform.getOutputCoder called.
>>>>>
>>>>> Which kind of makes sense i guess as it needs to make sure there is a
>>>>> coder for all possible classes in runtime. The Sub classes do also have a
>>>>> coder specified (all use AvroCoder) and seem to work fine when running the
>>>>> pipeline as I get the data I am expecting as output (in Kafka).
>>>>>
>>>>> Any ideas on what do do? I tried to look at the PAssert to figure out
>>>>> why this could be but it was a bit over my head.
>>>>>
>>>>> Thanks,
>>>>> Vilhelm von Ehrenheim
>>>>> ​
>>>>>
>>>>>
>>>>>
>>>>> ----------------------------
>>>>>
>>>>> http://www.bbc.co.uk
>>>>> This e-mail (and any attachments) is confidential and may contain
>>>>> personal views which are not the views of the BBC unless specifically
>>>>> stated.
>>>>> If you have received it in error, please delete it from your system.
>>>>> Do not use, copy or disclose the information in any way nor act in
>>>>> reliance on it and notify the sender immediately.
>>>>> Please note that the BBC monitors e-mails sent or received.
>>>>> Further communication will signify your consent to this.
>>>>>
>>>>> ---------------------
>>>>>
>>>>
>>>
>>
>

Reply via email to