Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-06 Thread Shengkai Fang
The checkpoint is a snapshot for the job and we can resume the job if the job is killed unexpectedly. The state is another thing to memorize the intermediate result of calculation. I don't think the checkpoint can replace state. 大森林 于2020年10月7日周三 下午12:26写道: > Could you tell me: > > why we need

Re: what's the example for datastream data generator?

2020-10-06 Thread Shengkai Fang
Hi, I think you can take a look at *org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan#createSourceTransformation*, which will tell you how to get transformations by source function and stream execution environment. In datastream api, we also have a DataGen that is

Re: Pushing Down Filters

2021-01-15 Thread Shengkai Fang
Hi Satyam, Currently, the community is using the new table source/sink API and the `FilterableTableSource`, `ProjectableTableSource` have been deprecated. The interface `SupportsProjectionPushDown` and `SupportsFilterPushDown` are the new interfaces to push down the `projection` and `filter`.

Re: Read Hive table in Stream Mode use distinct cause heap OOM

2021-04-26 Thread Shengkai Fang
Hi, could you tell me which version do you use? I just want to check whether there are any problems. Best, Shengkai 张颖 于2021年4月25日周日 下午5:23写道: > hi,I met an appearance like this: > > this is my sql: > SELECT distinct header,label,reqsig,dense_feat,item_id_feat,user_id_feat > FROM

Re: Watermarks in Event Time Temporal Join

2021-04-25 Thread Shengkai Fang
Hi, maverick. The watermark is used to determine the message is late or early. If we only use the watermark on versioned table side, we have no means to determine whether the event in the main stream is ready to emit. Best, Shengkai maverick 于2021年4月26日周一 上午2:31写道: > Hi, > I'm curious why

Re: Using Hive UDFs

2021-04-27 Thread Shengkai Fang
Hi. The order of the module may influence the load of the function. [1] https://issues.apache.org/jira/browse/FLINK-22383 Youngwoo Kim (김영우) 于2021年4月28日周三 上午10:50写道: > Hi, > > I've configured Hive metastore to use HiveCatalog in streaming > application. So far, most of the features are

Re: Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-26 Thread Shengkai Fang
as idle and thus the > watermark is not increased. In any case I have observed how with a larger > number of source tasks no results are produced. > > > > Best, > > Jan > > *Von:* Shengkai Fang > *Gesendet:* Freitag, 26. Februar 2021 15:32 > *An:* Jan Oelschlegel >

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-07 Thread Shengkai Fang
Hi, Yuval, Jark, Timo. Currently the watermark push down happens in the logical rewrite phase but the filter push down happens in the local phase, which means the planner will first check the Filter push down and then check the watermark push down. I think we need a rule to transpose between the

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-08 Thread Shengkai Fang
kai, > That does explain what I'm seeing. > > Jark / Shenkai - Is there any workaround to get Flink to work with push > watermarks and predicate pushdown until this is resolved? > > On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang wrote: > >> Hi, Yuval, Jark, Timo. >>

Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-08 Thread Shengkai Fang
Sorry for the typo... I mean it will not take too much time. Best, Shengkai Shengkai Fang 于2021年3月9日周二 上午10:25写道: > Hi, Yuval. > > I have opened a ticket about this[1]. But I don't think we have any > solution to solve. > > Do you have time to help us to solve this? I thin

Re: Kafka SQL Connector: dropping events if more partitions then source tasks

2021-02-26 Thread Shengkai Fang
Hi, Jan. Could you tell us which Flink version you use? As far as I know, the kafka sql connector has implemented `SupportWatermarkPushDown` in Flink-1.12. The `SupportWatermarkPushDown` pushes the watermark generator into the source and emits the minimum watermark among all the partitions. For

Re: Flink SQL support array transform function

2021-08-03 Thread Shengkai Fang
Hi, Caizhi. Do you think we should support this? Maybe we can open a jira for this or to align with the spark to support more useful built-in functions. Caizhi Weng 于2021年8月3日周二 下午3:42写道: > Hi! > > Currently there is no such built-in function in Flink SQL. You can try to > write your own

Re: How to define event time and Watermark for intermediary joined table in FlinkSQL?

2022-04-21 Thread Shengkai Fang
Hi, The watermark of the join operator is the minimum of the watermark of the input streams. ``` JoinOperator.watermark = min(left.watermark, right.watermark); ``` I think it's enough for most cases. Could you share more details about the logic in the UDF getEventTimeInNS? I think the better

Re: XXX doesn't exist in the parameters of the SQL statement

2022-04-18 Thread Shengkai Fang
Hi, John. Could you share the exception stack to us and the schema of the `dummy` table in your database? Best, Shengkai John Tipper 于2022年4月17日周日 21:15写道: > Hi all, > > I'm having some issues with getting a Flink SQL application to work, where > I get an exception and I'm not sure why it's

Re: Custom restart strategy

2022-05-29 Thread Shengkai Fang
Given very few pipelines experience failures and > they are far in-between, I am looking for a push based model vs polling. > > Thanks > AK > > On Thu, May 26, 2022 at 7:21 PM Shengkai Fang wrote: > >> Hi. >> >> I think you can use REST OPEN API to fet

Re: length value for some classes extending LogicalType.

