/**
* flink sql 计算wordCount
*/
public class BatchWordCount_tablesql {
public static void main(String[] args) throws Exception {
// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建一个tableEnvironment
BatchTableEnvironment btEnv = BatchTableEnvironment.create(env);
// 获取数据
String words = "hello flink hello spark hello hbase";
String[] split = words.split(" ");
ArrayList<WordCount> list = new ArrayList<>();
for (String word:split) {
list.add(new WordCount(word,1));
}
DataSet<WordCount> input = env.fromCollection(list);
// DataSet 转sql,指定字段名称
Table table = btEnv.fromDataSet(input,"word,frequency");
table.printSchema();
// 将table注册为表
btEnv.createTemporaryView("WordCountTable",table);
// 执行sql 查询
Table table1 = btEnv.sqlQuery("select word as word,sum(frequency) as frequency
from WordCountTable group by word");
// 将sql查询出来的结果转换为DataSet
DataSet<WordCount> wordCountDataSet = btEnv.toDataSet(table1, WordCount.class);
wordCountDataSet.printToErr();
}
public static class WordCount{
public String word;
public int frequency;
public WordCount() {
}
public WordCount(String word, int frequency) {
this.word = word;
this.frequency = frequency;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getFrequency() {
return frequency;
}
public void setFrequency(int frequency) {
this.frequency = frequency;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", frequency=" + frequency +
'}';
}
}
}
在 2020-08-10 10:48:27,"Shengkai Fang" <[email protected]> 写道:
>hi.
>能提供具体的代码?
>
>
>
>郭华威 <[email protected]> 于2020年8月10日周一 上午10:21写道:
>
>> flink1.11.1 使用tableApi 报错:
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Create BatchTableEnvironment failed.
>> at
>> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
>> at
>> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
>> at
>> yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:264)
>> at
>> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509)
>> ... 2 more
>> 但是相关的依赖都有的,下面是pom文件:
>> <properties>
>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>> <flink.version>1.11.1</flink.version>
>> <mysql.version>5.1.40</mysql.version>
>> <scala.binary.version>2.11</scala.binary.version>
>> <scala.version>2.11.12</scala.version>
>> <java.version>1.8</java.version>
>> <maven.compiler.source>${java.version}</maven.compiler.source>
>> <maven.compiler.target>${java.version}</maven.compiler.target>
>> </properties>
>>
>> <dependencies>
>>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-java</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>> <!-- 利用Java开发 -->
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>> <!-- 使用Blink Planner -->
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>> <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 -->
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-common</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>> <!-- JDBC Connector的支持,本案例会是使用MySQL -->
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>> <!-- Kafka Connector的支持-->
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>> <!-- Kafka里面的消息采用Json格式 -->
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-json</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>>
>> <!-- MySQL的驱动 -->
>> <dependency>
>> <groupId>mysql</groupId>
>> <artifactId>mysql-connector-java</artifactId>
>> <version>${mysql.version}</version>
>> </dependency>
>>
>> <!--提交作业所必须的依赖,比如:LocalExecutorFactory -->
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-clients_2.11</artifactId>
>> <version>1.11.1</version>
>> </dependency>
>>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-connector-jdbc_2.11</artifactId>
>> <version>1.11.1</version>
>> </dependency>
>>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-scala_2.11</artifactId>
>> <version>1.11.0</version>
>> </dependency>
>>
>> <!-- 日志方便调试 -->
>> <dependency>
>> <groupId>org.slf4j</groupId>
>> <artifactId>slf4j-log4j12</artifactId>
>> <version>1.7.7</version>
>> <scope>runtime</scope>
>> </dependency>
>> <dependency>
>> <groupId>log4j</groupId>
>> <artifactId>log4j</artifactId>
>> <version>1.2.17</version>
>> <scope>runtime</scope>
>> </dependency>
>> </dependencies>
>>
>>
>>
>>
>>
>>