Flink Kafka Connector Source Parallelism

2020-05-28 Thread Chen, Mason
Hi all, I’m currently trying to understand Flink’s Kafka Connector and how parallelism affects it. So, I am running the flink playground click count job and the parallelism is set to 2 by default. However, I don’t see the 2nd subtask of the Kafka Connector sending any records:

Re: Webinar: Unlocking the Power of Apache Beam with Apache Flink

2020-05-28 Thread Marta Paes Moreira
Thanks for sharing, Aizhamal - it was a great webinar! Marta On Wed, 27 May 2020 at 23:17, Aizhamal Nurmamat kyzy wrote: > Thank you all for attending today's session! Here is the YT recording: > https://www.youtube.com/watch?v=ZCV9aRDd30U > And link to the slides: >

Re: Flink Kafka Connector Source Parallelism

2020-05-28 Thread Chen, Mason
I think I may have just answered my own question. There’s only one Kafka partition, so the maximum parallelism is one and it doesn’t really make sense to make another kafka consumer under the same group id. What threw me off is that there’s a 2nd subtask for the kafka source created even though

Re: Webinar: Unlocking the Power of Apache Beam with Apache Flink

2020-05-28 Thread Maximilian Michels
Thanks to everyone who joined and asked questions. Really enjoyed this new format! -Max On 28.05.20 08:09, Marta Paes Moreira wrote: > Thanks for sharing, Aizhamal - it was a great webinar! > > Marta > > On Wed, 27 May 2020 at 23:17, Aizhamal Nurmamat kyzy > mailto:aizha...@apache.org>> wrote:

Re: Flink TTL for MapStates and Sideoutputs implementations

2020-05-28 Thread Jaswin Shah
Thanks for responding Alexander. We have solved the problem now with ValueState now. Basically, here we are implementing outer join logic with custom keyedCoprocessFunction implementations. From: Alexander Fedulov Sent: 28 May 2020 17:24 To: Jaswin Shah Cc:

How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Guodong Wang
Hi ! I want to use Flink SQL to process some json events. It is quite challenging to define a schema for the Flink SQL table. My data source's format is some json like this { "top_level_key1": "some value", "nested_object": { "nested_key1": "abc", "nested_key2": 123, "nested_key3": ["element1",

Re: Multiple Sinks for a Single Soure

2020-05-28 Thread Alexander Fedulov
Hi Prasanna, if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ...

New dates for Flink Forward Global Virtual 2020

2020-05-28 Thread Ana Vasiliuk
Hi everyone, Flink Forward Global Virtual 2020 is now a 4-day conference, featuring two training days on October 19 & 20! The organizers have decided to extend the training program for this event to ensure that you get the most out of your time with our team of Flink experts. *New dates:* Apache

Re: Flink TTL for MapStates and Sideoutputs implementations

2020-05-28 Thread Alexander Fedulov
Hi Jaswin, I would like to clarify something first - what do you key your streams by, when joining them? It seems that what you want to do is to match each CartMessage with a corresponding Payment that has the same orderId+mid. If this is the case, you probably do not need the MapState in the

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Benchao Li
Hi Guodong, I think you almost get the answer, 1. map type, it's not working for current implementation. For example, use map, if the value if non-string json object, then `JsonNode.asText()` may not work as you wish. 2. list all fields you cares. IMO, this can fit your scenario. And you can set

Re: Installing Ververica, unable to write to file system

2020-05-28 Thread Marta Paes Moreira
Hi, Charlie. This is not the best place for questions about Ververica Platform CE. Please use community-edit...@ververica.com instead — someone will be able to support you there! If you have any questions related to Flink itself, feel free to reach out to this mailing list again in the future.

Re: Multiple Sinks for a Single Soure

2020-05-28 Thread Prasanna kumar
Alexander, Thanks for the reply. Will implement and come back in case of any questions. Prasanna. On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov wrote: > Hi Prasanna, > > if the set of all possible sinks is known in advance, side outputs will be > generic enough to express your

Custom trigger to trigger for late events

