Re: User / Job Manager (permissions) for Flink

2020-06-05 Thread Arvid Heise
If you are running in K8s, you could also directly use the ingest layer of that. That's especially convenient if you have managed to connect that your company's SSO. On Tue, Jun 2, 2020 at 9:38 PM Robert Metzger wrote: > Hi David, > > I guess you could also "just" put a nginx in front of

Re: Getting Window information from coGroup functin

2020-06-05 Thread Arvid Heise
Hi Sudan, it seems to be unsupported directly. You can have a hacky workaround by replicating apply[1] in your code and adjust the last line to call your CoGroupWindowFunction. [1]

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-06-05 Thread Arvid Heise
A common approach is to use a dead letter queue, which is an extra output for bad input. So the result of the read operation would look like Tuple2 (or use Either in scala) and return the parsed TBase on success or else put in the invalid record byte[]. Then in your DAG, split the handling of

Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-06-05 Thread Arvid Heise
Hi Arnaud, just to add up. The overhead of this additional map is negligible if you enable object reuse [1]. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger wrote: > I'm not 100% sure about this answer,

Re: Flink s3 streaming performance

2020-06-05 Thread Kostas Kloudas
Hi all, @Venkata, Do you have many small files being created as Arvid suggested? If yes, then I tend to agree that S3 is probably not the best sink. Although I did not get that from your description. In addition, instead of PrintStream you can have a look at the code of the SimpleStringEncoder in

Re: Stopping a job

2020-06-05 Thread Arvid Heise
Hi, could you check if this SO thread [1] helps you already? [1] https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable On Thu, Jun 4, 2020 at 7:43 PM M Singh wrote: > Hi: > > I am running a job which consumes data from Kinesis and send data to >

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Arvid Heise
Instead of changing the query, I used to embed the query in a larger context for similar works. So if you get an arbitrary query X which produces exactly one result (e.g. X = select sum(revenue) from lineorder group by 1) then you can craft a query where you add a dummy pk to the result. Table

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

2020-06-05 Thread Arvid Heise
Hi Chris, could you also try what happens when you turn incremental checkpoints off? Incremental checkpoints may create many small files which are a bad fit for HDFS. You could also evaluate other storage options (net drive, S3) if you find incremental checkpoints to be better. On Tue, Jun 2,

Re: Flink s3 streaming performance

2020-06-05 Thread Arvid Heise
Hi Venkata, are the many small files intended or is it rather an issue of our commit on checkpointing? If so then FLINK-11499 [1] should help you. Design is close to done, unfortunately implementation will not make it into 1.11. In any case, I'd look at the parameter fs.s3a.connection.maximum,

Re: Dynamically merge multiple upstream souces

2020-06-05 Thread Arvid Heise
Hi Yi, one option is to use Avro, where you define one global Avro schema as the source of truth. Then you add aliases [1] to this schema for each source where the fields are named differently. You use the same schema to read the Avro messages from Kafka and Avro automatically converts the data

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Arvid Heise
Hi Satyam, you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more. In your case, couldn't you just add a dummy column containing a constant key? select 'revenue' AS name, sum(revenue) from lineorder and then set the dummy field as PK? On

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hey Arvid, Thanks for the reply. As you suggested, rewriting the query to add a dummy output and group by the clause - "select 1, sum(revenue) from lineorder group by 1" does add a unique key column to the output, and the pipeline succeeds. However, the application may get arbitrary SQL from

Re: Creating Kafka Topic dynamically in Flink

2020-06-05 Thread Arvid Heise
Hi Prasanna, auto.create.topics.enable is only recommended for development clusters and not in production use cases (as one programming error could potentially flood the whole broker with a large amount of topics). I have experienced first hand the mess it makes. I'd suggest finding a

Re: Tumbling windows - increasing checkpoint size over time

