Hi
你需要指定下数据类型,可以用: cast(null as varchar) as person_uuid
Best
Leonard
> 在 2020年10月15日,12:18,Dream-底限 写道:
>
> hi、
> 我现在使用flink sql完成如下sql语句,但是程序无法运行,请问这个功能要怎么实现:
> select null as person_uuid from tablename
> 抛出异常:
> org.apache.flink.client.program.ProgramInvocationException: The main method
>
Hi Arpith
If you use savepoint to restore RocksDB state, the actual phase is to insert
original binary key-value pairs into an empty RocksDB which would be slow if
state large. There existed several discussions about the optimizations of this
phase [1] [2].
If you want to walk around this
hi、
我现在使用flink sql完成如下sql语句,但是程序无法运行,请问这个功能要怎么实现:
select null as person_uuid from tablename
抛出异常:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: SQL validation failed. From line 1, column 47 to line 1,
column 50: Illegal use of 'NULL'
at
您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source')
#st_env是StreamTableEnvironment,source是kafka源端
#只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
DataTypes.BOOLEAN())
table =
Hi, Dylan
The table in JdbcCatalog only contains basic options, it’s normal the table
from JdbcCatalog does not bring some options.
Flink provides SQL Hints feature to specify or override table options[1], you
can have a try.
Best,
Leonard
[1]
可以通过提交flink任务的客户端配置文件实现认证,具体配置可参考:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#auth-with-external-systems
caozhen 于2020年10月12日周一 下午6:05写道:
> "认证的kafka是BBB.keytab" 这个是怎么设置的呢?是自己实现的kafkaSink嘛?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
非常感谢。
后续我关注下这个问题,有结论反馈给大家,供参考。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
可以具体描述下你的问题么,没太看懂你的问题。
smallwong 于2020年10月14日周三 下午6:57写道:
> 哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Best,
Benchao Li
I was experimenting with the JdbcCatalog, and I see that the options match some
of the SQL WITH options. I looked at the source code, and even see that it
directly references those options from JdbcDynamicTableFactory. However, I
didn’t see any obvious way to set scan.fetch-size or any way to
Hi Rinat,
It's called in single thread fashion and so there is no need for the
synchronization.
Besides, there is a pair of open/close methods in the ScalarFunction and you
could also override them and perform the initialization work in the open method.
Regards,
Dian
> 在
Hi Piotrek,
Thank you for replying! I want to process each record as soon as it is
ingested (or reaches an operator) without waiting for a window for records
to arrive. However, by not using windows, I am not sure if each record gets
emitted immediately upon processing.
> You still can use
(Or small correction; a Row with a column of Array of Longs, but still)
On Wed, Oct 14, 2020 at 4:46 PM Rex Fenley wrote:
> I believe I found the issue:
> new RowTypeInfo(Types.PRIMITIVE_ARRAY(Types.LONG()))
> vs
> new RowTypeInfo(createTypeInformation[Array[Long]])
> I didn't quite understand
I believe I found the issue:
new RowTypeInfo(Types.PRIMITIVE_ARRAY(Types.LONG()))
vs
new RowTypeInfo(createTypeInformation[Array[Long]])
I didn't quite understand at the time whose type information I was meant to
supply, now I do.
However, I think my question still stands. Is there a way for this
Could you share your code to reproduce it ?
Rex Fenley 于2020年10月15日周四 上午5:54写道:
> Hello,
>
> I've been playing with UDFs using the Scala API and have repeatedly run
> into issues such as this:
> ```
> flink-taskmanager_1| java.lang.ClassCastException:
>
While trying to use a mutable.Set and later .asJava I receive the following
flink-jobmanager_1 | Caused by: java.lang.ClassCastException:
scala.collection.convert.Wrappers$MutableSetWrapper cannot be cast to [J
flink-jobmanager_1 | at
Hello,
I've been playing with UDFs using the Scala API and have repeatedly run
into issues such as this:
```
flink-taskmanager_1| java.lang.ClassCastException:
scala.collection.immutable.Set$EmptySet$ cannot be cast to [J
```
Is there something that can be done on Flink's end, either to catch
Hi mates !
I keep moving in my research of new features of PyFlink and I'm really
excited about that functionality.
My main goal is to understand how to integrate our ML registry, powered by
ML Flow and PyFlink jobs and what restrictions we have.
I need to bootstrap the UDF function on it's
Are the number of sinks fixed? If so, then you can just take the output
of your map function and apply multiple filters, writing the output of
each filter into a sync. You could also use a process function with
side-outputs, and apply a source to each output.
On 10/14/2020 6:05 PM, Vignesh
Thanks for the suggestion Piotr!
The problem is that the sink needs to have access to the schema (so that it can
write the schema only once per file instead of record) and thus needs to know
when the schema has been updated. In this proposed architecture, I think the
sink would still need to
Hi team! We're trying to upgrade our applications from 1.9.2 to 1.11.2. After
re-compiling and updating our runtime dependencies to use 1.11.2, we see this
LinkageError:
Caused by: java.lang.LinkageError: ClassCastException: attempting to
Hello,
We noticed that after upgrading to Flink 1.11, the StatsD metric prefix is
changed from the hostname to IP address of the task manager.
The Flink job runs in a k8s cluster.
Here is an example of metric reported to StatsD in Flink 1.10:
Hi,
I'm currently storing around 70GB of data in map sate backed by RocksDB
backend . Once I restore an application from savepoint currently the
application takes more than 4mins to start processing events. How can I
speed this up or is there any other recommended approach.
I'm using the
I'm glad to hear that :)
Best regards,
Piotrek
śr., 14 paź 2020 o 18:28 Vijayendra Yadav
napisał(a):
> Thank You Piotre. I moved *flink-s3-fs-hadoop* library to plugin. Now
> it's good.
>
>
> On Wed, Oct 14, 2020 at 6:23 AM Piotr Nowojski
> wrote:
>
>> Hi,
>>
>> Are you sure you are loading
Thank You Piotre. I moved *flink-s3-fs-hadoop* library to plugin. Now
it's good.
On Wed, Oct 14, 2020 at 6:23 AM Piotr Nowojski wrote:
> Hi,
>
> Are you sure you are loading the filesystems correctly? Are you using the
> plugin mechanism? [1] Since Flink 1.10 plugins can only be loaded in
much Manks for your replies
I mean,where the "france revenue"
in the following document ?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html
Thanks for your help~
--原始邮件--
发件人:
My requirement is to send the data to a different ES sink (based on the
data). Ex: If the data contains a particular info send it to sink1 else
send it to sink2 etc(basically send it dynamically to any one sink based on
the data). I also want to set parallelism separately for ES sink1, ES
sink2,
Great! Please let us know if it solves the issue or not.
Best,
Piotrek
śr., 14 paź 2020 o 17:46 Vijayendra Yadav
napisał(a):
> Hi Piotrek,
>
> That is correct I was still in 1.10, I am upgrading to 1.11.
>
> Regards,
> Vijay
>
> On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski
> wrote:
>
>> Hi
Hi Piotrek,
That is correct I was still in 1.10, I am upgrading to 1.11.
Regards,
Vijay
On Wed, Oct 14, 2020 at 6:12 AM Piotr Nowojski wrote:
> Hi Yadav,
>
> What Flink version are you using? `getPartPrefix` and `getPartSufix`
> methods were not public before 1.10.1/1.11.0, which might be
Hi,
Can you link what document do you have in mind? The documentation [1]? I
don't think so.
There are working examples, located in the binary distribution under the
`examples/table/` directory. Their code is available in the repository [2].
Best regards,
Piotrek
[1]
sorry that I did not make it clear.
I mean:
Is there such a dataset can be downloaded
to satisfy all the examples in the document?
Thanks for your help
--原始邮件--
发件人:
Hi,
It depends how you defined `orders` in your example. For example here [1]
> Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)
`orders` is obtained from the environment, from a table registered under
the name "Orders". You would need to first register such table, or register
a
Hi Pankaj,
I'm not entirely sure if I understand your question.
If you want to minimize latency, you should avoid using windows or any
other operators, that are buffering data for long periods of time. You
still can use windowing, but you might want to emit updated value of the
window per every
Hi Julian,
Have you seen Broadcast State [1]? I have never used it personally, but it
sounds like something you want. Maybe your job should look like:
1. read raw messages from Kafka, without using the schema
2. read schema changes and broadcast them to 3. and 5.
3. deserialize kafka records in
Hi,
Are you sure you are loading the filesystems correctly? Are you using the
plugin mechanism? [1] Since Flink 1.10 plugins can only be loaded in this
way [2], while there were some changes to plug some holes in Flink 1.11 [3].
Best,
Piotrek
[1]
Could anyone tell me
what's the datasets used in flink sql document?
For sql like:
val revenue = orders .filter($"cCountry" === "FRANCE") .groupBy($"cID",
$"cName") .select($"cID", $"cName", $"revenue".sum AS "revSum")
Thanks for your help
Hi Yadav,
What Flink version are you using? `getPartPrefix` and `getPartSufix`
methods were not public before 1.10.1/1.11.0, which might be causing
this problem for you. Other than that, if you are already using Flink
1.10.1 (or newer), maybe please double check what class are you extending?
The
哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi all,
What is the recommended way to make a Flink job that processes each event
individually as soon as it comes and without waiting for a window, in order
to minimize latency in the entire DAG of operators?
For example, here is some sample WordCount code (without windws),
followed by some
HI
链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制
在2020年08月31日 15:57,酷酷的浑蛋 写道:
关键是在sql中怎么设置,connector=jdbc
在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
这个问题本质是连接活性问题,
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的
No worries, thanks for the update! It's good to hear that it worked for you.
Best regards,
Piotrek
wt., 13 paź 2020 o 22:43 Binh Nguyen Van napisał(a):
> Hi,
>
> Sorry for the late reply. It took me quite a while to change the JDK
> version to reproduce the issue. I confirmed that if I upgrade
Hey all,
I’m building a Flink app that pulls in messages from a Kafka topic and writes
them out to disk using a custom bucketed sink. Each message needs to be parsed
using a schema that is also needed when writing in the sink. This schema is
read from a remote file on a distributed file system
hello,
我现在使用flinksql的方式读取kafka数据并且以parquet的格式写出到hdfs,当我引入flink-parquet的依赖时程序提交失败了,但是我以内置的avro、json等格式输出是正常的,下面是我的报错信息
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Unable to create a source for reading table
你好, 用kafka table
connector接过来的数据,在flink这边会保留多久,在参数列表里没有看到有这个设置,如果保留太久,内存会撑暴,比如我只想保留半个小时,之前的数据可以清除。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source')
#st_env是StreamTableEnvironment,source是kafka源端
table =
source.select("@timestamp").execute_insert('sink').get_job_client().get_job_execution_result().result()
嗯嗯,看到了,才发现还有这个地方可以看
--
Sent from: http://apache-flink.147419.n8.nabble.com/
好的,谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
I think the problem is that you are using BatchTableEnvironment which is
deprecated and does not support newer features such as e.g. FLIP-95
sources/sinks. I am sorry it is not more prominent in the documentation.
I am not too familiar with the python API, and I am not sure if a
unified
我之前对源码进行了修复,测试的时候没有恢复之前的源码状态,后来发现Map这种方式是不可以的
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi Till,
Very thanks for the feedbacks !
> 1) When restarting all tasks independent of the status at checkpoint time
> (finished, running, scheduled), we might allocate more resources than we
> actually need to run the remaining job. From a scheduling perspective it
> would be easier if we
Hi
This difference of data size of incremental vs full checkpoint is due to the
different implementations.
The incremental checkpoint strategy upload binary sst files while full
checkpoint strategy scans the DB and write all kv entries to external DFS.
As your state size is really small (only
hi、
我想到是一个实现方案是在flink端ddl建立lookup表的时候,一张flink表对应上面说的那个外部子查询虚拟表,相当于flink建了一个视图吧
Dream-底限 于2020年10月14日周三 下午2:23写道:
> hi、
>
> 》》你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。
> 是的,可以理解为用一个key查询一个视图,这个视图来自于多表关联;在不做视图的情况下,直接点查外部系统的子查询,在flink端依然是原查询样式
> 依然是:JOIN
Hi,
Which version of pyflink are you using? I think the api you are using is
not the pyflink since flink 1.9. For detailed usage of pyflink, you can
refer to doc[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table_api_tutorial.html
Best,
Xingbo
大森林
hi、
》》你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。
是的,可以理解为用一个key查询一个视图,这个视图来自于多表关联;在不做视图的情况下,直接点查外部系统的子查询,在flink端依然是原查询样式 依然是:
JOIN table2 FOR SYSTEM_TIME AS OF
table1.proctime,只不过table2不再是一个物理实表,如:table2=(select
col from table)
Leonard Xu 于2020年10月13日周二
试了下一种解决方案,如下,可以调整sql并行度。
val table1: Table = stenv.sqlQuery("select * from test")
val schema = table1.getSchema
val table2 = stenv.fromDataStream(table1.toAppendStream[Row].map(item
=> Row.of(item.getField(0), item.getField(1)))(new
RowTypeInfo(schema.getFieldTypes.toList.take(2).toArray,
想获取到的话其实可以通过 REST API 去如下图的 metrics 处获取作业 source 往下 send
的数据量和速度,不过这个是单个并行度的,可以去将每个并行度的累加起来。
http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-10-14-060508.png
[image: image.png]
Best
zhisheng
Kevin Liu 于2020年10月14日周三 上午12:35写道:
> 可以参考
55 matches
Mail list logo