Hello,
This can be caused by several reasons such as back-pressure, large
snapshots or bugs.
Could you please share:
- the stats of the previous (successful) checkpoints
- back-pressure metrics for sources
- which Flink version do you use?
Regards,
Roman
On Thu, Mar 11, 2021 at 7:03 AM Alexey
Hey Sonam,
I'm wondering whether it may be helpful to have a min and max parallelism,
> and the actual parallelism be determined by the scaling policy mentioned
> next?
Yes, that's certainly possible.
Thanks a lot for your input on the design of a scaling policy. Your input
is very valuable
Essentially, Does this code leak state
private static class SessionIdProcessWindowFunction
extends
ProcessWindowFunction, KeyedSessionWithSessionID<
KEY, VALUE>, KEY, TimeWindow> {
private static final long serialVersionUID = 1L;
private final static ValueStateDescriptor sessionId = new
Hello folks,
The suggestion is to use windowState() for a key key per
window state and clear the state explicitly. Also it seems that
getRuntime().getState() will return a globalWindow() where state is shared
among windows with the same key. I desire of course to have state
Hi,
We have multiple jobs that need to be deployed to a Flink cluster. Parallelism
for jobs vary and dependent on the type of work being done and so are the
memory requirements. All jobs currently use the same state backend. Since the
workloads handled by each job is different, the scaling
Let's close this issue guys please answer my questions. I am using Flink
1.8.1.
Thanks
Sri
On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:
> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
> ConfigConstants.ENV_FLINK_LIB_DIR will
Makes total sense, Thanks, I'll check it out.
On Wed, Mar 10, 2021 at 1:28 PM Till Rohrmann wrote:
> Hi Vishal,
>
> There is no specific reason why Flink does not have a Nomad HA
> implementation other than it has not been done yet. As long as Nomad
> supports leader election, service recovery
Thanks Arvid,
If too many jobs run in the same task manager JVM, will it cause too much
metaspace memory occupation?
Thanks,
Lei
On Thu, Mar 11, 2021 at 11:54 PM Arvid Heise wrote:
> Hi Lei,
>
> each application has its own classloader as such each static constant
> exists multiple times
on
/warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6
(inode 2792145632): File does not exist. [Lease. Holder:
DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2
on
/warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6
(inode 2792145632): File does not exist. [Lease. Holder:
DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2
Flink sql中如何插入null值,有人了解吗?目前,insert 语句values中直接写null在zeppelin上报错了。
|
Best,
Jimmy
|
Signature is customized by Netease Mail Master
on
/warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6
(inode 2792145632): File does not exist. [Lease. Holder:
DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2
Zeppelin 支持加载UDF jar的,可以参考下面的代码,不过架构上可能与你们的原有架构会有所差别
https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2#8iONE
https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L469
chenxyz 于2021年3月12日周五 上午9:42写道:
>
目前这种方法不可行,在公司的平台化系统里提交flink任务,自己能掌控的只有代码这块。
在 2021-03-11 16:39:24,"silence" 写道:
>启动时通过-C加到classpath里试试
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
On Thu, Mar 11, 2021 at 02:14:32PM +0100, Arvid Heise wrote:
> Hi ChangZhuo,
>
> Did you upgrade to Flink 1.12.2 and change the settings at the time? If so,
> could you maybe reset the settings to the old values on Flink 1.12.2 and
> check if the job still gets stuck? Especially, turning off
Hi Arvid,
Thanks for responding. I did check the configuration tab of the job
manager and the setting cluster.evenly-spread-out-slots: true is
there. However I'm still observing unevenness in the distribution of
source tasks. Perhaps this additional information could shed light.
Version:
I too think this would be a useful capability for the job manager to be able
to send metrics easily. Sometimes additional compute responsibilities are
placed in the job manager and having a convenient way to add telemetry data
into a metrics stream would be very useful.
--
Sent from:
Hey Till,
You are right.
I'm new to Flink, I was looking for a Java way to deploy an application
cluster. I first tried the standalone approach and changed to native (although
the official documents specify that application mode is more suitable for
production , they show only the CLI way).
Hi Robert,
Thanks for getting back to me. We are currently assessing Flink Standalone on
Kubernetes and Native Flink on Kubernetes and haven't yet decided on which
model we intend to use. We want to ensure that whichever model we choose, we'll
be able to get the benefits of the new features
Hi, folk
Using State Processor Api can I:
1. get full state of flink-application with rocksdb backend in cluster mode
(as I realised it's checkpoins or savepoints)?
2. update it?
3. get this state from other flink-application (other jar)?
4. query it with sql (Table API & SQL) to get data I
Hi, folk
Using State Processor Api can I:
1. get full state of flink-application with rocksdb backend in cluster mode
(as I realised it's checkpoins or savepoints)?
2. update it?
3. get this state from other flink-application (other jar)?
4. query it with sql (Table API & SQL) to get data I
Hi Arvid,
assuming that I have A0,B0,C0 parquet files with different schema and a
common field *ID*, I want to write them to A1,B2,C3 files respectively. My
problem is that in my code I do not want to know the full schema just by
filtering using the ID field and writing the unfiltered lines to the
What the Flink client does for you when starting an application mode
cluster in native K8s mode is to generate the K8s job specification and to
submit it to the K8s cluster. Hence, you can also do it yourself if you
prefer to manage the K8s resources directly. Writing your own client should
also
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but
perhaps I hit FLINK-21028. This lead to question, if normal via API
take-savepoint-and-cancel-job fails, what steps should be done outside Flink to
be able to resume from savepoint with new job version? Is
Hi everyone,
It seems I'm having either the same problem, or a problem similar to the one
mentioned here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-when-restoring-from-savepoint-with-missing-state-amp-POJO-modification-td39945.html
I have a POJO class that is
默认的 shard assigner
public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER =
(shard, subtasks) -> shard.hashCode();
如何shard 的数量 大于 并发度 很容易造成分布不均。
想着用这种方法,在主类使用
static ConcurrentHashMap map = new ConcurrentHashMap<>();
static AtomicInteger counter = new AtomicInteger(0);
public static
Hi Lei,
each application has its own classloader as such each static constant
exists multiple times (1 per job). So there should be no interference. You
could verify it by logging the value of the constant and see it yourself.
Best,
Arvid
On Thu, Mar 11, 2021 at 7:11 AM Lei Wang wrote:
>
Hi Abdullah,
without specific logs, it's hard to diagnose what went wrong. Could you
check in your taskmanager logs if any error occurred and add it? In Flink
UI, you can also browse the latest exceptions and look at the checkpoint
history. That may give you (and us) additional insights.
On Thu,
Hi Alexis,
could you open a new thread and post your exception? It sounds as if it
should work, but it's not for some reason.
Did you double check that the PojoSerializer is used?
On Wed, Mar 10, 2021 at 10:27 PM sardaesp <
alexis.sarda-espin...@microfocus.com> wrote:
> I'm having the same
Hi Jin,
as Till already answered on the ticket: in general, there is no guarantee
that stuff works in between different versions. Everything that builds on
public APIs is guaranteed to be forward compatible. However, in this case,
you want things to be backward-compatible, which is impossible to
Couple of use cases, I have metric representing job version, currently it bound
for a task, but I want bound it to job manager, another example I have dump to
on OOM exception configured, and on start, I want to check content of directory
with dumps and if not empty increase
Hi Arvid,
Thank you for your reply.
I am using to get input by using,
DataStream data = env.socketTextStream("localhost", 9090);
It shows a error: The main method caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
Is there any particular rule to get
Thanks for the suggestion, Arvid.
Currently my job is using producer.kafka.request.timeout.ms=9
I will try to increase to 12.
Best regards
Rainie
On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise wrote:
> Hi Rainie,
>
> This looks like the record batching in Kafka producer timed out. At this
建云,
之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s
启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制
在2021年03月11日 22:43,Kezhu Wang 写道:
有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。
确实是这样的,checkpoint 把
Hey Till,
Thank you for responding.
I've already read the link you send , but they are not enough , they don't
provide a good solution for production.
Standalone-Kubernetes is not a good approach for production for 3 main
reasons(In my opinion):
* TMs are defined as deployment which
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。
确实是这样的,checkpoint 把 serializer 也 snapshot 了。
重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。
On March 11, 2021 at 20:26:15, 赵 建云
I missed in documentation:
A KeyedProcessFunction is always a RichFunction. Therefore, access to the
RuntimeContext is always available and setup and teardown methods can be
implemented. See
RichFunction.open(org.apache.flink.configuration.Configuration) and
RichFunction.close().
Hi Abdullah,
You don't need to implement checkpointed functions for checkpointing to
work - but you may lose state if you manage it manually.
If you have enabled checkpointing, you should see it with any application
that is running. Make sure that the checkpointing interval is small enough
so
Hi Jan,
10 MB sounds very tight. How much memory are you giving your JVM? Are you
loading big data structures in your user-defined functions?
You can read about Flink's memory here [1]. You may need to lower memory
fractions or set .max if you provide only little RAM to your JVM.
I'm also
Hey Dan,
I think the logic should be correct. Mind that in the processElement we
are using *relative*Upper/LowerBound, which are inverted global bound:
relativeUpperBound = upperBound for left and -lowerBound for right
relativeLowerBound = lowerBound for left and -upperBound for right
Hey Sonam,
I'm very happy to hear that you are interested in reactive mode. Your
understanding of the limitations for 1.13 is correct. Note that you can
deploy standalone Flink on Kubernetes [1]. I'm actually currently preparing
a demo for this [2].
We are certainly aware that support for active
Hi Aeden,
the option that you mentioned should have actually caused your desired
behavior. Can you double-check that it's set for the job (you can look at
the config in the Flink UI to be 100% sure).
Another option is to simply give all task managers 2 slots. In that way,
the scheduler can only
Hi Alexey,
could you describe what you want to achieve? Most metrics are bound to a
specific task (available in RuntimeContext). You can also access them in
custom operators and state backends.
Then you have some metrics bound to taskmanager and even java processes,
but I don't see an easy way
Hi ChangZhuo,
Did you upgrade to Flink 1.12.2 and change the settings at the time? If so,
could you maybe reset the settings to the old values on Flink 1.12.2 and
check if the job still gets stuck? Especially, turning off unaligned
checkpoints (UC) should clarify if it's a general issue in Flink
Hello,
I'm learning State Processor API:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
There is example in this page with StatefulFunctionWithTime extends
KeyedProcessFunction. And here we can see method open() we need implement to
initialize state. But
Hi Maciek,
Thanks for reaching out. Only through these interactions, we know how
important certain features are to users.
Queryable State has some limitations and makes the whole system rather
fragile. Most users that try it out are disappointed that there is actually
no SQL support. If we could
Hi Hemant,
Yes, this looks like an issue with different library versions. You probably
have 3 solutions:
* use the netty version of Flink
* shade your netty into your jar with relocations
* ditch jasync and just use jdbc with a custom thread pool (little overhead)
On Wed, Mar 10, 2021 at 2:40 PM
Hi Avi,
I'm not entirely sure I understand the question. Let's say you have source
A, B, C all with different schema but all have an id. You could use the
ParquetMapInputFormat that provides a map of the records and just use a
map-lookup.
However, I'm not sure how you want to write these records
你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。
这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?
感谢~
2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:
新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。
+
Hi Rainie,
This looks like the record batching in Kafka producer timed out. At this
point, the respective records are lost forever. You probably want to tweak
your Kafka settings [1].
Usually, Flink should fail and restart at this point and recover without
data loss. However, if the transactions
Hi Suxi,
to expand a bit on the answer of Yun: it depends on which kind of algorithm
do you have. If you want to apply your C++ function to each record, then
you can go Yun's way and use a RichMapFunction to load your library and
invoke it for each record in map. If you need more records, then
建议看下集群剩余的内存情况,看是不是 140 个 TAskManager 内存不够了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
The documentation for ValueStateDescriptor documents the name parameter as -
"name - The (unique) name for the state."
What is the scope of the uniqueness? Unique within an RichFunction instance?
Unique withing job? Unique within a session cluster?
I ask because I have several jobs that use a
Hi Alexey,
>From your attached logs, it seems that the leader related config map is
reused.
Then the Flink application is recovered instead of submitting a new one.
This is
the root cause it is trying to recover from a wrong savepoint which is
specified in
your last submission.
> So how to fix
退订
--
发自我的网易邮箱手机智能版
What I mean is you could create a K8s deployment using the Flink image just
like following.
After then, you could use "kubectl exec -it {pod_name} bash" to tunnel in
and submit the Flink
python application to the existing session cluster.
apiVersion: apps/v1
kind: Deployment
metadata:
name:
退订
Hi,
In short, [1] means whether the job will trigger checkpoints, and [2] means
which operators will take action when checkpoints are triggered.
If use ExampleCountSource, flink-streaming-java should be a dependency in
pom.xml and classes such as ListState, ListStateDescriptor,
Hi,
Im using Flink 1.11.3 and run a batch job. In the log of the jobmanager I see
that all operators switched from running to finished. And then there is a
timeout of the jobmanager. And after some pause the overall status is switched
from running to finished.
Why is there a big gap in
Hi,
Yes, as Danny said, it is very hard work...
A suggestion is that you can cherry-pick some bugfixs from the new Calcite
version to your own internal Calcite branch, if you just want to fix some
bugs.
Best,
Jingsong
On Thu, Mar 11, 2021 at 2:28 PM Danny Chan wrote:
> Hi Sheng ~
>
> It is a
StatementSet inserts = tableEnv.createStatementSet();
inserts.addInsertSql("insert into xxx select * from xxx") // topic1
-》topic2任务
inserts.addInsertSql("insert into xxx select * from xxx") // topic2
-》Postgre 任务
inserts.execute();
--
Sent from: http://apache-flink.147419.n8.nabble.com/
请user-zh 不要再发邮件了
--
发件人:silence
发送时间:2021年3月11日(星期四) 16:39
收件人:user-zh
主 题:Re: flink sql如何从远程加载jar包中的udf
启动时通过-C加到classpath里试试
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink 变量在 operator 之间传递是需要序列话的。如果 DataStream<> 泛型通过基类引用,到后面的 operator
上能保留子类的完整信息并强制转换吗?
比如:
DataStream stream = source.from(SubClass);
stream.keyBy( ) {
这里的代码能判断并强制转换吗。
SubClass subObj = (SubClass) baseObj;
}
谢谢,
王磊
启动时通过-C加到classpath里试试
--
Sent from: http://apache-flink.147419.n8.nabble.com/
1:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大
我10分钟一次checkpoint,每次都增大2M,两天增大到400M,但其实我的实际应该只有20M(只做一个窗口计算)(我做savepoint之后也才20M)。
已设置了 ttl。
2:当我关闭state.backend.incremental 后 。每次checkpoint也就20M左右,不会变大了。
按我的理解:state.backend.incremental 开启后,Checkpointed Data
65 matches
Mail list logo