Hi,
This happens because StreamingFileSink does not support a finite input
stream.
In the docs it's mentioned under "Important Considerations":
[image: image.png]
This behaviour often surprises users...
There's a FLIP
https://github.com/apache/flink/commit/0f30c263eebd2fc3ecbeae69a4ce9477e1d5d774
Best,
tison.
tison 于2020年3月3日周二 下午2:13写道:
> 1.9.2 和 1.10 上已经修复此问题,修改可参考
>
> https://issues.apache.org/jira/browse/FLINK-13749
>
> Best,
> tison.
>
>
> aven.wu 于2020年3月3日周二 下午2:04写道:
>
>> 组件版本 Hadoop 2.7.3,flink
1.9.2 和 1.10 上已经修复此问题,修改可参考
https://issues.apache.org/jira/browse/FLINK-13749
Best,
tison.
aven.wu 于2020年3月3日周二 下午2:04写道:
> 组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
> 该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client
> ,在Yarn集群上提交任务的时候出现了如下异常:
> java.lang.NoSuchFieldError:
组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client ,在Yarn集群上提交任务的时候出现了如下异常:
java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
at
org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
Thanks a lot, hope it will be fixed soon!
发件人: Jark Wu
发送时间: 2020年3月3日 11:25
收件人: Lu Weizheng
抄送: user@flink.apache.org
主题: Re: Table API connect method timestamp watermark assignment problem
Hi Lu,
DDL and Schema descriptor do not share the same code path. I
Hi Lu,
DDL and Schema descriptor do not share the same code path. I guess the
reason why Schema descriptor doesn't work is because of FLINK-16160.
We will fix that in the next minor release. Please use DDL to define
watermark which is also the suggested way to do that.
The current Schema
Hi flink users,
We have a problem and think flink may be a good solution for that. But I'm
new to flink and hope can get some insights from flink community :)
Here is the problem. Suppose we have a DynamoDB table which store the
inventory data, the schema is like:
* vendorId (primary key)
*
Hey guys,
I am using Flink Table API recently. I want to use EventTime and use a Kakfa
Table Connector. I think in my code Flink cannot recognize event time timestamp
field. Here is my code :
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings =
Hi,
Some previous discussion in [1], FYI
[1] https://issues.apache.org/jira/browse/FLINK-10230
Best,
Jingsong Lee
--
From:Jark Wu
Send Time:2020年3月2日(星期一) 22:42
To:Jeff Zhang
Cc:"Gyula Fóra" ; user
Subject:Re: SHOW CREATE
Hi,
I'v introduced LocalDateTime type information to flink-core.
But for compatibility reason, I revert the modification in TypeExtractor.
It seems that at present you can only use Types.LOCAL_DATE_TIME explicitly.
[1] http://jira.apache.org/jira/browse/FLINK-12850
Best,
Jingsong Lee
It may work. However, you need to set your own retry policy(similar as
`ConfiguredFailoverProxyProvider` in hadoop).
Also if you directly use namenode address and do not load HDFS
configuration, some HDFS client configuration (e.g.
dfs.client.*) will not take effect.
Best,
Yang
Nick Bendtner
side note: this question has been asked on SO as well:
https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808
(I'm mentioning this here so that we are not wasting support resources in
our community on double-debugging issues)
On
I've put some information about my situation in the ticket
https://issues.apache.org/jira/browse/FLINK-16142?focusedCommentId=17049679=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17049679
On Mon, Mar 2, 2020 at 2:55 PM Arvid Heise wrote:
> Hi Niels,
>
> to add to
Hi Tzu-Li,
I think you misunderstood Oskar's question.
The question was if there are there any plans to support Java's
LocalDateTime in Flink's "native" de/serialization mechanism. As we can read
in [1], for basic types, Flink supports all Java primitives and their boxed
form, plus void, String,
Hi,
Flink currently performs a 128-bit murmur hash on the user-provided uids to
generate the final node hashes in the stream graph. Specifically, this
library is being used [1] as the hash function.
If what you are looking for is for Flink to use exactly the provided hash,
you can use
Hi David,
Currently, I am testing it with a single source and parallelism 1 only so
not able to understand this behavior.
On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz
wrote:
> Hi Anuj,
>
> What parallelism has your source? Do all of your source tasks produce
> records? Watermark is always
Hi,
What that LOG means (i.e. "must be processed as a Generic Type") is that
Flink will have to fallback to using Kryo for the serialization for that
type.
You should be concerned about that if:
1) That type is being used for some persisted state in snapshots. That would
be the case if you've
Hi Kaymak,
To answer your last question:
there will be no data loss in that scenario you described, but there could
be duplicate processed records.
With checkpointing enabled, the Flink Kafka consumer does not commit
offsets back to Kafka until offsets in Flink checkpoints have been
persisted.
Thanks a lot Yang. What are your thoughts on catching the exception when a
name node is down and retrying with the secondary name node ?
Best,
Nick.
On Sun, Mar 1, 2020 at 9:05 PM Yang Wang wrote:
> Hi Nick,
>
> Certainly you could directly use "namenode:port" as the schema of you HDFS
> path.
Hi there!
I am tracking the latency of my operators using
"setLatencyTrackingInterval(1)" and I can see the latency metrics on
the browser http://127.0.0.1:8081/jobs//metrics . For each logical
operator I set a .uid("operator_name") and I know that Flink uses the
UidHash to create a string
Hi Anuj,
What parallelism has your source? Do all of your source tasks produce
records? Watermark is always the minimum of timestamps seen from all the
upstream operators. Therefore if some of them do not produce records the
watermark will not progress. You can read more about Watermarks and how
I am trying to use process function to some processing on a set of events.
I am using event time and keystream. The issue I am facing is The watermark
value is always coming as 9223372036854725808. I have put print statement
to debug and it shows like this:
timestamp--1583128014000
Thank you! One last question regarding Gordons response. When a pipeline
stops consuming and cleanly shuts down and there is no error during that
process, and then it gets started again and uses the last committed offset
in Kafka - there should be no data loss - or am I missing something?
In what
Hi Dawid and Kostas,
Sorry for the late reply + thank you for the troubleshooting. I put
together an example repo that reproduces the issue[1], because I did have
checkpointing enabled in my previous case -- still must be doing something
wrong with that config though.
Thanks!
Austin
[1]:
Thanks for the positive feedback and creating the JIRA ticket :)
Gyula
On Mon, Mar 2, 2020 at 3:15 PM Jark Wu wrote:
> big +1 for this. I created an issue for "SHOW CREATE TABLE" [1]. Many
> database systems also support this.
> We can also introduce "describe extended table" in the future but
big +1 for this. I created an issue for "SHOW CREATE TABLE" [1]. Many
database systems also support this.
We can also introduce "describe extended table" in the future but is
an orthogonal requirement.
Best,
Jark
[1]: https://issues.apache.org/jira/browse/FLINK-16384
On Mon, 2 Mar 2020 at
Hi all,
I wanted to use LocalDateTime field in my POJO class used in Flink's
pipeline. However when I run the job I can see in the logs following
statements:
/class java.time.LocalDateTime does not contain a getter for field date
class java.time.LocalDateTime does not contain a setter for field
+1 for this, maybe we can add 'describe extended table' like hive
Gyula Fóra 于2020年3月2日周一 下午8:49写道:
> Hi All!
>
> I am looking for the functionality to show how a table was created or show
> all the properties (connector, etc.)
>
> I could only find DESCRIBE at this point which only shows the
Hi Niels,
to add to Yang. 96m is plenty of space and was heavily tested by Alibaba.
The most likely reason and the motivation for the change is that you
probably have a classloader leak in your pipeline, quite possibly by one of
our connectors. For example, see FLINK-16142 [1].
If you could give
Hi Arvid,
It’s actually the second case. I just wanted to build a scalable generic case
where I can pass a set of kafka topics and my consumer can use the same
AvroDeserializationSchema. But yeah, I think I’ll do the fetching latest schema
part in main() separately.
Thanks for the help!
> On
Hi KristoffSC,
Regarding your questions inline.
> 1. task deployment descriptor
The `TaskDeploymentDescriptor` is used by JobMaster to submit a task to
TaskManager.
Since the JobMaster knows all the operator and its location, it will put
the upstream operator location
in the
I just run it in my IDE.
sunfulin 于2020年3月2日周一 下午9:04写道:
>
>
> Hi,
> Yep, I am using 1.10
> Did you submit the job in the cluster or just run it in your IDE? Because
> I can also run it successfully in my IDE, but cannot run it through cluster
> by a shading jar. So I think maybe the problem is
I just run it in my IDE.
sunfulin 于2020年3月2日周一 下午9:04写道:
>
>
> Hi,
> Yep, I am using 1.10
> Did you submit the job in the cluster or just run it in your IDE? Because
> I can also run it successfully in my IDE, but cannot run it through cluster
> by a shading jar. So I think maybe the problem is
Hi,
Yep, I am using 1.10
Did you submit the job in the cluster or just run it in your IDE? Because I can
also run it successfully in my IDE, but cannot run it through cluster by a
shading jar. So I think maybe the problem is related with maven jar classpath.
But not sure about that.
If
Hi,
Yep, I am using 1.10
Did you submit the job in the cluster or just run it in your IDE? Because I can
also run it successfully in my IDE, but cannot run it through cluster by a
shading jar. So I think maybe the problem is related with maven jar classpath.
But not sure about that.
If
Hi,
Sorry for my previous slightly confusing response, please take a look at the
response from Gordon.
Piotrek
> On 2 Mar 2020, at 12:05, Kaymak, Tobias wrote:
>
> Hi,
>
> let me refine my question: My pipeline is generated from Beam, so the Flink
> pipeline is a translated Beam pipeline.
Hi fulin,
I cannot reproduce your exception on current master using your SQLs. I
searched the error message, it seems that this issue[1] is similar with
yours, but it seems that current compile util does not have this issue.
BTW, do you using 1.10?
[1]
Hi,
The connectors that are listed in the AWS documentation page that you
referenced are not provided by AWS. They are bundled connectors shipped by
the Apache Flink community as part of official Flink releases, and are
discoverable as artifacts from the Maven central repository. See the
Hi fulin,
I cannot reproduce your exception on current master using your SQLs. I
searched the error message, it seems that this issue[1] is similar with
yours, but it seems that current compile util does not have this issue.
BTW, do you using 1.10?
[1]
Hi,
First of all, state is only managed by Flink (and therefore Flink's state
backends) if the state is registered by the user.
You can take a look at the documents here [1] on details on how to register
state.
A state has to be registered for it to be persisted in checkpoints /
savepoints, and
>From 1.10, Flink will enable the metaspace limit via "-XX:MaxMetaspaceSize"
by default. The default value is 96m, loading too many classes will cause
"OutOfMemoryError: Metaspace"[1]. So you need to increase the configured
value.
[1].
Hi Laurent,
You can take a look at Flink's MiniClusterResource JUnit test rule, and its
usages in the codebase for that.
The rule launches a Flink MiniCluster within the same JVM, and submission to
the mini cluster resembles how it would be submitting to an actual Flink
cluster, so you would
I set sql.exec.runtime-filter.wait to true. HiveTableSource take much
longer time but get same result. I think the reason is not commit
preAggregateAccumulator. But I dont know why it happens?
JingsongLee 于 2020年3月2日周一 下午3:22写道:
> Hi,
>
> Does runtime filter probe side wait for building runtime
Hi All,
I am wondering how Flink serializes and deserializes state from rockdb?
What is the format used?
For example, say I am doing some stateful streaming and say an object for
my class below represents a state. how does Flink serializes and
deserializes the object of MyClass below? is it just
Hello,
I would like to test a Flink application, including any problem that would
happen when deployed on a distributed cluster.
The way we do this currently is to launch a Flink cluster in Docker and run
the job on it. This setup seems heavy and might not be necessary.
Is there a way to
Hi,
Is using the session window to implement the above logic is good idea or i
should use process function.
On Sun, Mar 1, 2020 at 11:39 AM aj wrote:
> Hi ,
>
> I am working on a use case where i have a stream of events. I want to
> attach a unique id to all the events happened in a session.
>
Hi,
I'm running a lot of batch jobs on Kubernetes once in a while I get this
exception.
What is causing this?
How can I fix this?
Niels Basjes
java.lang.OutOfMemoryError: Metaspace
at java.lang.ClassLoader.defineClass1(Native Method)
at
Hi all,
In AWS documentation [1] we can see that AWS provides some set of connectors
for Flink. I would need to use an ActiveMQ one provided by [2]. Currently
I'm using Docker based stand alone Job Cluster and not AWS one.
Whats up with those connectors provided by AWS? Will I be able to use my
Hi,
let me refine my question: My pipeline is generated from Beam, so the Flink
pipeline is a translated Beam pipeline. When I update my Apache Beam
pipeline code, working with a snapshot in Flink to stop the pipeline is not
an option, as the snapshot will use the old representation of the the
I didn't get the use case completely. Are you using several sensors with
different schemas? Are processing them jointly?
Let's assume some cases:
1) Only one format, it would be best to generate a case class with
avrohugger. That is especially true if you processing actually requires
specific
Hi Tobi,
No, FlinkKafkaConsumer is not using committed Kafka’s offsets for recovery.
Offsets where to start from are stored in the checkpoint itself. Updating the
offsets back to Kafka is an optional, purely cosmetic thing from the Flink’s
perspective, so the job will start from the correct
Hi Tobi,
In this case, the job would indeed continue from the last offset that has
been committed in Kafka (assuming that you are using the
`startFromGroupOffsets` start position) for the specified group id.
However, do keep in mind that those offsets are not consistent with the
offsets written
Hi,
So I am building a data pipeline that takes input from sensors via MQTT broker
and passes it to kafka. Before it goes to kafka, I am filtering and serializing
the filtered data into avro format and keeping the schema in the registry. Now
I want to get that data in flink to process it using
Hi 大家好
最近有使用flink自带的jdbc outputformat
将flink处理后的数据写到mysql,但是如果我的数据格式有问题,比如超过mysql对应字段设置的大小,或者数据库出现问题,导致延时。这些问题都会导致这个task抛出异常,导致task
fail,进而导致整个job从checkpoint重启。
我的问题是,如果我使用的是flink提供的outputformat,我是否可以catch 异常,并且忽略。如果没有,有没有其它好的办法?
Could you please give more background on your use case? It's hard to give
any advice with the little information you gave us.
Usually, the consumer should know the schema or else it's hard to do
meaningful processing.
If it's something completely generic, then there is no way around it, but
that
??ubuntu16.04minikubeflink??
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=test \
-Dtaskmanager.memory.process.size=1024m \
-Dkubernetes.taskmanager.cpu=1 \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dresourcemanager.taskmanager-timeout=360
Hi,
Thanks for the replies. I get that it is not wise to use GenericRecord and that
is what is causing the Kryo fallback, but then if not this, how should I go
about writing a AvroSchemaRegistrySchema for when I don’t know the schema.
Without the knowledge of schema, I can’t create a class.
Hi, welcome,
For user side,
u...@flink.apache.org is for English.
user-zh@flink.apache.org is for Chinese.
d...@flink.apache.org is for development related discussions, so please not
send to it.
Best,
Jingsong Lee
--
From:王博迪
Thank you Piotr!
One last question - let's assume my source is a Kafka topic - if I stop via
the CLI with a savepoint in Flink 1.9, but do not use that savepoint when
restarting my job - the job would continue from the last offset that has
been committed in Kafka and thus I would also not
Hi Arvid,
Yes I got it..and it works as said in my previous email.
Thanks!
On Mon, Mar 2, 2020 at 12:10 AM Arvid Heise wrote:
> Hi Kant,
>
> I think Dawid meant to not add the Kafka version number like this:
>
> flinkShadowJar
>
Hi Eleanore,
the flink runner is maintained by the Beam developers, so it's best to ask
on their user list.
The documentation is, however, very clear. "Flink runner is one of the
runners whose checkpoint semantics are not compatible with current
implementation (hope to provide a solution in near
您好,
我是你们flink的新用户,有一些开发相关的问题想咨询,问一下可以和哪个邮箱交流。
谢谢
Hello, I am a new user of flink. I would like to ask you some questions related
to development. I would like to know which email can I communicate with
Thanks for replying Lee, I follow your method to debug the code and I find
the build side only call addPreAggregatedAccumulator but not call commit
method. Furthermore, I add a breakpoint at future.handleAsync in
asyncGetBroadcastBloomFilter method. But when program stop at if(e==null &&
Hi Nitish,
Just to slightly extend on Arvid's reply. As Arvid said the Kryo
serializer comes from the call to
TypeExtractor.getForClass(classOf[GenericRecord]). As a GenericRecord is
not a pojo this call will produce a GenericTypeInfo which uses Kryo
serialization.
For a reference example I
建议使用Batch模式来读取Hive table。
Best,
Jingsong Lee
--
From:like
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题
我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日
> 自动推断可能面临资源不足无法启动的问题
理论上不应该呀?Batch作业是可以部分运行的。
Best,
Jingsong Lee
--
From:like
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题
If an exception is unhandled in connectors, it will eventually be handled
by the runtime, where it is logged and the task fails. Doing both logging
and throwing an exception is an anti-pattern as the consumer of an
exception should have the sole responsibility of handling it correctly.
In your
Hi Kant,
I think Dawid meant to not add the Kafka version number like this:
flinkShadowJar "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}"
On Sun, Mar 1, 2020 at 7:31 PM kant kodali wrote:
> * What went wrong:
> Could not determine the dependencies of task ':shadowJar'.
> >
Hi Nitish,
Kryo is the fallback serializer of Flink when everything else fails. In
general, performance suffers quite a bit and it's not always applicable as
in your case. Especially, in production code, it's best to avoid it
completely.
In your case, the issue is that your provided type
69 matches
Mail list logo