Re: flink sql添加 null值 字段

2020-10-14 Thread Leonard Xu
Hi 你需要指定下数据类型,可以用: cast(null as varchar) as person_uuid Best Leonard > 在 2020年10月15日,12:18,Dream-底限 写道: > > hi、 > 我现在使用flink sql完成如下sql语句,但是程序无法运行,请问这个功能要怎么实现: > select null as person_uuid from tablename > 抛出异常: > org.apache.flink.client.program.ProgramInvocationException: The main method >

Re: Large state RocksDb backend increases app start time

2020-10-14 Thread Yun Tang
Hi Arpith If you use savepoint to restore RocksDB state, the actual phase is to insert original binary key-value pairs into an empty RocksDB which would be slow if state large. There existed several discussions about the optimizations of this phase [1] [2]. If you want to walk around this

flink sql添加 null值 字段

2020-10-14 Thread Dream-底限
hi、 我现在使用flink sql完成如下sql语句,但是程序无法运行,请问这个功能要怎么实现: select null as person_uuid from tablename 抛出异常: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. From line 1, column 47 to line 1, column 50: Illegal use of 'NULL' at

pyflink sql中select,where都带udf,其中一个udf失效

2020-10-14 Thread whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table =

Re: Setting JDBC connector options using JdbcCatalog

2020-10-14 Thread Leonard Xu
Hi, Dylan The table in JdbcCatalog only contains basic options, it’s normal the table from JdbcCatalog does not bring some options. Flink provides SQL Hints feature to specify or override table options[1], you can have a try. Best, Leonard [1]

Re: Flink Kerberos认证问题

2020-10-14 Thread naisili Yuan
可以通过提交flink任务的客户端配置文件实现认证,具体配置可参考: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#auth-with-external-systems caozhen 于2020年10月12日周一 下午6:05写道: > "认证的kafka是BBB.keytab" 这个是怎么设置的呢?是自己实现的kafkaSink嘛? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: Re:Re: Flink 1.10.1 checkpoint失败问题

2020-10-14 Thread Storm☀️
非常感谢。 后续我关注下这个问题,有结论反馈给大家,供参考。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flinksql如何控制结果输出的频率

2020-10-14 Thread Benchao Li
可以具体描述下你的问题么,没太看懂你的问题。 smallwong 于2020年10月14日周三 下午6:57写道: > 哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li

Setting JDBC connector options using JdbcCatalog

2020-10-14 Thread Dylan Forciea
I was experimenting with the JdbcCatalog, and I see that the options match some of the SQL WITH options. I looked at the source code, and even see that it directly references those options from JdbcDynamicTableFactory. However, I didn’t see any obvious way to set scan.fetch-size or any way to

Re: PyFlink :: Bootstrap UDF function

2020-10-14 Thread Dian Fu
Hi Rinat, It's called in single thread fashion and so there is no need for the synchronization. Besides, there is a pair of open/close methods in the ScalarFunction and you could also override them and perform the initialization work in the open method. Regards, Dian > 在

Re: Processing single events for minimum latency

2020-10-14 Thread Pankaj Chand
Hi Piotrek, Thank you for replying! I want to process each record as soon as it is ingested (or reaches an operator) without waiting for a window for records to arrive. However, by not using windows, I am not sure if each record gets emitted immediately upon processing. > You still can use

Re: Safer handling of Scala immutable collections

2020-10-14 Thread Rex Fenley
(Or small correction; a Row with a column of Array of Longs, but still) On Wed, Oct 14, 2020 at 4:46 PM Rex Fenley wrote: > I believe I found the issue: > new RowTypeInfo(Types.PRIMITIVE_ARRAY(Types.LONG())) > vs > new RowTypeInfo(createTypeInformation[Array[Long]]) > I didn't quite understand

Re: Safer handling of Scala immutable collections

2020-10-14 Thread Rex Fenley
I believe I found the issue: new RowTypeInfo(Types.PRIMITIVE_ARRAY(Types.LONG())) vs new RowTypeInfo(createTypeInformation[Array[Long]]) I didn't quite understand at the time whose type information I was meant to supply, now I do. However, I think my question still stands. Is there a way for this

Re: Safer handling of Scala immutable collections

2020-10-14 Thread Jeff Zhang
Could you share your code to reproduce it ? Rex Fenley 于2020年10月15日周四 上午5:54写道: > Hello, > > I've been playing with UDFs using the Scala API and have repeatedly run > into issues such as this: > ``` > flink-taskmanager_1| java.lang.ClassCastException: >