2020-05-28 Thread Poornapragna Ts
Hi, I have a simple requirement where i want to have 10 second window with allow late events upto 1 hour. Existing TumblingEventTimeWindows with EventTimeTrigger will work for this. But the EventTimeTrigger, triggers for every incoming event after watermark has passed windows max time. I don't

Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Chesnay Schepler
If it were a class-loading issue I would think that we'd see an exception of some kind. Maybe double-check that flink-shaded-hadoop is not in the lib directory. (usually I would ask for the full classpath that the HS is started with, but as it turns out this isn't getting logged :(

Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Chesnay Schepler
Looks like it is indeed stuck on downloading the archive. I searched a bit in the Hadoop JIRA and found several similar instances: https://issues.apache.org/jira/browse/HDFS-6999 https://issues.apache.org/jira/browse/HDFS-7005 https://issues.apache.org/jira/browse/HDFS-7145 It is supposed to be

Dropping messages based on timestamp.

2020-05-28 Thread Joe Malt
Hi, I'm working on a custom TimestampAssigner which will do different things depending on the value of the extracted timestamp. One of the actions I want to take is to drop messages entirely if their timestamp meets certain criteria. Of course there's no direct way to do this in the

Re: ClusterClientFactory selection

2020-05-28 Thread M Singh
HI Kostas/Yang/Lake: I am looking at aws emr and did not see the execution.target in the flink-conf.yaml file under flink/conf directory. Is it defined in another place ?   I also did search in the current flink source code and did find mention of it in the md files but not in any property

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Benchao Li
Hi Guodong, Does the RAW type meet your requirements? For example, you can specify map type, and the value for the map is the raw JsonNode parsed from Jackson. This is not supported yet, however IMO this could be supported. Guodong Wang 于2020年5月28日周四 下午9:43写道: > Benchao, > > Thank you for your

RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Hailu, Andreas
Just created a dump, here's what I see: "Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5 os_prio=0 tid=0x7f93a5a2c000 nid=0x5692 runnable [0x7f934a0d3000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at

Re: Apache Flink - Question about application restart

2020-05-28 Thread Till Rohrmann
Hi, Yarn won't resubmit the job. In case of a process failure where Yarn restarts the Flink Master, the Master will recover the submitted jobs from a persistent storage system. Cheers, Till On Thu, May 28, 2020 at 4:05 PM M Singh wrote: > Hi Till/Zhu/Yang: Thanks for your replies. > > So

RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Hailu, Andreas
Okay, I will look further to see if we're mistakenly using a version that's pre-2.6.0. However, I don't see flink-shaded-hadoop in my /lib directory for flink-1.9.1. flink-dist_2.11-1.9.1.jar flink-table-blink_2.11-1.9.1.jar flink-table_2.11-1.9.1.jar log4j-1.2.17.jar slf4j-log4j12-1.7.15.jar

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Israel Ekpo
Guowei, What do we need to do to add support for it? How do I get started on that? On Wed, May 27, 2020 at 8:53 PM Guowei Ma wrote: > Hi, > I think the StreamingFileSink could not support Azure currently. > You could find more detailed info from here[1]. > > [1]

Streaming multiple csv files

2020-05-28 Thread Nikola Hrusov
Hello, I have multiple files (file1, file2, file3) each being CSV and having different columns and data. The column headers are finite and we know their format. I would like to take them and parse them based on the column structure. I already have the parsers e.g.: file1 has columns (id,

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Till Rohrmann
Hi Israel, thanks for reaching out to the Flink community. As Guowei said, the StreamingFileSink can currently only recover from faults if it writes to HDFS or S3. Other file systems are currently not supported if you need fault tolerance. Maybe Klou can tell you more about the background and

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Guodong Wang
Benchao, Thank you for your quick reply. As you mentioned, for current scenario, approach 2 should work for me. But it is a little bit annoying that I have to modify schema to add new field types when upstream app changes the json format or adds new fields. Otherwise, my user can not refer the

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Leonard Xu
Hi, guodong > I am wondering if Flink SQL can/will support the flexible schema in the > future, It’s an interesting topic, this feature is more close to the scope of schema inference. The schema inference should come in next few releases. Best, Leonard Xu > for example, register the

Flink Iterator Functions

2020-05-28 Thread Roderick Vincent
Hi, I am brand new to Apache Flink so please excuse any silly questions. I have an Iterator function defined as below and adding it as a source to a Flink stream. But when I try to pass configuration information to it (via a Spring env), what I notice is that one of the threads calls hasNext()

Re: Tumbling windows - increasing checkpoint size over time

2020-05-28 Thread Till Rohrmann
Hi Matt, when using tumbling windows, then the checkpoint size is not only dependent on the number of keys (which is equivalent to the number of open windows) but also on how many events arrive for each open window because the windows store every window event in its state. Hence, it can be the

How do I make sure to place operator instances in specific Task Managers?

2020-05-28 Thread Felipe Gutierrez
For instance, if I have the following DAG with the respect parallelism in parenthesis (I hope the dag appears real afterall): source01 -> map01(4) -> flatmap01(4) \ |-> keyBy -> reducer(8) source02 -> map02(4) -> flatmap02(4) / And I have 4 TMs in 4 machines with 4 cores each. I would

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Israel Ekpo
Hi Till, Thanks for your feedback and guidance. It seems similar work was done for S3 filesystem where relocations were removed for those file system plugins. https://issues.apache.org/jira/browse/FLINK-11956 It appears the same needs to be done for Azure File systems. I will attempt to

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Till Rohrmann
I think what needs to be done is to implement a org.apache.flink.core.fs.RecoverableWriter for the respective file system. Similar to HadoopRecoverableWriter and S3RecoverableWriter. Cheers, Till On Thu, May 28, 2020 at 6:00 PM Israel Ekpo wrote: > Hi Till, > > Thanks for your feedback and

Re: Apache Flink - Question about application restart

2020-05-28 Thread M Singh
Hi Till/Zhu/Yang:  Thanks for your replies. So just to clarify - the job id remains same if the job restarts have not been exhausted.  Does Yarn also resubmit the job in case of failures and if so, then is the job id different. ThanksOn Wednesday, May 27, 2020, 10:05:40 AM EDT, Till

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Guodong Wang
Yes. Setting the value type as raw is one possible approach. And I would like to vote for schema inference as well. Correct me if I am wrong, IMO schema inference means I can provide a method in the table source to infer the data schema base on the runtime computation. Just like some calcite

Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-05-28 Thread LINZ, Arnaud
Hello, I would like to upgrade the performance of my Apache Kudu Sink by using the new “KuduPartitioner” of Kudu API to match Flink stream partitions with Kudu partitions to lower the network shuffling. For that, I would like to implement something like stream.partitionCustom(new

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Israel Ekpo
Thanks Till. I will take a look at that tomorrow and let you know if I hit any roadblocks. On Thu, May 28, 2020 at 12:11 PM Till Rohrmann wrote: > I think what needs to be done is to implement > a org.apache.flink.core.fs.RecoverableWriter for the respective file > system. Similar to

Flink Elastic Sink

2020-05-28 Thread aj
Hello All, I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want

Question on stream joins

2020-05-28 Thread Sudan S
Hi , I have two usecases 1. I have two streams which `leftSource` and `rightSource` which i want to join without partitioning over a window and find the difference of count of elements of leftSource and rightSource and emit the result of difference. Which is the appropriate join function ican

Re: Stateful functions Harness

2020-05-28 Thread Boris Lublinsky
Also I have noticed, that a few cludstate jars including statefun-flink-core, statefun-flink-io, statefun-flink-harness are build for Scala 11, is it possible to create versions of those for Scala 12? > On May 27, 2020, at 3:15 PM, Seth Wiesman wrote: > > Hi Boris, > > Example usage of

Re: Apache Flink - Question about application restart

2020-05-28 Thread M Singh
Thanks Till - in the case of restart of flink master - I believe the jobid will be different.  Thanks On Thursday, May 28, 2020, 11:33:38 AM EDT, Till Rohrmann wrote: Hi, Yarn won't resubmit the job. In case of a process failure where Yarn restarts the Flink Master, the Master will

Re: Re: Flink Window with multiple trigger condition

2020-05-28 Thread aj
Hi, I have implemented the below solution and its working fine but the biggest problem with this is if no event coming for the user after 30 min then I am not able to trigger because I am checking time diff from upcoming events. So when the next event comes than only it triggers but I want it to

Re: Flink Elastic Sink

2020-05-28 Thread Yangze Guo
Hi, Anuj. >From my understanding, you could send IndexRequest to the indexer in `ElasticsearchSink`. It will create a document under the given index and type. So, it seems you only need to get the timestamp and concat the `date` to your index. Am I understanding that correctly? Or do you want to

Re: Flink Elastic Sink

2020-05-28 Thread Leonard Xu
Hi,aj In the implementation of ElasticsearchSink, ElasticsearchSink won't create index and only start a Elastic client for sending requests to the Elastic cluster. You can simply extract the index(date value in your case) from your timestamp field and then put it to an IndexRequest[2],

Re: Re: Re: Flink Window with multiple trigger condition

2020-05-28 Thread Yun Gao
Hi, I think you could use timer to achieve that. In processFunction you could register a timer at specific time (event time or processing time) and get callbacked at that point. It could be registered like ctx.timerService().registerEventTimeTimer(current.lastModified + 6); More

Re: Apache Flink - Question about application restart

2020-05-28 Thread Zhu Zhu
Restarting of flink master does not change the jobId if one yarn application. To be simple, in a yarn application that runs a flink cluster, the job id of a job does not change once the job is submitted. You can even submit a flink application multiples times to that cluster (if it is session

RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Hailu, Andreas
May I also ask what version of flink-hadoop you're using and the number of jobs you're storing the history for? As of writing we have roughly 101,000 application history files. I'm curious to know if we're encountering some kind of resource problem. // ah From: Hailu, Andreas [Engineering]

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Benchao Li
Hi Guodong, After an offline discussion with Leonard. I think you get the right meaning of schema inference. But there are two problems here: 1. schema of the data is fixed, schema inference can save your effort to write the schema explicitly. 2. schema of the data is dynamic, in this case the

Re: ClusterClientFactory selection

2020-05-28 Thread Yang Wang
You could find more information about deployment target here[1]. As you mentioned, it is not defined in the flink-conf.yaml by default. For the code, it is defined in flink-core/DeploymentOptions. [1]. https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#deployment-targets Best,

Re: How do I make sure to place operator instances in specific Task Managers?

2020-05-28 Thread Weihua Hu
Hi, Felipe Flink does not support run tasks on specified TM. You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM. Can you please give the reason for specifying TM? Best Weihua Hu > 2020年5月28日 21:37,Felipe Gutierrez 写道: > > For instance, if I have

Re: Cannot start native K8s

2020-05-28 Thread Yang Wang
A quick update on this issue. The root cause of this issue is compatibility of kubernetes-client and java 8u252[1]. And we have bumped he fabric8 kubernetes-client version from 4.5.2 to 4.9.2 in master and release-1.11 branch. Now users could deploy Flink on K8s natively with java 8u252. If you

Re:Re: flink sql 写 hive分区表失败

2020-05-28 Thread Zhou Zach
多谢指点,可以了。 但是换成动态插入,有问题: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL parse failed. Encountered "p_year" at line 3, column 58. Was expecting one of: "DATE" ... "FALSE" ... "INTERVAL" ... "NULL" ... "TIME" ... "TIMESTAMP"

疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread wind.fly....@outlook.com
Hi,all: 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码: tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a")); tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'"); tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'"); 其中a是kafka表,connector属性为:

Re: flink sql 写 hive分区表失败

2020-05-28 Thread Leonard Xu
>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, > `p_month` = p_month) >|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and > `p_month` = 4 动态分区不是这样指定的,和hive的语法是一样的,下面两种应该都可以,flink这边文档少了点,可以参考[1][2] INSERT INTO dwdCatalog.dwd.t1_copy

flink-sql watermark问题

2020-05-28 Thread guaishushu1...@163.com
flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark 但是long这样转换后也可以生成watermark很奇怪? CREATE TABLE user_log ( response_size int, rowtime BIGINT, w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'), WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟 ) guaishushu1...@163.com

recursive.file.enumeration使用问题

2020-05-28 Thread 阿华田
使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误: java.io.IOException: Error opening the InputSplit hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15.log.flume2_idcfeature_kafkamq.tmp [0,134217728]: File does not exist: /xxx/ds=2020-05-28/hour=15/ | | 王志华 | | a15733178...@163.com |

Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。 wind.fly@outlook.com 于2020年5月28日周四 下午5:02写道: > Hi, Benchao: > 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次? > > > > > Best, > Junbao Zhang >

回复: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread wind.fly....@outlook.com
Hi, Benchao: DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint? Best, Junbao Zhang 发件人: Benchao Li 发送时间: 2020年5月28日 17:05 收件人: user-zh 主题: Re: 疑问:flink sql

Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
Hi, 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么? 第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。 wind.fly@outlook.com 于2020年5月28日周四 下午5:27写道: > Hi, Benchao: > >

Re:Re: flink sql 写 hive分区表失败

2020-05-28 Thread Zhou Zach
回复的好详细!而且引出了相关的测试用例 Thanks very much! 在 2020-05-28 14:23:33,"Leonard Xu" 写道: > >>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, >> `p_month` = p_month) >>|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and >> `p_month` = 4 >

Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。 wind.fly@outlook.com 于2020年5月28日周四 下午3:14写道: > Hi,all: > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码: > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a")); >

Re: 关于kubernetes native配置的疑问

2020-05-28 Thread Yangze Guo
您好,我来回来一下第一个大问题 1. resources.requests.cpu和resources.limits.cpu都会被设置为kubernetes.jobmanager.cpu 2. external-resource..kubernetes.config-key 是为1.11的新特性扩展资源框架[1]而加入的。请不要使用它来配置cpu和memory。 [1] https://issues.apache.org/jira/browse/FLINK-17044 Best, Yangze Guo On Thu, May 28, 2020 at 3:48 PM wrote:

回复:recursive.file.enumeration使用问题

2020-05-28 Thread 阿华田
说明一下 读取的数据还没有到今天的数据 也就是提示文件不存在的目录xxx/ds=2020-05-28 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年05月28日 16:36,阿华田 写道: 使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误: java.io.IOException: Error opening the InputSplit

回复: flink-sql watermark问题

2020-05-28 Thread 112039...@qq.com
w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),这个语句产生的就是一个timestamp的数据Flink内置函数:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/functions/systemFunctions.htmlFROM_UNIXTIME(numeric[, string]): Returns a representation of the numeric argument as a

flink写入hbase 报错如下 还会导致反压 任务变慢

2020-05-28 Thread air23
2020-05-28 16:54:23,867 INFO org.apache.hadoop.hbase.client.AsyncRequestFutureImpl - id=2, table=GC_SCHEM:mon1, attempt=7/16, failureCount=427ops, last exception=org.apache.hadoop.hbase.RegionTooBusyException: org.apache.hadoop.hbase.RegionTooBusyException: Above memstore limit,

回复: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread wind.fly....@outlook.com
Hi, Benchao: 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次? Best, Junbao Zhang 发件人: Benchao Li 发送时间: 2020年5月28日 15:59 收件人: user-zh 主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread zhisheng
Hi,Benchao http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-05-28-093940.jpg 这张图里面说的 TableEnvironment 不支持 UDAF/UDTF,那么如果想要用的话暂时有什么解决方法吗?社区大概什么时候会支持? Thanks! Benchao Li 于2020年5月28日周四 下午5:35写道: > Hi, > > 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么? > >

回复:关于kubernetes native配置的疑问

2020-05-28 Thread a511955993
感谢两位大佬的回复,期待native kubernetes更多的特性出现 | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 在2020年05月28日 17:39,Yang Wang 写道: 您好, 目前native方式还没有办法来挂载volume,包括hostpath、persistent volume等 这个目前已经有了JIRA ticket[1],但是还没有开始做 如果你感兴趣,可以参与进来一起 [1]. https://issues.apache.org/jira/browse/FLINK-15649

Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
Hi zhisheng, 这块内容在FLIP-84[1]里面做了大量的重构和优化,而且大部分工作在1.11已经完成了,你们可以看下。 [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 wind.fly@outlook.com 于2020年5月28日周四 下午5:45写道: > Hi, > >

Re: flink-sql watermark问题

2020-05-28 Thread Benchao Li
Hi, 没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。 之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1] [1] https://issues.apache.org/jira/browse/FLINK-16938 guaishushu1...@163.com 于2020年5月28日周四 下午4:22写道: > flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark > 但是long这样转换后也可以生成watermark很奇怪? > CREATE TABLE

Re: 关于kubernetes native配置的疑问

2020-05-28 Thread Yang Wang
您好, 目前native方式还没有办法来挂载volume,包括hostpath、persistent volume等 这个目前已经有了JIRA ticket[1],但是还没有开始做 如果你感兴趣,可以参与进来一起 [1]. https://issues.apache.org/jira/browse/FLINK-15649 Best, Yang Yangze Guo 于2020年5月28日周四 下午4:11写道: > 您好,我来回来一下第一个大问题 > > 1. >

回复: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread wind.fly....@outlook.com
Hi, StreamTableEnvironment类似于TableEnvironment的Dag优化有版本规划吗?未来TableEnvironment的dag优化和StreamTableEnvironment较丰富api会不会统一? Best, Junbao Zhang 发件人: Benchao Li 发送时间: 2020年5月28日 17:35 收件人: user-zh 主题: Re: 疑问:flink sql

关于kubernetes native配置的疑问

2020-05-28 Thread a511955993
hi all 我在使用native kubernetes的时候,对几个配置项有疑问,想得到解答。 1. kubernetes.jobmanager.cpu配置项针对一个TM配置多少个cpu资源,是否在resources.requests.cpu 或者 resources.limits.cpu也做了配置? 在https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#kubernetes上看到对应的一个新的配置external-resource..kubernetes.config-key。

Re: flink写入hbase 报错如下 还会导致反压 任务变慢

2020-05-28 Thread Leonard Xu
Hi > org.apache.hadoop.hbase.RegionTooBusyException 这异常信息看起来像hbase集群在大量写入时碰到了问题,不像是flink这边的问题,建议排查下hbase侧,应该有一些参数可以优化。 Best, Leonard Xu

Re:Re: flink-sql watermark问题

2020-05-28 Thread 程龙
可以先在之前对于long类型的字段转换成Timestmap 类型 再生成watermark 在 2020-05-28 17:00:53,"Benchao Li" 写道: >Hi, > >没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。 >之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1] > >[1] https://issues.apache.org/jira/browse/FLINK-16938 > >guaishushu1...@163.com

