测试发了10个线程,每个线程1000次,一共1万条记录
会在写入几千条的时候挂掉
2020-10-29 12:04:55,573 WARN org.apache.flink.runtime.taskmanager.Task
[] - Join(joinType=[LeftOuterJoin], where=[(ID = ID1)], select=[ID,
PRODUCT_SERVICE, CUSTOMER_NO, CUSTOMER_NAME, CUSTOMER_REQUEST_NO, EXTE
RNAL_NO, STATUS, ORDER_DATE, CREATE_TIME, COUPON_AMOUNT, ID0,
CHANNEL_RET_CODE, CHANNEL_RET_MSG, STATUS0, CARD_NO, BANK_PAY_WAY,
CREATE_TIME0, UPDATE_TIME0, PAY_AMOUNT, PAYER_FEE, CNET_BIND_CARD_ID,
PAYER_CUSTOMER_REQUEST_NO, OPE RATOR_NAME, CARD_HOLDER_NAME, ID1,
CUSTOMER_BIZ_REQUEST_NO, GOODS_NAME, GOODS_CAT, GOODS_DESC, GOODS_EXT_INFO,
MEMO, EXTEND_INFO], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[ID AS id,
ID0 AS op_id, ORDER_DATE AS order_date, UPDATE_TIME0 AS complete_date,
PAYER_CUSTOMER_REQUEST_NO AS payer_customer_request_no, CREATE_TIME0 AS
pay_time, CUSTOMER_REQUEST_NO AS customer_request_no, EXTERNAL_NO AS
external_no, STA TUS0 AS pay_status, STATUS AS order_status, PAY_AMOUNT
AS pay_amount, ABS(PAYER_FEE) AS payer_fee, BANK_PAY_WAY AS bank_pay_way,
GOODS_CAT AS goods_cat, GOODS_NAME AS goods_name, GOODS_DESC AS productdesc,
GOODS_DESC AS goods_des c, CUSTOMER_BIZ_REQUEST_NO AS
customer_biz_request_no, GOODS_EXT_INFO AS goods_ext_info, MEMO AS memo,
EXTEND_INFO AS extend_info, CHANNEL_RET_CODE AS channel_ret_code,
CHANNEL_RET_MSG AS channel_ret_msg, OPERATOR_NAME AS operato r,
CUSTOMER_NO AS customer_no, CUSTOMER_NAME AS customer_name, PRODUCT_SERVICE
AS extend, CREATE_TIME0 AS payercreatetime, UPDATE_TIME0 AS payerupdatetime,
CARD_NO AS card_no, CARD_HOLDER_NAME AS card_holder_name, CREATE_TIME AS
create_time, CNET_BIND_CARD_ID AS cnetbindcarid, COUPON_AMOUNT AS
coupon_amount]) -> Sink:
Sink(table=[default_catalog.default_database.wide_table_1], fields=[id,
op_id, order_date, complete_date, payer_customer_request_no, pay_t ime,
customer_request_no, external_no, pay_status, order_status, pay_amount,
payer_fee, bank_pay_way, goods_cat, goods_name, productdesc, goods_desc,
customer_biz_request_no, goods_ext_info, memo, extend_info,
channel_ret_code, c hannel_ret_msg, operator, customer_no,
customer_name, extend, payercreatetime, payerupdatetime, card_no,
card_holder_name, create_time, cnetbindcarid, coupon_amount]) (1/1)
(14a0d11067e4779e13ad3e500f2ab29d) switched from RUNNING to FAILED.
java.io.IOException: Writing records to JDBC failed.
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:157)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at StreamExecCalc$147.processElement(Unknown Source) ~[?:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.output(StreamingJoinOperator.java:305)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:278)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-dist_2.12-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-dist_2.12-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
~[flink-table_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:310)
~[flink-table_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.getPrimaryKey(JdbcDynamicOutputFormatBuilder.java:216)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createRowKeyExtractor$7(JdbcDynamicOutputFormatBuilder.java:193)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcDynamicOutputFormatBuilder.java:128)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.connector.jdbc.internal.executor.BufferReduceStatementExecutor.executeBatch(BufferReduceStatementExecutor.java:99)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:200)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:154)
~[flink-connector-jdbc_2.11-1.11.2.jar:1.11.2]
... 32 more
--
Sent from: http://apache-flink.147419.n8.nabble.com/