Hi all,
I'm currently trying to write a TestStream to validate the windowing logic
in a Beam pipeline.
I'm creating a teststream of Strings and applying the different PTransforms
to the stream, ending with a PAssert on some of the events I created
TestStream<String> events = TestStream.create(AvroCoder.of(String.class))
.addElements("", "")
.advanceWatermarkToInfinity();
PCollection<KV<String, ArrayList<String>>> eventsSessionised = p.apply(events)
.apply(new Processing(new
TupleTag<invalidJSON>() {
}, new TupleTag<Event>() {
}, new TupleTag<Event>() {
}, eventsEnrichedKeyedTag, "", "",
"")).get(eventsEnrichedKeyedTag)
.apply(new
Sessionisation(SESSION_GAP_SIZE_HOURS, SESSION_CUT_OFF,
ALLOWED_LATENESS_MINUTES))
.apply(new Aggregation(uniqueEventsTag, new
TupleTag<EventEnriched>() {
})).get(uniqueEventsTag).apply(ParDo.of(new
EventToKV()));
PAssert.that(eventsSessionised).inOnTimePane(new
IntervalWindow(baseTime, endWindow1)).containsInAnyOrder(e1,
e2);
Running the test function with in a main functions (new
IngestionPipeLineTest().testOnTimeEvents();) causes the following error:
Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize
pointing at a custom DoFn which runs fine running the main pipeline.
Not sure why this error gets thrown all of a sudden, any pointers /
help would be greatly appreciated.
Full stacktrace:
Exception in thread "main" java.lang.IllegalArgumentException: unable
to serialize xxx
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at
org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at
org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
at xxx.transforms.Processing.expand(Processing.java:52)
at xxx.transforms.Processing.expand(Processing.java:1)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)
at
xxx.IngestionPipeLineTest.testOnTimeEvents(IngestionPipeLineTest.java:96)
at xxx.IngestionPipeLineTest.main(IngestionPipeLineTest.java:155)
Caused by: java.io.NotSerializableException:
org.apache.beam.sdk.testing.TestPipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
... 10 more
Best,
Matthias