/**
 *  flink sql 计算wordCount
 */
public class BatchWordCount_tablesql {
public static void main(String[] args) throws Exception {

// 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建一个tableEnvironment
BatchTableEnvironment btEnv = BatchTableEnvironment.create(env);

// 获取数据
String words = "hello flink hello spark hello hbase";
String[] split = words.split(" ");
ArrayList<WordCount> list = new ArrayList<>();
        for (String word:split) {
            list.add(new WordCount(word,1));
}
        DataSet<WordCount> input = env.fromCollection(list);

// DataSet 转sql,指定字段名称
Table table = btEnv.fromDataSet(input,"word,frequency");
table.printSchema();

// 将table注册为表
btEnv.createTemporaryView("WordCountTable",table);

// 执行sql 查询
Table table1 = btEnv.sqlQuery("select word as word,sum(frequency) as frequency 
from WordCountTable group by word");

// 将sql查询出来的结果转换为DataSet
DataSet<WordCount> wordCountDataSet = btEnv.toDataSet(table1, WordCount.class);

wordCountDataSet.printToErr();

}

public static class WordCount{
public String word;
        public int frequency;

        public WordCount() {
        }

public WordCount(String word, int frequency) {
this.word = word;
            this.frequency = frequency;
}

public String getWord() {
return word;
}

public void setWord(String word) {
this.word = word;
}

public int getFrequency() {
return frequency;
}

public void setFrequency(int frequency) {
this.frequency = frequency;
}

@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", frequency=" + frequency +
'}';
}
    }
}

















在 2020-08-10 10:48:27,"Shengkai Fang" <[email protected]> 写道:
>hi.
>能提供具体的代码?
>
>
>
>郭华威 <[email protected]> 于2020年8月10日周一 上午10:21写道:
>
>> flink1.11.1 使用tableApi  报错:
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Create BatchTableEnvironment failed.
>> at
>> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
>> at
>> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
>> at
>> yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:264)
>> at
>> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509)
>> ... 2 more
>> 但是相关的依赖都有的,下面是pom文件:
>> <properties>
>>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>    <flink.version>1.11.1</flink.version>
>>    <mysql.version>5.1.40</mysql.version>
>>    <scala.binary.version>2.11</scala.binary.version>
>>    <scala.version>2.11.12</scala.version>
>>    <java.version>1.8</java.version>
>>    <maven.compiler.source>${java.version}</maven.compiler.source>
>>    <maven.compiler.target>${java.version}</maven.compiler.target>
>> </properties>
>>
>> <dependencies>
>>
>>    <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-java</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- 利用Java开发 -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- 使用Blink Planner -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- 支持一些自定义的消息格式,比如kafka里面消息格式是json的,或者需要自定义函数支持 -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-table-common</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- JDBC Connector的支持,本案例会是使用MySQL -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- Kafka Connector的支持-->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-sql-connector-kafka-0.11_${scala.binary.version}</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- Kafka里面的消息采用Json格式 -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-json</artifactId>
>>       <version>${flink.version}</version>
>>    </dependency>
>>
>> <!-- MySQL的驱动 -->
>> <dependency>
>>       <groupId>mysql</groupId>
>>       <artifactId>mysql-connector-java</artifactId>
>>       <version>${mysql.version}</version>
>>    </dependency>
>>
>> <!--提交作业所必须的依赖,比如:LocalExecutorFactory -->
>> <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-clients_2.11</artifactId>
>>       <version>1.11.1</version>
>>    </dependency>
>>
>>    <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-connector-jdbc_2.11</artifactId>
>>       <version>1.11.1</version>
>>    </dependency>
>>
>>    <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-scala_2.11</artifactId>
>>       <version>1.11.0</version>
>>    </dependency>
>>
>> <!-- 日志方便调试 -->
>> <dependency>
>>       <groupId>org.slf4j</groupId>
>>       <artifactId>slf4j-log4j12</artifactId>
>>       <version>1.7.7</version>
>>       <scope>runtime</scope>
>>    </dependency>
>>    <dependency>
>>       <groupId>log4j</groupId>
>>       <artifactId>log4j</artifactId>
>>       <version>1.2.17</version>
>>       <scope>runtime</scope>
>>    </dependency>
>> </dependencies>
>>
>>
>>
>>
>>
>>

回复