代码是:
https://paste.ubuntu.com/p/gVGrj2V7ZF/
报错:
A group window expects a time attribute for grouping in a stream environment.
但是代码的数据源中已经有时间属性了.
请问应该怎么修改代码?
谢谢
刚才搜到了,谢谢
在 2020-12-09 15:20:07,"hailongwang" <18868816...@163.com> 写道:
>http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259
>这个邮件列表有相似的问题,你看下有没有帮助。
>PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source
>消费不会因为 format
http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259
这个邮件列表有相似的问题,你看下有没有帮助。
PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source
消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。
[1]
第二个问题原因找到了,为啥ha目录下没有知道了,这个是我傻了,我本身这次更新容器就是更换了ha目录的。
所以问题变为,1个是为什么这种情况会导致JobManager失败,其次2是当前这种case是不是需要删除zk中信息,我看删除还挺复杂,因为zk不支持删除非空目录,我需要一个一个子目录删除嘛是?
赵一旦 于2020年12月9日周三 下午3:07写道:
> 基于公司自研的pass平台部署,3个机器,pass自带recover。
> 正常运作中,直接重启pass容器,导致任务失败,等容器重启后,3个机器就都处于类似的无限循环状态。
>
基于公司自研的pass平台部署,3个机器,pass自带recover。
正常运作中,直接重启pass容器,导致任务失败,等容器重启后,3个机器就都处于类似的无限循环状态。
目前初步分析是因为JobManager启动失败,进而由pass平台自动重启容器,然后无限循环了。
看日志,JobManager启动后有恢复任务,然后进程失败。
日志如下:
14:55:55.304 [main] INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
14:55:55.305 [main] INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Hi Jark
sorry,是1.12.0, 我打错了
Original Message
Sender: Jark Wu
Recipient: user-zh
Date: Wednesday, Dec 9, 2020 14:40
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0
这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao
Hi Jark
Original Message
Sender: Jark Wu
Recipient: user-zh
Date: Wednesday, Dec 9, 2020 14:40
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0
这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai
wrote: >
Hi 赵一旦,
这部分 jackson 组件已经自动处理了这部分逻辑。
Hi xiaocai,
你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。
Best,
Jark
On Wed, 9 Dec 2020 at 14:34, xiao cai wrote:
> 好的,计划下周升级测试下,另:1.12.1计划何时发布呢
>
>
> Original Message
> Sender: Jark Wu
> Recipient: user-zh
> Date: Tuesday, Dec 8, 2020 13:41
> Subject: Re:
好的,计划下周升级测试下,另:1.12.1计划何时发布呢
Original Message
Sender: Jark Wu
Recipient: user-zh
Date: Tuesday, Dec 8, 2020 13:41
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
hailong 说的定义成 STRING 是在1.12 版本上支持的,
https://issues.apache.org/jira/browse/FLINK-18002 1.12 这两天就会发布,如果能升级的话,可以尝试一下。
Best, Jark On Tue,
Hi Kien,
>From my point of view, RocksDB native metrics could be classified into 5 parts
>below, and you could select what you're interested in to enable. Enable those
>metrics could cause about 10% performance regression, and this might impact
>the overall performance as not all jobs are
flink1.10.1??error_1006_cnt_permillage
sql??
SELECT
|DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10'
SECOND)), '-MM-dd') `day`,
|UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime,
INTERVAL '10' SECOND)), '-MM-dd
flink1.10.1??error_1006_cnt_permillage
sql??
SELECT
|DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10'
SECOND)), '-MM-dd') `day`,
|UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime,
INTERVAL '10' SECOND)), '-MM-dd
flink cep sql blinkPATTERN (e1{3 } - e2{1 }?)??
比如下面这种消息:
第一条消息:
{"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]}
第二条消息:
{"source":"transaction_2020202020200","data":[{"name":"d"},{"age":18}]}
第三条消息:
{"source":"transaction_2020202020200","data":[]}
我想直接在创建表时用一个字段来表示data这个属性的所有值。
在 2020-12-09
怎么个动态法?
在 2020-12-09 13:18:56,"破极" 写道:
>Hello,各位大佬:
>请教下大佬们,在Flink
>SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create
>table时候schema呢?我定义了array,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
>请问各位大佬有啥高招呢?谢谢。
>
>
>kafka消息样例(data的value是动态的):
Hello,各位大佬:
请教下大佬们,在Flink
SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create
table时候schema呢?我定义了array,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。
请问各位大佬有啥高招呢?谢谢。
kafka消息样例(data的value是动态的):
{"source":"transaction_2020202020200","data":[{"name":"d"},{"age":18}]}
my code is:
https://paste.ubuntu.com/p/gVGrj2V7ZF/
it complains
A group window expects a time attribute for grouping in a stream environment.
but the data already as time attribute,
How to fix it?
Thanks for your help.
The complete code is:
https://paste.ubuntu.com/p/hpWB87kT6P/
The result is:
2> (true,1,diaper,4)
7> (true,3,rubber,2)
4> (true,1,beer,3)
7> (false,3,rubber,2)
7> (true,3,rubber,8)
That's the meaning of true/false in the result
after running the above code?
Thanks for your help~!
场景上:
目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
目前测试了一版本flink
sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。
just a data point. we actually enabled all RocksDb metrics by default
(including very large jobs in terms of parallelism and state size). We
didn't see any significant performance impact. There is probably a small
impact. At least, it didn't jump out for our workload.
On Tue, Dec 8, 2020 at 9:00
flink sql 1.11 kafka source ?? ??kafka??canal??mysql ??'format'
= 'canal-json'?? ??
1sourcemsyql??schemadata[{}]??table??ts??
2??topicmysql binlog??kafka
是跟这个 Issue 有关吗?https://issues.apache.org/jira/browse/FLINK-20537
赵一旦 于2020年12月9日周三 上午10:17写道:
> MARK,学习下。等回复。
>
> 莫失莫忘 于2020年12月8日周二 下午6:49写道:
>
> > 我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中
> > 字符串都是用双引号表示,例如select * from table1 where column1 =
> > "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。
>
关闭了RocksDB的内存控制后,是不是应该把taskmanager.memory.managed.size设置成0?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
*【环境】*
Flink 版本:1.11.2
Hadoop 版本 :2.6.0-cdh5.8.3
Java 版本: 1.8.0_144
-
*【命令】*
[jacob@localhost flink-1.11.2]$ ./bin/yarn-session.sh -jm 1024m -tm 2048m
*【现象】*
2020-12-08 18:06:00,134 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli
[]
旧版 'connector.type' = 'jdbc',新版 'connector' = 'jdbc'。
新旧区别,旧版根据查询决定key,新版你只需要定义了key就是upsert了,不需要查询符合一定要求。
Leonard Xu 于2020年12月7日周一 下午5:11写道:
> Hi,
> 你是不是没有订阅flink的用户邮件列表,所以有些邮件你看不到。
> 你可以发送任意内容的邮件到user-zh-subscr...@flink.apache.org user-zh-subscr...@flink.apache.org>
@JarkWu,你的意思是定义成string,但是输入数据是一个json object也可以支持?这岂不是需要对json部分的反序列化做特殊定制。
比如a字段是string,但数据中a属性是这样的:
"a":{ .. }。在java中相当于需要先将a这部分反序列化为一个map,然后再序列化为json(字符串)后作为a这个string的值。
?是吗。
Jark Wu 于2020年12月8日周二 下午1:42写道:
> hailong 说的定义成 STRING 是在1.12 版本上支持的,
>
Hi macia,
一旦回答的基本比较完整了。
watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
如果是两侧都有数据,watermark不前进,也都可以正常输出。
关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark
作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致
你的没有join到的数据下发会延迟很多了。
MARK,学习下。等回复。
莫失莫忘 于2020年12月8日周二 下午6:49写道:
> 我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中
> 字符串都是用双引号表示,例如select * from table1 where column1 =
> "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。
> ps:我看到flink SQL中字符串都必须用 单引号,例如select * from table1 where column1 =
> 'word' 。如何使 字符串 既可以是单引号 也可以是 双引号呢
没搞懂你怎么个不得解,是不去除redis的sdk使用还是咋的,问题描述太简单了。
这个貌似就是map内你通过redis client操作redis就好啦呀。
追梦的废柴 于2020年12月8日周二 下午8:44写道:
> 各位:
> 晚上好!
> 对于redis作为数据源或者sink网上有很多参照的案例,那么请问如何在一个map算子里面,先写入set结构中,然后再读取该set的长度呢?
> 业务需要,百思不得解,还望各位指点迷津!
> 祝好!
>
>
> | |
> 追梦的废柴
> |
> |
> 邮箱:zhuimeng...@163.com
> |
>
> 签名由 网易邮箱大师
重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
(1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
join。
(2)此外,还有一个点,这个我也不确认。如果是datastream
api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
最简单的方式就是自定义Source,里面定时检测配置文件是否有更新,如果有更新的话就发送配置文件的内容,下游算子把这个source发送的内容当做广播变量
| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|
签名由 网易邮箱大师 定制
在2020年12月08日 17:36,Lei Wang 写道:
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。
如果配置文件更新了,怎样能把广播变量的内容也更新呢?
谢谢,
王磊
https://issues.apache.org/jira/browse/FLINK-20533
There is no workaround in the current Flink releases, but you could
compile the reporter based on the PR that I opened.
On 12/8/2020 10:38 PM, Fanbin Bu wrote:
thank you Chesnay. I did verified that count works with datadog.
Please link here
Great, thank you very much :)
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
After a bit more playing around with it today, I figured out that what I needed
to call was:
statementSet.execute().getJobClient().get().getJobExecutionResult(getClass().getClassLoader()).get()
The fact that getJobExecutionResult required a classloader is what threw me
off. Since I’m using an
thank you Chesnay. I did verified that count works with datadog. Please
link here the ticket once you create it. Meanwhile, is there any workaround
for now?
Fanbin
On Tue, Dec 8, 2020 at 2:56 AM Chesnay Schepler wrote:
> It appears that the datadog reporter does not report histograms. I'll
I need to elaborate on my use case. I would like the SQL api to do
aggregation for me in an SQL TUMBLING window.
But I want the next window to perform business logic on all the
records just aggregated in a DataStream ProcessWindowFunction.
This would be a mix of SQL and DataStream API.
On Tue,
GIVEN two windows (ProcessWindowFunction), window A, and window B,
AND window A is a tumbling processing time window of 15 minutes
AND 20 records entered window A, and performs its business logic.
How can I assure that Window B will process exactly all the records
that left window A within
Hi,
This exception looks like it was thrown by a downstream Task/TaskManager
when trying to read a message/packet from some upstream Task/TaskManager
and that connection between two TaskManagers was reseted (closed abruptly).
So it's the case:
> involves communicating with other non-collocated
I believe it was solved in 1.11 by FLINK-15911 [1]
I tried setting taskmanager.rpc.port to 1 for 1.12 and got
tcp6 0 0 :::1:::*LISTEN
13768/java
[1]
https://issues.apache.org/jira/browse/FLINK-15911
Regards,
Roman
On Tue, Dec 8, 2020
Hello,
I'd like to better understand delete behavior of AggregateFunctions. Let's
assume there's an aggregate of `user_id` to a set of `group_ids` for groups
belonging to that user.
`user_id_1 -> [group_id_1, group_id_2, etc.]`
Now let's assume sometime later that deletes arrive for all rows
I set up the following lookup cache values:
'lookup.cache.max-rows' = '20'
'lookup.cache.ttl' = '1min'
for a jdbc connector.
This table currently only has about 2 records in it. However,
since I set the TTL to 1 minute, I expected the job to query that
table every minute.
The
I'm trying to figure out a way to make Flink jobmanager (in HA) connect to
zookeeper over SSL/TLS. It doesn't seem like there are native properties
like Kafka has that support this interaction yet. Is this true or is there
some way that I can go about doing this?
Hello, Piotr.
Thank you.
This is an error logged to the taskmanager just before it became "lost" to
the jobmanager (i.e., reported as "lost" in the jobmanager log just before
the job restart). In what context would this particular error (not the
root-root cause you referred to) be thrown from a
I've notice that jobmanager ports all listen on all interfaces by default, as
well as data port on the taskmanager.
The only exception is the taskmanager RPC port,
```
bash-4.2$ netstat -lpn | grep 612
tcp0 0 172.20.54.176:6121 0.0.0.0:* LISTEN
54/java
tcp
@Benchao Li 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
FLink,可能我的Case 太特殊了.
我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要
filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
还要注意的是 even time 是 create_time, 这里问题非常大:
1. 很多表都有 create
Hi Kien,
I am pulling in Yun who might know better.
Regards,
Roman
On Sun, Dec 6, 2020 at 3:52 AM Truong Duc Kien
wrote:
> Hi all,
>
> We are thinking about enabling RocksDB metrics to better monitor our
> pipeline. However, since they will have performance impact, we will have to
> be
Thank you for the clarification.
On Tue, Dec 8, 2020 at 8:14 AM Khachatryan Roman
wrote:
>
> Hi Marco,
>
> Yes, if TTL is not configured then the state will never expire (will stay
> forever until deleted explicitly).
>
> Regards,
> Roman
>
>
> On Tue, Dec 8, 2020 at 5:09 PM Marco Villalobos
Thank you very much!
On Tue, Dec 8, 2020 at 8:26 AM Khachatryan Roman
wrote:
>
> Hi Marco,
>
> You can find the list of the supported time units in TimeUtils javadoc [1]:
> DAYS: "d", "day"
> HOURS: "h", "hour"
> MINUTES: "min", "minute"
> SECONDS: "s", "sec", "second"
> MILLISECONDS: "ms",
Hi Marco,
You can find the list of the supported time units in TimeUtils javadoc [1]:
DAYS: "d", "day"
HOURS: "h", "hour"
MINUTES: "min", "minute"
SECONDS: "s", "sec", "second"
MILLISECONDS: "ms", "milli", "millisecond"
MICROSECONDS: "µs", "micro", "microsecond"
NANOSECONDS: "ns", "nano",
Hi Marco,
Yes, if TTL is not configured then the state will never expire (will stay
forever until deleted explicitly).
Regards,
Roman
On Tue, Dec 8, 2020 at 5:09 PM Marco Villalobos
wrote:
> After reading
>
>
>
Thanks, Randal,
Yes, I think the only way is to partition the stream the same way as
kinesis does (as I wrote before).
Regards,
Roman
On Tue, Dec 8, 2020 at 1:38 PM Randal Pitt wrote:
> Hi Roman,
>
> We're using a custom watermarker that uses a histogram to calculate a "best
> fit" event
After reading
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html
It is unclear to me how long keyed state will exist if it has no TTL.
Is it cached forever, unless explicitly cleared or overwritten?
can somebody please explain to me?
Thank you.
Hi Narasimha,
I investigated your problem and it is caused by multiple issues. First vvp in
general cannot really handle multi job submissions per jar because the complete
deployment lifecycle in vvp is scoped around a single Flink job id.
Therefore vvp sets a generated Flink job id during
scenario:
kafka stream enriched with tableS in postgresql
Let's pretend that the postgres has an organizations, departments, and
persons table, and we want to join the full name of the kafka table
that has the person id. I also want to determine if the person id is
missing.
This requires a
In
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.html
there no allowable dimensions specified for the lookup.cache.ttl.
Can somebody please provide a list of valid values and their meaning? I
know 's' for seconds is supported. How do I specify minutes?
各位:
晚上好!
对于redis作为数据源或者sink网上有很多参照的案例,那么请问如何在一个map算子里面,先写入set结构中,然后再读取该set的长度呢?
业务需要,百思不得解,还望各位指点迷津!
祝好!
| |
追梦的废柴
|
|
邮箱:zhuimeng...@163.com
|
签名由 网易邮箱大师 定制
Hi Guowei,
1. Unfortunately the UDF and the job are not in the same fatjar. Essentially
there is only one "fatjar" containing the Flink environment + the job, the UDF
is separate.
2. Yes, that is correct.
3. As explained in 1. I don't submit job jars to the Flink environment,
Hi Roman,
We're using a custom watermarker that uses a histogram to calculate a "best
fit" event time as the data we receive can be very unordered.
As you can see we're using the timestamp from the first event in the batch,
so we're essentially sampling the timestamps rather than using them all.
Hi Randal,
Can you share the code for the 1st approach
(FlinkKinesisConsumer.setPeriodicWatermarkAssigner))?
I think the 2nd approach (flatMap) can be improved by partitioning the
stream the same way kinesis does (i.e. same partition key).
Regards,
Roman
On Mon, Dec 7, 2020 at 2:44 PM Randal
??flink1.10.1,pom??
Hi Deep,
It seems that the TypeInformation array in your code has 2 elements, but we
only need one here. This approach treats the entire csv file as a Row which has
only a one column, so there should be only one `BasicTypeInfo.STRING_TYPE_INFO`
in the array. And if you use the TextInputFormat
报错信息:
Exception in thread "main" org.apache.flink.table.client.SqlClientException:
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.api.TableException: Unsupported conversion
项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法
在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义
select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE);
有没有什么转换方法?或者只插入部分数据的方法?
Hi,
As far as I know, TableAggregateFunction is not supported yet in batch
mode[1]. You can try to use it in stream mode.
[1] https://issues.apache.org/jira/browse/FLINK-10978
Best,
Xingbo
Leonard Xu 于2020年12月8日周二 下午6:05写道:
> Hi, appleyuchi
>
> Sorry for the late reply,
> but could you
It appears that the datadog reporter does not report histograms. I'll
file an issue to fix that.
On 12/8/2020 4:42 AM, Fanbin Bu wrote:
Hi,
I followed [1] to define my own metric as:
val dropwizardHistogram = new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(500))
histogram =
hive sql ?? flink hive sql ??
select * from table1 where column1 =
"word"SQLflink SQL ??
psflink SQL select * from table1
where column1 = 'word' ??
作业数据流是 kafka -> flink ->
http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 我再debug看看~
Paul Lam 于2020年12月8日周二 下午6:00写道:
> Hi,
>
> 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
>
> Best,
> Paul Lam
>
> > 2020年12月8日 11:03,zilong xiao 写道:
> >
> > Hi Paul,
> >
Awesome, thanks!
On Tue, Dec 8, 2020 at 11:55 AM Xingbo Huang wrote:
> Hi,
>
> This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3
> and release-1.12.0 have not been released yet (VOTE has been passed). I run
> your job in release-1.12, and the plan is correct.
>
>
> [1]
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢您的答复!!
在 2020-12-08 15:57:32,"Leonard Xu" 写道:
>Hi,
>Flink 的元数据存放在catalog中的,也支持多种catalog(embedded,
>HIve,JDBC,自定义catalog),默认Flink使用内置的GenericInMemoryCatalog,这个是in
>memory的catalog,元数据都在这里,生产环境上可以使用HiveCatalog
>
>
>祝好
>Leonard
>[1]
Hi, appleyuchi
Sorry for the late reply,
but could you describe you problem more or post your exception stack? The doc
you posted has contained the section to define and register function.
And I suggest you post your entire code in email directly that can reproduce
the problem, thus the
Hi,
我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
Best,
Paul Lam
> 2020年12月8日 11:03,zilong xiao 写道:
>
> Hi Paul,
>线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
>
好的,谢谢大佬解答~
-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3
and release-1.12.0 have not been released yet (VOTE has been passed). I run
your job in release-1.12, and the plan is correct.
[1] https://issues.apache.org/jira/browse/FLINK-19675
Best,
Xingbo
László Ciople
??flink1.10.1,pom??
根据官方提供的方法,用HADOOP_CLASSPATH=`hadoop classpath`集成hadoop成功。
因为flink on yarn是用的cdh6集群,所以我想利用现有的classpath中的包含的hbase库,使用
export
HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/*:$HADOOP_CLASSPATH
然后用yarn-session创建一个flink运行环境,再用sql-client连接这个容器,创建hbase映射表,这种用法失败:分析提示是找不到hbase包。
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。
如果配置文件更新了,怎样能把广播变量的内容也更新呢?
谢谢,
王磊
Hello,
I am trying to use Flink v1.11.2 with Python and the Table API to read and
write back messages to kafka topics. I am trying to filter messages based
on the output of a udf which returns a boolean. It seems that Flink ignores
the WHERE clause in my queries and every input message is received
Hi Kye,
Almost for sure this error is not the primary cause of the failure. This
error means that the node reporting it, has detected some fatal failure on
the other side of the wire (connection reset by peer), but the original
error is somehow too slow or unable to propagate to the JobManager
80 matches
Mail list logo