赞详细的分析!
没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现
com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行
是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下
```
if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
return new long[0];
}
```
祝好,
Leonard Xu
> 在 2020年4月22日,21:58,1101300123 <[email protected]> 写道:
>
>
>
> 我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误?
> 我的版本是1.10.0,代码如下
> JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()
> .setTableSchema(results.getSchema())
> .setOptions(JDBCOptions.builder()
>
> .setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
> .setDriverName("com.mysql.jdbc.Driver")
> .setUsername("jczx_cjch")
> .setPassword("jczx_cjch2")
> .setTableName("xkf_join_result")
> .build())
> .setFlushIntervalMills(1000)
> .setFlushMaxSize(100)
> .setMaxRetryTimes(3)
> .build();
>
>
> DataStream<Tuple2<Boolean, Row>> retract =
> bsTableEnv.toRetractStream(results, Row.class);
> retract.print();
> build.emitDataStream(retract);
>
>
>
>
> 就会出现如下错误
> java.sql.SQLException: No value specified for parameter 1
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> at
> com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
> at
> com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
> at
> com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
> at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
> at
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
> at
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
> at
> org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
>
>
>
>
> 我的输出数据是(true,2020-04-22 21:34:00,2020-04-22
> 21:34:15,20200422213541465568468)是这样的
> 我查看源码发现
> 先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
> @Override
> public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws
> IOException {
> checkFlushException();
> try {
> jdbcWriter.addRecord(tuple2);
> batchCount++;
> if (batchCount >= flushMaxSize) {
> flush();
> }
> } catch (Exception e) {
> throw new RuntimeException("Writing records to JDBC failed.", e);
> }
> }
> 之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
> public synchronized void flush() throws Exception {
> checkFlushException();
> for (int i = 1; i <= maxRetryTimes; i++) {
> try {
> jdbcWriter.executeBatch();
> batchCount = 0;
> break;
> } catch (SQLException e) {
> LOG.error("JDBC executeBatch error, retry times = {}", i, e);
> if (i >= maxRetryTimes) {
> throw e;
> }
> Thread.sleep(1000 * i);
> }
> }
> }
>
>
> 然后会调用UpsertWriter类
> 实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行
> deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;
>
>
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
> Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
> if (tuple.f0) {
> processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
> }
> internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
> }