我给你一些数据和代码吧!和我真实场景错误一样
订单主表:orders
13点两条记录;order_state是状态 0取消 1待支付
{"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-01
13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-01
13:00:00","update_time":"2020-04-01 13:00:00"}
13:15
来了一条新的记录 取消订单
{"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-01
13:00:00","update_time":"2020-04-01 13:15:00"}
订单明细表:order_detail
4条记录
{"order_no":"order1","product_code":"product1","quantity":3,"create_time":"2020-04-01
13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order1","product_code":"product2","quantity":5,"create_time":"2020-04-01
13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order2","product_code":"product1","quantity":2,"create_time":"2020-04-01
13:00:00","update_time":"2020-04-01 13:00:00"}
{"order_no":"order2","product_code":"product2","quantity":4,"create_time":"2020-04-01
13:00:00","update_time":"2020-04-01 13:00:00"}
需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。
代码
package Learn.kafkasql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class SqlCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env
=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env,settings);
tenv.sqlUpdate("CREATE TABLE orders " +
" (" +
" order_no string," +
" order_state int," +
" pay_time string," +
" create_time string," +
" update_time string" +
" ) " +
" WITH (" +
" 'connector.type' = 'kafka', " +
" 'connector.version' = 'universal', " +//--kafka版本
" 'connector.topic' = 'tp_orders'," +//--kafkatopic
" 'connector.properties.zookeeper.connect' =
'192.168.179.120:2181', " +
" 'connector.properties.bootstrap.servers' =
'192.168.179.120:9092'," +
" 'connector.properties.group.id' = 'testGroup'," +
" 'connector.startup-mode' = 'latest-offset'," +
" 'format.type' = 'json' " +//--数据为json格式
" )");
tenv.sqlUpdate("CREATE TABLE order_detail " +
" (" +
" order_no string," +
" product_code string," +
" quantity int," +
" create_time string," +
" update_time string" +
" ) " +
" WITH (" +
" 'connector.type' = 'kafka', " +
" 'connector.version' = 'universal', " +//--kafka版本
" 'connector.topic' = 'tp_order_detail'," +//--kafkatopic
" 'connector.properties.zookeeper.connect' =
'192.168.179.120:2181', " +
" 'connector.properties.bootstrap.servers' =
'192.168.179.120:9092'," +
" 'connector.properties.group.id' = 'testGroup'," +
" 'connector.startup-mode' = 'latest-offset'," +
" 'format.type' = 'json' " +//--数据为json格式
" )");
tenv.sqlUpdate("CREATE TABLE product_sale" +
" (" +
" order_date string," +
" product_code string," +
" cnt int" +
" ) " +
" WITH (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' =
'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=UTC&useSSL=true', " +
" 'connector.table' = 'order_state_cnt', " +
" 'connector.driver' = 'com.mysql.jdbc.Driver', " +
" 'connector.username' = 'root'," +
" 'connector.password' = '123456'," +
" 'connector.write.flush.max-rows' = '1'," +//--默认每5000条数据写入一次,测试调小一点
" 'connector.write.flush.interval' = '2s'," +//--写入时间间隔
" 'connector.write.max-retries' = '3'" +
" )");
tenv.sqlUpdate("insert into product_sale " +
"select create_date,product_code,sum(quantity)" +
"from (select t1.order_no," +
" t1.create_date," +
" t2.product_code," +
" t2.quantity" +
" from (select order_no," +
" order_state," +
" substring(create_time,1,10) create_date," +
" update_time ," +
" row_number() over(partition by order_no order by
update_time desc) as rn" +
" from orders" +
" )t1" +
" left join order_detail t2" +
" on t1.order_no=t2.order_no" +
" where t1.rn=1" +//--取最新的订单状态数据
" and t1.order_state<>0" +//--不包含取消订单
" )t3" +
" group by create_date,product_code");
Table table = tenv.sqlQuery("select create_date,product_code,sum(quantity)" +
"from (select t1.order_no," +
" t1.create_date," +
" t2.product_code," +
" t2.quantity" +
" from (select order_no," +
" order_state," +
" substring(create_time,1,10) create_date," +
" update_time ," +
" row_number() over(partition by order_no order by
update_time desc) as rn" +
" from orders" +
" )t1" +
" left join order_detail t2" +
" on t1.order_no=t2.order_no" +
" where t1.rn=1" +
" and t1.order_state<>0" +
" )t3" +
" group by create_date,product_code");
tenv.toRetractStream(table, Row.class).print();
tenv.execute("count");
}
}
mysql 建表语句
CREATE TABLE `order_state_cnt` (
`order_date` varchar(12) ,
`product_code` varchar(12) ,
`cnt` int
) ENGINE=InnoDB DEFAULT CHARSET=utf8
使用的是kafka命令行一条条发送数据的方式
主要是deleteStatement.executeBatch();这个方法报错
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}
在2020年4月23日 00:21,Leonard Xu<[email protected]> 写道:
赞详细的分析!
没能复现你说的问题,最后一步的分析应该有点小问题,我看下了jdbc mysql的实现
com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289行
是会判断batchedArgs数组的大小后会直接返回的,应该不会执行,你可以进一步调试确认下
```
if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
return new long[0];
}
```
祝好,
Leonard Xu
在 2020年4月22日,21:58,1101300123 <[email protected]> 写道:
我在SQL关联后把结果写入mysql出现 No value specified for parameter 1错误?
我的版本是1.10.0,代码如下
JDBCUpsertTableSink build = JDBCUpsertTableSink.builder()
.setTableSchema(results.getSchema())
.setOptions(JDBCOptions.builder()
.setDBUrl("。。。。MultiQueries=true&useUnicode=true&characterEncoding=UTF-8")
.setDriverName("com.mysql.jdbc.Driver")
.setUsername("jczx_cjch")
.setPassword("jczx_cjch2")
.setTableName("xkf_join_result")
.build())
.setFlushIntervalMills(1000)
.setFlushMaxSize(100)
.setMaxRetryTimes(3)
.build();
DataStream<Tuple2<Boolean, Row>> retract = bsTableEnv.toRetractStream(results,
Row.class);
retract.print();
build.emitDataStream(retract);
就会出现如下错误
java.sql.SQLException: No value specified for parameter 1
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at
com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.java:2211)
at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2191)
at com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:2121)
at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162)
at
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:118)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDBCUpsertSinkFunction.java:56)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
我的输出数据是(true,2020-04-22 21:34:00,2020-04-22
21:34:15,20200422213541465568468)是这样的
我查看源码发现
先调用JDBCUpsertOutputFormat类的writeRecord方法给UpsertWriter类的成员变量map中添加元素
@Override
public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws
IOException {
checkFlushException();
try {
jdbcWriter.addRecord(tuple2);
batchCount++;
if (batchCount >= flushMaxSize) {
flush();
}
} catch (Exception e) {
throw new RuntimeException("Writing records to JDBC failed.", e);
}
}
之后调用flush()方法,调用UpsertWriter类执行executeBatch方法
public synchronized void flush() throws Exception {
checkFlushException();
for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= maxRetryTimes) {
throw e;
}
Thread.sleep(1000 * i);
}
}
}
然后会调用UpsertWriter类
实现JDBCWriter类在executeBatch方法中先判断map是否为空,然后循环map;之后判断2元组第一个元素的true调用内部类处理元素,否则删除数据;但是我每次的数据只有一条就是map的大小是1,且2元组的第一个元素值是true,循环结束执行
deleteStatement.executeBatch();方法就会出错,因为删除的语句站位符还没有填充;
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}