2022-05-25 Thread Shengkai Fang
Hi. It will also influence how Flink serialize/deserialize the RowData. For example, Flink will build the TimestampDataSerializer with specified precision in the type. You can see it only extract the expected part to serialize[1]. But for char/varchar type, the serializer will not truncate the

Re: LinkedMap ClassCastException issue

2022-05-25 Thread Shengkai Fang
Hi. Could you tell us the version of the Flink you are using? What's the version of commons-collections:commons-collections:jar when you compile the sql and the version in the cluster? It's possible you compile the sql and submit with the different version. I am not sure how you submit your

Re: Custom restart strategy

2022-05-26 Thread Shengkai Fang
Hi. I think you can use REST OPEN API to fetch the job status from the JM periodically to detect whether something happens. Currently REST OPEN API also supports to fetch the exception list for the specified job[2]. Best, Shengkai [1]

Re: Exception when running Java UDF with Blink table planner

2022-05-26 Thread Shengkai Fang
Hi. Could you also tell us which Flink version you are using, the schema of the source table and some test data? With these info, we can debug in our local environment. Best, Shengkai Tom Thornton 于2022年5月27日周五 06:47写道: > We are migrating from the legacy table planner to the Blink table

Re: [External] Re: Exception when running Java UDF with Blink table planner

2022-05-31 Thread Shengkai Fang
Hi, Tom. I don't reproduce the exception in the master. I am not sure whether the problem is fixed or I missing something. The only difference is my test udf extends ScalarFunction rather than DPScalarFunction and I use String[] as the input type. ``` public static class ListToString extends

Re: Application mode deployment through API call

2022-05-24 Thread Shengkai Fang
the CLIFrontend is not as robust as the REST API, or you >> will end up having to rebuild a very similar Rest API. For the meta space >> issue, have you tried adding shared libraries to the flink lib folder? >> >> On Mon, May 23, 2022 at 23:31 Shengkai Fang wrote: >>

Re: accuracy validation of streaming pipeline

2022-05-24 Thread Shengkai Fang
Hi, all. >From my understanding, the accuracy for the sync pipeline requires to snapshot the source and sink at some points. It is just like we have a checkpoint that contains all the data at some time for both sink and source. Then we can compare the content in the checkpoint and find the

Re: accuracy validation of streaming pipeline

2022-05-23 Thread Shengkai Fang
It's a good question. Let me ping @Leonard to share more thoughts. Best, Shengkai vtygoss 于2022年5月20日周五 16:04写道: > Hi community! > > > I'm working on migrating from full-data-pipeline(with spark) to > incremental-data-pipeline(with flink cdc), and i met a problem about > accuracy validation

Re: Job Logs - Yarn Application Mode

2022-05-23 Thread Shengkai Fang
If you find the JM in the yarn web ui, I think you can also find the webui to access the Flink web ui with the JM. Best, Shengkai

Re: Application mode deployment through API call

2022-05-23 Thread Shengkai Fang
Hi, all. > is there any plan in the Flink community to provide an easier way of deploying Flink with application mode on YARN Yes. Jark has already opened a ticket about how to use the sql client to submit the SQL in application mode[1]. What's more, in FLIP-222 we are able to manage the jobs in

Re: Window aggregation fails after upgrading to Flink 1.15

2022-05-23 Thread Shengkai Fang
Glad to see you find the root cause. I think we can shade the janino dependency if it influences the usage. WDYT, godfrey? Best, Shengkai Pouria Pirzadeh 于2022年5月21日周六 00:59写道: > Thanks for help; I digged into it and the issue turned out to be the > version of Janino: > flink-table has pinned

Re: Json Deserialize in DataStream API with array length not fixed

