flink run jar throw java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

2019-10-29 文章 Alex Wang
Hello everyone, I am a newbie.
I am learning the flink-sql-submit project. From @Jark Wu  :
https://github.com/wuchong/flink-sql-submit

My local environment is:
1. flink1.9.0 standalone
2. kafka_2.11-2.2.0 single

I configured Flink Connectors and Formats jars to $FLINK_HOME/lib .
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors

Then I run flink-sql-submit , sh run.sh q1
Throw  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver.

My question is:
I configured mysql-connector-java in the pom.xml file, mvn build jar
include com.mysql.jdbc.Driver.
Why is this error still reported? I put the jar package in $FLINK_HOME/lib
and the problem can be solved.
Do you need to put these jars in $FLINK_HOME/lib when the project relies on
too many jar packages?
If I don't put mysql-connector-java.jar in $FLINK_HOME/lib, how can I solve
this problem?

Can @Jark Wu  give me some advice? Or can someone give me some advice?
Thank you.

1. pom.xml


> mysql
> mysql-connector-java
> 5.1.38
> 

2. mvn clean; mvn package

$ ll -rth target
>
>  [±master ●]
> total 32312
> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 generated-sources
> drwxr-xr-x  5 alex  staff   160B Oct 30 11:32 classes
> drwxr-xr-x  3 alex  staff96B Oct 30 11:32 maven-archiver
> -rw-r--r--  1 alex  staff   7.2M Oct 30 11:32
> flink-sql-submit-1.0-SNAPSHOT.jar
> -rw-r--r--  1 alex  staff   8.2M Oct 30 11:32 flink-sql-submit.jar
>

3. flink-sql-submit.jar include java.sql.Driver

" zip.vim version v28
> " Browsing zipfile
> /Users/alex/IdeaProjects/alex/flink_learn/flink-sql-submit/target/flink-sql-submit.jar
> " Select a file with cursor and press ENTER
>
> META-INF/MANIFEST.MF
> META-INF/
> q1.sql
> user_behavior.log
> com/
> com/github/
> com/github/wuchong/
> com/github/wuchong/sqlsubmit/
> com/github/wuchong/sqlsubmit/SqlSubmit$1.class
> com/github/wuchong/sqlsubmit/SqlSubmit.class
> com/github/wuchong/sqlsubmit/SourceGenerator.class
> com/github/wuchong/sqlsubmit/cli/
> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommandCall.class
> com/github/wuchong/sqlsubmit/cli/SqlCommandParser.class
> com/github/wuchong/sqlsubmit/cli/SqlCommandParser$SqlCommand.class
> com/github/wuchong/sqlsubmit/cli/CliOptions.class
> com/github/wuchong/sqlsubmit/cli/CliOptionsParser.class
> META-INF/maven/
> META-INF/maven/com.github.wuchong/
> META-INF/maven/com.github.wuchong/flink-sql-submit/
> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.xml
> META-INF/maven/com.github.wuchong/flink-sql-submit/pom.properties
> META-INF/services/
> META-INF/services/java.sql.Driver
> com/mysql/
> com/mysql/fabric/
> com/mysql/fabric/FabricCommunicationException.class
> com/mysql/fabric/FabricConnection.class
> com/mysql/fabric/FabricStateResponse.class
> com/mysql/fabric/HashShardMapping$ReverseShardIndexSorter.class
> com/mysql/fabric/HashShardMapping.class
> com/mysql/fabric/RangeShardMapping$RangeShardIndexSorter.class
> com/mysql/fabric/RangeShardMapping.class
> com/mysql/fabric/Response.class
> com/mysql/fabric/Server.class
> com/mysql/fabric/ServerGroup.class
> com/mysql/fabric/ServerMode.class
> com/mysql/fabric/ServerRole.class
> etc ...
>



$FLINK_DIR/bin/flink run -d -p 3 target/flink-sql-submit.jar -w
"${PROJECT_DIR}"/src/main/resources/ -f "$1".sql
Eerror:
2019-10-30 10:27:35
java.lang.IllegalArgumentException: JDBC driver class not found.
At
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:112)
At
org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42)
At
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
At
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
At
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
At
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
At
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
At org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
At org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
At java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
At java.net.URLClassLoader.findClass(URLClassLoader.java:381)
At java.lang.ClassLoader.loadClass(ClassLoader.java:424)
At sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
At java.lang.ClassLoader.loadClass(ClassLoader.java:357)
At java.lang.Class.forName0(Native Method)
At java.lang.Class.forName(Class.java:264)
At
org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:66)
At
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:99)
... 9 more


