Hi Kumar,
They way you've implemented your custom source sounds like the right way:
Having a "running" flag checked by the run() method and changing it in
cancel().
Also, it is good that you are properly handling the interrupt set by Flink
(some people ignore InterruptedExceptions, which make it
Hi Felipe,
the file is just 80 MBs. It is probably cached in the linux page cache,
there should not be any disk IO involved.
So you are saying is that you can not further increase the throughput for
sleeps shorter than 2000 nanoseconds.
Have you tried running this w/o any Sleep / nano.time
Hi Arnaud,
Maybe I don't fully understand the constraints, but what about
stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());
The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with
open() and close() where you can handle the connection with Kudu's
Hi Mason,
your understanding is correct.
On Thu, May 28, 2020 at 8:23 AM Chen, Mason wrote:
> I think I may have just answered my own question. There’s only one Kafka
> partition, so the maximum parallelism is one and it doesn’t really make
> sense to make another kafka consumer under the same
Hi Joe,
my gut feeling is that a flatMap() is what you are looking for.
Best,
Robert
On Thu, May 28, 2020 at 7:21 PM Joe Malt wrote:
> Hi,
>
> I'm working on a custom TimestampAssigner which will do different things
> depending on the value of the extracted timestamp. One of the actions I
>
Hi Sudan,
The first process is used to tag the elements from the left and right
windows, so next they could be merged into the same stream and then they could
be assigned to the same window. Then the next window(xxx).process(new
WindowProcessFunction) defines the window operator to process
I was losing something when because I was reading the line of the
GZIPInputStream outside of the busy while loop. I changed it and now I
am having more throughput. It is also a good idea to use VisualVM to
check if the throughput is correct and where I am losing more cycles.
while
Hi Roderick,
Luckily there are no silly questions, just silly answers (so I have the
harder job here ;) )
It seems that you are trying to read data from an Arango Database, right?
What is important to understand is that the "flink job" that you are
implementing in your main() method gets
Thanks Yun. Was thinking a similar way. I had one more question.
leftSource.connect(rightSource)
.process(new TagCoprocessFunction()) // In this function, tag the
left source with "0" and the right source with "1"
.window(xx)
.process(new XX())
In this when will the window
Hi Nikola,
you could implement a custom SourceFunction that implements this in some
way: If the files are small (< 10 MB) send each file as a record, then
process it in a subsequent flatMap operation. If the files are large, split
the work across the parallel sources and read them serially in the
Hey Prasanna,
(Side note: there is not need to send this email to multiple mailing lists.
The user@ list is the right one)
Let me quickly go through your questions:
Is this usecase suited for flink ?
Based on the details you've provided: Yes
What you also need to consider are the hardware
oh I'm not using the HistoryServer; I just wrote it ;)
Are these archives all in the same location? So we're roughly looking at
5 GB of archives then?
That could indeed "just" be a resource problem. The HistoryServer
eagerly downloads all archives, and not on-demand.
The next step would be to
Hi Sudan,
As far as I know, both join and cogroup requires keys (namely partitioning),
thus for the non-keyed scenario, you may have to use low-level connect operator
to achieve it. In my opinion it should be something like
leftSource.connect(rightSource)
.process(new
目前在学习使用pyflink的Table api,请教一个问题:
1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka
topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作?
2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。
新手入门,请多指教,感谢。
Hi,
Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。
steven chen 于2020年5月29日周五 下午2:34写道:
> 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
> CREATE TABLE user_behavior (
>
> itemCode VARCHAR,
>
> ts BIGINT COMMENT '时间戳',
>
> t as TO_TIMESTAMP(FROM_UNIXTIME(ts
Thanks Robert for the reply.
On Fri 29 May, 2020, 12:31 Robert Metzger, wrote:
> Hey Prasanna,
>
> (Side note: there is not need to send this email to multiple mailing
> lists. The user@ list is the right one)
>
> Let me quickly go through your questions:
>
> Is this usecase suited for flink ?
because I am measuring one operator (all instances) and I want to
place its downstream operators in another machine in order to use
network channels.
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
On Fri, May 29, 2020 at 4:59 AM Weihua Hu wrote:
>
>
Using slotSharingGroup I can do some placement. however, I am using
two different slotSharingGroup for two different sources, even though
they are placed in the same TM. And this starts splitting the
downstream operators in different TM as well.
stream01 = source01.slot1 -> map01(4).slot1 ->
数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
CREATE TABLE user_behavior (
itemCode VARCHAR,
ts BIGINT COMMENT '时间戳',
t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'-MM-dd HH:mm:ss')),
proctime as PROCTIME(),
WATERMARK FOR t as t - INTERVAL '5' SECOND
) WITH (
'connector.type' =
Hi Prasanna
As far as I know, Flink does not allow to submit new jobgraph without
restarting it, and I actually not understand what's your 3rd question meaning.
From: Prasanna kumar
Sent: Friday, May 29, 2020 11:18
To: Yun Tang
Cc: user
Subject: Re: Running
Hi,
afaik, this feature was added because Hadoop MapReduce has it as well (
https://blog.cloudera.com/how-to-include-third-party-libraries-in-your-map-reduce-job/,
point 2.).
I don't remember having seen this anywhere in the wild. I believe it is a
good idea to simplify our codebase here.
If
Yes, these are all in the same directory, and we're at 67G right now. I'll try
with incrementally smaller directories and let you know what I find.
// ah
From: Chesnay Schepler
Sent: Friday, May 29, 2020 3:11 AM
To: Hailu, Andreas [Engineering] ;
user@flink.apache.org
Subject: Re: History
Hello,
Yes, that would definitely do the trick, with an extra mapper after keyBy to
remove the tuple so that it stays seamless. It’s less hacky that what I was
thinking of, thanks!
However, is there any plan in a future release to have rich partitioners ? That
would avoid adding overhead and
Hi Roderick,
adding to Robert's response: The easiest way is to get all needed
information injected only in the driver from which you manually pass the
config in a serializable form to your iterator. Configs could be for
example a Java Map using serializable elements, such as Strings.
If you
Although flatMap() is a valid choice, it would be more idiomatic to use
filter(). I'd apply that even before running TimestampAssigner, except when
extracting the timestamp is rather complicated. But if it's a simple field,
then it feels better to first filter bad data, and then apply any kind of
Till,
I’ll have to calculate the theoretical upper bound for our window state. Our
data distribution and rate has a predictable pattern but the data rate pattern
didn’t match the checkpoint size growth.
[cid:image001.png@01D6359B.BE0FD540]
Here is a screenshot of the checkpoint size for the
Benchao,
Thank you for your detailed explanation.
Schema Inference can solve my problem partially. For example, starting from
some time, all the json afterward will contain a new field. I think for
this case, schema inference will help.
but if I need to handle all the json events with different
Hi,
I have a usecase where i want to join two streams. I am using coGroup for
this
KeyBuilder leftKey = new
KeyBuilder(jobConfiguration.getConnectStream().getLeftKey());
KeyBuilder rightKey = new
KeyBuilder(jobConfiguration.getConnectStream().getRightKey());
Thank you both for your answers and yes, that does explain what's going
on. I will have to refactor this code.
Thanks again for your help!
Rick
On Fri, May 29, 2020 at 2:29 PM Arvid Heise wrote:
> Hi Roderick,
>
> adding to Robert's response: The easiest way is to get all needed
> information
Hi Guo,
Thanks again for your inputs. If I periodically renew the kerberos
cache using an external process(kinit) on all flink nodes in standalone
mode, will the cluster still be short lived or will the new ticket in the
cache be used and the cluster can live till the end of the new expiry ?
Thx, Xintong for the detailed explanation of memory fraction. I increased
the mem fraction now.
As I increase the defaultParallelism, I keep getting this error:
org.apache.flink.runtime.io.network.partition.consumer.
PartitionConnectionException: Connection for partition
Hi there,
Currently I have a job pipeline reading data from > 10 different kind of
sources with each having different out-of-orderness characteristics. I am
currently working on adjusting the watermarks for each source "properly". I
work with BoundedOutOfOrdernessTimestampExtractor and, as
Hi Poornapragna
I'll try to answer your questions
1. you don't need to delete the timer manually(it will be deleted after
fired), but you can delete the timers manually if you want.
2. AFAIK, triggers would not be snapshot out, but the timers will be
snapshot out
3. delete timer that was not
Hi Satyam,
Are you using blink planner in streaming mode? AFAIK, blink planner in
batch mode can sort on arbitrary columns.
Satyam Shekhar 于2020年5月30日周六 上午6:19写道:
> Hello,
>
> I am using Flink as the streaming execution engine for building a
> low-latency alerting application. The use case
Hi Robert,
Would appreciate more insights please.
What we are noticing: When the flink job is issued a stop command, the
Thread.sleep is not receiving the InterruptedException
It certainly receives the exception when the flink job is issued a cancel
command.
In both cases (cancel and stop)
Hi
Could it be store a histogram data in custom
`BoundedOutOfOrdernessTimestampExtractor`
and adjust the `maxOutOfOrderness` according to the histogram data ok for
you case? (be careful, such histogram data would not snapshot out when
checkpointing)
Best,
Congxian
Theo Diefenthal
Hello,
I am using Flink as the streaming execution engine for building a
low-latency alerting application. The use case also requires ad-hoc
querying on batch data, which I also plan to serve using Flink to avoid the
complexity of maintaining two separate engines.
My current understanding is
Hi
>From the given picture,
1. there were some checkpoint failed(but not because of timeout), could you
please check why these checkpoint would fail?
2. The checkpoint data size is the delta size for current checkpoint[1],
assume you using incremental checkpoint
3. In fig1 the checkpoint size is
这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。
参考这个文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath
Best,
tison.
Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道:
> 谢谢,请问需要怎么处理避免这个问题?
>
>
>
>
>
另外关于类加载的一般性文档,可以看下这个
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
Best,
tison.
tison 于2020年5月29日周五 下午7:46写道:
> 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。
>
> 参考这个文档
>
----
??:"zz zhang"
代码就是flink自带的例子。
public class WordCountStreamingByJava {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 设置socket数据源
DataStreamSource source = env.socketTextStream("zongteng75", 9001,
Hi,
这块工作还处于DOING状态,可以参考FLIP-90[1]
[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550
star <3149768...@qq.com> 于2020年5月29日周五 下午2:34写道:
> 请问,flink sql1.10有解析json的udf吗?没有找到
>
> 发自我的iPhone
--
Best,
Benchao Li
Hi??
??1.2.0-cdh??1.4.3??hbase connector
??jar:
flink-hbase_2.11-1.10.1.jar
hbase-client-1.4.3.jar
hbase-common-1.4.3.jar
hbase-protocol-1.4.3.jar
Best,
Xinghalo
目前在学习使用pyflink的Table api,请教一个问题:
1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka
topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作?
2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。
新手入门,请多指教,感谢。
??
hadoopTestHACluster??apipath
hdfs://TestHACluster/user/flink/test
??TestHACluster:8020??
??hiveTestHACluster:8020
StreamExecutionEnvironment
??error??Exception in thread "main"
java.lang.IllegalStateException: No operators defined in streaming topology.
Cannot executeinsert??hive
cliselectbatch
table?? EnvironmentSettings settings =
然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)
Best,
tison.
tison 于2020年5月29日周五 下午2:21写道:
> 这个问题好诡异啊,一般来说编译会在 env.execute
> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
>
> Best,
> tison.
>
>
> air23 于2020年5月29日周五 下午1:38写道:
>
>> cdh运行flink1.10 on cdh yarn 报错如下。
你运行的命令是啥?然后在哪个目录下运行的,和 flink 下载下来解压的目录是什么相对关系?
Best,
tison.
air23 于2020年5月29日周五 下午2:35写道:
> 代码就是flink自带的例子。
>
> public class WordCountStreamingByJava {
> public static void main(String[] args) throws Exception {
>
> // 创建执行环境
> StreamExecutionEnvironment env =
>
这个报错:>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
suppressed
>>> by NoRestartBackoffTimeStrategy
应该是没有读取到flink conf下面的flink-conf.yaml配置文件,里面有个task失败重启配置参数!
发件人: air23
发送时间: 2020-05-29 14:34
收件人: user-zh
主题: Re:Re: flink1.10 on yarn 问题
代码就是flink自带的例子。
public class
下面的代码是你本地运行的是吗 如果是本地需要最简单的方式 就是把hdfs-site.xml 和core-site.xml 配置文件放到资源目录下
在 2020-05-29 15:06:21,"了不起的盖茨比" <573693...@qq.com> 写道:
>请教大家一个问题 ,
>hadoop服务TestHACluster,可是我用api访问时候,填写了path
>hdfs://TestHACluster/user/flink/test
>就会去访问TestHACluster:8020, 但是我是没有那个端口的,针对这种情况怎么处理下。
谢谢
发自我的iPhone
-- 原始邮件 --
发件人: Benchao Li https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550
star <3149768...@qq.com 于2020年5月29日周五 下午2:34写道:
请问,flink sql1.10有解析json的udf吗?没有找到
发自我的iPhone
--
Best,
Benchao Li
可以排除一下是否是jar包冲突
--
发件人:Even <452232...@qq.com>
发送时间:2020年5月29日(星期五) 16:17
收件人:user-zh
主 题:Kafka Consumer反序列化错问题
Hi!
请教一个Kafka Consumer反序列问题:
一个kafkaconsumerjob 提交到Flink session cluster时运行稳定,但是独立提交到到Flink
per-job cluster
Hi,
(1)是的,
(2) 可以,自己实现下就行
祝好
Leonard Xu
> 在 2020年5月29日,16:44,op <520075...@qq.com> 写道:
>
> 大家好我有两个问题:
>
>
> 1.现有的 hbase connector 只支持 hbase 1.4.3版本吗?
> 2.可以自定义1.2.0版本的connector吗?
>
>
> 谢谢!
Hi??
Kafka Consumer
kafkaconsumerjob ??Flink session
clusterFlink per-job cluster
kafka??
flink??1.10,kafka??kafka_2.12-2.1.0consumer??val data =
env.addSource(new
我不是特别确定,但是看起来跟这两行代码有关系:
List 于2020年5月29日周五 下午3:48写道:
> 代码如下:这是error提示:Exception in thread "main" java.lang.IllegalStateException:
> No operators defined in streaming topology. Cannot execute其实insert成功了,去hive
> cli确认过,select也查了数据,为什么还是提示我没有算子被定义呢?我需要batch table吗 EnvironmentSettings
> settings =
>
你好 是cluster的 本地代码没有报错的 报错的消息贴下面了
flink1.7 时正常的。
后来我加上了flink的环境变量
#flink
export FLINK_HOME=/opt/module/flink-1.10.1
export PATH=${FLINK_HOME}/bin:$PATH
这个报错的例子 就正常跑了
但是换了另外一个任务 在1.7 和本地都是可以的。报错如下
The program finished with the
请问,flink sql1.10有解析json的udf吗?没有找到
发自我的iPhone
你的代码:w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss')
这里里面的 FROM_UNIXTIME 函数就是接受BIGINT参数,返回 VARCHAR类型的日期值,默认日期格式:-MM-dd
HH:mm:ss
然后通过TO_TIMESTAMP函数转换成TIMESTAMP类型,是timestamp(3)类型,这个只能用blink planner
cdh??hdfs8020??
hdfs://TestHACluster/user/flink/test
hdfs://TestHACluster:8020/user/flink/test
flinkTestHAClusterNamespace??hdfsHA??hive-site.xml??hdfs-site.xml
??OK??
----
??:""<13162790...@163.com;
:2020??5??29??(??) 3:20
??:"user-zh"
应该是maven-shade那边配置问题,
?? ??
----
??:"Benchao Li"
thanks very much ??
resource hdfs-site.xml
----
??:"wangweigu...@stevegame.cn"
??:
1.?? hbase connector ?? hbase 1.4.3
2.??1.2.0??connector
??
你好,我没有用confluent schema
registry的avro格式,就是调用avro生成类中的toByteBufer实现了kafka的Serilizer。
> 在 2020年5月29日,下午12:31,Leonard Xu 写道:
>
> Hi,
>> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,
> 你往kafka中写入avro格式的数据时用kafak的Serializer,写入的格式是当成confluent schemat
> registry的avro格式吧
> confluent
CC user-zh
Hi steven,
我刚意识到这个回复只是回复到了私人邮箱,没有抄送社区。现在已经抄送了user-zh邮件列表。
第二个问题是,我才意识到最初你发送的邮件列表是user,而不是user-zh。下次如果是中文的邮件,可以直接发送user-zh,而不是user。
user邮件列表推荐用英文来交流。
关于你的问题,我认为这个watermark的时区其实对你的数据计算过程是没有影响的,不管是不是存在时区偏移,
watermark跟事件时间他们两个是可以对齐的。如果你不需要把时间输出到sink,这个watermark的偏移你可以暂时先不用关注。
steven chen
Hi
如果 processing time window 没有问题,但是 event time window 有问题的话,那需要考虑 event-time
下的 watermark 生成逻辑是否符合预期,如果 watermark 没有超过 window 结束时间,则一直不会被触发。
Best,
Congxian
李佳宸 于2020年5月22日周五 下午3:27写道:
> 大家好,
>
> 我遇到一个问题一直想不明白原因,想请教大家
>
> 我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。
>
>
hi
Flink 暂时没有全局 State 的概念,所有的 state 均指单并发下的 state。不过你可以用 broadcast state[1]
来模拟全局 state
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/broadcast_state.html
Best,
Congxian
a773807...@gmail.com 于2020年5月27日周三 下午3:47写道:
> 不会,在第二次keyby的时候,根据id-name 做key,
69 matches
Mail list logo