Re:flink写入hbase 报错如下 还会导致反压 任务变慢

2020-05-28 Thread 程龙
这不是flink的问题,之前遇到过相关问题 hbase region分裂的时候 会出现此类问题 你可以看看日志是否hbase region当时正在分裂, 在 2020-05-28 16:57:35,"air23" 写道: 2020-05-28 16:54:23,867 INFO org.apache.hadoop.hbase.client.AsyncRequestFutureImpl - id=2, table=GC_SCHEM:mon1, attempt=7/16, failureCount=427ops, last

FlinkKafkaConsumer消费kafka中avro格式的消息,直接用AvroDeserializationSchema总是运行失败,自己实现了一个AbstracteserializationSchema运行成功了。

2020-05-28 Thread hyangvv
我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,代码如下: import org.apache.kafka.common.serialization.Serializer; import java.io.IOException; public class UserViewSerializer implements Serializer { @Override public byte[] serialize(String topic, UserView data) { byte[]

Re: flink 如何自定义connector

2020-05-28 Thread Peihui He
hello 正在尝试中,感谢解答珞 best wishes 111 于2020年5月28日周四 上午10:16写道: > Hi, > 想要在sqlgateway里面使用,那么可以看看下面几个条件: > 1 满足SPI的要求,能让flink自动发现实现类 > 2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下 > 3 如果与Hive集成,使用hivecatalog,那么先要注册表 > 这样就可以使用了。 > Best, > Xinghalo

Re:Re: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-28 Thread air23
可以看下你的HADOOP_CONF吗。我的配置的=/etc/hadoop/conf。 开源的Hadoop版本 这个也放了 在 2020-05-28 09:36:10,"wangweigu...@stevegame.cn" 写道: > >确实,你只要配置好CDH的HADOOP_CONF环境变量,同时下载开源的Hadoop版本(和CDH版本相同)放到flink > lib下,就可以访问CDH yarn,提交作业! > >目前我这边是CDH 5.16.1,Flink 1.10,提交Flink on

flink1.10 on yarn 问题

2020-05-28 Thread air23
cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题 flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了 hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf 求解答 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error:

Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
我的理解是这样的。 TableEnvironment的api主要是跟table相关的概念,这里面并没有流(DataStream)和批(DataSet)的概念。 StreamTableEnvironment是继承了TableEnvironment,但是多了跟DataStream交互的接口,比如把DataStream转成Table,以及反过来转换等。 BatchTableEnvironment也是同理,添加了跟DataSet打交道的一些接口。 LakeShen 于2020年5月29日周五 上午10:16写道: > Hi Benchao, > > TableEnvironment 和

Re: native kubernetes在不同kubernetes版本构建失败问题

2020-05-28 Thread Yang Wang
更新一下这个问题的进展: 目前java 8u252的修复已经merge到了master和release-1.11分支,你可以用这两个 分支自己编译flink binary进行验证 另外,如果确实想在在1.10使用,可以设置环境变量HTTP2_DISABLE=true Flink client端可以export HTTP2_DISABLE=true JM/TM可以通过如下Flink参数设置,当然也可以直接在build镜像的时候设置 containerized.master.env.HTTP2_DISABLE=true

Re: FlinkKafkaConsumer消费kafka中avro格式的消息,直接用AvroDeserializationSchema总是运行失败,自己实现了一个AbstracteserializationSchema运行成功了。

2020-05-28 Thread Leonard Xu
Hi, > 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口, 你往kafka中写入avro格式的数据时用kafak的Serializer,写入的格式是当成confluent schemat registry的avro格式吧 confluent schemat registry 在处理avro数据时会多写一个MAGIC_BYTE,一般avro是没有的,消费时用ConfluentRegistryAvroDeserializationSchema 试试。 Best, Leonard Xu [1]

Re: Re: flink-sql watermark问题

2020-05-28 Thread guaishushu1...@163.com
而且 flink不是只支持这种"-MM-dd'T'HH:mm:ss.SSS'Z'" 类型解析为watermark吗,就对这样有点疑惑 guaishushu1...@163.com 发件人: guaishushu1...@163.com 发送时间: 2020-05-29 10:20 收件人: Benchao Li 抄送: user-zh 主题: Re: Re: flink-sql watermark问题 就是我是long类型的时间戳,但是用TO_TIMESTAMP转换成'-MM-dd HH:mm:ss' 之后依然可以生成watermark。

Re: Re: flink-sql watermark问题

2020-05-28 Thread Benchao Li
Flink支持把Timestamp(3)这种类型声明声明为事件时间列,并且为它生成watermark。 你上面提到的"-MM-dd'T'HH:mm:ss.SSS'Z'",并不是一种数据类型,它只是Timestamp的一种string表达形式,这个主要是在json format里面把一个字符串解析为timestamp类型的时候需要的一种格式。 所以如果你有其他类型的字段,比如varchar、long、int等等,都可以通过内置函数或者udf将其转成timestamp(3)的类型,再在此基础上做watermark生成。 guaishushu1...@163.com

Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread LakeShen
Hi Benchao, TableEnvironment 和 StreamTableEnvironment 具体有什么差异吗,我看StreamTableEnvironment 继承了 TableEnvironment。 这块我不是很了解,有什么文档介绍吗,感谢。 Best, LakeShen Benchao Li 于2020年5月28日周四 下午5:52写道: > Hi zhisheng, > > 这块内容在FLIP-84[1]里面做了大量的重构和优化,而且大部分工作在1.11已经完成了,你们可以看下。 > > [1] >

Re: Re: flink-sql watermark问题

2020-05-28 Thread guaishushu1...@163.com
就是我是long类型的时间戳,但是用TO_TIMESTAMP转换成'-MM-dd HH:mm:ss' 之后依然可以生成watermark。 guaishushu1...@163.com 发件人: Benchao Li 发送时间: 2020-05-28 17:00 收件人: user-zh 主题: Re: flink-sql watermark问题 Hi, 没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。 之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1] [1]