+1 a FLIP to clarify the idea.
Please be careful to choose which type of state you use here. The doc[1]
says the broadcast state doesn't support RocksDB backend here.
Best,
Shengkai
[1]
方勇老师说的没错。我们在文档里面也加了如何配置 hiveserver2 endpoint 的文档[1]
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hiveserver2/#setting-up
Shammon FY 于2023年3月27日周一 08:41写道:
> Hi
>
>
>
Hello. Thanks for sharing this with us.
I think it's not easy work to support Deduplicate in streaming mode. For
example, in the keep first-row case, we need to memorize all records during
the running. Because the first row may be deleted at some point. One idea
to work around is to use window
听上去像是数据乱序了。可以看看这个文档对应的解决下[1]
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/determinism/
Best,
Shengkai
casel.chen 于2023年3月1日周三 16:18写道:
> flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。
>
>
hi. 手动使用 join 将多个流拼接起来?
Best,
Shengkai
casel.chen 于2023年3月2日周四 21:01写道:
> flink sql jdbc connector是否支持多流拼接?
> 业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。
> 每条流更新大宽表的一部分字段。
Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
Best,
Shengkai
casel.chen 于2023年2月9日周四 12:03写道:
> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> json格式解析时直接忽略不识别的type,例如
> 例1:
>
我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
Best,
Shengkai
[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
Shammon FY 于2023年2月20日周一 08:41写道:
> Hi
>
>
能 jstack 看看卡在哪里吗?并且提供下步骤该怎么复现这个。
Best,
Shengkai
<704669...@qq.com.invalid> 于2023年1月12日周四 22:03写道:
> Hi 各位,
>
>
>
> 碰到sql-client进程不会被kill, 如下所示
>
> ---
>
> Flink SQL>
>
> [4]+ Stopped ./bin/sql-client.sh
>
> [l3@node1 flink-1.16.0]$ jps
>
> 35040
想问一下你想实现的功能是咋样的呢?是阿里云上的那种吗?
Best,
Shengkai
casel.chen 于2022年11月23日周三 08:29写道:
> flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink
> sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!
你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。
Best,
Shengkai
左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道:
>
>
>
>
>
>
>
>
>
> .print(); 去掉也不行,
>
> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>
>
>
>
>
>
>
>
> 在 2022-11-04 16:52:08,"yinghua...@163.com" 写道:
>
>
hi,
看不到的图片。能不能直接展示文字或者用图床工具?
Best,
Shengkai
左岩 <13520871...@163.com> 于2022年10月28日周五 18:34写道:
> upsert kafka作为source时,消费不到kafka中的数据
> 通过flink sql 写了一个upsert kafka connector 的连接器,去读一个topic,读不到数据,但是普通kafka
> 连接器消费这个topic 就能读到,都是读的同一个topic,代码如下
>
Hi. Could you share the query you use in the tests and let us reproduce
this problem offline? It's better you can provide us with more infos:
- the Flink version you use
- the logs in the sql-client and jm
- It's better you can dump the memory to detect which uses the memory
Best,
Shengkai
我看到这个 sql 是对维表的join on字段进行 cast,想问一下能否对主表(X_NEWS_TCRNW0003_1_ALL_CDC)的字段进行
cast 试试呢?
Shengkai
Alibaba
yinghua...@163.com 于2022年10月24日周一 09:27写道:
> 出错时SQL如下:
> create table X_NEWS_TCRNW0003_1_ALL_CDC
> (
> ID bigint,
> NewsCode bigint,
> NewsDate timestamp,
> ITCode2 string,
> CompanyCode
没事。欢迎试用,如果在使用过程之中遇到了问题,可以联系我。
Best,
Shengkai
TonyChen 于2022年10月20日周四 09:45写道:
> 找到了,打扰了列位
> public static final ConfigOption
> SQL_GATEWAY_SESSION_IDLE_TIMEOUT =
> key("sql-gateway.session.idle-timeout")
> .durationType()
>
hi.
You can use SQL API to parse or write the header in the Kafka record[1] if
you are using Flink SQL.
Best,
Shengkai
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata
Yaroslav Tkachenko 于2022年10月13日周四 02:21写道:
> Hi,
>
> You can
Hi.
可以从这个地方入手看看
https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java#L95
Best,
Shengkai
casel.chen 于2022年10月11日周二 10:58写道:
> 可以给一些hints吗?看哪些类?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
你好,可以发送邮件到 user-zh-unsubscr...@flink.apache.org 来退订。
Best,
Shengkai
13341000780 <13341000...@163.com> 于2022年10月10日周一 18:21写道:
>
> 退订
>
>
>
>
> --
> 发自我的网易邮箱手机智能版
Hi.
I read the trace and I find nothing is related about the flink... could you
also give us some code snippets about the blocking test.
Best,
Shengkai
Pappula, Prasanna via user 于2022年10月14日周五 00:06写道:
>
>
> I have upgraded the flink version from 1.11.6 to 1.15.0. Build is failing.
> It
Hi. I think you can write a udf[1] to process some fields and then insert
into the sink.
Best.
Shengkai
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
于2022年9月15日周四 22:10写道:
> What's the most effective way (performance) to update big no of rows?
> Sure
Hi.
In Flink SQL, you can select the column that you wants in the query. For
example, you can use
```
SELECT col_a, col_b FROM some_table;
```
Best,
Shengkai
于2022年7月9日周六 01:48写道:
> Does Table API connector, CSV, has some option to ignore some columns in
> source file?
> For instance read
hi.
能展示下具体想要的plan 和实际的 plan 吗?
Best,
Shengkai
明寒 于2022年7月1日周五 09:50写道:
>
> HI:在flink1.12中,对于如下的Sql,生成的执行图中有两个GroupWindowAggregate算子,该如何调整Sql或者配置保证只生成一个GroupWindowAggregate算子
> CREATE TEMPORARY TABLE RawSource (
> `key` STRING,
> `accessNum` INT,
> `status` STRING,
> rowTime
Hi.
这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
Best,
Shengkai
yidan zhao 于2022年6月28日周二 10:44写道:
> 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
>
> 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> window是event time window,配合自定义的
>
Hi.
I think you can use Expressions#callSql to convert the String to
Expression. Then you can use ExpressionResolver to resolve the converted
Expression.
Best,
Shengkai
Qing Lim 于2022年6月22日周三 23:58写道:
> Hi Xuyang,
>
>
>
> Thanks for the pointer, however it does not seems to achieve what I
hi.
Please use English in the user mail list. If you want to unsubscribe the
mail list, you can send mail to user-unsubscr...@flink.apache.org
.
Best,
Shengkai
liangzai 于2022年6月19日周日 10:36写道:
> 请问这个邮件咋退订?
>
>
> Replied Message
> From bastien dine
> Date 06/15/2022 17:50
> To
hi.
这种情况下,最好查看一下是否发生了反压,同时看看日志之中是否有相关的异常信息。
Best,
Shengkai
amber_...@qq.com.INVALID 于2022年6月21日周二 09:43写道:
> 您好!
> 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
> 当我提交普通数据同步任务时,一切正常;
> 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
> Memory使用率始终是100%;
Hi.
> *1. Is there some easy way to use deserialized JSON in DataStream without
case classes or POJOs?*
Could you explain what you expected? Do you mean you want to just register
a DataType that is able to bridge the received bytes to the POJO objects. I
am not sure wether the current RAW
hi.
Could you share more info for us, e.g. exception stack? Do you set the
assigner for all the source? I think you can modify the
KeyedProcessFuncition to print the message whose timestamp is null.
Best,
Shengkai
bat man 于2022年6月15日周三 14:57写道:
> Has anyone experienced this or has any clue?
>
表的ttl相关日志。
>
>
>
>
>
>
>
> 在 2022-06-15 11:42:19,"Shengkai Fang" 写道:
> >> 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确
> >
> >不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key
> >的数据频繁访问情况下,那么这个数据就不会过期。
>
的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。
> >
> >
> >如果我有不对的地方,请指正我哈。
> >
> >
> >
> >
> >--
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >
> >在 2022-06-12 14:39
>>>>== Optimized Physical Plan ==
> >>>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id,
> parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num,
> p_sp_sub_amt, display_qty, qty, bom_type])
> >>>>>+- Join(joinType=[InnerJoin], where=[=(id
Hi.
I open a ticket about upgrading the version[1]. Maybe it is worth a try.
Best,
Shengkai
[1] https://issues.apache.org/jira/browse/FLINK-27995
Benenson, Michael 于2022年6月10日周五 04:51写道:
> Hi, David
>
>
>
> Hard to tell for sure, but yes, [1] could also indicate some problems with
> Janio.
>
你好,能提供下具体的 plan 供大家查看下吗?
你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN
").print() 打印下相关的信息。
Best,
Shengkai
lxk 于2022年6月10日周五 10:29写道:
> flink 版本:1.14.4
> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval
> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。
> 水印是直接使用kafka
Hi.
I am not sure whether Flink SQL satisfies your requirement or not. You can
just write the SQL in the file and use the SQL Client to submit it to your
cluster. We have a quick start in the Flink CDC and you can make a try[1].
Best,
Shengkai
[1]
RL was
> not found on this
> server",发现是md文档中链接的URL拼写错误造成的。我提了一个hotfix[3],正好借助这个PR熟悉下贡献流程,辛苦老师帮忙Review下
> >
> >[1]
> https://flink.apache.org/contributing/code-style-and-quality-common.html#nullability-of-the-mutable-parts
> >[2] "usage of Java Optional"
Hi.
The TableResult.print() only prints the result to the client console. How
do you redirect the output to the .out file? Can you get the output without
redirection?
Best,
Shengkai
Xuyang 于2022年6月2日周四 21:17写道:
> Could you find that the input amount of the node `sink` is
> being accumulated
Hi.
我记得 Jdbc Connector 实现了 ProjectionPushDown。你可以参考着实现。
xuyang 老师说的对,getScanRuntimeProvider 发生在 push down
之后。应该不会有你说的问题。另外,可以考虑贡献到社区[1],我们也可以帮忙一起 review 下,帮忙解决你的问题?
Best,
Shengkai
[1] https://issues.apache.org/jira/browse/FLINK-19651
Xuyang 于2022年6月1日周三 23:47写道:
>
>
Hi, Tom.
I don't reproduce the exception in the master. I am not sure whether the
problem is fixed or I missing something.
The only difference is my test udf extends ScalarFunction rather than
DPScalarFunction and I use String[] as the input type.
```
public static class ListToString extends
Hi.
会根据 key 的 hash 值分配到固定个数的 keygroup 之中的。简单来说,跟HashMap>
有点相似。金竹老师有一篇文章详细解释了[1]。
如果想看实现的话,可以从 KeyGroupStreamPartitioner 入手来看看 Table 层是怎么做的。
Best,
Shengkai
[1] https://developer.aliyun.com/article/667562
Peihui He 于2022年5月29日周日 11:55写道:
> Hi, all
>
> 请教下大家,flink key by 后 使用process
Given very few pipelines experience failures and
> they are far in-between, I am looking for a push based model vs polling.
>
> Thanks
> AK
>
> On Thu, May 26, 2022 at 7:21 PM Shengkai Fang wrote:
>
>> Hi.
>>
>> I think you can use REST OPEN API to fet
Hi.
Could you also tell us which Flink version you are using, the schema of the
source table and some test data? With these info, we can debug in our local
environment.
Best,
Shengkai
Tom Thornton 于2022年5月27日周五 06:47写道:
> We are migrating from the legacy table planner to the Blink table
Hi.
I think you can use REST OPEN API to fetch the job status from the
JM periodically to detect whether something happens. Currently REST OPEN
API also supports to fetch the exception list for the specified job[2].
Best,
Shengkai
[1]
Hi.
Could you tell us the version of the Flink you are using? What's the
version of commons-collections:commons-collections:jar when you compile the
sql and the version in the cluster? It's possible you compile the sql and
submit with the different version.
I am not sure how you submit your
Hi.
Could you tell us the version of the Flink you are using? What's the
version of commons-collections:commons-collections:jar when you compile the
sql and the version in the cluster? It's possible you compile the sql and
submit with the different version.
I am not sure how you submit your
Hi.
It will also influence how Flink serialize/deserialize the RowData. For
example, Flink will build the TimestampDataSerializer with specified
precision in the type. You can see it only extract the expected part to
serialize[1]. But for char/varchar type, the serializer will not truncate
the
Hi, all.
>From my understanding, the accuracy for the sync pipeline requires to
snapshot the source and sink at some points. It is just like we have a
checkpoint that contains all the data at some time for both sink and
source. Then we can compare the content in the checkpoint and find the
the CLIFrontend is not as robust as the REST API, or you
>> will end up having to rebuild a very similar Rest API. For the meta space
>> issue, have you tried adding shared libraries to the flink lib folder?
>>
>> On Mon, May 23, 2022 at 23:31 Shengkai Fang wrote:
>>
It's a good question. Let me ping @Leonard to share more thoughts.
Best,
Shengkai
vtygoss 于2022年5月20日周五 16:04写道:
> Hi community!
>
>
> I'm working on migrating from full-data-pipeline(with spark) to
> incremental-data-pipeline(with flink cdc), and i met a problem about
> accuracy validation
If you find the JM in the yarn web ui, I think you can also find the webui
to access the Flink web ui with the JM.
Best,
Shengkai
Glad to see you find the root cause.
I think we can shade the janino dependency if it influences the usage.
WDYT, godfrey?
Best,
Shengkai
Pouria Pirzadeh 于2022年5月21日周六 00:59写道:
> Thanks for help; I digged into it and the issue turned out to be the
> version of Janino:
> flink-table has pinned
Hi.
I think you should send the mail to the user mail list or stack overflow,
which is about the usage and help. The dev mail list focus on the design of
the Flink itself.
Could you share more details for your problems, including
- which version you use.
- how you use the Flink, including you
Hi.
In the SQL, you can just specify the `array_coordinates` type ARRAY[1]. For
example,
```
CREATE TABLE source(
`array_coordinates` ARRAY>
) WITH (
'format' = 'json'
)
```
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
Zain Haider Nemati
Hi, all.
> is there any plan in the Flink community to provide an easier way of
deploying Flink with application mode on YARN
Yes. Jark has already opened a ticket about how to use the sql client to
submit the SQL in application mode[1]. What's more, in FLIP-222 we are able
to manage the jobs in
y YARN, users need to access YARN web ui to find
> the YARN application by applicationId and then click 'application master
> url' of that application to be redirected to Flink web ui.
>
> Best,
> Biao Geng
>
> Shengkai Fang 于2022年5月20日周五 10:59写道:
>
>> Hi.
&
Hi.
Yes. Flink supports to write the value to the Kafka record key parts. You
just need to specify which column belongs to the key in the WITH blocks,
e.g.
```
CREATE TABLE kafka_sink (
...
) WITH (
`key.fields` = 'id'
);
```
[1]
Hi.
Yes. Flink supports to write the value to the Kafka record key parts. You
just need to specify which column belongs to the key in the WITH blocks,
e.g.
```
CREATE TABLE kafka_sink (
...
) WITH (
`key.fields` = 'id'
);
```
[1]
Hi.
I am not familiar with the YARN application mode. Because the job manager
is started when submit the jobs. So how can users know the address of the
JM? Do we need to look up the Yarn UI to search the submitted job with the
JobID?
Best,
Shengkai
Weihua Hu 于2022年5月20日周五 10:23写道:
> Hi,
> You
你好,图挂了,应该是需要图床工具。
另外,能否贴一下相关的异常日志呢?
Best,
Shengkai
yidan zhao 于2022年5月20日周五 10:28写道:
> UI视图:[image: 1.png].
>
> 网络视图:
> [image: image.png]
>
>
> 补充部分集群部署信息:
> (1)flink1.13,standalone集群,基于zk做的HA。3 jm,若干tm。
> (2)jm的rest api开启了ssl,基于 nginx
>
Hi,
If you use SQL API, you can specify the partition in the DDL[1] and filter
out the record that you don't need.
```
CREATE TABLE KafkaSource (
...
`partition` METADATA
) WITH (
...
);
SELECT * FROM KafkaSource
WHERE partition = 1;
```
Best,
Shengkai
[1]
hi, 能从日志中拿到更多详细的日志吗?请同时分享下你的执行步骤?
Best,
Shengkai
18579099...@163.com <18579099...@163.com> 于2022年5月9日周一 17:46写道:
> 我有一部分表的数据是存在hbase上的,平时通过hive加载外部表的方式读取hbase的数据,我想通过flink sql读取hive表的方式
> 读取数据(不直接使用flink
> 读取hbase是我使用的catalog是hive,不用再写建表语句然后再查),当我用sql-client尝试的时候报错。
> 读取正常的hive是可以正常读取的,但是读取hive
Hi,
看不到你的图,是说这个吗[1]?
Best,
Shengkai
[1]
https://github.com/apache/flink/blob/ca58a700bbc0522f3c62e9db720f9f89c8bd8313/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
于2022年5月7日周六 10:07写道:
> 您好
>
>
Hi,
The watermark of the join operator is the minimum of the watermark of the
input streams.
```
JoinOperator.watermark = min(left.watermark, right.watermark);
```
I think it's enough for most cases. Could you share more details about the
logic in the UDF getEventTimeInNS?
I think the better
hi
sql jar 往往是 shade 了相关的依赖,而 普通的 jar 则不带有相关的依赖。正如名字所说,在 table api/sql
的情况下建议使用 sql jar,datastream 建议使用 普通的jar。
Best,
Shengkai
weishishuo...@163.com 于2022年4月21日周四 16:52写道:
>
>
Hi, John.
Could you share the exception stack to us and the schema of the `dummy`
table in your database?
Best,
Shengkai
John Tipper 于2022年4月17日周日 21:15写道:
> Hi all,
>
> I'm having some issues with getting a Flink SQL application to work, where
> I get an exception and I'm not sure why it's
退订的同学,请发送任意邮件到 user-zh-unsubscr...@flink.apache.org。
BEST,
Shengkai
王健 <13166339...@163.com> 于2022年4月15日周五 14:40写道:
> 退订
退订请发送邮件到 user-zh-unsubscr...@flink.apache.org。
Best,
Shengkai
剑来 <1370505...@qq.com.invalid> 于2022年4月15日周五 16:28写道:
> 退订
Hi, 这个就是 Flink UI 监听的端口号。
Best,
Shengkai
陈卓宇 <2572805...@qq.com.invalid> 于2022年4月15日周五 17:47写道:
> 今天看到这两个配置项,有些遗憾:
> # Port range for the REST and web server to bind to.
> #
> rest.bind-port: 8080-8090
> 这个绑定端口是起到什么作用?
> # The address that the REST web server binds to
> #
> #rest.bind-address:
退订的同学,请发送任意邮件到 user-zh-unsubscr...@flink.apache.org。
Best,
Shengkai
yefan 于2022年4月15日周五 16:33写道:
> 退订
退订的同学,请发送任意邮件到 user-zh-unsubscr...@flink.apache.org。
Best,
Shengkai
Shengkai Fang 于2022年4月15日周五 17:34写道:
> 我记得这个问题常常是因为 lambda 函数的入参/出参被 shade 了,导致找不到对应的实现类了。不太理解你的意思,但是用 sql-client
> 使用 sql-jar 应该是没有问题的。建议你代码里面也换成 sql jar 先试试看。
>
> Best,
> Shengkai
>
> wangzhen 于2022年4月1
我记得这个问题常常是因为 lambda 函数的入参/出参被 shade 了,导致找不到对应的实现类了。不太理解你的意思,但是用 sql-client
使用 sql-jar 应该是没有问题的。建议你代码里面也换成 sql jar 先试试看。
Best,
Shengkai
wangzhen 于2022年4月15日周五 11:57写道:
> 退订
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-04-14 22:44:48,"顺其自然" <712677...@qq.com.INVALID> 写道:
> >我的flink 1.12.1 sql
Hi,
可以为 HBaseDynamicTableSource 实现 FilterPushDown[1] 接口。这样子 你直接添加的 filter 会被下推到
Source,从而在读取的时候过滤掉不相关的数据。
[1]
HGT+HCbsAz9Hv5RU
> ruC/3H5UUhkL29DNCLPXsQx3U4qAbHNFIQtA1CvI99j55Pg=
> X-Google-Smtp-Source:
> ABdhPJx7/O7Ag3uEhwBNFyIWBHDPTxLpItw1QDymjd+Joj8OW3RpFtKmnbnsXaaxXsxC33L2O+Ki+LWm4/vME6cnu70=
> X-Received: by 2002:a05:6602:13c9:: with SMTP id
> o9mr19655702iov.40.1637547288489;
&
你好,想问一下你的 hive 包确定是1.13对应的版本吗?
Best,
Shengkai
drewfranklin 于2021年11月22日周一 上午9:45写道:
> Hello
>我按照官方文档使用 sql client 去连接hive catalog 时出错。
> 我的hive version 2.3.6
> Flink version 1.13.1
>
> 感觉官方介绍的bundled 方式添加jar 包,在flink/lib 下添加如下截图的包。然后重启集群,启动了sql-client
>
hi, 看不见图,建议用图床或者填一下代码。
我看到代码中有 yaml 文件,事实上 更建议使用 ddl 来创建相应的 catalog。
best,
Shengkai
drewfranklin 于2021年11月18日周四 下午6:01写道:
> Hello, friends !
>我按照官方文档使用 sql client 去连接hive catalog 时出错。
> 我的hive version 2.3.6
> Flink version 1.13.1
>
> 感觉官方介绍的bundled 方式添加jar 包,在flink/lib
不太理解,直接删掉老source 有什么影响吗?
Best,
Shengkai
杨浩 于2021年11月17日周三 下午6:00写道:
> 试了下,可以直接加source,有办法加source后,删掉老的source么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-11-17 14:56:37,"杨浩" 写道:
>
> 请问下,我们程序在运行后需要添加一个kafka
> source(和老的协议一致,后面加工逻辑一样),程序开启了checkpoint,如何保障之前程序的状态不丢啊
>
>
>
>
>
>
打出来的jar中是否包含了 相应的依赖呢?
aiden <18765295...@163.com> 于2021年11月17日周三 下午2:34写道:
> 补充下抛错异常
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.delegation.ExecutorFactory' in
> the classpath.
>
> Reason: No
同一个 task 里面的 op 往往是 chain 起来的,不经过网络传输很难评估其传输的数据量的...如果是 debug 的话,应该把所有的
chaining 断开,这样子每个 op 就是一个 task。可以通过设置 pipeline.operator-chaining false
来关闭chaining。
Best,
Shengkai
Ada Luna 于2021年11月17日周三 上午10:16写道:
> 看不到Task里Operator之间传输的数据量
>
> zhisheng 于2021年11月4日周四 下午4:56写道:
> >
> > webui 有
如果是个 bug,建议在社区开个 issue 跟踪下这个问题。
Shengkai Fang 于2021年11月16日周二 下午12:37写道:
> 能分享下具体是什么错误类型吗?
>
> 我看了下代码,感觉不太好支持。具体的序列化器是由
> `AbstractJdbcRowConverter`#createExternalConverter 决定的。
> 根据你的描述,当前的序列化器不够通用导致了这个问题,可能需要改动下源码才能支持。
>
> Best,
> Shengkai
>
> Ada Luna 于2021年11月
能分享下具体是什么错误类型吗?
我看了下代码,感觉不太好支持。具体的序列化器是由 `AbstractJdbcRowConverter`#createExternalConverter
决定的。 根据你的描述,当前的序列化器不够通用导致了这个问题,可能需要改动下源码才能支持。
Best,
Shengkai
Ada Luna 于2021年11月12日周五 上午11:25写道:
> Oracle中有VARCHAR 和 CLOB
> 如果我在Flink SQL JDBC Sink中配置STRING那么只能写VARCHAR 写CLOB会报错。
> 我想扩展FlinkSQL
1. 不知道使用 udf 能不能实现,自己实现一个 udf,然后在实现里面手动查询外表;
2. 如果自己实现的话,那么也应该能控制攒 batch 的实现;
悟空 于2021年11月12日周五 上午11:53写道:
> Hi :
> 第一个 我了解了Cache 不太适合我的场景,因为我的表都是几十亿量级,同时 我要根据一些关键键 去数据库里查询,所以
> 我先在Job 中 聚合一些主键,通过In 条件 去查询。
> 第二个 好像是我理解的问题,最初想通过Flink Sql 把整体逻辑 下发到数据库去查询,因为有些OLAP
> 引擎 查询性能是可以接受的
>
>
>
> ---
>
我看pom 之中之前依赖的kafka的版本是 2.4.1,当前依赖的 kafka 版本是 2.8.1。应该是不支持低版本的 kafka。
Best,
Shengkai
yidan zhao 于2021年11月11日周四 下午3:22写道:
>
> 如题,当前新的kafaksouce貌似对kafka-client版本做了依赖,比如代码KafkaPartitionSplitReader.acquireAndSetStoppingOffsets方法中用到的
>
> consumer.committed(partitionsStoppingAtCommitted)
>
说的是 statement set [1] 吗 ?
[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
悟空 于2021年8月26日周四 上午11:33写道:
> hi all:
> 我目前基于flink 1.12 sql 来开发功能, 目前遇到一个问题, 我现在想实现 在一个事务里 先将kafka
> 源的数据写入到一张msyql 表中,
- 得看一下具体的卡死的节点的栈,分析下具体的工作任务才知道。
- 日志中有包含错误的信息吗?
Best,
Shengkai
yidan zhao 于2021年8月26日周四 下午12:03写道:
> 可能存在机器压力倾斜,但是我是不太清楚这种现象的原因,直接停滞了任务?
>
> 东东 于2021年8月26日周四 上午11:06写道:
>
> > 建议检查一下是否有数据倾斜
> >
> >
> > 在 2021-08-26 10:22:54,"yidan zhao" 写道:
> > >问题期间的确ckpt时间较长。
> >
able.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
> at
> org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
> at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)
>
>
>
>
> Flink 版
能发一下具体的异常栈吗?是哪个版本?
yanyunpeng 于2021年8月4日周三 下午2:47写道:
> Table table = tableEnv
> .from("t_yyp_test")
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
> .orderBy($("f_time"))
> .preceding("unbounded_range")
> .following(CURRENT_RANGE)
> .as("w"))
> .select($("f_value"),
>
Hi, Caizhi. Do you think we should support this? Maybe we can open a jira
for this or to align with the spark to support more useful built-in
functions.
Caizhi Weng 于2021年8月3日周二 下午3:42写道:
> Hi!
>
> Currently there is no such built-in function in Flink SQL. You can try to
> write your own
Flink 暂时不支持这个功能,可能需要自己改一下 jdbc connector 相关的代码.
但是这个报错很奇怪..你 sql 咋写的
Ye Chen 于2021年8月2日周一 上午11:37写道:
> 你好,我试了一下,如果表的ddl是三个字段,但是insert只指定两个字段的话,会报错:
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for
hi,想问一下是在什么情况下产生这些文件的?相关的 sql 是什么? 如果在 tmp 下 似乎不是 sql client 产生的。
吴旭 于2021年7月30日周五 下午2:45写道:
>
>
>
> 大佬们,请问下 在使用 sql-client 查询的过程中, /tmp 目录下面生成很多如下这样的临时文件,请问有地方可以配置吗
>
>
> ```txt
> 00615a2c-c0f6-4ca9-b5c4-ee8d69ca2513
> 1098b539-31f2-4fbb-9e7b-46d490ff25d6
> 13b0dcbb-2e2c-4b85-9969-f90915e2a9ca
Hi.
新版的 CatalogFactory 实现了 Factory,这意味着当前的所有的 connector、format 以及 Catalog
都实现了相同的接口,保持了统一性。而保持原来的方法,更多是为了暂时的兼容性( 我的理解 ): 如果 某个Catalog
从低版本迁移到高版本只需要添加一些新的接口方法,而不需要删除之前的逻辑。之后的版本可能会删除这些已经被deprecated 方法。
Best,
Shengkai.
hoose 于2021年7月15日周四 下午6:13写道:
> Flink1.13.1在Catalog方面修改比较大,特别是一些方法的实现上
> 如
>
可以看看之前的问题,看看能否解决。
Best,
Shengkai
[1] http://apache-flink.147419.n8.nabble.com/Flink-td7866.html
[2] https://issues.apache.org/jira/browse/FLINK-20780
Fei Han 于2021年6月8日周二 下午8:03写道:
>
> @all:
> Flink环境:Flink1.13.1
> HADOOP环境:CDH5.15.2
> 测试命令如下:./bin/sql-client.sh embedded -i
请问是要用正则表达式匹配数据库中的table吗?‘org.users’ 是一个正则表达式吗?
Best,
Shengkai
Asahi Lee <978466...@qq.com> 于2021年5月19日周三 下午2:01写道:
> hi!
>flink jdbc 是否有考虑支持表基于模式查询?如下 table-name写法:
> CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status
> BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH
Hi.
1. 当初的设计是一个较为保守的设计,其主要目的就是为了能够补全delete消息;
2. 核心类是 StreamExecChangelogNormalize[1]
3. 是的。目前 Upsert-kafka 要求具有相同key的数据在相同 partition 的。因为 kafka 仅保证 partiiton 内按
offset 读取,如果相同 key 的数据分布在不同 partition 的话,那么读取会乱序。
4. 当数据进入到具体的算子的时候并不会区别数据是来自什么connector的。如果 left, right 的 paritition
策略不一致,会shuffle的。
Hi.
可以看看 TableEnvironment#execute逻辑,如果能够将sql 转化成 `Transformation`,那么语法应该没有问题。
Best,
Shengkai
Michael Ran 于2021年4月29日周四 上午11:57写道:
> dear all :
> 用table api 提交多个SQL 的时候,有什么API 能提前检查SQL 的错误吗? 每次都是提交执行的时候才报错。
> 理论上里面在SQL解析部分就能发现,有现成的API吗?还是说需要自己去抓那部分解析代码,然后封装出API。
>
Hi.
可以通过`StatementSet` 指定多个insert,这样子就可以构造出多个sink了。
Best,
Shengkai
Han Han1 Yue 于2021年4月28日周三 下午2:30写道:
> Hi,
> 个人在分析RelNodeBlock逻辑,多个SINK才会拆分并重用公共子树,怎么构造多个sink呢,
> 文件RelNodeBlock.scala源码里的writeToSink()已经找不到了
>
> // 源码里的多sink例子
> val sourceTable = tEnv.scan("test_table").select('a, 'b, 'c)
>
Hi.
The order of the module may influence the load of the function.
[1] https://issues.apache.org/jira/browse/FLINK-22383
Youngwoo Kim (김영우) 于2021年4月28日周三 上午10:50写道:
> Hi,
>
> I've configured Hive metastore to use HiveCatalog in streaming
> application. So far, most of the features are
hi, gen.
近期内应该就会发布,应该是五一左右就会发布1.13的版本。
Best,
Shengkai
gen 于2021年4月27日周二 下午8:57写道:
> hi, Shengkai
> 非常感谢你的解答, 解决了困扰我几天的问题。
> 按照你的建议 ,我使用 今天(2021-4-27) 主干版本,运行正常,发现确实是已经修复的。
> 我之前使用的版本是 1.12.2。
>
>
> 目前最新的release版本是1.12.2 ,应该还没有包含这个修复。不知道你是否了解 1.13的发布计划。
>
>
>
> --
> Sent from:
Hi gen
我在1.13分支上验证了下你的case,发现能够跑通。建议cp下那个patch到自己的分支,再验证下。
Best,
Shengkai
Shengkai Fang 于2021年4月27日周二 上午11:46写道:
> 请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1]
>
> [1] https://github.com/apache/flink/pull/15548
>
> gen 于2021年4月27日周二 上午9:40写道:
>
>> Hi, all
>>
>&
是不是没有删除之前生成的类,手动删除冲突的类试试。
Best,
Shengkai
HunterXHunter <1356469...@qq.com> 于2021年4月27日周二 上午10:58写道:
> 查看发现
>
> org.apache.avro
> avro-maven-plugin
> ${avro.version}
>
hi, Colar.
Flink 使用的 Kafka 的版本是2.4.1,但是你的集群版本是1.1.1。看样子 作业运行时加载的是 集群上的
ByteArraySerializer,而不是 Flink 的
`flink-connector-kafka`中的。不太确定打成一个shade包能不能行。
Best,
Shengkai
Colar <523774...@qq.com> 于2021年4月26日周一 下午6:05写道:
> 使用Flink 1.12.2 消费Kafka报错:
>
> 2021-04-26 17:39:39,802 WARN
请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1]
[1] https://github.com/apache/flink/pull/15548
gen 于2021年4月27日周二 上午9:40写道:
> Hi, all
>
> 请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。
>
> tEnv.executeSql(
> """
> | SELECT t.* FROM (
> | SELECT EvtParser(request) as t FROM parsed_nginx_log
>
Flink支持将DataStream 转换成一个 Table,然后通过API进行操作。如果想跟SQL相结合,可以将Table注册成一个
temporary view。
Best,
Shengkai
HunterXHunter <1356469...@qq.com> 于2021年4月27日周二 上午9:46写道:
> 你试过吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
1 - 100 of 148 matches
Mail list logo