Re: Age old stop vs cancel debate

2020-05-29 Thread Robert Metzger
Hi Kumar, They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel(). Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it

Re: Executing a controllable benchmark in Flink

2020-05-29 Thread Robert Metzger
Hi Felipe, the file is just 80 MBs. It is probably cached in the linux page cache, there should not be any disk IO involved. So you are saying is that you can not further increase the throughput for sleeps shorter than 2000 nanoseconds. Have you tried running this w/o any Sleep / nano.time

Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-05-29 Thread Robert Metzger
Hi Arnaud, Maybe I don't fully understand the constraints, but what about stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink()); The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with open() and close() where you can handle the connection with Kudu's

Re: Flink Kafka Connector Source Parallelism

2020-05-29 Thread Robert Metzger
Hi Mason, your understanding is correct. On Thu, May 28, 2020 at 8:23 AM Chen, Mason wrote: > I think I may have just answered my own question. There’s only one Kafka > partition, so the maximum parallelism is one and it doesn’t really make > sense to make another kafka consumer under the same

Re: Dropping messages based on timestamp.

2020-05-29 Thread Robert Metzger
Hi Joe, my gut feeling is that a flatMap() is what you are looking for. Best, Robert On Thu, May 28, 2020 at 7:21 PM Joe Malt wrote: > Hi, > > I'm working on a custom TimestampAssigner which will do different things > depending on the value of the extracted timestamp. One of the actions I >

Re: Re: Question on stream joins

2020-05-29 Thread Yun Gao
Hi Sudan, The first process is used to tag the elements from the left and right windows, so next they could be merged into the same stream and then they could be assigned to the same window. Then the next window(xxx).process(new WindowProcessFunction) defines the window operator to process

Re: Executing a controllable benchmark in Flink

2020-05-29 Thread Felipe Gutierrez
I was losing something when because I was reading the line of the GZIPInputStream outside of the busy while loop. I changed it and now I am having more throughput. It is also a good idea to use VisualVM to check if the throughput is correct and where I am losing more cycles. while

Re: Flink Iterator Functions

2020-05-29 Thread Robert Metzger
Hi Roderick, Luckily there are no silly questions, just silly answers (so I have the harder job here ;) ) It seems that you are trying to read data from an Arango Database, right? What is important to understand is that the "flink job" that you are implementing in your main() method gets

Re: Question on stream joins

2020-05-29 Thread Sudan S
Thanks Yun. Was thinking a similar way. I had one more question. leftSource.connect(rightSource) .process(new TagCoprocessFunction()) // In this function, tag the left source with "0" and the right source with "1" .window(xx) .process(new XX()) In this when will the window

Re: Streaming multiple csv files

2020-05-29 Thread Robert Metzger
Hi Nikola, you could implement a custom SourceFunction that implements this in some way: If the files are small (< 10 MB) send each file as a record, then process it in a subsequent flatMap operation. If the files are large, split the work across the parallel sources and read them serially in the

Re: Need Help on Flink suitability to our usecase

2020-05-29 Thread Robert Metzger
Hey Prasanna, (Side note: there is not need to send this email to multiple mailing lists. The user@ list is the right one) Let me quickly go through your questions: Is this usecase suited for flink ? Based on the details you've provided: Yes What you also need to consider are the hardware

Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-29 Thread Chesnay Schepler
oh I'm not using the HistoryServer; I just wrote it ;) Are these archives all in the same location? So we're roughly looking at 5 GB of archives then? That could indeed "just" be a resource problem. The HistoryServer eagerly downloads all archives, and not on-demand. The next step would be to

Re: Question on stream joins

