I tried the 2.0.0rc2 and I started facing weird serialization issue that's not 
been happening on 0.6.0. I am able to reproduce it using the attached unit test 
- see first serialization attempt which is passing ok and second one slightly 
different (transform output type is generic) that's failing just on 2.0.0

Can anyone find a way around it? In my real case I depend on generics output 
types.

thanks,
antony.
package sertest;

import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;

import java.io.Serializable;

public class SerTest {
    @Rule public final transient TestPipeline pipeline = TestPipeline.create();

    @DefaultCoder(SerializableCoder.class) private static class Output<T extends Serializable> implements Serializable {
        // body irrelevant
    }

    private static class GenericOutput<T extends Serializable> extends DoFn<Integer, Output<T>> {
        @ProcessElement public void processElement(ProcessContext c) {
            // body irrelevant
        }
    }

    private static class NonGenericOutput<T extends Serializable> extends DoFn<Integer, Output> {
        @ProcessElement public void processElement(ProcessContext c) {
            // body irrelevant
        }
    }

    // this passes OK
    @Test public void testNonGeneric() {
        PCollection<Integer> input = this.pipeline.apply(Create.of(1));
        SerializableUtils.serializeToByteArray(input.apply(ParDo.of(new NonGenericOutput<>())).apply("passing", View.asSingleton()));
        this.pipeline.run();
    }

    // this fails on beam 2.0 while it works OK on 0.6.0
    @Test public void testGeneric() {
        PCollection<Integer> input = this.pipeline.apply(Create.of(1));
        SerializableUtils.serializeToByteArray(input.apply(ParDo.of(new GenericOutput<>())).apply("failing", View.asSingleton()));
        this.pipeline.run();
    }
}

Reply via email to