SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's 
identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ 
and 5.7.6+ requirements SSL connection must be established by default if 
explicit option isn't set. For compliance with existing applications not using 
SSL the verifyServerCertificate property is set to 'false'. You need either to 
explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide 
truststore for server certificate verification.

Exception in thread "main" java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
c53425963e2c88e84a2e699732565e1a)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)

at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)

at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at 
org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)

at 
org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink$.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala:74)

at 
org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala)

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: c53425963e2c88e84a2e699732565e1a)

at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)

at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)

at akka.dispatch.OnComplete.internal(Future.scala:264)

at akka.dispatch.OnComplete.internal(Future.scala:261)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)

at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)

at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)

at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)

at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)

at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)

at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)

at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)

at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)

at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)

at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)

at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)

at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.

at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 31 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

... 4 more

Caused by: java.lang.IllegalArgumentException: open() failed.

at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:74)

at 
org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.open(JDBCSinkFunction.java:55)

at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using 
password: NO)

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:871)

at 
com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1714)

at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1224)

at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2190)

at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2221)

at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2016)

at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:776)

at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)

at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:386)

at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)

at java.sql.DriverManager.getConnection(DriverManager.java:664)

at java.sql.DriverManager.getConnection(DriverManager.java:270)

at 
org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:68)

at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:71)

... 12 more




query:
    val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = TableEnvironment.create(settings)

        tableEnv.sqlUpdate(
"""
            |
            |CREATE TABLE user_log (
            |    id INT
            |) WITH (
            |    'connector.type' = 'kafka',
            |    'connector.version' = 'universal',
            |    'connector.topic' = 'num',
            |    'connector.startup-mode' = 'latest-offset',
            |    'connector.properties.0.key' = 'zookeeper.connect',
            |    'connector.properties.0.value' = 
'cdh1:2181,cdh2:2181,cdh3:2181',
            |    'connector.properties.1.key' = 'bootstrap.servers',
            |    'connector.properties.1.value' = 
'cdh1:9092,cdh2:9092,cdh3:9092',
            |    'update-mode' = 'append',
            |    'format.type' = 'json',
            |    'format.derive-schema' = 'true'
            |)
            |""".stripMargin)

val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
      .setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://localhost:3306/dashboard")
       .setQuery("INSERT INTO t2(id) VALUES (?)")
.setParameterTypes(BasicTypeInfo.INT_TYPE_INFO)
      .build()

    tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
Array[String]("id"),
Array[TypeInformation[_]](Types.INT),
sink)

    tableEnv.sqlQuery("select id from user_log")
      .insertInto("jdbcOutputTable")

    tableEnv.execute("from kafka sink mysql")

回复