Re: Safer handling of Scala immutable collections

2020-10-14 Thread Rex Fenley
While trying to use a mutable.Set and later .asJava I receive the following flink-jobmanager_1 | Caused by: java.lang.ClassCastException: scala.collection.convert.Wrappers$MutableSetWrapper cannot be cast to [J flink-jobmanager_1 | at

Safer handling of Scala immutable collections

2020-10-14 Thread Rex Fenley
Hello, I've been playing with UDFs using the Scala API and have repeatedly run into issues such as this: ``` flink-taskmanager_1| java.lang.ClassCastException: scala.collection.immutable.Set$EmptySet$ cannot be cast to [J ``` Is there something that can be done on Flink's end, either to catch

PyFlink :: Bootstrap UDF function

2020-10-14 Thread Sharipov, Rinat
Hi mates ! I keep moving in my research of new features of PyFlink and I'm really excited about that functionality. My main goal is to understand how to integrate our ML registry, powered by ML Flow and PyFlink jobs and what restrictions we have. I need to bootstrap the UDF function on it's

Re: [QUERY] Multiple elastic search sinks for a single flink pipeline

2020-10-14 Thread Chesnay Schepler
Are the number of sinks fixed? If so, then you can just take the output of your map function and apply multiple filters, writing the output of each filter into a sync. You could also use a process function with side-outputs, and apply a source to each output. On 10/14/2020 6:05 PM, Vignesh

Re: Broadcasting control messages to a sink

2020-10-14 Thread Jaffe, Julian
Thanks for the suggestion Piotr! The problem is that the sink needs to have access to the schema (so that it can write the schema only once per file instead of record) and thus needs to know when the schema has been updated. In this proposed architecture, I think the sink would still need to

Runtime Dependency Issues Upgrading to Flink 1.11.2 from 1.9.2

2020-10-14 Thread Hailu, Andreas
Hi team! We're trying to upgrade our applications from 1.9.2 to 1.11.2. After re-compiling and updating our runtime dependencies to use 1.11.2, we see this LinkageError: Caused by: java.lang.LinkageError: ClassCastException: attempting to

StatsD metric name prefix change for task manager after upgrading to Flink 1.11

2020-10-14 Thread Allen Wang
Hello, We noticed that after upgrading to Flink 1.11, the StatsD metric prefix is changed from the hostname to IP address of the task manager. The Flink job runs in a k8s cluster. Here is an example of metric reported to StatsD in Flink 1.10:

Large state RocksDb backend increases app start time

2020-10-14 Thread Arpith P
Hi, I'm currently storing around 70GB of data in map sate backed by RocksDB backend . Once I restore an application from savepoint currently the application takes more than 4mins to start processing events. How can I speed this up or is there any other recommended approach. I'm using the

Re: Upgrade to Flink 1.11 in EMR 5.31 Command line interface

2020-10-14 Thread Piotr Nowojski
I'm glad to hear that :) Best regards, Piotrek śr., 14 paź 2020 o 18:28 Vijayendra Yadav napisał(a): > Thank You Piotre. I moved *flink-s3-fs-hadoop* library to plugin. Now > it's good. > > > On Wed, Oct 14, 2020 at 6:23 AM Piotr Nowojski > wrote: > >> Hi, >> >> Are you sure you are loading

Re: Upgrade to Flink 1.11 in EMR 5.31 Command line interface

2020-10-14 Thread Vijayendra Yadav
Thank You Piotre. I moved *flink-s3-fs-hadoop* library to plugin. Now it's good. On Wed, Oct 14, 2020 at 6:23 AM Piotr Nowojski wrote: > Hi, > > Are you sure you are loading the filesystems correctly? Are you using the > plugin mechanism? [1] Since Flink 1.10 plugins can only be loaded in

回复: what's the datasets used in flink sql document?

2020-10-14 Thread 大森林
much Manks for your replies I mean,where the "france revenue" in the following document ? https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html Thanks for your help~ --原始邮件-- 发件人:

[QUERY] Multiple elastic search sinks for a single flink pipeline

2020-10-14 Thread Vignesh Ramesh
My requirement is to send the data to a different ES sink (based on the data). Ex: If the data contains a particular info send it to sink1 else send it to sink2 etc(basically send it dynamically to any one sink based on the data). I also want to set parallelism separately for ES sink1, ES sink2,

Re: Dynamic file name prefix - StreamingFileSink

2020-10-14 Thread Piotr Nowojski
Great! Please let us know if it solves the issue or not. Best, Piotrek śr., 14 paź 2020 o 17:46 Vijayendra Yadav napisał(a): > Hi Piotrek, > > That is correct I was still in 1.10, I am upgrading to 1.11. > > Regards, > Vijay > > On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski > wrote: > >> Hi

Re: Dynamic file name prefix - StreamingFileSink

2020-10-14 Thread Vijayendra Yadav
Hi Piotrek, That is correct I was still in 1.10, I am upgrading to 1.11. Regards, Vijay On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski wrote: > Hi Yadav, > > What Flink version are you using? `getPartPrefix` and `getPartSufix` > methods were not public before 1.10.1/1.11.0, which might be

Re: what's the datasets used in flink sql document?

2020-10-14 Thread Piotr Nowojski
Hi, Can you link what document do you have in mind? The documentation [1]? I don't think so. There are working examples, located in the binary distribution under the `examples/table/` directory. Their code is available in the repository [2]. Best regards, Piotrek [1]

回复: what's the datasets used in flink sql document?

2020-10-14 Thread 大森林
sorry that I did not make it clear. I mean: Is there such a dataset can be downloaded to satisfy all the examples in the document? Thanks for your help --原始邮件-- 发件人:

Re: what's the datasets used in flink sql document?

2020-10-14 Thread Piotr Nowojski
Hi, It depends how you defined `orders` in your example. For example here [1] > Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime) `orders` is obtained from the environment, from a table registered under the name "Orders". You would need to first register such table, or register a

