devinbost commented on issue #4721: Flink pulsar sink NotSerializableException URL: https://github.com/apache/pulsar/issues/4721#issuecomment-561965357 Does anyone know of a workaround? I'm stuck on this exact issue. You should be able to reproduce it with this example: ``` import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer; import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; import static java.nio.charset.StandardCharsets.UTF_8; public class StreamingJob { public static Tuple2<String,String> mapToTuple(String incomingMessage) throws ParseException { JSONObject incomingObj = (JSONObject) new JSONParser().parse(incomingMessage); JSONObject correlationIdJson = (JSONObject) incomingObj.get("correlationId"); String correlationId = ""; if(correlationIdJson != null){ correlationId = correlationIdJson.toString(); } // Put in try/catch to throw exception if correlationIdJson == null Tuple2 msgEnvelope = new Tuple2(correlationId, incomingObj.toString()); return msgEnvelope; } private static class JsonConcatenator implements AggregateFunction<Tuple2<String, String>, Tuple2<String, String>, String> { @Override public Tuple2<String, String> createAccumulator() { return new Tuple2<String, String>("",""); } @Override public Tuple2<String, String> add(Tuple2<String, String> value, Tuple2<String, String> accumulator) { return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1); } @Override public String getResult(Tuple2<String, String> accumulator) { return "[" + accumulator.f1 + "]"; } @Override public Tuple2<String, String> merge(Tuple2<String, String> a, Tuple2<String, String> b) { return new Tuple2<>(a.f0, a.f1 + ", " + b.f1); } } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String SERVICE_URL = "pulsar://localhost:6650"; String INPUT_TOPIC = "persistent://public/default/test-flink-jaeger-spanner-input"; String SUBSCRIPTION_NAME = "test-jaeger-spanner"; String OUTPUT_TOPIC = "persistent://public/default/test-flink-jaeger-spanner-output"; PulsarSourceBuilder<String> builder = PulsarSourceBuilder .builder(new SimpleStringSchema()) .serviceUrl(SERVICE_URL) .topic(INPUT_TOPIC) .subscriptionName(SUBSCRIPTION_NAME); SourceFunction<String> src = builder.build(); DataStream<String> dataStream = env.addSource(src); DataStream<String> combinedEnvelopes = dataStream .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2 map(String incomingMessage) throws Exception { return mapToTuple(incomingMessage); } }) .keyBy(0) .window(EventTimeSessionWindows.withGap(Time.seconds(20))) .aggregate(new JsonConcatenator()) .returns(String.class); combinedEnvelopes.addSink(new FlinkPulsarProducer<>( SERVICE_URL, OUTPUT_TOPIC, new AuthenticationDisabled(), // probably need to fix // AuthenticationTls() combinedData -> combinedData.toString().getBytes(UTF_8), combinedData -> null) ); // execute program env.execute("Flink Streaming Java API Skeleton"); } } ``` with this POM file: ``` <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>flink-poc</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>Flink Quickstart Job</name> <url>http://www.myorganization.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.9.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <pulsar.version>2.4.0</pulsar.version> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- https://mvnrepository.com/artifact/io.jaegertracing/jaeger-client --> <dependency> <groupId>io.jaegertracing</groupId> <artifactId>jaeger-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-functions-api</artifactId> <version>${pulsar.version}</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-io-core</artifactId> <version>${pulsar.version}</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.4.0</version> <!-- What's the latest stable version???--> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-admin</artifactId> <version>2.4.0</version> </dependency> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-flink --> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-flink</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.overstock.dataeng.jaeger.spanner.StreamingJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>assemble-all</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --> <plugin> <groupId>org.eclipse.m2e</groupId> <artifactId>lifecycle-mapping</artifactId> <version>1.0.0</version> <configuration> <lifecycleMappingMetadata> <pluginExecutions> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <versionRange>[3.0.0,)</versionRange> <goals> <goal>shade</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <versionRange>[3.1,)</versionRange> <goals> <goal>testCompile</goal> <goal>compile</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> </pluginExecutions> </lifecycleMappingMetadata> </configuration> </plugin> </plugins> </pluginManagement> </build> <!-- This profile helps to make things run out of the box in IntelliJ --> <!-- Its adds Flink's core classes to the runtime class path. --> <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' --> <profiles> <profile> <id>add-dependencies-for-IDEA</id> <activation> <property> <name>idea.version</name> </property> </activation> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> </dependencies> </profile> </profiles> </project> ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services