please find another unit test with more realistic usecase. I am compiling it
using Java 8 and the stacktrace I get from the testGeneric is attached.
thx,a.
On Friday, 12 May 2017, 1:52, Thomas Groh <[email protected]> wrote:
Hey Antony;
I've tried to update your code to compile in Java 7 and removed the
SerializableUtils Dan mentioned, and I can't seem to reproduce the issue. Can
you share more about the specific issue you're having? A stack trace would be
really helpful.
Thanks,
Thomas
On Thu, May 11, 2017 at 4:42 PM, Dan Halperin <[email protected]> wrote:
Hi Anthony,
I'm a little confused by your code snippet:
SerializableUtils.serializeToB yteArray( input.apply(ParDo.of(new
NonGenericOutput<>())).apply(" passing", View.asSingleton()));
is serializing a PCollectionView object. Those are not necessarily serializable
and it's not clear that it makes sense to do this at all.
Can you say more about what you're trying to do?
Thanks,Dan
On Thu, May 11, 2017 at 4:21 PM, Antony Mayi <[email protected]> wrote:
I tried the 2.0.0rc2 and I started facing weird serialization issue that's not
been happening on 0.6.0. I am able to reproduce it using the attached unit test
- see first serialization attempt which is passing ok and second one slightly
different (transform output type is generic) that's failing just on 2.0.0
Can anyone find a way around it? In my real case I depend on generics output
types.
thanks,
antony.
package sertest;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.junit.Rule;
import org.junit.Test;
import java.io.Serializable;
public class SerTest {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@DefaultCoder(SerializableCoder.class) private static class Output<T extends Serializable> implements Serializable {
// body irrelevant
}
private static class GenericOutput<T extends Serializable> extends DoFn<Integer, Output<T>> {
@ProcessElement public void processElement(ProcessContext c) {
// body irrelevant
}
}
private static class NonGenericOutput<T extends Serializable> extends DoFn<Integer, Output> {
@ProcessElement public void processElement(ProcessContext c) {
// body irrelevant
}
}
private static class Consumer<T extends Serializable> extends DoFn<Integer, Void> {
private final PCollectionView<T> view;
public Consumer(PCollectionView<T> view) {
this.view = view;
}
@ProcessElement public void processElement(ProcessContext c) {
// body irrelevant
}
}
// this passes OK
@Test public void testNonGeneric() {
PCollection<Integer> input = this.pipeline.apply(Create.of(1));
PCollectionView<Output> view = input.apply(ParDo.of(new NonGenericOutput<>())).apply("passing", View.asSingleton());
input.apply(ParDo.of(new Consumer<>(view)).withSideInputs(view));
this.pipeline.run();
}
// this fails on beam 2.0 while it works OK on 0.6.0
@Test public void testGeneric() {
PCollection<Integer> input = this.pipeline.apply(Create.of(1));
PCollectionView<Output<Integer>> view = input.apply(ParDo.of(new GenericOutput<Integer>())).apply("failing", View.asSingleton());
input.apply(ParDo.of(new Consumer<>(view)).withSideInputs(view));
this.pipeline.run();
}
}
java.lang.IllegalArgumentException: unable to serialize
sertest.SerTest$Consumer@2371aaca
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
at
org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at
org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
at sertest.SerTest.testGeneric(SerTest.java:60)