If you are running in K8s, you could also directly use the ingest layer of
that. That's especially convenient if you have managed to connect that your
company's SSO.
On Tue, Jun 2, 2020 at 9:38 PM Robert Metzger wrote:
> Hi David,
>
> I guess you could also "just" put a nginx in front of
Hi Sudan,
it seems to be unsupported directly.
You can have a hacky workaround by replicating apply[1] in your code and
adjust the last line to call your CoGroupWindowFunction.
[1]
A common approach is to use a dead letter queue, which is an extra output
for bad input.
So the result of the read operation would look like Tuple2
(or use Either in scala) and return the parsed TBase on success or else put
in the invalid record byte[].
Then in your DAG, split the handling of
Hi Arnaud,
just to add up. The overhead of this additional map is negligible if you
enable object reuse [1].
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger wrote:
> I'm not 100% sure about this answer,
Hi all,
@Venkata, Do you have many small files being created as Arvid suggested? If
yes, then I tend to agree that S3 is probably not the best sink. Although I
did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the
SimpleStringEncoder in
Hi,
could you check if this SO thread [1] helps you already?
[1]
https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
On Thu, Jun 4, 2020 at 7:43 PM M Singh wrote:
> Hi:
>
> I am running a job which consumes data from Kinesis and send data to
>
Instead of changing the query, I used to embed the query in a larger
context for similar works.
So if you get an arbitrary query X which produces exactly one result (e.g.
X = select sum(revenue) from lineorder group by 1) then you can craft a
query where you add a dummy pk to the result.
Table
Hi Chris,
could you also try what happens when you turn incremental checkpoints off?
Incremental checkpoints may create many small files which are a bad fit for
HDFS. You could also evaluate other storage options (net drive, S3) if you
find incremental checkpoints to be better.
On Tue, Jun 2,
Hi Venkata,
are the many small files intended or is it rather an issue of our commit on
checkpointing? If so then FLINK-11499 [1] should help you. Design is close
to done, unfortunately implementation will not make it into 1.11.
In any case, I'd look at the parameter fs.s3a.connection.maximum,
Hi Yi,
one option is to use Avro, where you define one global Avro schema as the
source of truth. Then you add aliases [1] to this schema for each source
where the fields are named differently. You use the same schema to read the
Avro messages from Kafka and Avro automatically converts the data
Hi Satyam,
you are right, there seems to be a disconnect between javadoc and
implementation. Jark probably knows more.
In your case, couldn't you just add a dummy column containing a constant
key?
select 'revenue' AS name, sum(revenue) from lineorder
and then set the dummy field as PK?
On
Hey Arvid,
Thanks for the reply.
As you suggested, rewriting the query to add a dummy output and group by
the clause - "select 1, sum(revenue) from lineorder group by 1" does add a
unique key column to the output, and the pipeline succeeds.
However, the application may get arbitrary SQL from
Hi Prasanna,
auto.create.topics.enable is only recommended for development clusters and
not in production use cases (as one programming error could potentially
flood the whole broker with a large amount of topics). I have experienced
first hand the mess it makes.
I'd suggest finding a
Guowei,
I had a different Flink app that was using 10 or15s intervals – it had a
similar behavior but not nearly as bad as the 2s interval pipeline. Both have
much have much longer checkpoint intervals now.
Here is the state config:
state.backend: rocksdb
state.checkpoints.dir: {{
Hi Kostas and Arvid,
Thanks for your suggestions.
The small files were already created and I am trying to roll few into a big
file while sinking. But due to the custom bucket assigner, it is hard
getting more files with in the same prefix in specified checkinpointing
time.
For example:
Hi Chris
>From the given exception, seems there is something wrong of the FileSystem,
one reason is that Arvid gave (incremental checkpoint may generate too much
small files)
You can turn off incremental checkpoint or try to increase the config
`state.backend.fs.memory-threshold` to see if things
Hi Theo,
The general idea is interesting. I'd probably start with some initial out
of boundness, and after collecting X elements, switch to the histogram. It
sounds very valid to snapshot it. I'd probably use a union state to also
support rescaling in a meaningful way.
However, tbh for a
Thanks Yang for the suggestion, I have tried it and I'm still getting the
same exception. Is it possible its due to the null pod name? Operation:
[create] for kind: [Pod] with name: [null] in namespace: [default]
failed.
Best,
kevin
--
Sent from:
Hi Ramana,
What connector do you use or how do you instantiate the TableSource?
Also which catalog do you use and how do you register your table in that
catalog?
The problem is that conversion from TypeInformation to DataType produces
legacy types (because they cannot be mapped exactyl 1-1 to
Hello,
I am using Flink as the query engine to build an alerting/monitoring
application. One of the use cases in our product requires continuously
tracking and charting the output of an aggregate only SQL query,
for example, select sum(revenue) from lineorder. A desirable property from
the output
Hi,
Resolved the issue by using a Custom Partitioner and setting RequestTimeout
properties.
kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner());
private static final class SerializableCustomPartitioner extends
KinesisPartitioner> {
private static final long
Hi Leonard,
We are using Flink 1.10 version and I can not share the complete schema but
it looks like below in Hive Catalog,
flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
`postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
VARCHAR(2147483647),
First of all to give a back story for the deprecation we do not want to
depend on the TypeInformation anymore for the types in Table API as it
binds both the on-wire representation with the logical types of the SQL
API. The goal is to use the DataType exclusively in the Table API
(including for
Hi Dawid,
We are using a custom connector that is very similar to Flink Kafka
Connector and instantiating TableSchema using a custom class which maps
Avro types to Flink's DataTypes using TableSchema.Builder.
For Array type, we have below mapping:
case ARRAY:
return
Hi there,
I am running a Batch job with several outputs.
Is there a way to run some code(e.g. release a distributed lock) after all
outputs are finished?
Currently I do this in a try-finally block around
ExecutionEnvironment.execute() call, but I have to switch to the detached
execution mode
You can try JobListener which you can register to ExecutionEnvironment.
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
Mark Davis 于2020年6月6日周六 上午12:00写道:
> Hi there,
>
> I am running a Batch job with several outputs.
> Is
好的,非常感谢。
Best.
Whirly.
| |
whirly
|
|
邮箱:whir...@163.com
|
签名由 网易邮箱大师 定制
在2020年06月06日 11:45,Benchao Li 写道:
哇,非常赞!
我也查了一下,在SQL标准里,bool表达式是有三种value的[1],分别是true、false、unknown。
而且null正常来讲是跟任何value都不相等的,包括另外一个null [2]。
所以如果执行`SELECT null <>
null`,返回结果应该unknown,在flink里,这个应该就是null,而不是true,或者false。
哇,非常赞!
我也查了一下,在SQL标准里,bool表达式是有三种value的[1],分别是true、false、unknown。
而且null正常来讲是跟任何value都不相等的,包括另外一个null [2]。
所以如果执行`SELECT null <>
null`,返回结果应该unknown,在flink里,这个应该就是null,而不是true,或者false。
而如果在WHERE条件中出现这种情况的时候,比较的结果应该也是unknown[3],但是默认处理是按照false来处理的。
而`IS [NOT] DISTINCT
Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
Michael Ran 于2020年6月4日周四 下午5:22写道:
> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.com> 写道:
> >dear: 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
>
Hi,
我又想了一下这个问题,我认为这个行为的确是有点不太合理,我建了一个issue[1] 来跟踪这个事情。
[1] https://issues.apache.org/jira/browse/FLINK-18164
whirly 于2020年6月5日周五 下午11:20写道:
> 好的,可能只是我的邮件客户端显示的问题,感谢回复。
>
> 关于多加一个 is not null的判断的问题,po和我都觉得有些多此一举的感觉,而且有时候条件里字段很多,每个字段之前都需要加一个 is not
> null,难度也很大,且容易出错。
>
> 如果能有一个配置项控制 null <>
Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
哪位帮忙看看,不胜感激.
2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task -
Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
大家好:
在 flink sql 中,如 SELECT * from order where product <> 'rubber',如果数据中的
product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于 'rubber'
只有将条件改为 where product is Null or product <> 'rubber' 才能匹配。
但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null 的判断,可以怎么办呢?
----
??:"star"<3149768...@qq.com;
:2020??6??5??(??) 10:40
??:"user-zh@flink.apache.org"
*一、场景说明:*
flink作业逻辑:source(kakfa)-> data process (wordCount逻辑) -> sink (kafka)
1、作业A:
source_topic: word_count_topic
sink_topc: result_01
group_id: test-group01
2、作业B:
source_topic: word_count_topic
sink_topc: result_02
group_id:
Flink-1.10.0 StandAlone部署
Kafka-0.11
JDK8
当我在同一个Flink集群部署两个应用,这两个应用被分配到了同一个TaskManager运行,使用不同的group.id消费同一个topic时,第一个应用启动正常,第二个应用启动时,遇到了如下警告信息,请问,这个警告是如何造成的,是否可以忽略,或者说如何解决。
WARNorg.apache.kafka.common.utils.AppInfoParser- Error registering
AppInfo mbean
更新下,第二个问题,Flink sql-client中支持SET配置 TTL(ms)的:
Flink SQL> set execution.min-idle-state-retention = 2;
[INFO] Session property has been set.
祝好,
Leonard Xu
> 在 2020年6月5日,13:39,Leonard Xu 写道:
>
> Hi,
>
> 第一个问题,即将发布的1.11可以设定primary key, 不用再推断primary
>
Hi,
我这边只收到一封你第一次发的邮件,看起来应该是没有问题。(不知道是不是你邮箱客户端本地显示的问题)
关于你的问题,现在的确是这样子处理的。我想问一下,多加一个IS NOT NULL有什么问题么?
whirly 于2020年6月5日周五 下午9:54写道:
> 不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-05 14:25:10,"whirly" 写道:
> >大家好:
> >在 flink sql
感谢回复, 我明白了在状态恢复时具体细节,以及其他文件的产生及作用
Weihua Hu 于2020年6月5日周五 下午1:48写道:
> HI, Px New
>
> 1. Checkpoint 保存数量可以通过参数: state.checkpoints.num-retained 来控制,默认是 1
> 2. _metadata 只是一些元数据,保存了state 的句柄,其他文件是 state 数据,由各 Task 在触发 checkpoint
> 的时候上传。相反在恢复 checkpoint 的时候JM 读取_metadata 将相应句柄下发到 Task,Task 通过远端
是有原生的,另外一个老哥也回答了,
你用java直接连接是不是也不ok,?
xueaohui_...@163.com
发件人: liunaihua521
发送时间: 2020-06-05 09:31
收件人: user-zh@flink.apache.org
抄送: user-zh@flink.apache.org
主题: 回复:回复: flink整合hbase
hi,
sink是自己的,我还没发现flink有原生的hbase的sink,正在研究.
在2020年6月5日 09:06,xueaohui_...@163.com 写道:
Sink 是自己的还是官方的。
哦 对此我很抱歉:
图1: https://i.loli.net/2020/06/05/SAfpnkqlOUM9hD3.png
图2:
https://imgkr.cn-bj.ufileos.com/aed4cb64-dd24-4076-ba4c-a0e07bc356bf.png
zhiyezou <1530130...@qq.com> 于2020年6月5日周五 下午1:58写道:
> Hi
> 麻烦使用第三方图床,把图片链接过来,直接贴图片的话显示不出来
>
>
>
>
> --原始邮件--
> 发件人:"Weihua
Hi,
看你的问题描述,我们可能遇到过类似的问题。
我们的问题是,从cp恢复之后,某些key跟之前的状态里的key对不上了,所以就有点类似于丢失了一部分状态。
但是我们也没有查出来具体的原因,一方面是因为问题比较难以复现,我们用线上数据,也只是有部分数据有问题,
也看不出来这部分有问题的数据有什么规律;另一方面是blink planner底层用的都是binary的数据结构,debug起来也会
比较困难。
如果你能提供一个比较稳定的能复现的数据集和测试方法,我觉得这个问题我们可以再推进解决一下。
star <3149768...@qq.com> 于2020年6月5日周五
访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
目前已经有一个 FLIP [1] 在讨论中,预计 1.12 会支持。
Best,
Jark
[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
On Fri, 5 Jun 2020 at 19:19, sunfulin wrote:
>
Hi,
想问下Flink SQL在使用DDL创建Kafka
Source时,支持设置获取到Kafka自带的timestamp么?我们有场景想使用Kafka带的timestamp,这种情况下消息流中可能并不存在时间属性.
如果支持的话,能否分享下具体写法哈?我尝试使用下面的SQL报错:
CREATE TABLE user_behavior (
test_time TIMESTAMP(3),
user_id STRING ,
item_id STRING ,
category_id STRING ,
behavior STRING,
ts STRING,
proctime as
不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发
在 2020-06-05 14:25:10,"whirly" 写道:
>大家好:
>在 flink sql 中,如 SELECT * from order where product <> 'rubber',如果数据中的
> product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于 'rubber'
>只有将条件改为 where
谢谢Jark老大的回复。看起来在属性里增加 'timestamp.field' = 'timestamp'
应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。
在 2020-06-05 19:31:37,"Jark Wu" 写道:
>访问 Kafka 消息上的所有数据(timestamp, partition, key, 等等)是一个非常重要的功能,社区也很早就意识到了。
>目前已经有一个
据我所知 `timestampFromSource` 目前没有 connector 支持。。。
On Fri, 5 Jun 2020 at 22:29, sunfulin wrote:
>
>
>
> 谢谢Jark老大的回复。看起来在属性里增加 'timestamp.field' = 'timestamp'
> 应该是我需要的。我注意到目前通过Java代码可以获取timestampFromSource,这个功能是可以拿到source的timestamp么?我测试了下貌似解析出来为空。我的Kafka版本是0.10.2。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
46 matches
Mail list logo