2022-05-23 Thread Shengkai Fang
Hi. In the SQL, you can just specify the `array_coordinates` type ARRAY[1]. For example, ``` CREATE TABLE source( `array_coordinates` ARRAY> ) WITH ( 'format' = 'json' ) ``` [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/ Zain Haider Nemati

Re: Application mode -yarn dependancy error

2022-05-23 Thread Shengkai Fang
Hi. I think you should send the mail to the user mail list or stack overflow, which is about the usage and help. The dev mail list focus on the design of the Flink itself. Could you share more details for your problems, including - which version you use. - how you use the Flink, including you

Re: Job Logs - Yarn Application Mode

2022-05-20 Thread Shengkai Fang
y YARN, users need to access YARN web ui to find > the YARN application by applicationId and then click 'application master > url' of that application to be redirected to Flink web ui. > > Best, > Biao Geng > > Shengkai Fang 于2022年5月20日周五 10:59写道: > >> Hi. &

Re: How to KafkaConsume from Particular Partition in Flink(version 1.14.4)

2022-05-19 Thread Shengkai Fang
Hi, If you use SQL API, you can specify the partition in the DDL[1] and filter out the record that you don't need. ``` CREATE TABLE KafkaSource ( ... `partition` METADATA ) WITH ( ... ); SELECT * FROM KafkaSource WHERE partition = 1; ``` Best, Shengkai [1]

Re: Does kafka key is supported in kafka sink table

2022-05-19 Thread Shengkai Fang
Hi. Yes. Flink supports to write the value to the Kafka record key parts. You just need to specify which column belongs to the key in the WITH blocks, e.g. ``` CREATE TABLE kafka_sink ( ... ) WITH ( `key.fields` = 'id' ); ``` [1]

Re: Job Logs - Yarn Application Mode

2022-05-19 Thread Shengkai Fang
Hi. I am not familiar with the YARN application mode. Because the job manager is started when submit the jobs. So how can users know the address of the JM? Do we need to look up the Yarn UI to search the submitted job with the JobID? Best, Shengkai Weihua Hu 于2022年5月20日周五 10:23写道: > Hi, > You

Re: Does Table API connector, csv, has some option to ignore some columns

2022-07-10 Thread Shengkai Fang
Hi. In Flink SQL, you can select the column that you wants in the query. For example, you can use ``` SELECT col_a, col_b FROM some_table; ``` Best, Shengkai 于2022年7月9日周六 01:48写道: > Does Table API connector, CSV, has some option to ignore some columns in > source file? > For instance read

Re: Re:How can I convert a SQL String to a ResolvedExpression?

2022-06-22 Thread Shengkai Fang
Hi. I think you can use Expressions#callSql to convert the String to Expression. Then you can use ExpressionResolver to resolve the converted Expression. Best, Shengkai Qing Lim 于2022年6月22日周三 23:58写道: > Hi Xuyang, > > > > Thanks for the pointer, however it does not seems to achieve what I

Re: Fink 15: InvalidProgramException: Table program cannot be compiled. This is a bug

2022-06-10 Thread Shengkai Fang
Hi. I open a ticket about upgrading the version[1]. Maybe it is worth a try. Best, Shengkai [1] https://issues.apache.org/jira/browse/FLINK-27995 Benenson, Michael 于2022年6月10日周五 04:51写道: > Hi, David > > > > Hard to tell for sure, but yes, [1] could also indicate some problems with > Janio. >

Re: Not able to see std output in console/.out files with table API

2022-06-05 Thread Shengkai Fang
Hi. The TableResult.print() only prints the result to the client console. How do you redirect the output to the .out file? Can you get the output without redirection? Best, Shengkai Xuyang 于2022年6月2日周四 21:17写道: > Could you find that the input amount of the node `sink` is > being accumulated

Re: context.timestamp null in keyedprocess function

2022-06-15 Thread Shengkai Fang
hi. Could you share more info for us, e.g. exception stack? Do you set the assigner for all the source? I think you can modify the KeyedProcessFuncition to print the message whose timestamp is null. Best, Shengkai bat man 于2022年6月15日周三 14:57写道: > Has anyone experienced this or has any clue? >

Re: Flink, JSON, and JSONSchemas

2022-06-16 Thread Shengkai Fang
Hi. > *1. Is there some easy way to use deserialized JSON in DataStream without case classes or POJOs?* Could you explain what you expected? Do you mean you want to just register a DataType that is able to bridge the received bytes to the POJO objects. I am not sure wether the current RAW

Re: New KafkaSource API: Change in default behavior regarding starting offset

2022-06-20 Thread Shengkai Fang
hi. Please use English in the user mail list. If you want to unsubscribe the mail list, you can send mail to user-unsubscr...@flink.apache.org . Best, Shengkai liangzai 于2022年6月19日周日 10:36写道: > 请问这个邮件咋退订? > > > Replied Message > From bastien dine > Date 06/15/2022 17:50 > To

Re: Flink config driven tool ?

2022-06-07 Thread Shengkai Fang
Hi. I am not sure whether Flink SQL satisfies your requirement or not. You can just write the SQL in the file and use the SQL Client to submit it to your cluster. We have a quick start in the Flink CDC and you can make a try[1]. Best, Shengkai [1]

Re: Flink SQL on Docker

2022-10-24 Thread Shengkai Fang
Hi. Could you share the query you use in the tests and let us reproduce this problem offline? It's better you can provide us with more infos: - the Flink version you use - the logs in the sql-client and jm - It's better you can dump the memory to detect which uses the memory Best, Shengkai

Re: INSERT INTO will work faster in Flink than in regular database?

2022-09-18 Thread Shengkai Fang
Hi. I think you can write a udf[1] to process some fields and then insert into the sink. Best. Shengkai [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ 于2022年9月15日周四 22:10写道: > What's the most effective way (performance) to update big no of rows? > Sure

Re: Build failing when Flink version upgrade from 1.11.6 to 1.15.0

2022-10-13 Thread Shengkai Fang
Hi. I read the trace and I find nothing is related about the flink... could you also give us some code snippets about the blocking test. Best, Shengkai Pappula, Prasanna via user 于2022年10月14日周五 00:06写道: > > > I have upgraded the flink version from 1.11.6 to 1.15.0. Build is failing. > It

Re: Utilizing Kafka headers in Flink Kafka connector

2022-10-13 Thread Shengkai Fang
hi. You can use SQL API to parse or write the header in the Kafka record[1] if you are using Flink SQL. Best, Shengkai [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata Yaroslav Tkachenko 于2022年10月13日周四 02:21写道: > Hi, > > You can

Re: Can I dedup over an upsert topic?

2023-03-05 Thread Shengkai Fang
Hello. Thanks for sharing this with us. I think it's not easy work to support Deduplicate in streaming mode. For example, in the keep first-row case, we need to memorize all records during the running. Because the first row may be deleted at some point. One idea to work around is to use window

Re: Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-03 Thread Shengkai Fang
+1 a FLIP to clarify the idea. Please be careful to choose which type of state you use here. The doc[1] says the broadcast state doesn't support RocksDB backend here. Best, Shengkai [1]

Re: flink 1.9 关于回撤流的问题

2020-09-15 Thread Shengkai Fang
hi, 我对于使用upsert kafka能够省state感到疑惑。金竹老师提供的实现只是丢掉了delete消息,你的下游表依旧需要手动去重才可以得到准确的结果才对啊。如果每个下游表都手动去重这样子还能省state吗? star <3149768...@qq.com> 于2020年6月8日周一 上午9:38写道: > 非常感谢,正是我想要的。也谢谢金竹老师的分享! > > > > > --原始邮件-- > 发件人:"Sun.Zhu"<17626017...@163.com; > 发送时间:2020年6月7日(星期天)

Re: 【Flink的shuffle Mode】

2020-10-06 Thread Shengkai Fang
Hi, 在Datastream api之中我们可以控制shuffle的模式[1],但是在Table api 似乎暂时还不提供控制shuffle mode的接口。 我对join算子的实现不太熟悉,这个需要shuffle吗?对于agg操作,它的partition模式是由group by的key控制的。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/#physical-partitioning 忝忝向仧 <153488...@qq.com> 于2020年10月6日周二

Re: TableColumn为啥不包含comment

2020-08-17 Thread Shengkai Fang
hi, 那请你在那个jira留一下言,我会把这个分配给你。 Harold.Miao 于2020年8月17日周一 上午11:26写道: > 谢谢 我想提交这个patch > > Shengkai Fang 于2020年8月14日周五 下午4:33写道: > > > hi, 我已经建了一个issue[1]跟踪这个情况,有兴趣的话可以帮忙修复下这个bug。 > > > > [1] https://issues.apache.org/jira/browse/FLINK-18958 > > >

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-28 Thread Shengkai Fang
hi, 你说的是为每个partition生成一个watermark吗? 这样子快流和慢流都会有独立的watermark gererator。 datastream已经支持了该特性, table层正在支持该特性,你可以看看flink-19282的jira。 赵一旦 于2020年9月28日 周一上午11:39写道: > 我这边负责的作业,一个作业上有2-3个kafka数据源,还包括多个mysql配置流数据源。也是各种join,但是没有union的case。 > > 没有任何watermark的问题,flink现有机制都是可以完美解决的。 > > > > 赵一旦 于2020年9月28日周一

Re: 请问如何模拟一个back pressure的场景,应该如何实现?谢谢

2020-10-03 Thread Shengkai Fang
hi, Jark 老师有个博文提到过这些,可以参考一下并模拟下[1][2]。 [1] http://wuchong.me/blog/2016/04/26/flink-internals-how-to-handle-backpressure/ [2] https://www.ververica.com/blog/how-flink-handles-backpressure 大森林 于2020年10月3日周六 下午8:03写道: > 您好! > 请问处理的速度 > 和消费的速度 > 在代码上应该怎么体现呢? > 谢谢您 > > >

Re: Table Api执行sql如何设置sink并行度

2020-08-07 Thread Shengkai Fang
hi 不知道 这个能不能满足你的要求 tEnv.getConfig().addConfiguration( new Configuration() .set(CoreOptions.DEFAULT_PARALLELISM, 128) ); 参见文档:https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html wldd 于2020年8月7日周五 下午3:16写道: > hi,all: > 请教一下,TableEnviroment在执行sql的时候如何设置sink的并行度 > > > > > >

Re: 使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

2020-08-07 Thread Shengkai Fang
你的意思是不是用1.10的es包没问题,但是用1.11的有问题? 似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。 参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2 费文杰 于2020年8月7日周五 下午3:56写道: > > 以下是我的代码: > import com.alibaba.fastjson.JSONObject; > import lombok.extern.slf4j.Slf4j; > import

Re: Re: Table Api执行sql如何设置sink并行度

2020-08-07 Thread Shengkai Fang
t; > Best, > wldd > > > > > > 在 2020-08-07 15:26:34,"Shengkai Fang" 写道: > >hi > >不知道 这个能不能满足你的要求 > > > >tEnv.getConfig().addConfiguration( > >new Configuration() > >.set(CoreOptions.DEFAULT_PARALLELISM, 128) > >); > > >

Re: 使用flink-sql-connector-elasticsearch6_2.11_1.10.0.jar执行任务失败

2020-08-07 Thread Shengkai Fang
不好意思,在es6上也进行了相应的修复。 但似乎是一个相同的问题。 Shengkai Fang 于2020年8月7日周五 下午7:52写道: > 你的意思是不是用1.10的es包没问题,但是用1.11的有问题? > 似乎是一个bug,在1.11 es7上已经修复了这个bug,但是没有对于es6进行修复。 > 参见[1] https://issues.apache.org/jira/browse/FLINK-18006?filter=-2 > > 费文杰 于2020年8月7日周五 下午3:56写道: > >&g

Re: FlinkSQL even time 定义问题

2020-08-12 Thread Shengkai Fang
对于第一个问题- 在查询语句之中定义watermark: 现在并不支持。这主要是由于在同一个作业之中,如果select的数据源是同一个表,那么在实际的优化过程之中,会将source进行复用,而现在同一个source并不支持多个watermark assigner。如果在不同的作业之中,那么显然只要修改watermark的定义语句即可。 对于第二个问题:rowtime的定义是必须建立在创建表的过程之中的。 对于第三个问题:社区正在讨论这个问题。现在仅支持多个insert的sql在同一个job之中。 Zhao,Yi(SEC) 于2020年8月12日周三 下午5:36写道: >

Re: 来自郭华威的邮件

2020-08-09 Thread Shengkai Fang
hi. 能提供具体的代码? 郭华威 于2020年8月10日周一 上午10:21写道: > flink1.11.1 使用tableApi 报错: > Exception in thread "main" org.apache.flink.table.api.TableException: > Create BatchTableEnvironment failed. > at > org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517) >

Re: 关于Flink1.11 CSV Format的一些疑问

2020-08-07 Thread Shengkai Fang
hi, 对于第一个问题,文档[1]中已经有较为详细的解释,你可以仔细阅读下文档关于partition files的解释。 对于第二个问题,现在的csv格式的确不支持这个选项,可以考虑见个jira作为improvment. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html WeiXubin <18925434...@163.com> 于2020年8月8日周六 上午11:40写道: > Hi,我在Flink1.11版本,使用filesystem

Re: Re:Re:Flink SQL No Watermark

2020-08-13 Thread Shengkai Fang
hi 那你有没有试过将并行度设置为partition的数量 Zhou Zach 于2020年8月13日 周四下午3:21写道: > > > > Hi forideal, > 我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: > > > val streamExecutionEnv = > StreamExecutionEnvironment.getExecutionEnvironment > >

Re: Flink SQL No Watermark

2020-08-12 Thread Shengkai Fang
hi, 你的意思是没有办法在codegen出来的代码上加断点的意思吗? 这里倒是有一个比较hack的方法: 将生成的类放在一个java文件之中,然后修改改下GeneratedClass下的newInstance方法,如果classname == “WatermarkGenerator$2” 则将刚才的类则返回 new WatermarkGenerator$2 这个类。 我个人对于问题的猜测是有一条数据的rowtime远远晚于其他数据,从而将整体的watermark提得很高,导致后面的“晚到”的数据一直无法触发watermark的生成。 forideal 于2020年8月13日周四

Re: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 Thread Shengkai Fang
针对(3)社区建议使用sql api, table api现在正准备重构。 靳亚洽 于2020年8月13日周四 上午11:00写道: > 针对2, 我们公司采用的是通过提交一个解析flink sql的jar包,把sql作为jar包的参数来执行的。采用接口机将用户的udf > jar包和平台提供的解析flink sql的jar包提交到集群。 > 针对3, 既然使用了flink sql,当然期望sql搞定一切了 > 针对4, > 我们改造了下flink-client模块的一小块代码,支持提交多jar包到集群,所以connector那些包就通过这种方式上传了。当然提前放集群上面也是可以的。

Re: Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 Thread Shengkai Fang
hi, watermark本来就是通过watermark assigner生成的。这是正常现象。 我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。 Zhou Zach 于2020年8月13日周四 下午4:33写道: > > > > Hi forideal, Shengkai Fang, > > 加上env.disableOperatorChaining()之后,发现5个算子, > > > > > Source: TableSourceScan(table=[[

Re: flink1.11 sql使用问题

2020-08-11 Thread Shengkai Fang
能展示下完整的例子吗? 我用的时候没有这种情况。 魏烽 于2020年8月11日 周二下午10:27写道: > 各位大佬好: > > 在使用flink1.11 sql客户端的时候,只能只用最基本的count,group by、order > by、join等都无法实现,请问这个是什么原因呢,感谢! > > > Flink SQL> select count(t2.superid) from cdp_profile_union t1 inner join > cdp_crowd_10002 t2 on t1.superid=t2.superid; > > [ERROR] Could not

Re: TableColumn为啥不包含comment

2020-08-14 Thread Shengkai Fang
hi, 我已经建了一个issue[1]跟踪这个情况,有兴趣的话可以帮忙修复下这个bug。 [1] https://issues.apache.org/jira/browse/FLINK-18958 Harold.Miao 于2020年8月13日周四 上午11:08写道: > hi all > 我发现TableColumn class不包含column comment , 给开发带来了一点麻烦,请教大家一下,谢谢 > > > -- > > Best Regards, > Harold Miao >

Re: Flink SQL并发度设置问题

2020-12-27 Thread Shengkai Fang
30,那么默认情况window的最大并行度是128。我在想,如果按照平均考虑,这种情况是不是从机制上就已经有大概率会导致数据倾斜了呢?设置成32对于128才可以均衡不是吗。 > > Shengkai Fang 于2020年12月27日周日 下午3:46写道: > > > 可以通过该配置[1]来设置 > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec

Re: Flink SQL并发度设置问题

2020-12-26 Thread Shengkai Fang
可以通过该配置[1]来设置 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-resource-default-parallelism 赵一旦 于2020年12月27日周日 下午12:44写道: > 了解下多少数据量呀,128的并发其实很高了感觉。 > > guaishushu1...@163.com 于2020年12月26日周六 下午5:39写道: > > > Flink > > >

Re: rowtime的时区问题

2020-12-26 Thread Shengkai Fang
社区正在解决这个问题,1.13应该会有一个系统性地修复。 CC Leonard 作为work around,可以参考下这个博客[1] [1] https://blog.csdn.net/tzs_1041218129/article/details/109064015?utm_medium=distribute.pc_relevant.none-task-blog-title-3=1001.2101.3001.4242 ゞ野蠻遊戲χ 于2020年12月26日周六 下午11:16写道: > Hi 大家好 > >

Re: 求问为什么KafkaDynamicSource 在批模式下不能运行

2020-12-26 Thread Shengkai Fang
现在Flink 走的是流批一体, 为什么说 Kafka 不支持批模式呢? wxpcc 于2020年12月25日周五 下午7:10写道: > kafka在我们这边场景上除了用来存放实时流式数据,还会用作临时大数据量的存储,主要用于: > > 1. > > 数据同步时,将全量数据同步到一个临时的kafka中,增量数据持续性同步到kafka中,目前我们都使用流模式消费其中的数据,就会有手动停止,或者借助指标等自动停止流式任务 > 2. 数据恢复时 > 3. 临时查看某个时间区间的数据 > > 如果批模式 sql能够完成这些事情的话那该多好 > > > > -- > Sent from:

Re: Flink 1.11.2客户端 select * from table

2020-12-28 Thread Shengkai Fang
要看一下日志里面是否有报错信息,才能进一步分析。 Jacob <17691150...@163.com> 于2020年12月29日周二 上午9:11写道: > Dear All, > > > > > > 在Flink SQL客户端中使用select * from table 语句查询表数据,结果只显示表头,没有数据,是什么原因。 > >

Re: Some questions about limit push down

2020-12-28 Thread Shengkai Fang
hi, Jun Zhang. Currently, the rule `PushLimitIntoTableSourceScanRule` captures the structure that the `FlinkLogicalSort` node is the parent of the `FlinkLogicalTableSourceScan`. In your case, we have a Calc node between the Sort node and the Scan node, which makes the rule fails to apply. Maybe

Re: 回复: flink sql 消费kafka 消息 写入Hive不提交分区

2021-01-17 Thread Shengkai Fang
这种情况一般是kafka的某个分区,不存在数据,导致总体的watermark不前进。遇到这种情况一般是需要手动设置idle source[1]。但是社区的watemark push down存在一些问题[2],已经在修复了。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout [2]

Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-25 Thread Shengkai Fang
orExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:9

Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-21 Thread Shengkai Fang
hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。 [1] https://github.com/apache/flink/pull/13800 Land 于2021年1月22日周五 上午11:28写道: > 可能是没有下推到MySQL执行。 > 问题和我遇到的类似: > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: 咨询求助

2021-01-30 Thread Shengkai Fang
hi, 根据文档[1][2], 你可以通过在with参数内填相应的内容来通过认证, e.g 'properties.sasl.kerberos.service.name' = 'xyz' [1] https://kafka.apache.org/documentation/#brokerconfigs_sasl.kerberos.service.name [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#properties 瞿叶奇

Re: sql client提交 flink任务失败

2021-06-08 Thread Shengkai Fang
可以看看之前的问题,看看能否解决。 Best, Shengkai [1] http://apache-flink.147419.n8.nabble.com/Flink-td7866.html [2] https://issues.apache.org/jira/browse/FLINK-20780 Fei Han 于2021年6月8日周二 下午8:03写道: > > @all: > Flink环境:Flink1.13.1 > HADOOP环境:CDH5.15.2 > 测试命令如下:./bin/sql-client.sh embedded -i

Re: Upsert kafka 作为 source 的几个问题

2021-05-06 Thread Shengkai Fang
Hi. 1. 当初的设计是一个较为保守的设计,其主要目的就是为了能够补全delete消息; 2. 核心类是 StreamExecChangelogNormalize[1] 3. 是的。目前 Upsert-kafka 要求具有相同key的数据在相同 partition 的。因为 kafka 仅保证 partiiton 内按 offset 读取,如果相同 key 的数据分布在不同 partition 的话,那么读取会乱序。 4. 当数据进入到具体的算子的时候并不会区别数据是来自什么connector的。如果 left, right 的 paritition 策略不一致,会shuffle的。

Re: flink 1.13.0 ,使用flink sql 链接数据库是否支持多模式,即表名为schema.name

2021-05-19 Thread Shengkai Fang
请问是要用正则表达式匹配数据库中的table吗?‘org.users’ 是一个正则表达式吗? Best, Shengkai Asahi Lee <978466...@qq.com> 于2021年5月19日周三 下午2:01写道: > hi! >flink jdbc 是否有考虑支持表基于模式查询?如下 table-name写法: > CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status > BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH

Re: Dynamic Table Options 被优化器去掉了

2021-04-26 Thread Shengkai Fang
hi, macial kk. 看样子是个bug,能提供以下你的ddl以及相关的环境吗?方便我们复现下问题。 Best, Shengkai plan的digest是不会打印connector的option的值的,因此你是没有办法通过plan来判断是否生效了。 macia kk 于2021年4月26日周一 上午12:31写道: > Hi > > 我有在使用 temporal Joini 的时候有设置 如果读取分区的相关的 dynamic > option,但是最后是没有生效的,我看全部使用的默认参数,打印出来了执行计划,逻辑执行计划是有的,优化之后没有了 >

Re: DataStreamAPI 与flink sql疑问

2021-04-26 Thread Shengkai Fang
Flink支持将DataStream 转换成一个 Table,然后通过API进行操作。如果想跟SQL相结合,可以将Table注册成一个 temporary view。 Best, Shengkai HunterXHunter <1356469...@qq.com> 于2021年4月27日周二 上午9:46写道: > 你试过吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: 问题:flink 1.13编译 flink-parquet报错 -类重复:org.apache.flink.formats.parquet.generated.SimpleRecord

2021-04-26 Thread Shengkai Fang
是不是没有删除之前生成的类,手动删除冲突的类试试。 Best, Shengkai HunterXHunter <1356469...@qq.com> 于2021年4月27日周二 上午10:58写道: > 查看发现 > > org.apache.avro > avro-maven-plugin > ${avro.version} >

Re: flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-26 Thread Shengkai Fang
请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1] [1] https://github.com/apache/flink/pull/15548 gen 于2021年4月27日周二 上午9:40写道: > Hi, all > > 请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。 > > tEnv.executeSql( > """ > | SELECT t.* FROM ( > | SELECT EvtParser(request) as t FROM parsed_nginx_log >

Re: Flink消费Kafka报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

2021-04-26 Thread Shengkai Fang
hi, Colar. Flink 使用的 Kafka 的版本是2.4.1,但是你的集群版本是1.1.1。看样子 作业运行时加载的是 集群上的 ByteArraySerializer,而不是 Flink 的 `flink-connector-kafka`中的。不太确定打成一个shade包能不能行。 Best, Shengkai Colar <523774...@qq.com> 于2021年4月26日周一 下午6:05写道: > 使用Flink 1.12.2 消费Kafka报错: > > 2021-04-26 17:39:39,802 WARN

Re: flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-26 Thread Shengkai Fang
Hi gen 我在1.13分支上验证了下你的case,发现能够跑通。建议cp下那个patch到自己的分支,再验证下。 Best, Shengkai Shengkai Fang 于2021年4月27日周二 上午11:46写道: > 请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1] > > [1] https://github.com/apache/flink/pull/15548 > > gen 于2021年4月27日周二 上午9:40写道: > >> Hi, all >> >&

Re: 关于upsert-kafka connector的问题

2021-04-22 Thread Shengkai Fang
Hi, 请问是有什么具体的问题吗? Best, Shengkai op <520075...@qq.com> 于2021年4月22日周四 下午6:05写道: > 用 upsert-kafka connector 作为source,会有key的插入和更新出现乱序导致结果不准的问题吗? > 谢谢

Re: flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-27 Thread Shengkai Fang
hi, gen. 近期内应该就会发布,应该是五一左右就会发布1.13的版本。 Best, Shengkai gen 于2021年4月27日周二 下午8:57写道: > hi, Shengkai > 非常感谢你的解答, 解决了困扰我几天的问题。 > 按照你的建议 ,我使用 今天(2021-4-27) 主干版本,运行正常,发现确实是已经修复的。 > 我之前使用的版本是 1.12.2。 > > > 目前最新的release版本是1.12.2 ,应该还没有包含这个修复。不知道你是否了解 1.13的发布计划。 > > > > -- > Sent from:

Re: 使用Table API怎么构造多个sink

2021-04-28 Thread Shengkai Fang
Hi. 可以通过`StatementSet` 指定多个insert,这样子就可以构造出多个sink了。 Best, Shengkai Han Han1 Yue 于2021年4月28日周三 下午2:30写道: > Hi, > 个人在分析RelNodeBlock逻辑,多个SINK才会拆分并重用公共子树,怎么构造多个sink呢, > 文件RelNodeBlock.scala源码里的writeToSink()已经找不到了 > > // 源码里的多sink例子 > val sourceTable = tEnv.scan("test_table").select('a, 'b, 'c) >

Re: Table-api sql 预检查

2021-04-28 Thread Shengkai Fang
Hi. 可以看看 TableEnvironment#execute逻辑,如果能够将sql 转化成 `Transformation`,那么语法应该没有问题。 Best, Shengkai Michael Ran 于2021年4月29日周四 上午11:57写道: > dear all : > 用table api 提交多个SQL 的时候,有什么API 能提前检查SQL 的错误吗? 每次都是提交执行的时候才报错。 > 理论上里面在SQL解析部分就能发现,有现成的API吗?还是说需要自己去抓那部分解析代码,然后封装出API。 >

Re: 官网文档和样例的不完整性和不严谨性的问题

2021-04-24 Thread Shengkai Fang
Hi, xuefli. 非常感谢你指出文档的问题! 由于邮件中看代码比较吃力(没有语法高亮以及排版的问题),我只是粗略地看了下代码。 当输入源 为 `一次性从内存中的List读取数据`,无法触发onTimer。 实际的例子中,我看到看到采用的是process time,且延时 3s 触发 。我怀疑是不是,数据量太少,所以程序很快就结束了导致没来得及触发timer,建议改成event time试试这种情况。 Best, Shengkai xue...@outlook.com 于2021年4月25日周日 上午9:42写道: >

Re: 关于upsert-kafka connector的问题

2021-04-24 Thread Shengkai Fang
本质上,upsert-kafka是对kafka的封装,其内部仍然是一个消息队列,只是在消费的时候,我们形成一个视图。 消息从flink进入到kafka之中,根据kafka的协议保证了at-least-once。 Best, Shengkai op <520075...@qq.com> 于2021年4月23日周五 下午2:18写道: > > 谢谢,upsert-kafka作为sink可以保证相同key的数据放在同一个partition内,假如对相同key的更新数据,由于网络等原因后更新的值A的比先更新的值B提前发送到kafka, >

Re: 关于upsert-kafka connector的问题

2021-04-24 Thread Shengkai Fang
这里有对upsert-kafka完整的一个分析的讲解:深度解析 Flink upsert-kafka[1]。如果还有问题,可以继续咨询。 [1]https://flink-learning.org.cn/developers/flink-training-course3/ Shengkai Fang 于2021年4月25日周日 上午10:16写道: > 本质上,upsert-kafka是对kafka的封装,其内部仍然是一个消息队列,只是在消费的时候,我们形成一个视图。 > > 消息从flink进入到kafka之中,根据kafka的协议保证了at-l

Re: 关于Flink水位线与时间戳分配的疑问

2021-04-01 Thread Shengkai Fang
hi, 图挂了。 1. 可以这么使用这个方法: ··· input.assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)) .withTimestampAssigner((event, timestamp) -> 42L)); ··· TimestampAssigner 会从输入的event上读取数据 并由watermark generator 决定输出对应的watermark. 3.

