代码,编译没问题,但运行的时候,RichFlatMapFunction在open的时候,JdbcRowDataOutputFormat.open会core,说RuntimeContext为空,如果去掉outputFormatStatus.setRuntimeContext(this.getRuntimeContext()),又会提示没有初始化?
麻烦大佬帮看看,什么问题啊,是我哪里用的不对吗?
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.createSimpleRowDataExecutor(JdbcRowDataOutputFormat.java:198)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$new$2d156164$1(JdbcRowDataOutputFormat.java:94)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.open(JdbcRowDataOutputFormat.java:103)
at
com.qqmusic.quku.cdcSync.PostgresSinkMapFunction.open(PostgresSinkMapFunction.java:132)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
代码=====>
public class PostgresSinkMapFunction extends RichFlatMapFunction<String,
String> {
private static String driverClass = "org.postgresql.Driver";
private static String dbUrl =
"jdbc:postgresql://localhost:5432/ai_audio_lyric_task";
private static String userNmae = "postgres";
private static String passWord = "1";
// 表status
private static JdbcRowDataOutputFormat outputFormatStatus;
private static String[] fieldNames = new String[] {"id", "name"};
private static DataType[] fieldDataTypes = new DataType[]{
DataTypes.INT(),
DataTypes.STRING()};
private static RowType rowType = RowType.of(
Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new),
fieldNames);
private static RowDataTypeInfo rowDataTypeInfo =
RowDataTypeInfo.of(rowType);
@Override
public void flatMap(String s, Collector<String> collector) throws
Exception {
GenericRowData row = new GenericRowData(2);
row.setRowKind(INSERT);
row.setField(0, count);
row.setField(1, "jindy" + Integer.toString(count));
outputFormatStatus.writeRecord(row);
}
public void open(Configuration parameters) throws Exception {
super.open(parameters);
JdbcOptions jdbcOptions = JdbcOptions.builder()
.setDriverName(driverClass)
.setDBUrl(dbUrl)
.setTableName("status_mirror")
.setUsername(userNmae)
.setPassword(passWord)
.build();
JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
.withTableName(jdbcOptions.getTableName())
.withDialect(jdbcOptions.getDialect())
.withFieldNames(fieldNames)
.build();
outputFormatStatus =
JdbcRowDataOutputFormat.dynamicOutputFormatBuilder()
.setJdbcOptions(jdbcOptions)
.setFieldDataTypes(fieldDataTypes)
.setJdbcDmlOptions(dmlOptions)
.setJdbcExecutionOptions(JdbcExecutionOptions.builder().build())
.build();
// set context,这里有问题!!!!!!!!!!!!!!!!!!
outputFormatStatus.setRuntimeContext(this.getRuntimeContext());
outputFormatStatus.open(0, 1);
}
public void close() throws Exception {
super.close();
outputFormatStatus.close();
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/