Re: Processing single events for minimum latency

2020-10-14 Thread Piotr Nowojski
Hi Pankaj, I'm not entirely sure if I understand your question. If you want to minimize latency, you should avoid using windows or any other operators, that are buffering data for long periods of time. You still can use windowing, but you might want to emit updated value of the window per every

Re: Broadcasting control messages to a sink

2020-10-14 Thread Piotr Nowojski
Hi Julian, Have you seen Broadcast State [1]? I have never used it personally, but it sounds like something you want. Maybe your job should look like: 1. read raw messages from Kafka, without using the schema 2. read schema changes and broadcast them to 3. and 5. 3. deserialize kafka records in

Re: Upgrade to Flink 1.11 in EMR 5.31 Command line interface

2020-10-14 Thread Piotr Nowojski
Hi, Are you sure you are loading the filesystems correctly? Are you using the plugin mechanism? [1] Since Flink 1.10 plugins can only be loaded in this way [2], while there were some changes to plug some holes in Flink 1.11 [3]. Best, Piotrek [1]

what's the datasets used in flink sql document?

2020-10-14 Thread ??????
Could anyone tell me what's the datasets used in flink sql document? For sql like: val revenue = orders .filter($"cCountry" === "FRANCE") .groupBy($"cID", $"cName") .select($"cID", $"cName", $"revenue".sum AS "revSum") Thanks for your help

Re: Dynamic file name prefix - StreamingFileSink

2020-10-14 Thread Piotr Nowojski
Hi Yadav, What Flink version are you using? `getPartPrefix` and `getPartSufix` methods were not public before 1.10.1/1.11.0, which might be causing this problem for you. Other than that, if you are already using Flink 1.10.1 (or newer), maybe please double check what class are you extending? The

Re: flinksql如何控制结果输出的频率

2020-10-14 Thread smallwong
哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Processing single events for minimum latency

2020-10-14 Thread Pankaj Chand
Hi all, What is the recommended way to make a Flink job that processes each event individually as soon as it comes and without waiting for a window, in order to minimize latency in the entire DAG of operators? For example, here is some sample WordCount code (without windws), followed by some

回复:flink1.11连接mysql问题

2020-10-14 Thread superainbower
HI 链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗? | | superainbower | | superainbo...@163.com | 签名由网易邮箱大师定制 在2020年08月31日 15:57,酷酷的浑蛋 写道: 关键是在sql中怎么设置,connector=jdbc 在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道: 这个问题本质是连接活性问题, 连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的

Re: NPE when checkpointing

2020-10-14 Thread Piotr Nowojski
No worries, thanks for the update! It's good to hear that it worked for you. Best regards, Piotrek wt., 13 paź 2020 o 22:43 Binh Nguyen Van napisał(a): > Hi, > > Sorry for the late reply. It took me quite a while to change the JDK > version to reproduce the issue. I confirmed that if I upgrade

Broadcasting control messages to a sink

