flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
Hello everyone, I am a newbie. I am learning the flink-sql-submit project. From @Jark Wu : https://github.com/wuchong/flink-sql-submit My local environment is: 1. flink1.9.0 standalone 2. kafka_2.11-2.2.0 single I configured Flink Connectors and Formats jars to $FLINK_HOME/lib . Reference: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors Then I run flink-sql-submit , sh run.sh q1 Throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver. My question is: I configured mysql-connector-java in the pom.xml file, mvn build jar include com.mysql.jdbc.Driver. Why is this error still reported? I put the jar package in $FLINK_HOME/lib and the problem can be solved. Do you need to put these jars in $FLINK_HOME/lib when the project relies on too many jar packages? If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I solve this problem? Can @Jark Wu give me some advice? Or can someone give me some advice? Thank you. 1. pom.xml > mysql > mysql-connector-java > 5.1.38 > 2. mvn clean; mvn package $ ll -rth target > > [±master ●] > total 32312 > drwxr-xr-x 3 alex staff96B Oct 30 11:32 generated-sources > drwxr-xr-x 5 alex staff 160B Oct 30 11:32 classes > drwxr-xr-x 3 alex staff96B Oct 30 11:32 maven-archiver > -rw-r--r-- 1 alex staff 7.2M Oct 30 11:32 > flink-sql-submit-1.0-SNAPSHOT.jar > -rw-r--r-- 1 alex staff 8.2M Oct 30 11:32 flink-sql-submit.jar > 3. flink-sql-submit.jar include java.sql.Driver " zip.vim version v28 > " Browsing zipfile > /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar > " Select a file with cursor and press ENTER > > META-INF/MANIFEST.MF > META-INF/ > q1.sql > user_behavior.log > com/ > com/github/ > com/github/wuchong/ > com/github/wuchong/sqlsubmit/ > com/github/wuchong/sqlsubmit/SqlSubmit$1.class > com/github/wuchong/sqlsubmit/SqlSubmit.class > com/github/wuchong/sqlsubmit/SourceGenerator.class > com/github/wuchong/sqlsubmit/cli/ > com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class > com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class > com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class > com/github/wuchong/sqlsubmit/cli/CliOptions.class > com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class > META-INF/maven/ > META-INF/maven/com.github.wuchong/ > META-INF/maven/com.github.wuchong/flink-sql-submit/ > META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml > META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties > META-INF/services/ > META-INF/services/java.sql.Driver > com/mysql/ > com/mysql/fabric/ > com/mysql/fabric/FabricCommunicationException.class > com/mysql/fabric/FabricConnection.class > com/mysql/fabric/FabricStateResponse.class > com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class > com/mysql/fabric/HashShardMapping.class > com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class > com/mysql/fabric/RangeShardMapping.class > com/mysql/fabric/Response.class > com/mysql/fabric/Server.class > com/mysql/fabric/ServerGroup.class > com/mysql/fabric/ServerMode.class > com/mysql/fabric/ServerRole.class > etc ... > $FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w "${PROJECT_DIR}"/src/main/resources/ -f "$1".sql Eerror: 2019-10-30 10:27:35 java.lang.IllegalArgumentException: JDBC driver class not found. At org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112) At org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42) At org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) At org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) At org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) At org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532) At org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396) At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) At java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver At java.net.URLClassLoader.findClass(URLClassLoader.java:381) At java.lang.ClassLoader.loadClass(ClassLoader.java:424) At sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) At java.lang.ClassLoader.loadClass(ClassLoader.java:357) At java.lang.Class.forName0(Native Method) At java.lang.Class.forName(Class.java:264) At org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:66) At org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:99) ... 9 more -- Best
flink1.9.1 on yarn sql 部署问题
hello: 环境: flink1.9.1, on yarn hadoop2.6 flink只安装在了一台提交的机器上, lib目录下有文件: flink-dist_2.11-1.9.1.jar flink-json-1.9.0-sql-jar.jar flink-shaded-hadoop-2-uber-2.6.5-7.0.jar flink-sql-connector-kafka_2.11-1.9.0.jar flink-table_2.11-1.9.1.jar flink-table-blink_2.11-1.9.1.jar log4j-1.2.17.jar slf4j-log4j12-1.7.15.jar // flink-shaded-hadoop-2-uber-2.6.5-7.0.jar , flink-sql-connector-kafka_2.11-1.9.0.jar flink-json-1.9.0-sql-jar.jar 这3个包是安装后,拷贝进去的 问题1:on yarn模式,我是否需要在每台机器上都安装flink软件目录,还是只需要在提交机器上有flink软件目录就行了? 问题2: 我需要用到blink-planner 连接外部kafka(1.1版本,json格式) 来生成sql表,是否 需要在lib目录下添加 flink-sql-connector-kafka_2.11-1.9.0.jar flink-json-1.9.0-sql-jar.jar 还是 在pom文件中 指定依赖,打成fat包 org.apache.flink flink-connector-kafka_2.11 ${flink.version} org.apache.flink flink-json ${flink.version} 问题3: flink run on yarn , 会额外附加lib目录下的jar包到用户jar下,再提交到yarn上运行么
Re: How to use two continuously window with EventTime in sql
Hi, You can use TUMBLE_ROWTIME(...) to get the rowtime attribute of the first window result, and use this field to apply a following window aggregate. See more https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows Best, Jark On Tue, 29 Oct 2019 at 15:39, 刘建刚 wrote: > For one sql window, I can register table with event time and use > time field in the tumble window. But if I want to use the result for the > first window and use another window to process it, how can I do it? Thank > you. >
Re:回复:flink1.9.1 kafka表读取问题
pom 文件 ``` http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> 4.0.0 com.hb flink pom 1.9.1-SNAPSHOT 1.8 UTF-8 1.9.1 2.11 2.11.12 6.2.3 1.72 2.6.2 0.11.0.2 1.2.46 1.9.1 1.2.17 5.1.42 4.18.1 3.6.0 compile compile org.apache.flink flink-core ${flink.version} ${flink.scope.type} org.apache.flink flink-clients_2.11 ${flink.version} ${flink.scope.type} org.apache.flink flink-scala_2.11 ${flink.version} ${flink.scope.type} org.apache.flink flink-streaming-scala_2.11 ${flink.version} ${flink.scope.type} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-table-api-scala-bridge_2.11 ${flink.version} org.apache.flink flink-table-api-java-bridge_2.11 ${flink.version} org.apache.flink flink-table-planner_2.11 ${flink.version} org.apache.flink flink-table-runtime-blink_2.11 ${flink.version} org.apache.flink flink-table-planner-blink_2.11 ${flink.version} org.apache.flink flink-connector-kafka-0.11_2.11 ${flink-connector-kafka} org.apache.flink flink-connector-kafka_2.11 ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-connector-elasticsearch6_2.11 ${flink.version} org.apache.flink flink-runtime-web_2.11 ${flink.version} org.elasticsearch elasticsearch-hadoop ${elasticsearch.hadoop} ${scope.type} org.scala-lang scala-library ${scala.version} ${scope.type} org.scala-lang scala-reflect ${scala.version} ${scope.type} org.apache.kafka kafka-clients ${kafka.version} com.beust jcommander ${jcommander.version} com.google.code.gson gson ${gson.version} com.alibaba fastjson ${fastjson.version} log4j log4j ${log4j.version} mysql mysql-connector-java ${mysql-connector-java.version} net.dongliu requests ${net.dongliu.requests.version} org.apache.maven.plugins maven-compiler-plugin ${maven-compiler-plugin.version} ${java.version} ${java.version} ${project.build.sourceEncoding} net.alchim31.maven scala-maven-plugin 3.3.2 scala-compile-first process-resources compile scala-test-compile-first process-test-resources testCompile attach-scaladocs verify doc-jar org.apache.maven.plugins maven-shade-plugin 3.1.1
How to use two continuously window with EventTime in sql
For one sql window, I can register table with event time and use time field in the tumble window. But if I want to use the result for the first window and use another window to process it, how can I do it? Thank you.
??????flink1.9.1 kafka??????????
?? maven??pom -- -- ??: "hb"<343122...@163.com>; : 2019??10??29??(??) 2:53 ??: "user-zh"
Re:回复:flink1.9.1 kafka表读取问题
我指定的是0.11版本kafka, 之前lib目录下 只有 0.11 jar的文件, 还是报这个错误的 在 2019-10-29 13:47:34,"如影随形" <1246407...@qq.com> 写道: >你好: > > > 看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢 > > > >陈浩 > > > > > > > > >-- 原始邮件 -- >发件人: "hb"<343122...@163.com>; >发送时间: 2019年10月29日(星期二) 下午2:41 >收件人: "user-zh" >主题: flink1.9.1 kafka表读取问题 > > > >代码本地ide 能正常执行, 有正常输出, > > >打包成fat-jar包后,提交到yarn-session 上执行 >报: >Caused by: org.apache.kafka.common.config.ConfigException: Invalid value >org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer > for configuration key.deserializer: Class >org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer > could not be found. > > >请教下是什么原因? > > >lib目录下文件为: >flink-dist_2.11-1.9.1.jar > >flink-sql-connector-kafka-0.10_2.11-1.9.0.jar >flink-sql-connector-kafka_2.11-1.9.0.jar >log4j-1.2.17.jar >flink-json-1.9.0-sql-jar.jar >flink-sql-connector-kafka-0.11_2.11-1.9.0.jar >flink-table_2.11-1.9.1.jar > >slf4j-log4j12-1.7.15.jar >flink-shaded-hadoop-2-uber-2.6.5-7.0.jar >flink-sql-connector-kafka-0.9_2.11-1.9.0.jar >flink-table-blink_2.11-1.9.1.jar > > > > > > >代码: >``` >import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >import org.apache.flink.table.api.EnvironmentSettings >import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >import org.apache.flink.types.Row > >object StreamingTable2 extends App{ > val env = StreamExecutionEnvironment.getExecutionEnvironment > val settings: EnvironmentSettings = >EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, >settings) > env.setParallelism(2) > > val sourceDDL1 = > """create table kafka_json_source( > > `timestamp` BIGINT, > > id int, > > name varchar > > ) with ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'hbtest2', > > 'connector.startup-mode' = 'earliest-offset', > > 'connector.properties.0.key' = 'bootstrap.servers', > > 'connector.properties.0.value' = '192.168.1.160:19092', > > 'connector.properties.1.key' = 'group.id', > > 'connector.properties.1.value' = 'groupId1', > > 'connector.properties.2.key' = 'zookeeper.connect', > > 'connector.properties.2.value' = '192.168.1.160:2181', > > 'update-mode' = 'append', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ) > """ > > tEnv.sqlUpdate(sourceDDL1) > tEnv.sqlQuery("select * from >kafka_json_source").toAppendStream[Row].print() > env.execute("table-example2") >} >```