-- 
Best


flink1.9.1 on yarn sql 部署问题

2019-10-29 文章 hb
hello:


环境:  flink1.9.1, on yarn hadoop2.6
flink只安装在了一台提交的机器上,


lib目录下有文件:
 flink-dist_2.11-1.9.1.jar
 flink-json-1.9.0-sql-jar.jar
 flink-shaded-hadoop-2-uber-2.6.5-7.0.jar
 flink-sql-connector-kafka_2.11-1.9.0.jar
 flink-table_2.11-1.9.1.jar
 flink-table-blink_2.11-1.9.1.jar
log4j-1.2.17.jar
slf4j-log4j12-1.7.15.jar


//
 flink-shaded-hadoop-2-uber-2.6.5-7.0.jar  ,
 flink-sql-connector-kafka_2.11-1.9.0.jar
 flink-json-1.9.0-sql-jar.jar
这3个包是安装后,拷贝进去的


问题1:on yarn模式,我是否需要在每台机器上都安装flink软件目录,还是只需要在提交机器上有flink软件目录就行了?


问题2: 我需要用到blink-planner 连接外部kafka(1.1版本,json格式) 来生成sql表,是否 
需要在lib目录下添加 
 flink-sql-connector-kafka_2.11-1.9.0.jar
 flink-json-1.9.0-sql-jar.jar
还是 在pom文件中 指定依赖,打成fat包

org.apache.flink
flink-connector-kafka_2.11
${flink.version}  


org.apache.flink
flink-json
${flink.version}

问题3: flink run  on yarn , 会额外附加lib目录下的jar包到用户jar下,再提交到yarn上运行么







Re: How to use two continuously window with EventTime in sql

2019-10-29 文章 Jark Wu
Hi,

You can use TUMBLE_ROWTIME(...) to get the rowtime attribute of the first
window result, and use this field to apply a following window aggregate.
See more
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#group-windows

Best,
Jark

On Tue, 29 Oct 2019 at 15:39, 刘建刚  wrote:

>   For one sql window, I can register table with event time and use
> time field in the tumble window. But if I want to use the result for the
> first window and use another window to process it, how can I do it? Thank
> you.
>


Re:回复:flink1.9.1 kafka表读取问题

2019-10-29 文章 hb
pom 文件