2020-10-14 Thread Jaffe, Julian
Hey all, I’m building a Flink app that pulls in messages from a Kafka topic and writes them out to disk using a custom bucketed sink. Each message needs to be parsed using a schema that is also needed when writing in the sink. This schema is read from a remote file on a distributed file system

flinksql引入flink-parquet_2.11任务提交失败

2020-10-14 Thread 奔跑的小飞袁
hello, 我现在使用flinksql的方式读取kafka数据并且以parquet的格式写出到hdfs,当我引入flink-parquet的依赖时程序提交失败了,但是我以内置的avro、json等格式输出是正常的,下面是我的报错信息 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table

kafka table connector保留多久的数据

2020-10-14 Thread marble.zh...@coinflex.com.INVALID
你好, 用kafka table connector接过来的数据,在flink这边会保留多久,在参数列表里没有看到有这个设置,如果保留太久,内存会撑暴,比如我只想保留半个小时,之前的数据可以清除。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

pyflink sql select带特殊符号的字段名

2020-10-14 Thread whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 table = source.select("@timestamp").execute_insert('sink').get_job_client().get_job_execution_result().result()

Re: flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情

2020-10-14 Thread cxydeve...@163.com
嗯嗯,看到了,才发现还有这个地方可以看 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情

2020-10-14 Thread cxydeve...@163.com
好的,谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Required context properties mismatch in connecting the flink with mysql database

2020-10-14 Thread Dawid Wysakowicz
Hi, I think the problem is that you are using BatchTableEnvironment which is deprecated and does not support newer features such as e.g. FLIP-95 sources/sinks. I am sorry it is not more prominent in the documentation. I am not too familiar with the python API, and I am not sure if a unified

Re: 回复: 回复: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-14 Thread 奔跑的小飞袁
我之前对源码进行了修复,测试的时候没有恢复之前的源码状态,后来发现Map这种方式是不可以的 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-14 Thread Yun Gao
Hi Till, Very thanks for the feedbacks ! > 1) When restarting all tasks independent of the status at checkpoint time > (finished, running, scheduled), we might allocate more resources than we > actually need to run the remaining job. From a scheduling perspective it > would be easier if we

Re: Rocksdb - Incremental vs full checkpoints

2020-10-14 Thread Yun Tang
Hi This difference of data size of incremental vs full checkpoint is due to the different implementations. The incremental checkpoint strategy upload binary sst files while full checkpoint strategy scans the DB and write all kv entries to external DFS. As your state size is really small (only

Re: flink点查时态表支持子查询

2020-10-14 Thread Dream-底限
hi、 我想到是一个实现方案是在flink端ddl建立lookup表的时候,一张flink表对应上面说的那个外部子查询虚拟表,相当于flink建了一个视图吧 Dream-底限 于2020年10月14日周三 下午2:23写道: > hi、 > > 》》你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。 > 是的,可以理解为用一个key查询一个视图,这个视图来自于多表关联;在不做视图的情况下,直接点查外部系统的子查询,在flink端依然是原查询样式 > 依然是:JOIN

Re: why this pyflink code has no output?

2020-10-14 Thread Xingbo Huang
Hi, Which version of pyflink are you using? I think the api you are using is not the pyflink since flink 1.9. For detailed usage of pyflink, you can refer to doc[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table_api_tutorial.html Best, Xingbo 大森林

Re: flink点查时态表支持子查询

2020-10-14 Thread Dream-底限
hi、 》》你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。 是的,可以理解为用一个key查询一个视图,这个视图来自于多表关联;在不做视图的情况下,直接点查外部系统的子查询,在flink端依然是原查询样式 依然是: JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime,只不过table2不再是一个物理实表,如:table2=(select col from table) Leonard Xu 于2020年10月13日周二

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-10-14 Thread Peihui He
试了下一种解决方案,如下,可以调整sql并行度。 val table1: Table = stenv.sqlQuery("select * from test") val schema = table1.getSchema val table2 = stenv.fromDataStream(table1.toAppendStream[Row].map(item => Row.of(item.getField(0), item.getField(1)))(new RowTypeInfo(schema.getFieldTypes.toList.take(2).toArray,

Re: flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情

2020-10-14 Thread zhisheng
想获取到的话其实可以通过 REST API 去如下图的 metrics 处获取作业 source 往下 send 的数据量和速度,不过这个是单个并行度的,可以去将每个并行度的累加起来。 http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-10-14-060508.png [image: image.png] Best zhisheng Kevin Liu 于2020年10月14日周三 上午12:35写道: > 可以参考