Re: [讨论] Flink Connector 并行写入数据方案

2021-03-31 Thread Shengkai Fang
Hi jie. User mail list 更多是用来讨论使用中的问题,请将关于dev相关的问题转发到d...@flink.apache.org 详情可以参考[1] [1] https://flink.apache.org/community.html jie mei 于2021年3月31日周三 下午3:03写道: > Hi, Community > > 我想发起一个初步的讨论,关于如何扩大 Flink 写入数据的并行度,并且能够分表,可选事务支持的方案。 > > 该方案应该支持三种场景: > > 1) 至少一次: 不支持或者有限支持 update 的存储,通常通过查询去重。

Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

2021-03-15 Thread Shengkai Fang
为是一次insert,从而促使下游emit record? > > 我们现在有若干自定义的Format,如果为了适配Upsert Kafka,format需要对d,(-u) 事件发射value == > null的Record吗 > > > > > > 发件人: Shengkai Fang > 发送时间: 2021年3月15日 14:21:31 > 收件人: user-zh@flink.apache.org > 主题: Re: Upsert Kafka 的

Re: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

2021-03-15 Thread Shengkai Fang
Hi. 当初的设计是基于kafka的compacted topic设计的,而compacted topic有自身的表达changelog的语法,例如:使用value 为 null 表示tombstone message。从这个角度出发的话,我们仅从kafka的角度去理解数据,而非从format的角度去解析数据。 这当然引入了一些问题,例如当利用upsert-kafka读取数据的时候需要维护一个state以记住读取的所有的key。 Best, Shengkai 刘首维 于2021年3月15日周一 上午11:48写道: > Hi all, > > > >