```

http://maven.apache.org/POM/4.0.0";
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0


com.hb
flink
pom
1.9.1-SNAPSHOT



1.8
UTF-8
1.9.1
2.11
2.11.12
6.2.3
1.72
2.6.2
0.11.0.2
1.2.46
1.9.1
1.2.17
5.1.42
4.18.1
3.6.0
compile

compile




org.apache.flink
flink-core
${flink.version}
${flink.scope.type}




org.apache.flink
flink-clients_2.11
${flink.version}
${flink.scope.type}




org.apache.flink
flink-scala_2.11
${flink.version}
${flink.scope.type}




org.apache.flink
flink-streaming-scala_2.11
${flink.version}
${flink.scope.type}





org.apache.flink
flink-table-common
${flink.version}




org.apache.flink
flink-table-api-scala-bridge_2.11
${flink.version}





org.apache.flink
flink-table-api-java-bridge_2.11
${flink.version}





org.apache.flink
flink-table-planner_2.11
${flink.version}





org.apache.flink
flink-table-runtime-blink_2.11
${flink.version}






org.apache.flink
flink-table-planner-blink_2.11
${flink.version}





org.apache.flink
flink-connector-kafka-0.11_2.11
${flink-connector-kafka}







org.apache.flink
flink-connector-kafka_2.11
${flink.version}






org.apache.flink
flink-json
${flink.version}




org.apache.flink
flink-connector-elasticsearch6_2.11
${flink.version}






org.apache.flink
flink-runtime-web_2.11
${flink.version}








org.elasticsearch
elasticsearch-hadoop
${elasticsearch.hadoop}
${scope.type}



org.scala-lang
scala-library
${scala.version}
${scope.type}


org.scala-lang
scala-reflect
${scala.version}
${scope.type}







org.apache.kafka
kafka-clients
${kafka.version}











com.beust
jcommander
${jcommander.version}




com.google.code.gson
gson
${gson.version}




com.alibaba
fastjson
${fastjson.version}






log4j
log4j
${log4j.version}






mysql
mysql-connector-java
${mysql-connector-java.version}




net.dongliu
requests
${net.dongliu.requests.version}













org.apache.maven.plugins
maven-compiler-plugin
${maven-compiler-plugin.version}

${java.version}
${java.version}
${project.build.sourceEncoding}



net.alchim31.maven
scala-maven-plugin
3.3.2


scala-compile-first
process-resources

compile



scala-test-compile-first
process-test-resources

testCompile



attach-scaladocs
verify

doc-jar







org.apache.maven.plugins
maven-shade-plugin
3.1.1



How to use two continuously window with EventTime in sql

2019-10-29 文章 刘建刚
  For one sql window, I can register table with event time and use time
field in the tumble window. But if I want to use the result for the first
window and use another window to process it, how can I do it? Thank you.


??????flink1.9.1 kafka??????????

2019-10-29 文章 ????????
??
     maven??pom






 





--  --
??: "hb"<343122...@163.com>;
: 2019??10??29??(??) 2:53
??: "user-zh"

Re:回复:flink1.9.1 kafka表读取问题

2019-10-29 文章 hb



我指定的是0.11版本kafka, 之前lib目录下 只有 0.11 jar的文件, 还是报这个错误的





在 2019-10-29 13:47:34,"如影随形" <1246407...@qq.com> 写道:
>你好:
>
>
>      看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢
>
>
>
>陈浩
>
>
> 
>
>
>
>
>
>-- 原始邮件 --
>发件人: "hb"<343122...@163.com>;
>发送时间: 2019年10月29日(星期二) 下午2:41
>收件人: "user-zh"
>主题: flink1.9.1 kafka表读取问题
>
>
>
>代码本地ide 能正常执行, 有正常输出,
>
>
>打包成fat-jar包后,提交到yarn-session 上执行
>报:
>Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> for configuration key.deserializer: Class 
>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> could not be found.
>
>
>请教下是什么原因?
>
>
>lib目录下文件为:
>flink-dist_2.11-1.9.1.jar
> 
>flink-sql-connector-kafka-0.10_2.11-1.9.0.jar  
>flink-sql-connector-kafka_2.11-1.9.0.jar  
>log4j-1.2.17.jar
>flink-json-1.9.0-sql-jar.jar
>flink-sql-connector-kafka-0.11_2.11-1.9.0.jar  
>flink-table_2.11-1.9.1.jar   
> 
>slf4j-log4j12-1.7.15.jar
>flink-shaded-hadoop-2-uber-2.6.5-7.0.jar  
>flink-sql-connector-kafka-0.9_2.11-1.9.0.jar   
>flink-table-blink_2.11-1.9.1.jar
>
>
>
>
>
>
>代码:
>```
>import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>import org.apache.flink.table.api.EnvironmentSettings
>import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>import org.apache.flink.types.Row
>
>object StreamingTable2 extends App{
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val settings: EnvironmentSettings = 
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
>settings)
>  env.setParallelism(2)
>
>  val sourceDDL1 =
>    """create table kafka_json_source(
>   
> `timestamp` BIGINT,
>   
> id int,
>   
> name varchar
> 
> ) with (
>   
> 'connector.type' = 'kafka',
>   
> 'connector.version' = '0.11',
>   
> 'connector.topic' = 'hbtest2',
>   
> 'connector.startup-mode' = 'earliest-offset',
>   
> 'connector.properties.0.key' = 'bootstrap.servers',
>   
> 'connector.properties.0.value' = '192.168.1.160:19092',
>   
> 'connector.properties.1.key' = 'group.id',
>   
> 'connector.properties.1.value' = 'groupId1',
>   
> 'connector.properties.2.key' = 'zookeeper.connect',
>   
> 'connector.properties.2.value' = '192.168.1.160:2181',
>   
> 'update-mode' = 'append',
>   
> 'format.type' = 'json',
>   
> 'format.derive-schema' = 'true'
> 
> )
>    """
>
>  tEnv.sqlUpdate(sourceDDL1)
>  tEnv.sqlQuery("select * from 
>kafka_json_source").toAppendStream[Row].print()
>  env.execute("table-example2")
>}
>```