Re: 关于如何贡献社区

2020-09-27 文章 jinhai wang
@Kyle Zhang

你需要在jira上说明自己的看法或者修复方案  然后commiter会分配给你



Best Regards

jinhai...@gmail.com

> 2020年9月27日 下午5:43,Kyle Zhang  写道:
> 
> Hi,
>  我在jira上建了一个issue(FLINK-19433
> 
> ),后续如何跟进呢,是否要有commiter把任务指配给我?
> 
> Best regards



Re: 自定义具有Exactly-Once语义的sink

2020-04-14 文章 jinhai wang
FlinkKafkaProducer、StreamingFileSink的实现都支持Exactly-Once,可以研究下


Best Regards

jinhai...@gmail.com

> 2020年4月15日 上午11:00,阿华田  写道:
> 
> 如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
> 



Re: ddl es 报错

2020-03-24 文章 jinhai wang
优秀!可以提个improve issue


Best Regards

jinhai...@gmail.com

> 2020年3月25日 下午1:40,zhisheng  写道:
> 
> hi,Leonar Xu
> 
> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?
> 
> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png
> 
> Best Wishes!
> 
> zhisheng
> 
> Leonard Xu  于2020年3月24日周二 下午5:53写道:
> 
>> Hi, 出发
>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
>> connector只支持csv format,所以会有这个错误。
>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>> 
>> 
>>org.apache.flink
>>flink-sql-connector-elasticsearch6_2.11
>>${flink.version}
>> 
>> 
>>org.apache.flink
>>flink-json
>>${flink.version}
>> 
>> 
>> Best,
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>>> 
>> 
>> 
>>> 在 2020年3月23日,23:30,出发 <573693...@qq.com> 写道:
>>> 
>>> 
>>> 源码如下:
>>> CREATE TABLE buy_cnt_per_hour (
>>>hour_of_day BIGINT,
>>>buy_cnt BIGINT
>>> ) WITH (
>>>'connector.type' = 'elasticsearch',
>>>'connector.version' = '6',
>>>'connector.hosts' = 'http://localhost:9200',
>>>'connector.index' = 'buy_cnt_per_hour',
>>>'connector.document-type' = 'user_behavior',
>>>'connector.bulk-flush.max-actions' = '1',
>>>'format.type' = 'json',
>>>'update-mode' = 'append'
>>> )
>>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>> import org.apache.flink.types.Row;
>>> 
>>> public class ESTest {
>>> 
>>>public static void main(String[] args) throws Exception {
>>> 
>>>//2、设置运行环境
>>>StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>>>StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv, settings);
>>>streamEnv.setParallelism(1);
>>>String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
>> buy_cnt BIGINT "
>>>+ ") WITH ( 'connector.type' = 'elasticsearch',
>> 'connector.version' = '6',"
>>>+ "'connector.hosts' = 'http://localhost:9200',
>> 'connector.index' = 'buy_cnt_per_hour',"
>>>+ "'connector.document-type' = 'user_behavior',"
>>>+ "'connector.bulk-flush.max-actions' = '1',\n" + "
>>  'format.type' = 'json',"
>>>+ "'update-mode' = 'append' )";
>>>tableEnv.sqlUpdate(sinkDDL);
>>>Table table = tableEnv.sqlQuery("select * from test_es ");
>>>tableEnv.toRetractStream(table, Row.class).print();
>>>streamEnv.execute("");
>>>}
>>> 
>>> }
>>> 具体error
>>> The matching candidates:
>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>> Mismatched properties:
>>> 'connector.type' expects 'filesystem', but is 'elasticsearch'
>>> 'format.type' expects 'csv', but is 'json'
>>> 
>>> The following properties are requested:
>>> connector.bulk-flush.max-actions=1
>>> connector.document-type=user_behavior
>>> connector.hosts=http://localhost:9200
>>> connector.index=buy_cnt_per_hour
>>> connector.type=elasticsearch
>>> connector.version=6
>>> format.type=json
>>> schema.0.data-type=BIGINT
>>> schema.0.name=hour_of_day
>>> schema.1.data-type=BIGINT
>>> schema.1.name=buy_cnt
>>> update-mode=append
>> 
>> 



Re: ddl

2020-03-13 文章 jinhai wang
Page on “User-defined Sources & Sinks”. For flink 1.10: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
 


Best Regards

jinhai...@gmail.com

> 2020年3月13日 下午7:17,王志华  写道:
> 
> 目前FLINK中对于DDL这块,它都只能什么类型的技术作为源头表或者SINK 表呢,我也网上也仅仅看到了ddl mysql sink、ddl hbase 
> sink等。还有其他类型的支持吗?如果不支持的话,是否flink开放了相关的接口,可以提供对其他类型技术的ddl语法支持呢?比如想做一个 ddl kudu 
> sink之类的
> 
> 
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
> 



Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 文章 jinhai wang
Thanks for FLIP-115. It is really useful feature for platform developers who 
manage hundreds of Flink to Hive jobs in production.
I think we need add 'connector.sink.username' for UserGroupInformation when 
data is written to HDFS


在 2020/3/13 下午3:33,“Jingsong Li” 写入:

Hi everyone,

I'd like to start a discussion about FLIP-115 Filesystem connector in Table
[1].
This FLIP will bring:
- Introduce Filesystem table factory in table, support
csv/parquet/orc/json/avro formats.
- Introduce streaming filesystem/hive sink in table

CC to user mail list, if you have any unmet needs, please feel free to
reply~

Look forward to hearing from you.

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee





Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-13 文章 jinhai wang
Thanks for FLIP-115. It is really useful feature for platform developers
who manage hundreds of Flink to Hive jobs in production.
I think we need add 'connector.sink.username' for UserGroupInformation when
data is written to HDFS

Jingsong Li  于2020年3月13日周五 下午3:33写道:

> Hi everyone,
>
> I'd like to start a discussion about FLIP-115 Filesystem connector in Table
> [1].
> This FLIP will bring:
> - Introduce Filesystem table factory in table, support
> csv/parquet/orc/json/avro formats.
> - Introduce streaming filesystem/hive sink in table
>
> CC to user mail list, if you have any unmet needs, please feel free to
> reply~
>
> Look forward to hearing from you.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
> Best,
> Jingsong Lee
>


Re: flinkSQL join 表的历史信息保存在哪里保存多久

2020-03-11 文章 jinhai wang
应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time


在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:


两个从 kafka 创建的表:

tableA: key  valueA
tableB: key  valueB 

用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?

比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
 
谢谢,
王磊



wangl...@geekplus.com.cn