2020-06-05 Thread Wissman, Matt
Guowei, I had a different Flink app that was using 10 or15s intervals – it had a similar behavior but not nearly as bad as the 2s interval pipeline. Both have much have much longer checkpoint intervals now. Here is the state config: state.backend: rocksdb state.checkpoints.dir: {{

Re: Flink s3 streaming performance

2020-06-05 Thread venkata sateesh` kolluru
Hi Kostas and Arvid, Thanks for your suggestions. The small files were already created and I am trying to roll few into a big file while sinking. But due to the custom bucket assigner, it is hard getting more files with in the same prefix in specified checkinpointing time. For example:

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

2020-06-05 Thread Congxian Qiu
Hi Chris >From the given exception, seems there is something wrong of the FileSystem, one reason is that Arvid gave (incremental checkpoint may generate too much small files) You can turn off incremental checkpoint or try to increase the config `state.backend.fs.memory-threshold` to see if things

Re: Auto adjusting watermarks?

2020-06-05 Thread Arvid Heise
Hi Theo, The general idea is interesting. I'd probably start with some initial out of boundness, and after collecting X elements, switch to the histogram. It sounds very valid to snapshot it. I'd probably use a union state to also support rescaling in a meaningful way. However, tbh for a

Re: Native K8S not creating TMs

2020-06-05 Thread kb
Thanks Yang for the suggestion, I have tried it and I'm still getting the same exception. Is it possible its due to the null pod name? Operation: [create] for kind: [Pod] with name: [null] in namespace: [default] failed. Best, kevin -- Sent from:

Re: Avro Arrat type validation error

2020-06-05 Thread Dawid Wysakowicz
Hi Ramana, What connector do you use or how do you instantiate the TableSource? Also which catalog do you use and how do you register your table in that catalog? The problem is that conversion from TypeInformation to DataType produces legacy types (because they cannot be mapped exactyl 1-1 to

UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hello, I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output

Re: FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

2020-06-05 Thread Vijay Balakrishnan
Hi, Resolved the issue by using a Custom Partitioner and setting RequestTimeout properties. kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner()); private static final class SerializableCustomPartitioner extends KinesisPartitioner> { private static final long

Re: [External Sender] Re: Flink sql nested elements

2020-06-05 Thread Ramana Uppala
Hi Leonard, We are using Flink 1.10 version and I can not share the complete schema but it looks like below in Hive Catalog, flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647), `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2` VARCHAR(2147483647),

Re: Creating TableSchema from the Avro Schema

2020-06-05 Thread Dawid Wysakowicz
First of all to give a back story for the deprecation we do not want to depend on the TypeInformation anymore for the types in Table API as it binds both the on-wire representation with the logical types of the SQL API. The goal is to use the DataType exclusively in the Table API (including for

Re: [External Sender] Re: Avro Arrat type validation error

2020-06-05 Thread Ramana Uppala
Hi Dawid, We are using a custom connector that is very similar to Flink Kafka Connector and instantiating TableSchema using a custom class which maps Avro types to Flink's DataTypes using TableSchema.Builder. For Array type, we have below mapping: case ARRAY: return

Run command after Batch is finished

2020-06-05 Thread Mark Davis
Hi there, I am running a Batch job with several outputs. Is there a way to run some code(e.g. release a distributed lock) after all outputs are finished? Currently I do this in a try-finally block around ExecutionEnvironment.execute() call, but I have to switch to the detached execution mode

Re: Run command after Batch is finished

2020-06-05 Thread Jeff Zhang
You can try JobListener which you can register to ExecutionEnvironment. https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java Mark Davis 于2020年6月6日周六 上午12:00写道: > Hi there, > > I am running a Batch job with several outputs. > Is

回复:flink sql 中值为null时结果都为 false

2020-06-05 Thread whirly
好的,非常感谢。 Best. Whirly. | | whirly | | 邮箱:whir...@163.com | 签名由 网易邮箱大师 定制 在2020年06月06日 11:45,Benchao Li 写道: 哇,非常赞! 我也查了一下,在SQL标准里,bool表达式是有三种value的[1],分别是true、false、unknown。 而且null正常来讲是跟任何value都不相等的,包括另外一个null [2]。 所以如果执行`SELECT null <> null`,返回结果应该unknown,在flink里,这个应该就是null,而不是true,或者false。

Re: Re: flink sql 中值为null时结果都为 false

2020-06-05 Thread Benchao Li
哇,非常赞! 我也查了一下,在SQL标准里,bool表达式是有三种value的[1],分别是true、false、unknown。 而且null正常来讲是跟任何value都不相等的,包括另外一个null [2]。 所以如果执行`SELECT null <> null`,返回结果应该unknown,在flink里,这个应该就是null,而不是true,或者false。 而如果在WHERE条件中出现这种情况的时候,比较的结果应该也是unknown[3],但是默认处理是按照false来处理的。 而`IS [NOT] DISTINCT

Re: 关于flinksql 与维表mysql的关联问题

2020-06-05 Thread Px New
Hi ,我有一个相关操作的一疑问. 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢? Michael Ran 于2020年6月4日周四 下午5:22写道: > 放到open 方法里面可以吗? > 在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.com> 写道: > >dear: 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 >

Re: flink sql 中值为null时结果都为 false

2020-06-05 Thread Benchao Li
Hi, 我又想了一下这个问题,我认为这个行为的确是有点不太合理,我建了一个issue[1] 来跟踪这个事情。 [1] https://issues.apache.org/jira/browse/FLINK-18164 whirly 于2020年6月5日周五 下午11:20写道: > 好的,可能只是我的邮件客户端显示的问题,感谢回复。 > > 关于多加一个 is not null的判断的问题,po和我都觉得有些多此一举的感觉,而且有时候条件里字段很多,每个字段之前都需要加一个 is not > null,难度也很大,且容易出错。 > > 如果能有一个配置项控制 null <>

flink 1.10SQL 报错问题求教

2020-06-05 Thread hb
Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB), 但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环. 哪位帮忙看看,不胜感激. 2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->

flink sql 中值为null时结果都为 false

2020-06-05 Thread whirly
大家好: 在 flink sql 中,如 SELECT * from order where product <> 'rubber',如果数据中的 product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于 'rubber' 只有将条件改为 where product is Null or product <> 'rubber' 才能匹配。 但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null 的判断,可以怎么办呢?

??????flink1.9 Sql ????????????????????????state??????

2020-06-05 Thread star
---- ??:"star"<3149768...@qq.com; :2020??6??5??(??) 10:40 ??:"user-zh@flink.apache.org"

FlinkKafkaProducer事务,transactionId问题反馈

2020-06-05 Thread 李杰
*一、场景说明:* flink作业逻辑:source(kakfa)-> data process (wordCount逻辑) -> sink (kafka) 1、作业A: source_topic: word_count_topic sink_topc: result_01 group_id: test-group01 2、作业B: source_topic: word_count_topic sink_topc: result_02 group_id:

Flink-1.10.0-StandAlone模式 Kafka Consumer InstanceAlreadyExistsException

2020-06-05 Thread zhang...@lakala.com
Flink-1.10.0 StandAlone部署 Kafka-0.11 JDK8 当我在同一个Flink集群部署两个应用,这两个应用被分配到了同一个TaskManager运行,使用不同的group.id消费同一个topic时,第一个应用启动正常,第二个应用启动时,遇到了如下警告信息,请问,这个警告是如何造成的,是否可以忽略,或者说如何解决。 WARNorg.apache.kafka.common.utils.AppInfoParser- Error registering AppInfo mbean

Re: flink sql ddl能设置ttl吗?

2020-06-05 Thread Leonard Xu
更新下,第二个问题,Flink sql-client中支持SET配置 TTL(ms)的: Flink SQL> set execution.min-idle-state-retention = 2; [INFO] Session property has been set. 祝好, Leonard Xu > 在 2020年6月5日,13:39,Leonard Xu 写道: > > Hi, > > 第一个问题,即将发布的1.11可以设定primary key, 不用再推断primary >

Re: flink sql 中值为null时结果都为 false

2020-06-05 Thread Benchao Li
Hi, 我这边只收到一封你第一次发的邮件,看起来应该是没有问题。(不知道是不是你邮箱客户端本地显示的问题) 关于你的问题,现在的确是这样子处理的。我想问一下,多加一个IS NOT NULL有什么问题么? whirly 于2020年6月5日周五 下午9:54写道: > 不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发 > > > > > > > > > > > > > > > 在 2020-06-05 14:25:10,"whirly" 写道: > >大家好: > >在 flink sql

Re: CheckPoint Dir 路径下引发的一些问题

2020-06-05 Thread Px New
感谢回复, 我明白了在状态恢复时具体细节,以及其他文件的产生及作用 Weihua Hu 于2020年6月5日周五 下午1:48写道: > HI, Px New > > 1. Checkpoint 保存数量可以通过参数: state.checkpoints.num-retained 来控制,默认是 1 > 2. _metadata 只是一些元数据,保存了state 的句柄,其他文件是 state 数据,由各 Task 在触发 checkpoint > 的时候上传。相反在恢复 checkpoint 的时候JM 读取_metadata 将相应句柄下发到 Task,Task 通过远端

Re: 回复: flink整合hbase

2020-06-05 Thread xueaohui_...@163.com
是有原生的,另外一个老哥也回答了, 你用java直接连接是不是也不ok,? xueaohui_...@163.com 发件人: liunaihua521 发送时间: 2020-06-05 09:31 收件人: user-zh@flink.apache.org 抄送: user-zh@flink.apache.org 主题: 回复:回复: flink整合hbase hi, sink是自己的,我还没发现flink有原生的hbase的sink,正在研究. 在2020年6月5日 09:06,xueaohui_...@163.com 写道: Sink 是自己的还是官方的。

Re: CheckPoint Dir 路径下引发的一些问题

2020-06-05 Thread Px New
哦 对此我很抱歉: 图1: https://i.loli.net/2020/06/05/SAfpnkqlOUM9hD3.png 图2: https://imgkr.cn-bj.ufileos.com/aed4cb64-dd24-4076-ba4c-a0e07bc356bf.png zhiyezou <1530130...@qq.com> 于2020年6月5日周五 下午1:58写道: > Hi > 麻烦使用第三方图床,把图片链接过来,直接贴图片的话显示不出来 > > > > > --原始邮件-- > 发件人:"Weihua

Re: flink1.9 Sql 注册的中间临时表不自动存state的吗?

2020-06-05 Thread Benchao Li
Hi, 看你的问题描述,我们可能遇到过类似的问题。 我们的问题是,从cp恢复之后,某些key跟之前的状态里的key对不上了,所以就有点类似于丢失了一部分状态。 但是我们也没有查出来具体的原因,一方面是因为问题比较难以复现,我们用线上数据,也只是有部分数据有问题, 也看不出来这部分有问题的数据有什么规律;另一方面是blink planner底层用的都是binary的数据结构,debug起来也会 比较困难。 如果你能提供一个比较稳定的能复现的数据集和测试方法,我觉得这个问题我们可以再推进解决一下。 star <3149768...@qq.com> 于2020年6月5日周五

Re: Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 Thread Jark Wu
访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。 目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。 Best, Jark [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records On Fri, 5 Jun 2020 at 19:19, sunfulin wrote: >

Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 Thread sunfulin
Hi, 想问下Flink SQL在使用DDL创建Kafka Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性. 如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错: CREATE TABLE user_behavior ( test_time TIMESTAMP(3), user_id STRING , item_id STRING , category_id STRING , behavior STRING, ts STRING, proctime as

Re:flink sql 中值为null时结果都为 false

2020-06-05 Thread whirly
不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发 在 2020-06-05 14:25:10,"whirly" 写道: >大家好: >在 flink sql 中,如 SELECT * from order where product <> 'rubber',如果数据中的 > product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于 'rubber' >只有将条件改为 where

Re:Re: Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 Thread sunfulin
谢谢Jark老大的回复。看起来在属性里增加 'timestamp.field' = 'timestamp' 应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。 在 2020-06-05 19:31:37,"Jark Wu" 写道: >访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。 >目前已经有一个

Re: Re: Flink SQL使用Kafka自带的timestamp作为事件时间

2020-06-05 Thread Jark Wu
据我所知 `timestampFromSource` 目前没有 connector 支持。。。 On Fri, 5 Jun 2020 at 22:29, sunfulin wrote: > > > > 谢谢Jark老大的回复。看起来在属性里增加 'timestamp.field' = 'timestamp' > 应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。 > > > > > > > > > > > > > > >