Table API问题

2019-12-13 文章 chendan
仔细看了Table API部分的文档,有以下理论和实际编程问题,盼逐个解答: 

1.跟BatchTableEnvironment 和StreamTableEnvironment 相比,TableEnvironment 
应该用在什么场景? 

2.文中提到Register an External Catalog,什么情况下会用到external Catalog?
但是在API文档里,registerExternalCatalog已经被定为Deprecated。那就只能使用
registerCatalog了。 内部Catalog和外部Catalog有什么区别?为什么需要不同的
Catalog?在什么情况下需要注册多个Catalog? 

3.注册一个Table,注册一个TableSource,然后怎么用呢?怎么写代码?文档语焉不
详。注册的作用是什么? 

4.API文档中TableEnvironment的方法scan的入参是tablePath,请问什么是
tablePath? 

在文档
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html
 
中看到如下示例代码: 

这么看起来,UserActions是TableSource的名字,scan就是把TableSource的名字作
为入参。但是当我自己按照这个方法来写的时候,却报错: 
这是我的代码: 

这是运行结果: 

我明明把user_moid注册成TableSource,TableSource的名字作为scan的入参。 


Table API????

2019-12-13 文章 Chen Dan
API??TableEnvironment??scantablePathtablePath? 
 
 
??https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/time_attributes.html
  
 
 
UserActions??TableSourcescan??TableSource
 
 ?? 
 
 ?? 
 
 user_moid??TableSource??TableSource??scan

Re: Flink1.9.1的SQL向前不兼容的问题

2019-12-13 文章 Kurt Young
Hi,

建议你翻译成英文然后到jira里建个issue。

Best,
Kurt


On Thu, Dec 12, 2019 at 11:39 PM 李佟  wrote:

> 近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink
> SQL的程序无法执行,异常如下:
>
>
> org.apache.flink.table.api.ValidationException: *Window can only be
> defined over a time attribute column.*
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
>
>
>
> 跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。
>
>
> 功能很简单,就是在某个时间窗对数值求和。测试用例如下:
>
>
> package org.flowmatrix.isp.traffic.accounting.test;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.scala.typeutils.Types;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sinks.CsvTableSink;
> import org.apache.flink.table.sinks.TableSink;
> import org.apache.flink.table.sources.DefinedRowtimeAttributes;
> import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
> import org.apache.flink.table.sources.StreamTableSource;
> import org.apache.flink.table.sources.tsextractors.ExistingField;
> import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
> import org.apache.flink.types.Row;
> import org.junit.Test;
>
> import javax.annotation.Nullable;
> import java.sql.Timestamp;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
>
> public class TestSql {
> @Test
> public void testAccountingSql() {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(1);
>
> try {
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>
> SimpleTableSource source = new SimpleTableSource();
> Table t = tableEnv.fromTableSource(source);
>
> String interval = "5"; //5 second
> System.out.println("source schema is " +
> source.getTableSchema());
>
> Table sqlResult = tableEnv.sqlQuery("SELECT " +
> " TUMBLE_START(UserActionTime, INTERVAL '" + interval
> + "' SECOND) as rowTime, " +
> " Username," +
> " SUM(Data) as Data " +
> " FROM  " + t +
> " GROUP BY TUMBLE(UserActionTime, INTERVAL '" +
> interval + "' SECOND),Username");
>
>
> String[] fieldNames = {
> "rowTime",
> "Username", "Data"};
> TypeInformation[] fieldTypes = {
> TypeInformation.of(Timestamp.class),
> TypeInformation.of(String.class),
> TypeInformation.of(Long.class)};
>
> TableSink sink1 = new CsvTableSink("/tmp/data.log", ",");
> sink1 = sink1.configure(fieldNames, fieldTypes);
> tableEnv.registerTableSink("EsSinkTable", sink1);
> System.out.println("sql result schema is " +
> sqlResult.getSchema());
>
> tableEnv.sqlUpdate("insert into EsSinkTable select  " +
> "rowTime,Username,Data from " + sqlResult + "");
>
> env.execute("test");
> } catch (Exception e) {
> e.printStackTrace();
> System.err.println("start program error. FlowMatrix
> --zookeeper  --config " +
> " --name  --interval 
> --indexName ");
> System.err.println(e.toString());
> return;
> }
> }
>
> public static class SimpleTableSource implements
> StreamTableSource, DefinedRowtimeAttributes {
> @Override
> public DataStream getDataStream(StreamExecutionEnvironment
> env) {
> return
> env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new
> AssignerWithPunctuatedWatermarks() {
> private long lastWaterMarkMillSecond = -1;
> private long waterMarkPeriodMillSecond = 1000;
> @Nullable
> @Override
> public Watermark checkAndGetNextWatermark(Row lastElement,
> long extractedTimestamp) {
> if(extractedTimestamp - lastWaterMarkMillSecond >=
> waterMarkPeriodMillSecond){
> lastWaterMarkMillSecond =