Hi, - JDBC是upsert sink,所以你需要toUpsertStream,而不是toRetractStream,建议你用完整的DDL来插入mysql的表。 - 这个异常看起来是JDBC的bug,你可以建个JIRA来跟踪吗?
Best, Jingsong Lee On Wed, Apr 22, 2020 at 9:58 PM 1101300123 <[email protected]> wrote: > > > 我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误? > 我的版本是1.10.0,代码如下 > JDBCUpsertTableSink build = JDBCUpsertTableSink.builder() > .setTableSchema(results.getSchema()) > .setOptions(JDBCOptions.builder() > > .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8") > .setDriverName("com.mysql.jdbc.Driver") > .setUsername("jczx_cjch") > .setPassword("jczx_cjch2") > .setTableName("xkf_join_result") > .build()) > .setFlushIntervalMills(1000) > .setFlushMaxSize(100) > .setMaxRetryTimes(3) > .build(); > > > DataStream<Tuple2<Boolean, Row>> retract = > bsTableEnv.toRetractStream(results, Row.class); > retract.print(); > build.emitDataStream(retract); > > > > > 就会出现如下错误 > java.sql.SQLException: No value specified for parameter 1 > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) > at > com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211) > at > com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191) > at > com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121) > at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) > at org.apache.flink.api.java.io > .jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118) > at org.apache.flink.api.java.io > .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at org.apache.flink.api.java.io > .jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) > > > > > 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22 > 21:34:15,20200422213541465568468)是这样的 > 我查看源码发现 > 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素 > @Override > public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws > IOException { > checkFlushException(); > try { > jdbcWriter.addRecord(tuple2); > batchCount++; > if (batchCount >= flushMaxSize) { > flush(); > } > } catch (Exception e) { > throw new RuntimeException("Writing records to JDBC failed.", e); > } > } > 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法 > public synchronized void flush() throws Exception { > checkFlushException(); > for (int i = 1; i <= maxRetryTimes; i++) { > try { > jdbcWriter.executeBatch(); > batchCount = 0; > break; > } catch (SQLException e) { > LOG.error("JDBC executeBatch error, retry times = {}", i, e); > if (i >= maxRetryTimes) { > throw e; > } > Thread.sleep(1000 * i); > } > } > } > > > 然后会调用UpsertWriter类 > 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行 > deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充; > > > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) > { > Row pk = entry.getKey(); > Tuple2<Boolean, Row> tuple = entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > } -- Best, Jingsong Lee