2020-05-29 Thread Yun Gao
Hi Sudan, As far as I know, both join and cogroup requires keys (namely partitioning), thus for the non-keyed scenario, you may have to use low-level connect operator to achieve it. In my opinion it should be something like leftSource.connect(rightSource) .process(new

pyflink Table Api连接 外部系统问题

2020-05-29 Thread 刘亚坤
目前在学习使用pyflink的Table api,请教一个问题: 1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作? 2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。 新手入门,请多指教,感谢。

Re: 关于flink sql 滚动窗口无法输出结果集合

2020-05-29 Thread Benchao Li
Hi, Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。 steven chen 于2020年5月29日周五 下午2:34写道: > 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答 > CREATE TABLE user_behavior ( > > itemCode VARCHAR, > > ts BIGINT COMMENT '时间戳', > > t as TO_TIMESTAMP(FROM_UNIXTIME(ts

Re: Need Help on Flink suitability to our usecase

2020-05-29 Thread Prasanna kumar
Thanks Robert for the reply. On Fri 29 May, 2020, 12:31 Robert Metzger, wrote: > Hey Prasanna, > > (Side note: there is not need to send this email to multiple mailing > lists. The user@ list is the right one) > > Let me quickly go through your questions: > > Is this usecase suited for flink ?

Re: How do I make sure to place operator instances in specific Task Managers?

2020-05-29 Thread Felipe Gutierrez
because I am measuring one operator (all instances) and I want to place its downstream operators in another machine in order to use network channels. -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Fri, May 29, 2020 at 4:59 AM Weihua Hu wrote: > >

Re: How do I make sure to place operator instances in specific Task Managers?

2020-05-29 Thread Felipe Gutierrez
Using slotSharingGroup I can do some placement. however, I am using two different slotSharingGroup for two different sources, even though they are placed in the same TM. And this starts splitting the downstream operators in different TM as well. stream01 = source01.slot1 -> map01(4).slot1 ->

关于flink sql 滚动窗口无法输出结果集合

2020-05-29 Thread steven chen
数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答 CREATE TABLE user_behavior ( itemCode VARCHAR, ts BIGINT COMMENT '时间戳', t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'-MM-dd HH:mm:ss')), proctime as PROCTIME(), WATERMARK FOR t as t - INTERVAL '5' SECOND ) WITH ( 'connector.type' =

Re: Running and Maintaining Multiple Jobs

2020-05-29 Thread Yun Tang
Hi Prasanna As far as I know, Flink does not allow to submit new jobgraph without restarting it, and I actually not understand what's your 3rd question meaning. From: Prasanna kumar Sent: Friday, May 29, 2020 11:18 To: Yun Tang Cc: user Subject: Re: Running

Re: [DISCUSS] Remove dependency shipping through nested jars during job submission.

2020-05-29 Thread Robert Metzger
Hi, afaik, this feature was added because Hadoop MapReduce has it as well ( https://blog.cloudera.com/how-to-include-third-party-libraries-in-your-map-reduce-job/, point 2.). I don't remember having seen this anywhere in the wild. I believe it is a good idea to simplify our codebase here. If

RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-29 Thread Hailu, Andreas
Yes, these are all in the same directory, and we're at 67G right now. I'll try with incrementally smaller directories and let you know what I find. // ah From: Chesnay Schepler Sent: Friday, May 29, 2020 3:11 AM To: Hailu, Andreas [Engineering] ; user@flink.apache.org Subject: Re: History

RE: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-05-29 Thread LINZ, Arnaud
Hello, Yes, that would definitely do the trick, with an extra mapper after keyBy to remove the tuple so that it stays seamless. It’s less hacky that what I was thinking of, thanks! However, is there any plan in a future release to have rich partitioners ? That would avoid adding overhead and

Re: Flink Iterator Functions

2020-05-29 Thread Arvid Heise
Hi Roderick, adding to Robert's response: The easiest way is to get all needed information injected only in the driver from which you manually pass the config in a serializable form to your iterator. Configs could be for example a Java Map using serializable elements, such as Strings. If you

Re: Dropping messages based on timestamp.

2020-05-29 Thread Arvid Heise
Although flatMap() is a valid choice, it would be more idiomatic to use filter(). I'd apply that even before running TimestampAssigner, except when extracting the timestamp is rather complicated. But if it's a simple field, then it feels better to first filter bad data, and then apply any kind of

Re: Tumbling windows - increasing checkpoint size over time

2020-05-29 Thread Wissman, Matt
Till, I’ll have to calculate the theoretical upper bound for our window state. Our data distribution and rate has a predictable pattern but the data rate pattern didn’t match the checkpoint size growth. [cid:image001.png@01D6359B.BE0FD540] Here is a screenshot of the checkpoint size for the

Re: How to create schema for flexible json data in Flink SQL

2020-05-29 Thread Guodong Wang
Benchao, Thank you for your detailed explanation. Schema Inference can solve my problem partially. For example, starting from some time, all the json afterward will contain a new field. I think for this case, schema inference will help. but if I need to handle all the json events with different

Getting Window information from coGroup functin

2020-05-29 Thread Sudan S
Hi, I have a usecase where i want to join two streams. I am using coGroup for this KeyBuilder leftKey = new KeyBuilder(jobConfiguration.getConnectStream().getLeftKey()); KeyBuilder rightKey = new KeyBuilder(jobConfiguration.getConnectStream().getRightKey());

Re: Flink Iterator Functions

2020-05-29 Thread Roderick Vincent
Thank you both for your answers and yes, that does explain what's going on. I will have to refactor this code. Thanks again for your help! Rick On Fri, May 29, 2020 at 2:29 PM Arvid Heise wrote: > Hi Roderick, > > adding to Robert's response: The easiest way is to get all needed > information

Re: kerberos integration with flink

2020-05-29 Thread Nick Bendtner
Hi Guo, Thanks again for your inputs. If I periodically renew the kerberos cache using an external process(kinit) on all flink nodes in standalone mode, will the cluster still be short lived or will the new ticket in the cache be used and the cluster can live till the end of the new expiry ?

Re: Flink Dashboard UI Tasks hard limit

2020-05-29 Thread Vijay Balakrishnan
Thx, Xintong for the detailed explanation of memory fraction. I increased the mem fraction now. As I increase the defaultParallelism, I keep getting this error: org.apache.flink.runtime.io.network.partition.consumer. PartitionConnectionException: Connection for partition

Auto adjusting watermarks?

2020-05-29 Thread Theo Diefenthal
Hi there, Currently I have a job pipeline reading data from > 10 different kind of sources with each having different out-of-orderness characteristics. I am currently working on adjusting the watermarks for each source "properly". I work with BoundedOutOfOrdernessTimestampExtractor and, as

Re: Custom trigger to trigger for late events

2020-05-29 Thread Congxian Qiu
Hi Poornapragna I'll try to answer your questions 1. you don't need to delete the timer manually(it will be deleted after fired), but you can delete the timers manually if you want. 2. AFAIK, triggers would not be snapshot out, but the timers will be snapshot out 3. delete timer that was not

Re: Sorting Bounded Streams

2020-05-29 Thread Benchao Li
Hi Satyam, Are you using blink planner in streaming mode? AFAIK, blink planner in batch mode can sort on arbitrary columns. Satyam Shekhar 于2020年5月30日周六 上午6:19写道: > Hello, > > I am using Flink as the streaming execution engine for building a > low-latency alerting application. The use case

Re: Age old stop vs cancel debate

2020-05-29 Thread Senthil Kumar
Hi Robert, Would appreciate more insights please. What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException It certainly receives the exception when the flink job is issued a cancel command. In both cases (cancel and stop)

Re: Auto adjusting watermarks?

2020-05-29 Thread Congxian Qiu
Hi Could it be store a histogram data in custom `BoundedOutOfOrdernessTimestampExtractor` and adjust the `maxOutOfOrderness` according to the histogram data ok for you case? (be careful, such histogram data would not snapshot out when checkpointing) Best, Congxian Theo Diefenthal

Sorting Bounded Streams

2020-05-29 Thread Satyam Shekhar
Hello, I am using Flink as the streaming execution engine for building a low-latency alerting application. The use case also requires ad-hoc querying on batch data, which I also plan to serve using Flink to avoid the complexity of maintaining two separate engines. My current understanding is

Re: Inconsistent checkpoint durations vs state size

2020-05-29 Thread Congxian Qiu
Hi >From the given picture, 1. there were some checkpoint failed(but not because of timeout), could you please check why these checkpoint would fail? 2. The checkpoint data size is the delta size for current checkpoint[1], assume you using incremental checkpoint 3. In fig1 the checkpoint size is

Re: Kafka Consumer反序列化错问题

2020-05-29 Thread tison
这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。 参考这个文档 https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath Best, tison. Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道: > 谢谢,请问需要怎么处理避免这个问题? > > > > >

Re: Kafka Consumer反序列化错问题

2020-05-29 Thread tison
另外关于类加载的一般性文档,可以看下这个 https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html Best, tison. tison 于2020年5月29日周五 下午7:46写道: > 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。 > > 参考这个文档 >

?????? Kafka Consumer??????????????

2020-05-29 Thread Even
---- ??:"zz zhang"

Re:Re: flink1.10 on yarn 问题

2020-05-29 Thread air23
代码就是flink自带的例子。 public class WordCountStreamingByJava { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置socket数据源 DataStreamSource source = env.socketTextStream("zongteng75", 9001,

Re: flink sql的json解析udf

2020-05-29 Thread Benchao Li
Hi, 这块工作还处于DOING状态,可以参考FLIP-90[1] [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550 star <3149768...@qq.com> 于2020年5月29日周五 下午2:34写道: > 请问,flink sql1.10有解析json的udf吗?没有找到 > > 发自我的iPhone -- Best, Benchao Li

??????flink sql??hbase connector????

2020-05-29 Thread 111
Hi?? ??1.2.0-cdh??1.4.3??hbase connector ??jar: flink-hbase_2.11-1.10.1.jar hbase-client-1.4.3.jar hbase-common-1.4.3.jar hbase-protocol-1.4.3.jar Best, Xinghalo

pyflink Table Api连接 外部系统问题

2020-05-29 Thread 刘亚坤
目前在学习使用pyflink的Table api,请教一个问题: 1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作? 2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。 新手入门,请多指教,感谢。

flink ????hadoop????????

2020-05-29 Thread ??????????????
?? hadoopTestHACluster??apipath hdfs://TestHACluster/user/flink/test ??TestHACluster:8020?? ??hiveTestHACluster:8020 StreamExecutionEnvironment

flink-1.10.0 hive-1.2.1 No operators defined in streaming topology

2020-05-29 Thread ??????????????
??error??Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot executeinsert??hive cliselectbatch table?? EnvironmentSettings settings =

Re: flink1.10 on yarn 问题

2020-05-29 Thread tison
然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x) Best, tison. tison 于2020年5月29日周五 下午2:21写道: > 这个问题好诡异啊,一般来说编译会在 env.execute > 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)? > > Best, > tison. > > > air23 于2020年5月29日周五 下午1:38写道: > >> cdh运行flink1.10 on cdh yarn 报错如下。

Re: Re: flink1.10 on yarn 问题

2020-05-29 Thread tison
你运行的命令是啥?然后在哪个目录下运行的,和 flink 下载下来解压的目录是什么相对关系? Best, tison. air23 于2020年5月29日周五 下午2:35写道: > 代码就是flink自带的例子。 > > public class WordCountStreamingByJava { > public static void main(String[] args) throws Exception { > > // 创建执行环境 > StreamExecutionEnvironment env = >

回复: Re: flink1.10 on yarn 问题

2020-05-29 Thread wangweigu...@stevegame.cn
这个报错:>>> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed >>> by NoRestartBackoffTimeStrategy 应该是没有读取到flink conf下面的flink-conf.yaml配置文件,里面有个task失败重启配置参数! 发件人: air23 发送时间: 2020-05-29 14:34 收件人: user-zh 主题: Re:Re: flink1.10 on yarn 问题 代码就是flink自带的例子。 public class

Re:flink 访问hadoop集群问题

2020-05-29 Thread 程龙
下面的代码是你本地运行的是吗 如果是本地需要最简单的方式 就是把hdfs-site.xml 和core-site.xml 配置文件放到资源目录下 在 2020-05-29 15:06:21,"了不起的盖茨比" <573693...@qq.com> 写道: >请教大家一个问题 , >hadoop服务TestHACluster,可是我用api访问时候,填写了path >hdfs://TestHACluster/user/flink/test >就会去访问TestHACluster:8020, 但是我是没有那个端口的,针对这种情况怎么处理下。

回复:flink sql的json解析udf

2020-05-29 Thread star
谢谢 发自我的iPhone -- 原始邮件 -- 发件人: Benchao Li https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550 star <3149768...@qq.com 于2020年5月29日周五 下午2:34写道: 请问,flink sql1.10有解析json的udf吗?没有找到 发自我的iPhone -- Best, Benchao Li

回复:Kafka Consumer反序列化错问题

2020-05-29 Thread 夏帅
可以排除一下是否是jar包冲突 -- 发件人:Even <452232...@qq.com> 发送时间:2020年5月29日(星期五) 16:17 收件人:user-zh 主 题:Kafka Consumer反序列化错问题 Hi! 请教一个Kafka Consumer反序列问题: 一个kafkaconsumerjob 提交到Flink session cluster时运行稳定,但是独立提交到到Flink per-job cluster

Re: flink sql的hbase connector问题

2020-05-29 Thread Leonard Xu
Hi, (1)是的, (2) 可以,自己实现下就行 祝好 Leonard Xu > 在 2020年5月29日,16:44,op <520075...@qq.com> 写道: > > 大家好我有两个问题: > > > 1.现有的 hbase connector 只支持 hbase 1.4.3版本吗? > 2.可以自定义1.2.0版本的connector吗? > > > 谢谢!

Kafka Consumer??????????????

2020-05-29 Thread Even
Hi?? Kafka Consumer kafkaconsumerjob ??Flink session clusterFlink per-job cluster kafka?? flink??1.10,kafka??kafka_2.12-2.1.0consumer??val data = env.addSource(new

Re: flink-1.10.0 hive-1.2.1 No operators defined in streaming topology

2020-05-29 Thread Benchao Li
我不是特别确定,但是看起来跟这两行代码有关系: List 于2020年5月29日周五 下午3:48写道: > 代码如下:这是error提示:Exception in thread "main" java.lang.IllegalStateException: > No operators defined in streaming topology. Cannot execute其实insert成功了,去hive > cli确认过,select也查了数据,为什么还是提示我没有算子被定义呢?我需要batch table吗 EnvironmentSettings > settings = >

Re:Re: flink1.10 on yarn 问题

2020-05-29 Thread air23
你好 是cluster的 本地代码没有报错的 报错的消息贴下面了 flink1.7 时正常的。 后来我加上了flink的环境变量 #flink export FLINK_HOME=/opt/module/flink-1.10.1 export PATH=${FLINK_HOME}/bin:$PATH 这个报错的例子 就正常跑了 但是换了另外一个任务 在1.7 和本地都是可以的。报错如下 The program finished with the

flink sql的json解析udf

2020-05-29 Thread star
请问,flink sql1.10有解析json的udf吗?没有找到 发自我的iPhone

Re: Re: flink-sql watermark问题

2020-05-29 Thread wangweigu...@stevegame.cn
你的代码:w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss') 这里里面的 FROM_UNIXTIME 函数就是接受BIGINT参数,返回 VARCHAR类型的日期值,默认日期格式:-MM-dd HH:mm:ss 然后通过TO_TIMESTAMP函数转换成TIMESTAMP类型,是timestamp(3)类型,这个只能用blink planner

????: flink ????hadoop????????

2020-05-29 Thread wangweigu...@stevegame.cn
cdh??hdfs8020?? hdfs://TestHACluster/user/flink/test hdfs://TestHACluster:8020/user/flink/test flinkTestHAClusterNamespace??hdfsHA??hive-site.xml??hdfs-site.xml

??????flink ????hadoop????????

2020-05-29 Thread ??????????????
??OK?? ---- ??:""<13162790...@163.com; :2020??5??29??(??) 3:20 ??:"user-zh"

Re: Kafka Consumer反序列化错问题

2020-05-29 Thread zz zhang
应该是maven-shade那边配置问题,

?????? flink-1.10.0 hive-1.2.1 No operators defined in streaming topology

2020-05-29 Thread kcz
?? ?? ---- ??:"Benchao Li"

?????? flink ????hadoop????????

2020-05-29 Thread ??????????????
thanks very much ?? resource hdfs-site.xml ---- ??:"wangweigu...@stevegame.cn"

flink sql??hbase connector????

2020-05-29 Thread op
??: 1.?? hbase connector ?? hbase 1.4.3 2.??1.2.0??connector ??

Re: FlinkKafkaConsumer消费kafka中avro格式的消息,直接用AvroDeserializationSchema总是运行失败,自己实现了一个AbstracteserializationSchema运行成功了。

2020-05-29 Thread hyangvv
你好,我没有用confluent schema registry的avro格式,就是调用avro生成类中的toByteBufer实现了kafka的Serilizer。 > 在 2020年5月29日,下午12:31,Leonard Xu 写道: > > Hi, >> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口, > 你往kafka中写入avro格式的数据时用kafak的Serializer,写入的格式是当成confluent schemat > registry的avro格式吧 > confluent

Re: 关于flink sql 滚动窗口无法输出结果集合

2020-05-29 Thread Benchao Li
CC user-zh Hi steven, 我刚意识到这个回复只是回复到了私人邮箱,没有抄送社区。现在已经抄送了user-zh邮件列表。 第二个问题是,我才意识到最初你发送的邮件列表是user,而不是user-zh。下次如果是中文的邮件,可以直接发送user-zh,而不是user。 user邮件列表推荐用英文来交流。 关于你的问题,我认为这个watermark的时区其实对你的数据计算过程是没有影响的,不管是不是存在时区偏移, watermark跟事件时间他们两个是可以对齐的。如果你不需要把时间输出到sink,这个watermark的偏移你可以暂时先不用关注。 steven chen

Re: Session Window使用event time延迟特别高

2020-05-29 Thread Congxian Qiu
Hi 如果 processing time window 没有问题,但是 event time window 有问题的话,那需要考虑 event-time 下的 watermark 生成逻辑是否符合预期,如果 watermark 没有超过 window 结束时间,则一直不会被触发。 Best, Congxian 李佳宸 于2020年5月22日周五 下午3:27写道: > 大家好, > > 我遇到一个问题一直想不明白原因,想请教大家 > > 我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。 > >

Re: 回复:全局state

2020-05-29 Thread Congxian Qiu
hi Flink 暂时没有全局 State 的概念,所有的 state 均指单并发下的 state。不过你可以用 broadcast state[1] 来模拟全局 state [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/broadcast_state.html Best, Congxian a773807...@gmail.com 于2020年5月27日周三 下午3:47写道: > 不会,在第二次keyby的时候,根据id-name 做key,