Hi,

I am trying to run a pipeline on Flink 1.8.1 ,getting the following
exception:


















*java.lang.StackOverflowError at
java.lang.Exception.<init>(Exception.java:66) at
java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at
java.lang.Class.getDeclaredMethod(Class.java:2130) at
org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*

I have even tried running in legacy mode, the pipeline code is :

private void execute(String[] args) {
        ParameterTool pt = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setMaxParallelism(30);
        env.setParallelism(20);

        env.enableCheckpointing(5000);
        StateBackend backend = new
FsStateBackend(pt.getRequired("checkpoint_path"), true);
        env.setStateBackend(backend);

        FlinkDynamoDBStreamsConsumer<ObjectNode>
flinkDynamoDBStreamsConsumer =
                new FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME,
new JsonNodeDeserializationSchema(),
                        dynamodbStreamsConsumerConfig);

        SingleOutputStreamOperator<ObjectNode> sourceStream = env
                .addSource(flinkDynamoDBStreamsConsumer)
                .name("Dynamo DB Streams");

        sourceStream
                .keyBy(new CdcKeySelector())
                .addSink(new
FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
                        new JsonSerializationSchema()))
                .name("Kafka Sink");

        try {
            env.execute();
        } catch (Exception e) {
            System.out.println("Caught exception for pipeline" +
e.getMessage());
            e.printStackTrace();
        }
    }

Regards,
Vinay Patil

Reply via email to