Re: 关于upsert-kafka connector的问题

2021-04-22 Thread Shengkai Fang
如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。 Best, Shengkai

Re: Re: Flink任务假死;无限100%反压;但下游节点无压力。

2021-08-25 Thread Shengkai Fang
- 得看一下具体的卡死的节点的栈,分析下具体的工作任务才知道。 - 日志中有包含错误的信息吗? Best, Shengkai yidan zhao 于2021年8月26日周四 下午12:03写道: > 可能存在机器压力倾斜,但是我是不太清楚这种现象的原因,直接停滞了任务? > > 东东 于2021年8月26日周四 上午11:06写道: > > > 建议检查一下是否有数据倾斜 > > > > > > 在 2021-08-26 10:22:54,"yidan zhao" 写道: > > >问题期间的确ckpt时间较长。 > >

Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-25 Thread Shengkai Fang
说的是 statement set [1] 吗 ? [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements 悟空 于2021年8月26日周四 上午11:33写道: > hi all: > 我目前基于flink 1.12 sql 来开发功能, 目前遇到一个问题, 我现在想实现 在一个事务里 先将kafka > 源的数据写入到一张msyql 表中,

Re: flink1.12 sql-client 临时文件

2021-08-02 Thread Shengkai Fang
hi,想问一下是在什么情况下产生这些文件的?相关的 sql 是什么? 如果在 tmp 下 似乎不是 sql client 产生的。 吴旭 于2021年7月30日周五 下午2:45写道: > > > > 大佬们,请问下 在使用 sql-client 查询的过程中, /tmp 目录下面生成很多如下这样的临时文件,请问有地方可以配置吗 > > > ```txt > 00615a2c-c0f6-4ca9-b5c4-ee8d69ca2513 > 1098b539-31f2-4fbb-9e7b-46d490ff25d6 > 13b0dcbb-2e2c-4b85-9969-f90915e2a9ca

Re: 回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 Thread Shengkai Fang
Flink 暂时不支持这个功能,可能需要自己改一下 jdbc connector 相关的代码. 但是这个报错很奇怪..你 sql 咋写的 Ye Chen 于2021年8月2日周一 上午11:37写道: > 你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错: > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Column types of query > result and sink for

Re: flink table over 窗口报错

2021-08-04 Thread Shengkai Fang
能发一下具体的异常栈吗?是哪个版本? yanyunpeng 于2021年8月4日周三 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), >

  1   2   >