代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化?

麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗?


        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
        at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198)
        at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94)
        at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
        at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
        at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103)
        at
com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132)
        at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)



代码=====>

public class PostgresSinkMapFunction extends RichFlatMapFunction<String,
String> {
    private static String driverClass = "org.postgresql.Driver";
    private static String dbUrl =
"jdbc:postgresql://localhost:5432/ai_audio_lyric_task";
    private static String userNmae = "postgres";
    private static String passWord = "1";

    // 表status
    private static JdbcRowDataOutputFormat outputFormatStatus;
    private static String[] fieldNames = new String[] {"id", "name"};
    private static DataType[] fieldDataTypes = new DataType[]{
            DataTypes.INT(),
            DataTypes.STRING()};

    private static RowType rowType = RowType.of(
            Arrays.stream(fieldDataTypes)
                    .map(DataType::getLogicalType)
                    .toArray(LogicalType[]::new),
            fieldNames);
    private static RowDataTypeInfo rowDataTypeInfo =
RowDataTypeInfo.of(rowType);

    @Override
    public void flatMap(String s, Collector<String> collector) throws
Exception {
            GenericRowData row = new GenericRowData(2);

             row.setRowKind(INSERT);
             row.setField(0, count);
             row.setField(1, "jindy" + Integer.toString(count));

            outputFormatStatus.writeRecord(row);

    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        JdbcOptions jdbcOptions = JdbcOptions.builder()
                .setDriverName(driverClass)
                .setDBUrl(dbUrl)
                .setTableName("status_mirror")
                .setUsername(userNmae)
                .setPassword(passWord)
                .build();

        JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
                .withTableName(jdbcOptions.getTableName())
                .withDialect(jdbcOptions.getDialect())
                .withFieldNames(fieldNames)
                .build();

        outputFormatStatus =
JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
                .setJdbcOptions(jdbcOptions)
                .setFieldDataTypes(fieldDataTypes)
                .setJdbcDmlOptions(dmlOptions)
               
.setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
                .build();

        // set context,这里有问题!!!!!!!!!!!!!!!!!!
        outputFormatStatus.setRuntimeContext(this.getRuntimeContext());
        outputFormatStatus.open(0, 1);
    }

    public void close() throws Exception {
        super.close();
        outputFormatStatus.close();
    }
}





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复