Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 Thread Benchao Li
看起来你的DDL写的没有什么问题。

你用的是哪个Flink版本呢?
此外就是可以发下更完整的异常栈么?

zilong xiao  于2020年11月24日周二 下午2:54写道:

> Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
>
> Benchao Li  于2020年11月24日周二 下午2:49写道:
>
> > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> >
> > zilong xiao  于2020年11月24日周二 上午10:49写道:
> >
> > > [image: image.png]
> > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: 退订

2020-11-23 Thread Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

李军  于2020年11月24日周二 下午2:49写道:

>
>


Re: ProcessingTime下的watermark

2020-11-23 Thread Xingbo Huang
Hi,

watermark是对于数据的eventTime没有顺序到来帮助何时触发计算用的,你如果用processingTime来,processingTime肯定是递增的,就不存在乱序这个概念了,就不需要watermark了。

Best,
Xingbo


Kyle Zhang  于2020年11月24日周二 下午1:34写道:

> Hi,
> 使用flink1.11,在SQL ddl中基于process time声明watermark报错
>
> SQL validation failed. Watermark can not be defined for a processing time
> attribute column.
>
> 文档里关于watermark的解释也基本是跟eventTime在一起[1]
> 我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#event-time-and-watermarks
>
> Best
>


FlinkSQL导致Prometheus内存暴涨

2020-11-23 Thread Luna Wong
FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。
下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。

task_name="Source:_wuren_foo_ods_fooSourceConversion_table__Unregistered_DataStream_1___fields__id__name__SinkConversionToRowSourceConversion_table__default_catalog_default_database_ods_foo___fields__id__name__PROCTIME__Calc_select__id__name__SinkConversionToTuple2Sink:_Unnamed"


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 Thread zilong xiao
Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~

Benchao Li  于2020年11月24日周二 下午2:49写道:

> 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
>
> zilong xiao  于2020年11月24日周二 上午10:49写道:
>
> > [image: image.png]
> > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> >
>
>
> --
>
> Best,
> Benchao Li
>


退订

2020-11-23 Thread 李军



Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 Thread Benchao Li
你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。

zilong xiao  于2020年11月24日周二 上午10:49写道:

> [image: image.png]
> 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
>


-- 

Best,
Benchao Li


flink table api 或者 sql 使用 自定义含有的state方法

2020-11-23 Thread 戴嘉诚
大家好:
请问,因为当前flink sql或者flink table
中,不支持自定义的udf中使用有state的逻辑,所以,当我们自己任务中,如果统计需要聚集型指标的情况下,就不能用上flink
sql了,只能自己使用flink datastream去硬编码,请问,flink
sql中,能否有其他方式,可以调用我们自己定义的有state的udf,并且可以不让再解析执行的时候,多次出现呢?还是说,只能一个指标一个flink
job?


Re: 关于Catalog的建议

2020-11-23 Thread admin
感谢jark大佬,试过了确实可以
我是先用hive的catalog+dialect 建了 hive表,
然后切换到default catalog 建了 kafka source表,
在insert into hive select from 
kafka时需要指定hive_catalog.hive_db.hive_table,否则会报表不存在,因为当前是在default catalog 
下。大家注意一下

> 2020年11月24日 上午11:41,Jark Wu  写道:
> 
> 1. 可以的
> 2. 是的。见文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/use.html#use-catloag
> 3. 是的。
> 
> Hive metastore catalog 就是 Flink 官方提供的通用 catalog(可以存任何 connector 类型)。
> 
> Best,
> Jark
> 
> 
> On Tue, 24 Nov 2020 at 10:58, admin <17626017...@163.com> wrote:
> 
>> Hi Rui Li,
>>> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
>> 
>> 一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。
>> 几个问题请教一下:
>> 1.create kafka source 使用  memory catalog,hive table 使用hive catalog,这样是可以的吧
>> 2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG
>> catalogName(default_catalog/hive_catalog)
>> 
>> 3.在注册hivecatalog时,需要指定一个默认的database,比如指定了默认test,然后要写到db1的hive_table,是不是切换一下database即可。
>>USE db1;
>> 感谢
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html
>> 
>>> 2020年11月23日 下午8:52,Rui Li  写道:
>>> 
>>> Hi,
>>> 
>>> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
>>> 
>>> 关于你的两个问题:
>>> 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog
>>> [1],文档也许是可以说的更明确一些。
>>> 2.
>>> 
>> 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive
>>> metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。
>>> 
>>> [1]
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog
>>> 
>>> On Mon, Nov 23, 2020 at 7:41 PM 赵一旦  wrote:
>>> 
 目前Flink提供memory、jdbc、hive这3种catalog。
 感觉实际使用中,可以使用如下几种方案。
 
 (1)选择memory catalog,然后每次sql都带上自己的相关DDL。
 (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。
 
 方案1和方案2各有优缺点。
 方案1的优点:
   比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
 
 
>> topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
 方案1的缺点:
   很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。
 
 -然后,我的问题来了。
 
 
>> 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。
 
 
 
>> 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。
 
 
 
>> 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。
 
>>> 
>>> 
>>> --
>>> Best regards!
>>> Rui Li
>> 
>> 



ProcessingTime下的watermark

2020-11-23 Thread Kyle Zhang
Hi,
使用flink1.11,在SQL ddl中基于process time声明watermark报错

SQL validation failed. Watermark can not be defined for a processing time
attribute column.

文档里关于watermark的解释也基本是跟eventTime在一起[1]
我想问的是基于processingTime的流处理是不需要watermark,还是被flink优化,不需要我们关心?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#event-time-and-watermarks

Best


Re:Flink SQL 对延迟数据怎么处理?

2020-11-23 Thread hailongwang
Hi,
  据我所知,FlinkSQL 不支持将迟到的数据输出到侧流中。
如果你下游使用的是 window 的话,可以通过设置
`table.exec.emit.late-fire.enabled` 和 `table.exec.emit.late-fire.delay` 来触发晚于 
watermark 到达的数据。
其中允许等待晚与 watermark 的数据的时间由 `table.exec.state.ttl` 控制,等价于 Datastream 中的 
allowedLateness,
故 window 的最大等待时间为 watermark 的 outOfOrder + allowedLateness。


Best,
Hailong
在 2020-11-24 09:03:13,"jy l"  写道:
>Hi:
>请教一下,FlinkSQL中,我在创建表时设置了watermark并设置了最大延迟,可是还是有数据依旧会迟到晚到,对于这样的数据我们又不想直接丢弃,那这个依旧迟到的数据我该怎么收集?是否有与StreamAPI一样可以将依旧迟到的数据进行分流的方案?
>
>祝好!


Re:关于flink实时写入hbase用flush方法频繁报操作超时问题

2020-11-23 Thread hailongwang



这个错误感觉是 Hbase 的错误。具体实现的话,你可以参考社区的 HBaseSinkFunction[1] 的实现。
[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
Best,
Hailong




在 2020-11-24 09:32:55,"bradyMk"  写道:
>请教各位:
>我用flink实时写入hbase,继承RichSinkFunction后用的hbase的BufferedMutator,每当写入一定量的数据后,就用flush的方法,类似这样:
> 
>但是我的任务会频繁报出如下错误:
> 
>感觉貌似是我代码的问题导致的,但又不知道原因,希望得到指导,感激不尽~
>
>
>
>-
>Best Wishes
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Print on screen DataStream content

2020-11-23 Thread Pankaj Chand
Please correct me if I am wrong. `DataStream#print()` only prints to the
screen when running from the IDE, but does not work (print to the screen)
when running on a cluster (even a local cluster).

Thanks,

Pankaj

On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Simone,
>
> I'd suggest trying out the `DataStream#print()` function to start, but
> there are a few other easy-to-integrate sinks for testing that you can
> check out in the docs here[1]
>
> Best,
> Austin
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks
>
> On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin 
> wrote:
>
>> Hi All,
>>
>> On my code I have a DataStream that I would like to access. I need to
>> understand what I'm getting for each transformation to check if the data
>> that I'm working on make sense. How can I print into the console or get a
>> file (csv, txt) for the variables: "stream", "enriched" and "result"?
>>
>> I have tried different way but no way to get the data.
>>
>> Thanks!
>>
>>
>> *FlinkKafkaConsumer kafkaData =*
>> *new FlinkKafkaConsumer("CorID_1", new
>> EventDeserializationSchema(), p);*
>> *WatermarkStrategy wmStrategy =*
>> *WatermarkStrategy*
>> *.forMonotonousTimestamps()*
>> *.withIdleness(Duration.ofMinutes(1))*
>> *.withTimestampAssigner((event, timestamp) -> {*
>> *return event.get_Time();*
>> *});*
>> *DataStream stream = env.addSource(*
>> *kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>>
>> *DataStream> enriched = stream*
>> *.keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)*
>> *.map(new StatefulSessionCalculator());*
>>
>> *WindowedStream, String, TimeWindow> result =
>> enriched*
>> *.keyBy(new MyKeySelector())*
>> *.window(EventTimeSessionWindows.withDynamicGap(new
>> DynamicSessionWindows()));*
>>
>


Re:flink sql 中是否可以使用 mysql 的存储过程和函数?

2020-11-23 Thread hailongwang
Hi,
  不可以的,其中链接[1] 是Flink SQL 支持的所有内置函数,链接[2] 是 Flink SQL 允许自己定义函数,来满足个性化需求。
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html


Best,
Hailong

在 2020-11-24 00:41:49,"macdoor"  写道:
>需求是这样,mysql中使用 binary(16) 存储 uuid,读取到 flink中需要转换成文本串的uuid,sql是这样
>
>select bin_to_uuid(id, true) as text_uuid from usertable
>
>我尝试使用,报错说  bin_to_uuid 找不到
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-23 Thread Jark Wu
AFAIK, FLINK-10886 is not implemented yet.
cc @Becket may know more plans about this feature.

Best,
Jark

On Sat, 21 Nov 2020 at 03:46,  wrote:

> Hi Timo,
>
> One more question, the blog also mentioned a jira task to solve this
> issue. https://issues.apache.org/jira/browse/FLINK-10886.  Will this
> feature be available in 1.12? Thanks!
>
> Best,
>
> Fuyao
> On 11/20/20 11:37, fuyao...@oracle.com wrote:
>
> Hi Timo,
>
> Thanks for your reply! I think your suggestions is really helpful! The
> good news is that I had managed to figure out it something by myself few
> days ago.
>
> 1. Thanks for the update about the table parallelism issue!
>
> 2. After trying out the idleness setting. It prevents some idle subtasks
> from blocking the pipeline's overall watermark and it works for me. Based
> on my observation and reading the source code, I have summarized some
> notes. Please correct me if I am wrong.
>
>1. (1)Watermark is independent within each subtask for an Flink
>operator.
>2. (2)The watermark of the multi-parallelism table operator is always
>dominated by least watermark of the current *ACTIVE* subtasks.
>3. (3)With withIdleness() configured. A subtask will be mark as idle
>if it hasn’t receive message for configured period of time. It will NOT
>execute onPeriodEmit() and emit watermark after reaching the idle state.
>Between [the start of the application/receive a new message]  and [reaching
>into the idle state], the onPeriodEmit() will still emit watermark and
>dominate the overall context watermark if it holds the smallest watermark
>among the subtasks.
>4. (4)Once an idle subtask receive a new message, it will switch its
>status from idle to active and start to influence the overall context
>watermark.
>
> 3. In order to route the correct information to the subtask in the join
> step, I have added the keyed() logic in the source based on the join key in
> the join step. It seems to work correctly and could route the message to a
> current place.
>
> 4. For the interval join, I think I can't use it directly since I need to
> use full outer join to not lose any information from any upstream
> datastream. I think interval join is a inner join it can't do this task. I
> guess my only option is to do full outer join with query configuration.
>
> 5. One more question about the data replay issue. I read the ververica
> blog (
> https://www.ververica.com/blog/replayable-process-functions-time-ordering-and-timers)
> and I think with replay use case, we will face some similar issues. I think
> the suggested approach mentioned
>
>   (1). Puts each incoming track record in a map keyed by its timestamp
>
>   (2). creates an event timer to process that record once the watermark
> hits that point.
>
> I kind of understand the idea here. Buffer all the data(maybe delete some
> of the old track if processed) in a track ordered by timestamp and trigger
> the event timer sequentially with this buffered track.
>
> Based on my understanding, this buffered design is only suitable for
> *offline* data processing, right? (It is a waste of resource to buffer
> this in real time. )
>
> Also, from the article, I think they are using periodic watermark
> strategy[1]. how can they process the last piece of data records with
> periodic watermark strategy since there is no more incoming data to advance
> the watermark? So the last piece of data will never be processed here? Is
> there a way to gracefully handle this? My use case doesn't allow me to lose
> any information.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
>
> Best,
>
> Fuyao
>
>
> On 11/20/20 08:55, Timo Walther wrote:
>
> Hi Fuyao,
>
> sorry for not replying earlier.
>
> You posted a lot of questions. I scanned the thread quickly, let me try to
> answer some of them and feel free to ask further questions afterwards.
>
> "is it possible to configure the parallelism for Table operation at
> operator level"
>
> No this is not possible at the moment. The reason is 1) we don't know how
> to expose such a functionality in a nice way. Maybe we will use SQL hints
> in the future [1]. 2) Sometime the planner sets the paralellism of
> operators explicitly to 1. All other operators will use the globally
> defined parallelism for the pipeline (also to not mess up retraction
> messages internally). You will be able to set the parallelism of the sink
> operation in Flink 1.12.
>
> "BoundedOutOfOrderness Watermark Generator is NOT making the event time to
> advance"
>
> Have you checked if you can use an interval join instead of a full join
> with state retention? Table/SQL pipelines that don't preserve a time
> attribute in the end might also erase the underlying watermarks. Thus,
> event time triggers will not work after your join.
>
> "Why can't I update the watermarks for all 8 parallelisms?"
>
> You could play around with 

flink使用hive udf函数

2020-11-23 Thread 酷酷的浑蛋
Flink-1.11.1,  hive-2.2.0
在使用current_timestamp或者current_date函数时会报
Caused by: java.lang.NullPointerException
at 
org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51)
at 
org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141)





Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 Thread jy l
我使用的是release-1.12.0-rc1

Best

Jark Wu  于2020年11月24日周二 上午11:42写道:

> 看报错像是一个 bug。 请问使用的是哪个版本呢?
> 可以去 JIRA issue 提个 issue。
>
> Best,
> Jark
>
> On Tue, 24 Nov 2020 at 11:27, jy l  wrote:
>
> > Hi:
> > FlinkSQL我在使用时发生一件很诡异的事件。具体如下:
> >
> > 我的DDL:
> > create table if not exists t_order(
> > id int PRIMARY KEY comment '订单id',
> > timestamps bigint comment '订单创建时间',
> > orderInformationId string comment '订单信息ID',
> > userId string comment '用户ID',
> > categoryId int comment '商品类别',
> > productId int comment '商品ID',
> > price decimal(10,2) comment '单价',
> > productCount int comment '购买数量',
> > priceSum decimal(10,2) comment '订单总价',
> > shipAddress string comment '商家地址',
> > receiverAddress string comment '收货地址',
> > ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
> > WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> > )with(
> > 'connector' = 'kafka',
> > 'format' = 'debezium-avro-confluent',
> > 'debezium-avro-confluent.schema-registry.url' = 'http://手动打码ip:8081
> 
> > 
> > ',
> > 'topic' = 'ods.userAnalysis.order',
> > 'properties.bootstrap.servers' = '手动打码ip:9092',
> > 'properties.group.id' = 'flink-analysis',
> > 'scan.startup.mode' = 'latest-offset'
> > )
> >
> > 我在查询该表时,使用如下查询语句能够正常查询出来:
> >
> >- select * from t_order
> >- select receiverAddress from t_order
> >- select
> >id,
> >timestamps,
> >orderInformationId,
> >userId,
> >categoryId,
> >productId,
> >price,
> >productCount,
> >priceSum,
> >shipAddress
> >from t_order
> >
> > 但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
> > select
> > id,
> > timestamps,
> > orderInformationId,
> > userId,
> > categoryId,
> > productId,
> > price,
> > productCount,
> > priceSum,
> > shipAddress,
> > receiverAddress
> > from t_order,
> > 报错信息如下:
> > Exception in thread "main" org.apache.flink.table.api.TableException:
> This
> > calc has no useful projection and no filter. It should be removed by
> > CalcRemoveRule.
> > at
> >
> >
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
> > at
> >
> >
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> > ...
> >
> > receiverAddress这个字段明明是在我的DDL中具体申明了的,且单独查询也能出来。
> > 这具体是什么原因呢?望各位大佬告知。
> >
> >
> > 祝好!
> >
>


Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 Thread Jark Wu
看报错像是一个 bug。 请问使用的是哪个版本呢?
可以去 JIRA issue 提个 issue。

Best,
Jark

On Tue, 24 Nov 2020 at 11:27, jy l  wrote:

> Hi:
> FlinkSQL我在使用时发生一件很诡异的事件。具体如下:
>
> 我的DDL:
> create table if not exists t_order(
> id int PRIMARY KEY comment '订单id',
> timestamps bigint comment '订单创建时间',
> orderInformationId string comment '订单信息ID',
> userId string comment '用户ID',
> categoryId int comment '商品类别',
> productId int comment '商品ID',
> price decimal(10,2) comment '单价',
> productCount int comment '购买数量',
> priceSum decimal(10,2) comment '订单总价',
> shipAddress string comment '商家地址',
> receiverAddress string comment '收货地址',
> ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> )with(
> 'connector' = 'kafka',
> 'format' = 'debezium-avro-confluent',
> 'debezium-avro-confluent.schema-registry.url' = 'http://手动打码ip:8081
> 
> ',
> 'topic' = 'ods.userAnalysis.order',
> 'properties.bootstrap.servers' = '手动打码ip:9092',
> 'properties.group.id' = 'flink-analysis',
> 'scan.startup.mode' = 'latest-offset'
> )
>
> 我在查询该表时,使用如下查询语句能够正常查询出来:
>
>- select * from t_order
>- select receiverAddress from t_order
>- select
>id,
>timestamps,
>orderInformationId,
>userId,
>categoryId,
>productId,
>price,
>productCount,
>priceSum,
>shipAddress
>from t_order
>
> 但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
> select
> id,
> timestamps,
> orderInformationId,
> userId,
> categoryId,
> productId,
> price,
> productCount,
> priceSum,
> shipAddress,
> receiverAddress
> from t_order,
> 报错信息如下:
> Exception in thread "main" org.apache.flink.table.api.TableException: This
> calc has no useful projection and no filter. It should be removed by
> CalcRemoveRule.
> at
>
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
> at
>
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> ...
>
> receiverAddress这个字段明明是在我的DDL中具体申明了的,且单独查询也能出来。
> 这具体是什么原因呢?望各位大佬告知。
>
>
> 祝好!
>


Re: 关于Catalog的建议

2020-11-23 Thread Jark Wu
1. 可以的
2. 是的。见文档
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/use.html#use-catloag
3. 是的。

Hive metastore catalog 就是 Flink 官方提供的通用 catalog(可以存任何 connector 类型)。

Best,
Jark


On Tue, 24 Nov 2020 at 10:58, admin <17626017...@163.com> wrote:

> Hi Rui Li,
> > FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
>
> 一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。
> 几个问题请教一下:
> 1.create kafka source 使用  memory catalog,hive table 使用hive catalog,这样是可以的吧
> 2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG
> catalogName(default_catalog/hive_catalog)
>
> 3.在注册hivecatalog时,需要指定一个默认的database,比如指定了默认test,然后要写到db1的hive_table,是不是切换一下database即可。
> USE db1;
> 感谢
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html
>
> > 2020年11月23日 下午8:52,Rui Li  写道:
> >
> > Hi,
> >
> > FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
> >
> > 关于你的两个问题:
> > 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog
> > [1],文档也许是可以说的更明确一些。
> > 2.
> >
> 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive
> > metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog
> >
> > On Mon, Nov 23, 2020 at 7:41 PM 赵一旦  wrote:
> >
> >> 目前Flink提供memory、jdbc、hive这3种catalog。
> >> 感觉实际使用中,可以使用如下几种方案。
> >>
> >> (1)选择memory catalog,然后每次sql都带上自己的相关DDL。
> >> (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。
> >>
> >> 方案1和方案2各有优缺点。
> >> 方案1的优点:
> >>比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
> >>
> >>
> topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
> >> 方案1的缺点:
> >>很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。
> >>
> >> -然后,我的问题来了。
> >>
> >>
> 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
> >> 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。
> >>
> >>
> >>
> 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。
> >>
> >>
> >>
> 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。
> >>
> >
> >
> > --
> > Best regards!
> > Rui Li
>
>


Re: 关于flink实时写入hbase用flush方法频繁报操作超时问题

2020-11-23 Thread bradyMk
补充下上个问题中图片的文字版:
图一:
 if (count > 300) {
mutator.flush()
count = 0
  }
  count = count + 1

图二:
Caused by:
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed
101 actions: Operation Timeout: 101 times, servers with issues:
172.xx.x.x,16020,1601173606933
at
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:297)
at
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2300(AsyncProcess.java:273)
at
org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1906)
at
org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:250)
at
org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:213)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 Thread jy l
Hi:
FlinkSQL我在使用时发生一件很诡异的事件。具体如下:

我的DDL:
create table if not exists t_order(
id int PRIMARY KEY comment '订单id',
timestamps bigint comment '订单创建时间',
orderInformationId string comment '订单信息ID',
userId string comment '用户ID',
categoryId int comment '商品类别',
productId int comment '商品ID',
price decimal(10,2) comment '单价',
productCount int comment '购买数量',
priceSum decimal(10,2) comment '订单总价',
shipAddress string comment '商家地址',
receiverAddress string comment '收货地址',
ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
)with(
'connector' = 'kafka',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://手动打码ip:8081
',
'topic' = 'ods.userAnalysis.order',
'properties.bootstrap.servers' = '手动打码ip:9092',
'properties.group.id' = 'flink-analysis',
'scan.startup.mode' = 'latest-offset'
)

我在查询该表时,使用如下查询语句能够正常查询出来:

   - select * from t_order
   - select receiverAddress from t_order
   - select
   id,
   timestamps,
   orderInformationId,
   userId,
   categoryId,
   productId,
   price,
   productCount,
   priceSum,
   shipAddress
   from t_order

但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
select
id,
timestamps,
orderInformationId,
userId,
categoryId,
productId,
price,
productCount,
priceSum,
shipAddress,
receiverAddress
from t_order,
报错信息如下:
Exception in thread "main" org.apache.flink.table.api.TableException: This
calc has no useful projection and no filter. It should be removed by
CalcRemoveRule.
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
...

receiverAddress这个字段明明是在我的DDL中具体申明了的,且单独查询也能出来。
这具体是什么原因呢?望各位大佬告知。


祝好!


Re: Hi I'm having problems with self-signed certificiate trust with Native K8S

2020-11-23 Thread Yang Wang
Hi Kevin,

Let me try to understand your problem. You have added the trusted keystore
to the Flink app image(my-flink-app:0.0.1)
and it could not be loaded. Right? Even though you tunnel in the pod, you
could not find the key store. It is strange.

I know it is not very convenient to bundle the keystore in the image.
Mounting them from ConfigMap could make things
easier. The reason why we still not have this (make
ConfigMap/PersistentVolume mountable via Flink config option) is that
pod template[1] may be a more common way to get this done.

[1]. https://issues.apache.org/jira/browse/FLINK-15656

Best,
Yang


Till Rohrmann  于2020年11月23日周一 下午11:01写道:

> Thanks for reaching out to the Flink community Kevin. Yes, with Flink
> 1.12.0 it should be possible to mount secrets with your K8s deployment.
> From the posted stack trace it is not possible to see what exactly is going
> wrong. Could you maybe post the complete logs? I am also pulling in Yang
> Wang who is most knowledgeable about Flink's K8s integration.
>
> Cheers,
> Till
>
> On Sun, Nov 22, 2020 at 12:49 PM Kevin Kwon  wrote:
>
>> I think what we need in the Native Kubernetis Config is to mount custom
>> ConfigMap, Secrets, and Volumes
>>
>> I see that in the upcoming release, Secrets are able to get mounted
>>
>> https://github.com/apache/flink/pull/14005 <- also can maintainers look
>> into this PR so we can mount other custom K8S resources?
>>
>> On Fri, Nov 20, 2020 at 9:23 PM Kevin Kwon  wrote:
>>
>>> Hi I am using MinIO as a S3 mock backend for Native K8S
>>>
>>> Everything seems to be fine except that it cannot connect to S3 since
>>> self-signed certificates' trusted store are not cloned in Deployment
>>> resources
>>>
>>> Below is in order, how I add the trusted keystore by using keytools and
>>> how I run my app with the built image
>>>
>>> FROM registry.local/mde/my-flink-app:0.0.1
>>> COPY s3/certs/public.crt $FLINK_HOME/s3-e2e-public.crt
>>> RUN keytool \
>>>   -noprompt \
>>>   -alias s3-e2e-public \
>>>   -importcert \
>>>   -trustcacerts \
>>>   -keystore $JAVA_HOME/lib/security/cacerts \
>>>   -storepass changeit \
>>>   -file $FLINK_HOME/s3-e2e-public.crt
>>>
>>> $FLINK_HOME/bin/flink run-application \
>>>   -t kubernetes-application \
>>> -Denv.java.opts="-Dkafka.brokers=kafka-external:9092 
>>> -Dkafka.schema-registry.url=kafka-schemaregistry:8081" \
>>> -Dkubernetes.container-start-command-template="%java% %classpath% 
>>> %jvmmem% %jvmopts% %logging% %class% %args%" \
>>> -Dkubernetes.cluster-id=${K8S_CLUSTERID} \
>>> 
>>> -Dkubernetes.container.image=${DOCKER_REPO}/${ORGANISATION}/${APP_NAME}:${APP_VERSION}
>>>  \
>>> -Dkubernetes.namespace=${K8S_NAMESPACE} \
>>> -Dkubernetes.rest-service.exposed.type=${K8S_RESTSERVICE_EXPOSED_TYPE} \
>>> -Dkubernetes.taskmanager.cpu=${K8S_TASKMANAGER_CPU} \
>>> -Dresourcemanager.taskmanager-timeout=360 \
>>> -Dtaskmanager.memory.process.size=${TASKMANAGER_MEMORY_PROCESS_SIZE} \
>>> -Dtaskmanager.numberOfTaskSlots=${TASKMANAGER_NUMBEROFTASKSLOTS} \
>>> -Ds3.endpoint=s3:443 \
>>> -Ds3.access-key=${S3_ACCESSKEY} \
>>> -Ds3.secret-key=${S3_SECRETKEY} \
>>> -Ds3.path.style.access=true \
>>> -Dstate.backend=filesystem \
>>> -Dstate.checkpoints.dir=s3://${ORGANISATION}/${APP_NAME}/checkpoint \
>>> -Dstate.savepoints.dir=s3://${ORGANISATION}/${APP_NAME}/savepoint \
>>> local://${FLINK_HOME}/usrlib/${APP_NAME}-assembly-${APP_VERSION}.jar
>>>
>>> However, I get the following error and I don't see my trusted key in 
>>> keytools when I login to the pod (seems the trustedstore is not cloned)
>>>
>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create 
>>> checkpoint storage at checkpoint coordinator side.
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:305)
>>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:224)
>>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>>> at 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
>>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>>> at 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
>>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>>> at 
>>> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
>>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>>> at 
>>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
>>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>>> at 
>>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
>>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>>> at 
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
>>>  

Re: 关于Catalog的建议

2020-11-23 Thread admin
Hi Rui Li,
> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。

一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。
几个问题请教一下:
1.create kafka source 使用  memory catalog,hive table 使用hive catalog,这样是可以的吧
2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG 
catalogName(default_catalog/hive_catalog)
3.在注册hivecatalog时,需要指定一个默认的database,比如指定了默认test,然后要写到db1的hive_table,是不是切换一下database即可。
USE db1;
感谢

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html

> 2020年11月23日 下午8:52,Rui Li  写道:
> 
> Hi,
> 
> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
> 
> 关于你的两个问题:
> 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog
> [1],文档也许是可以说的更明确一些。
> 2.
> 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive
> metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog
> 
> On Mon, Nov 23, 2020 at 7:41 PM 赵一旦  wrote:
> 
>> 目前Flink提供memory、jdbc、hive这3种catalog。
>> 感觉实际使用中,可以使用如下几种方案。
>> 
>> (1)选择memory catalog,然后每次sql都带上自己的相关DDL。
>> (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。
>> 
>> 方案1和方案2各有优缺点。
>> 方案1的优点:
>>比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
>> 
>> topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
>> 方案1的缺点:
>>很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。
>> 
>> -然后,我的问题来了。
>> 
>> 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
>> 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。
>> 
>> 
>> 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。
>> 
>> 
>> 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。
>> 
> 
> 
> -- 
> Best regards!
> Rui Li



Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 Thread zilong xiao
[image: image.png]
如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。


Learn flink source code book recommendation

2020-11-23 Thread ????
Excuse me, I want to learn the flink source code. Do you have any good 
information and the latest books?

关于flink实时写入hbase用flush方法频繁报操作超时问题

2020-11-23 Thread bradyMk
请教各位:
我用flink实时写入hbase,继承RichSinkFunction后用的hbase的BufferedMutator,每当写入一定量的数据后,就用flush的方法,类似这样:
 
但是我的任务会频繁报出如下错误:
 
感觉貌似是我代码的问题导致的,但又不知道原因,希望得到指导,感激不尽~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL 对延迟数据怎么处理?

2020-11-23 Thread jy l
Hi:
请教一下,FlinkSQL中,我在创建表时设置了watermark并设置了最大延迟,可是还是有数据依旧会迟到晚到,对于这样的数据我们又不想直接丢弃,那这个依旧迟到的数据我该怎么收集?是否有与StreamAPI一样可以将依旧迟到的数据进行分流的方案?

祝好!


flink sql 中是否可以使用 mysql 的存储过程和函数?

2020-11-23 Thread macdoor
需求是这样,mysql中使用 binary(16) 存储 uuid,读取到 flink中需要转换成文本串的uuid,sql是这样

select bin_to_uuid(id, true) as text_uuid from usertable

我尝试使用,报错说  bin_to_uuid 找不到





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

2020-11-23 Thread macdoor
自己回答一下,供其他人参考。

换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2
的一个bug,1.12应该已经改正了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Print on screen DataStream content

2020-11-23 Thread Austin Cawley-Edwards
Hey Simone,

I'd suggest trying out the `DataStream#print()` function to start, but
there are a few other easy-to-integrate sinks for testing that you can
check out in the docs here[1]

Best,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks

On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin 
wrote:

> Hi All,
>
> On my code I have a DataStream that I would like to access. I need to
> understand what I'm getting for each transformation to check if the data
> that I'm working on make sense. How can I print into the console or get a
> file (csv, txt) for the variables: "stream", "enriched" and "result"?
>
> I have tried different way but no way to get the data.
>
> Thanks!
>
>
> *FlinkKafkaConsumer kafkaData =*
> *new FlinkKafkaConsumer("CorID_1", new
> EventDeserializationSchema(), p);*
> *WatermarkStrategy wmStrategy =*
> *WatermarkStrategy*
> *.forMonotonousTimestamps()*
> *.withIdleness(Duration.ofMinutes(1))*
> *.withTimestampAssigner((event, timestamp) -> {*
> *return event.get_Time();*
> *});*
> *DataStream stream = env.addSource(*
> *kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
> *DataStream> enriched = stream*
> *.keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)*
> *.map(new StatefulSessionCalculator());*
>
> *WindowedStream, String, TimeWindow> result =
> enriched*
> *.keyBy(new MyKeySelector())*
> *.window(EventTimeSessionWindows.withDynamicGap(new
> DynamicSessionWindows()));*
>


Print on screen DataStream content

2020-11-23 Thread Simone Cavallarin
Hi All,

On my code I have a DataStream that I would like to access. I need to 
understand what I'm getting for each transformation to check if the data that 
I'm working on make sense. How can I print into the console or get a file (csv, 
txt) for the variables: "stream", "enriched" and "result"?

I have tried different way but no way to get the data.

Thanks!


FlinkKafkaConsumer kafkaData =
new FlinkKafkaConsumer("CorID_1", new 
EventDeserializationSchema(), p);
WatermarkStrategy wmStrategy =
WatermarkStrategy
.forMonotonousTimestamps()
.withIdleness(Duration.ofMinutes(1))
.withTimestampAssigner((event, timestamp) -> {
return event.get_Time();
});
DataStream stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));

DataStream> enriched = stream
.keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)
.map(new StatefulSessionCalculator());

WindowedStream, String, TimeWindow> result = 
enriched
.keyBy(new MyKeySelector())
.window(EventTimeSessionWindows.withDynamicGap(new 
DynamicSessionWindows()));


Re: Dynamic ad hoc query deployment strategy

2020-11-23 Thread lalala
Hi Till,

Thank you for your comment. I am looking forward to hearing from Timo and
Dawid as well.

Best regards,



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Logs of JobExecutionListener

2020-11-23 Thread Flavio Pompermaier
For the sake of simplification (so everybody looking for missing methods
in RestClusterClient) I just shared the new methods at [1].
In this way you can add them to the RestClusterClient when you want (if you
want to).
I also had to change the visibility of some variables and methods in order
to make it work.
Probably it would be useful to put DTOs of flink-webmonitor in a standalone
project in order to be "importable" in the client project..

Best,
Flavio

[1]
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java

On Mon, Nov 23, 2020 at 4:38 PM Flavio Pompermaier 
wrote:

> I don't know if they need to be added also to the ClusterClient but for
> sure they are missing in the RestClusterClient
>
> On Mon, Nov 23, 2020 at 4:31 PM Aljoscha Krettek 
> wrote:
>
>> On 23.11.20 16:26, Flavio Pompermaier wrote:
>> > Thank you Aljosha,.now that's more clear!
>> > I didn't know that jobGraph.getJobID() was the solution for my use
>> case..I
>> > was convinced that the job ID was assigned by the cluster!
>> > And to me it's really weird that the job listener was not called by the
>> > submitJob...Probably this should be documented at least.
>> > In the meanwhile I extended a little bit the RestClusterClient..do you
>> > think it could be worth issuing a PR to add some unimplemented methods?
>> >
>> > For example I added:
>> > - public JobExceptionsInfo getFlinkJobExceptionsInfo(JobID flinkJobId);
>> > - public EmptyResponseBody deleteJar(String jarFileName);
>> > - public boolean isJobRunning(JobID fjid)
>> > - public JarUploadResponseBody uploadJar(Path uploadedFile);
>> >
>> > and I was also going to add jarRun..
>>
>> I would be OK with adding these. But you would also need to add them to
>> the base ClusterClient, right?
>>
>> @Till or @Chesnay, any concerns with this?
>
>


Re: Logs of JobExecutionListener

2020-11-23 Thread Flavio Pompermaier
I don't know if they need to be added also to the ClusterClient but for
sure they are missing in the RestClusterClient

On Mon, Nov 23, 2020 at 4:31 PM Aljoscha Krettek 
wrote:

> On 23.11.20 16:26, Flavio Pompermaier wrote:
> > Thank you Aljosha,.now that's more clear!
> > I didn't know that jobGraph.getJobID() was the solution for my use
> case..I
> > was convinced that the job ID was assigned by the cluster!
> > And to me it's really weird that the job listener was not called by the
> > submitJob...Probably this should be documented at least.
> > In the meanwhile I extended a little bit the RestClusterClient..do you
> > think it could be worth issuing a PR to add some unimplemented methods?
> >
> > For example I added:
> > - public JobExceptionsInfo getFlinkJobExceptionsInfo(JobID flinkJobId);
> > - public EmptyResponseBody deleteJar(String jarFileName);
> > - public boolean isJobRunning(JobID fjid)
> > - public JarUploadResponseBody uploadJar(Path uploadedFile);
> >
> > and I was also going to add jarRun..
>
> I would be OK with adding these. But you would also need to add them to
> the base ClusterClient, right?
>
> @Till or @Chesnay, any concerns with this?


Re: Logs of JobExecutionListener

2020-11-23 Thread Aljoscha Krettek

On 23.11.20 16:26, Flavio Pompermaier wrote:

Thank you Aljosha,.now that's more clear!
I didn't know that jobGraph.getJobID() was the solution for my use case..I
was convinced that the job ID was assigned by the cluster!
And to me it's really weird that the job listener was not called by the
submitJob...Probably this should be documented at least.
In the meanwhile I extended a little bit the RestClusterClient..do you
think it could be worth issuing a PR to add some unimplemented methods?

For example I added:
- public JobExceptionsInfo getFlinkJobExceptionsInfo(JobID flinkJobId);
- public EmptyResponseBody deleteJar(String jarFileName);
- public boolean isJobRunning(JobID fjid)
- public JarUploadResponseBody uploadJar(Path uploadedFile);

and I was also going to add jarRun..


I would be OK with adding these. But you would also need to add them to 
the base ClusterClient, right?


@Till or @Chesnay, any concerns with this?


Re: Logs of JobExecutionListener

2020-11-23 Thread Flavio Pompermaier
Thank you Aljosha,.now that's more clear!
I didn't know that jobGraph.getJobID() was the solution for my use case..I
was convinced that the job ID was assigned by the cluster!
And to me it's really weird that the job listener was not called by the
submitJob...Probably this should be documented at least.
In the meanwhile I extended a little bit the RestClusterClient..do you
think it could be worth issuing a PR to add some unimplemented methods?

For example I added:
- public JobExceptionsInfo getFlinkJobExceptionsInfo(JobID flinkJobId);
- public EmptyResponseBody deleteJar(String jarFileName);
- public boolean isJobRunning(JobID fjid)
- public JarUploadResponseBody uploadJar(Path uploadedFile);

and I was also going to add jarRun..

Let me know,
Flavio

On Mon, Nov 23, 2020 at 3:57 PM Aljoscha Krettek 
wrote:

> On 20.11.20 22:09, Flavio Pompermaier wrote:
> > To achieve this, I was using the
> > RestClusterClient because with that I can use the
> > following code and retrieve the JobID:
> >
> >  (1) JobID flinkJobId =
> >
> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();
>
> All you want to do is get the JobID, correct? If yes, you can just do a
> `jobGraph.getJobID()`. The job id is not set on the cluster but it's
> actually set client side, on the JobGraph object.
>
> Does that help in your case?
>
> A general comment on your other questions: yes, the listener logic if
> only used when using the environments. It's not integrated with the
> RestClusterClient, which is considered more of an internal
> implementation detail.
>
> Aljoscha
>
>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


Re: Hi I'm having problems with self-signed certificiate trust with Native K8S

2020-11-23 Thread Till Rohrmann
Thanks for reaching out to the Flink community Kevin. Yes, with Flink
1.12.0 it should be possible to mount secrets with your K8s deployment.
>From the posted stack trace it is not possible to see what exactly is going
wrong. Could you maybe post the complete logs? I am also pulling in Yang
Wang who is most knowledgeable about Flink's K8s integration.

Cheers,
Till

On Sun, Nov 22, 2020 at 12:49 PM Kevin Kwon  wrote:

> I think what we need in the Native Kubernetis Config is to mount custom
> ConfigMap, Secrets, and Volumes
>
> I see that in the upcoming release, Secrets are able to get mounted
>
> https://github.com/apache/flink/pull/14005 <- also can maintainers look
> into this PR so we can mount other custom K8S resources?
>
> On Fri, Nov 20, 2020 at 9:23 PM Kevin Kwon  wrote:
>
>> Hi I am using MinIO as a S3 mock backend for Native K8S
>>
>> Everything seems to be fine except that it cannot connect to S3 since
>> self-signed certificates' trusted store are not cloned in Deployment
>> resources
>>
>> Below is in order, how I add the trusted keystore by using keytools and
>> how I run my app with the built image
>>
>> FROM registry.local/mde/my-flink-app:0.0.1
>> COPY s3/certs/public.crt $FLINK_HOME/s3-e2e-public.crt
>> RUN keytool \
>>   -noprompt \
>>   -alias s3-e2e-public \
>>   -importcert \
>>   -trustcacerts \
>>   -keystore $JAVA_HOME/lib/security/cacerts \
>>   -storepass changeit \
>>   -file $FLINK_HOME/s3-e2e-public.crt
>>
>> $FLINK_HOME/bin/flink run-application \
>>   -t kubernetes-application \
>> -Denv.java.opts="-Dkafka.brokers=kafka-external:9092 
>> -Dkafka.schema-registry.url=kafka-schemaregistry:8081" \
>> -Dkubernetes.container-start-command-template="%java% %classpath% 
>> %jvmmem% %jvmopts% %logging% %class% %args%" \
>> -Dkubernetes.cluster-id=${K8S_CLUSTERID} \
>> 
>> -Dkubernetes.container.image=${DOCKER_REPO}/${ORGANISATION}/${APP_NAME}:${APP_VERSION}
>>  \
>> -Dkubernetes.namespace=${K8S_NAMESPACE} \
>> -Dkubernetes.rest-service.exposed.type=${K8S_RESTSERVICE_EXPOSED_TYPE} \
>> -Dkubernetes.taskmanager.cpu=${K8S_TASKMANAGER_CPU} \
>> -Dresourcemanager.taskmanager-timeout=360 \
>> -Dtaskmanager.memory.process.size=${TASKMANAGER_MEMORY_PROCESS_SIZE} \
>> -Dtaskmanager.numberOfTaskSlots=${TASKMANAGER_NUMBEROFTASKSLOTS} \
>> -Ds3.endpoint=s3:443 \
>> -Ds3.access-key=${S3_ACCESSKEY} \
>> -Ds3.secret-key=${S3_SECRETKEY} \
>> -Ds3.path.style.access=true \
>> -Dstate.backend=filesystem \
>> -Dstate.checkpoints.dir=s3://${ORGANISATION}/${APP_NAME}/checkpoint \
>> -Dstate.savepoints.dir=s3://${ORGANISATION}/${APP_NAME}/savepoint \
>> local://${FLINK_HOME}/usrlib/${APP_NAME}-assembly-${APP_VERSION}.jar
>>
>> However, I get the following error and I don't see my trusted key in 
>> keytools when I login to the pod (seems the trustedstore is not cloned)
>>
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create 
>> checkpoint storage at checkpoint coordinator side.
>> at 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:305)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:224)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) 
>> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>> at 
>> 

Re: Logs of JobExecutionListener

2020-11-23 Thread Aljoscha Krettek

On 20.11.20 22:09, Flavio Pompermaier wrote:

To achieve this, I was using the
RestClusterClient because with that I can use the
following code and retrieve the JobID:

 (1) JobID flinkJobId =
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();


All you want to do is get the JobID, correct? If yes, you can just do a 
`jobGraph.getJobID()`. The job id is not set on the cluster but it's 
actually set client side, on the JobGraph object.


Does that help in your case?

A general comment on your other questions: yes, the listener logic if 
only used when using the environments. It's not integrated with the 
RestClusterClient, which is considered more of an internal 
implementation detail.


Aljoscha



Re: Concise example of how to deploy flink on Kubernetes

2020-11-23 Thread Till Rohrmann
Hi George,

Here is some documentation about how to deploy a stateful function job [1].
In a nutshell, you need to deploy a Flink cluster on which you can run the
stateful function job. This can either happen before (e.g. by spawning a
session cluster on K8s [2]) or you can combine your job into a Docker image
[3] which you can use to spin up a cluster.

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/deployment-and-operations/packaging.html#packaging-for-deployment
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html
[3] https://hub.docker.com/r/ververica/flink-statefun

Cheers,
Till

On Mon, Nov 23, 2020 at 3:39 PM George Costea  wrote:

> Sorry.  Forgot to reply to all.
>
> On Sun, Nov 22, 2020 at 9:24 PM George Costea  wrote:
> >
> > Hi Xingbo,
> >
> > I’m interested in using stateful functions to build an application on
> Kubernetes. Don’t I need to deploy the flink cluster on Kubernetes first
> before deploying my stateful functions?
> >
> > Thanks,
> > George
> >
> > On Sun, Nov 22, 2020 at 9:01 PM Xingbo Huang  wrote:
> >>
> >> Hi, George
> >>
> >> >>>  Is zookeeper needed to run the flink cluster?
> >> No. zookeeper is needed by the kafka.
> >>
> >> >>>  Or is it just the master and worker deployments?
> >> This example uses the kafka connector, so some kafka and zookeeper
> configurations are added.
> >>
> >> >>>  Is the fink cluster included in the master and worker stateful
> functions?
> >> In fact, I don’t really understand how this problem is related to
> stateful functions.
> >>
> >> Best,
> >> Xingbo
> >>
> >> George Costea  于2020年11月21日周六 上午11:09写道:
> >>>
> >>> I looked at that.
> >>>
> >>> The docker-compose file for the greeter example deploys a worker, a
> >>> master, zookeeper, and kafka.  Is zookeeper needed to run the flink
> >>> cluster?  Or is it just the master and worker deployments?  Is the
> >>> fink cluster included in the master and worker stateful functions?
> >>>
> >>> Thanks in advance.
> >>>
> >>> On Fri, Nov 20, 2020 at 9:32 PM Xingbo Huang 
> wrote:
> >>> >
> >>> > Hi George,
> >>> > Have you referred to the official document[1]?
> >>> >
> >>> > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
> >>> >
> >>> > Best,
> >>> > Xingbo
> >>> >
> >>> > 在 2020年11月21日星期六,George Costea  写道:
> >>> > > Hi there,
> >>> > >
> >>> > > Is there an example of how to deploy a flink cluster on Kubernetes?
> >>> > > I'd like to deploy the flink cluster, a kafka-broker, and then the
> >>> > > greeter example to give it a try.
> >>> > >
> >>> > > Thanks,
> >>> > > George
> >>> > >
>


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Till Rohrmann
In all cases (session and per-job mode cluster) except for the JM recovery
of the application mode [1], the main() function only runs once in order to
generate the JobGraph which is sent to the cluster and which is also used
for recoveries.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/#application-mode

Cheers,
Till

On Mon, Nov 23, 2020 at 3:24 PM Si-li Liu  wrote:

> Thanks for your reply.
>
> The source will poll the state of T operator periodicly. The it find the
> offset is 0 then it can fallback to latest committed offset.
>
> Till Rohrmann  于2020年11月23日周一 下午9:35写道:
>
>> Hi Si-li Liu,
>>
>> if you want to run T with a parallelism of 1, then your parallelism of A
>> should be limited by the total number of slots on your TM. Otherwise you
>> would have some A_i which are not running on a machine with T.
>>
>> For the approach with the colocation constraint, you can take a look at
>> Transformation.setCoLocationGroupKey() [1]. Using this API one can define
>> operators whose sub tasks need to run on the same machine (e.g. A_i runs
>> together with B_i on the same machine, even in the same slot). However,
>> this is pretty much an internal feature which might change in future
>> versions.
>>
>> What I did not fully understand is what should happen if your TM dies.
>> Wouldn't then the information of T be lost and the sources would start from
>> offset 0 again? According to your explanation, this should be intolerable
>> given the business requirements.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426
>>
>> Cheers,
>> Till
>>
>> On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise  wrote:
>>
>>> If you would prefer to have T with parallelism 1, one complete
>>> alternative solution would be to leave the timestamp in the state of T and
>>> extract the timestamp from the savepoint/checkpoint upon start of the
>>> application using the state processor API [1]. Unfortunately, it may be a
>>> bit hacky when you do a normal recovery as there is not a single entrypoint
>>> (if you start new you could just extract that timestamp from main()). Of
>>> course, you could also store the information in an external storage but
>>> that would also make the architecture more complicated.
>>>
>>> Let's see if anyone has an idea on the co-location topic.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>
>>> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:
>>>
 Thanks for your reply!

 Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
 should have 1 parallism in topo, also all A_i can start from the same
 timestamp, but some minor difference of resume timestamp in different A_i
 source is also acceptable. So I think multiple T operator is also ok to me
 here. But the prerequisite of this topo can work is I can make sure T and A
 always reside same TM.

 The problem here both stream A and stream B is very huge. 200k ~ 300k
 messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
 compressed) per messages, and I have to keep the whole message in cache. So
 it's hard to fit into Flink state.



 Arvid Heise  于2020年11月21日周六 上午3:35写道:

> Your topology is definitively interesting and makes sense to me on a
> high level. The main question remaining is the parallelism. I'm assuming
> you run your pipeline with parallelism p and both source A and
> timestampcalculator T are run with parallelism p. You want to create a
> situation where for A_i, there is an T_i which run in the same slot. Am I
> right?
>
> If so, then as you have noticed that there is currently no way to
> express that in Flink on a high level. One more idea before trying to 
> solve
> it in a hacky way: How large is B? Could use a broadcast to avoid the
> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
> because then it's easy to produce an operator chain, where everything even
> runs within the same thread.
>
> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>
>> Thanks for your reply.
>>
>> I want to join two stream A and stream B. Items in stream A come in
>> first then I keep them in memory cache, as join key and item, then serval
>> minutes later the items in stream B come in then the join work is
>> performed. The timestamp of the latest expired item in memory cache is 
>> the
>> safe rollback timestamp, I can resume source A from that timestamp when I
>> restart.
>>
>> It's not very percise, maybe lost same items or send same items
>> twice, but seems useful to me in my situation. But if job restart, both
>> source A and source B resume from last consumed offset, it will make the
>> absense of serval minutes join result, which is unacceptable.

Re: Concise example of how to deploy flink on Kubernetes

2020-11-23 Thread George Costea
Sorry.  Forgot to reply to all.

On Sun, Nov 22, 2020 at 9:24 PM George Costea  wrote:
>
> Hi Xingbo,
>
> I’m interested in using stateful functions to build an application on 
> Kubernetes. Don’t I need to deploy the flink cluster on Kubernetes first 
> before deploying my stateful functions?
>
> Thanks,
> George
>
> On Sun, Nov 22, 2020 at 9:01 PM Xingbo Huang  wrote:
>>
>> Hi, George
>>
>> >>>  Is zookeeper needed to run the flink cluster?
>> No. zookeeper is needed by the kafka.
>>
>> >>>  Or is it just the master and worker deployments?
>> This example uses the kafka connector, so some kafka and zookeeper 
>> configurations are added.
>>
>> >>>  Is the fink cluster included in the master and worker stateful 
>> >>> functions?
>> In fact, I don’t really understand how this problem is related to stateful 
>> functions.
>>
>> Best,
>> Xingbo
>>
>> George Costea  于2020年11月21日周六 上午11:09写道:
>>>
>>> I looked at that.
>>>
>>> The docker-compose file for the greeter example deploys a worker, a
>>> master, zookeeper, and kafka.  Is zookeeper needed to run the flink
>>> cluster?  Or is it just the master and worker deployments?  Is the
>>> fink cluster included in the master and worker stateful functions?
>>>
>>> Thanks in advance.
>>>
>>> On Fri, Nov 20, 2020 at 9:32 PM Xingbo Huang  wrote:
>>> >
>>> > Hi George,
>>> > Have you referred to the official document[1]?
>>> >
>>> > [1] 
>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
>>> >
>>> > Best,
>>> > Xingbo
>>> >
>>> > 在 2020年11月21日星期六,George Costea  写道:
>>> > > Hi there,
>>> > >
>>> > > Is there an example of how to deploy a flink cluster on Kubernetes?
>>> > > I'd like to deploy the flink cluster, a kafka-broker, and then the
>>> > > greeter example to give it a try.
>>> > >
>>> > > Thanks,
>>> > > George
>>> > >


Re: Dynamic ad hoc query deployment strategy

2020-11-23 Thread Till Rohrmann
Hi Lalala,

I think this approach can work as long as the generated query plan contains
the same sub plan for the previous queries as before. Otherwise Flink won't
be able to match the state to the operators of the plan. I think Timo and
Dawid should know definitely whether this is possible or not.

Cheers,
Till

On Mon, Nov 23, 2020 at 10:33 AM lalala  wrote:

> Hi Kostas,
>
> Yes, that would satisfy my use case as the platform is always
> future-oriented. Any arbitrary query is executed on the latest data.
>
> From your comment, I understand that even the session mode does not
> optimize
> our readers. I wish Flink could support arbitrary job submission and graph
> generation in runtime, so we could submit jobs dynamically from main() as
> we
> do in Spark.
>
> If we want to group similar jobs, what would you recommend us for arbitrary
> long-running jobs? Can we somehow take a snapshot of the queries running
> under a job graph then resubmit them with the new query?
>
> I assume if we do the following under a single job(main method);
>
> ’’’
> Source: create table A...
> Query1: select * from A
> Query 2: select * from A
> ’’’
>
> Both queries will share the same reader as they are part of a single job
> graph. Can we somehow take a snapshot of this and submit another query with
> them again under the same job graph?
>
> I really appreciate your time for answering my questions,
>
> Best.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Si-li Liu
Thanks for your reply.

The source will poll the state of T operator periodicly. The it find the
offset is 0 then it can fallback to latest committed offset.

Till Rohrmann  于2020年11月23日周一 下午9:35写道:

> Hi Si-li Liu,
>
> if you want to run T with a parallelism of 1, then your parallelism of A
> should be limited by the total number of slots on your TM. Otherwise you
> would have some A_i which are not running on a machine with T.
>
> For the approach with the colocation constraint, you can take a look at
> Transformation.setCoLocationGroupKey() [1]. Using this API one can define
> operators whose sub tasks need to run on the same machine (e.g. A_i runs
> together with B_i on the same machine, even in the same slot). However,
> this is pretty much an internal feature which might change in future
> versions.
>
> What I did not fully understand is what should happen if your TM dies.
> Wouldn't then the information of T be lost and the sources would start from
> offset 0 again? According to your explanation, this should be intolerable
> given the business requirements.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426
>
> Cheers,
> Till
>
> On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise  wrote:
>
>> If you would prefer to have T with parallelism 1, one complete
>> alternative solution would be to leave the timestamp in the state of T and
>> extract the timestamp from the savepoint/checkpoint upon start of the
>> application using the state processor API [1]. Unfortunately, it may be a
>> bit hacky when you do a normal recovery as there is not a single entrypoint
>> (if you start new you could just extract that timestamp from main()). Of
>> course, you could also store the information in an external storage but
>> that would also make the architecture more complicated.
>>
>> Let's see if anyone has an idea on the co-location topic.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:
>>
>>> Thanks for your reply!
>>>
>>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
>>> should have 1 parallism in topo, also all A_i can start from the same
>>> timestamp, but some minor difference of resume timestamp in different A_i
>>> source is also acceptable. So I think multiple T operator is also ok to me
>>> here. But the prerequisite of this topo can work is I can make sure T and A
>>> always reside same TM.
>>>
>>> The problem here both stream A and stream B is very huge. 200k ~ 300k
>>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
>>> compressed) per messages, and I have to keep the whole message in cache. So
>>> it's hard to fit into Flink state.
>>>
>>>
>>>
>>> Arvid Heise  于2020年11月21日周六 上午3:35写道:
>>>
 Your topology is definitively interesting and makes sense to me on a
 high level. The main question remaining is the parallelism. I'm assuming
 you run your pipeline with parallelism p and both source A and
 timestampcalculator T are run with parallelism p. You want to create a
 situation where for A_i, there is an T_i which run in the same slot. Am I
 right?

 If so, then as you have noticed that there is currently no way to
 express that in Flink on a high level. One more idea before trying to solve
 it in a hacky way: How large is B? Could use a broadcast to avoid the
 shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
 because then it's easy to produce an operator chain, where everything even
 runs within the same thread.

 On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:

> Thanks for your reply.
>
> I want to join two stream A and stream B. Items in stream A come in
> first then I keep them in memory cache, as join key and item, then serval
> minutes later the items in stream B come in then the join work is
> performed. The timestamp of the latest expired item in memory cache is the
> safe rollback timestamp, I can resume source A from that timestamp when I
> restart.
>
> It's not very percise, maybe lost same items or send same items twice,
> but seems useful to me in my situation. But if job restart, both source A
> and source B resume from last consumed offset, it will make the absense of
> serval minutes join result, which is unacceptable.
>
> The topo I consider is like
>
> source A -> parser --shuffle--> join -> sink
> source B -> parser ...(parallel)  |--->timestampcalculator
>
> Memory cache aside in join operator, the join operator will broadcast
> the timestamp of latest expired cache item to the timestampcalculator. 
> Then
> timestampcalculator will use them to calculate a safe rollback timestamp 
> (a
> moving minimum) that source A can resume from that timestamp, source B 
> will

statementset下source怎么完全复用

2020-11-23 Thread Jeff
请问一下,flink 1.11statement set 怎么复用同一个source呢? 
希望同一个job里不同sink使用完全相同的数据,不是默认的用hash分流,这个有地方设置么?

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Si-li Liu
Thanks for your reply!

Seems state processor api can solve my problem, the state written by T
operator's checkpoint can be read by main function when job restart. My
question is, when streaming job restarts due to some reason, does the main
function will also rerun again?

Arvid Heise  于2020年11月23日周一 下午6:00写道:

> If you would prefer to have T with parallelism 1, one complete alternative
> solution would be to leave the timestamp in the state of T and extract the
> timestamp from the savepoint/checkpoint upon start of the application using
> the state processor API [1]. Unfortunately, it may be a bit hacky when you
> do a normal recovery as there is not a single entrypoint (if you start new
> you could just extract that timestamp from main()). Of course, you could
> also store the information in an external storage but that would also make
> the architecture more complicated.
>
> Let's see if anyone has an idea on the co-location topic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:
>
>> Thanks for your reply!
>>
>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
>> should have 1 parallism in topo, also all A_i can start from the same
>> timestamp, but some minor difference of resume timestamp in different A_i
>> source is also acceptable. So I think multiple T operator is also ok to me
>> here. But the prerequisite of this topo can work is I can make sure T and A
>> always reside same TM.
>>
>> The problem here both stream A and stream B is very huge. 200k ~ 300k
>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
>> compressed) per messages, and I have to keep the whole message in cache. So
>> it's hard to fit into Flink state.
>>
>>
>>
>> Arvid Heise  于2020年11月21日周六 上午3:35写道:
>>
>>> Your topology is definitively interesting and makes sense to me on a
>>> high level. The main question remaining is the parallelism. I'm assuming
>>> you run your pipeline with parallelism p and both source A and
>>> timestampcalculator T are run with parallelism p. You want to create a
>>> situation where for A_i, there is an T_i which run in the same slot. Am I
>>> right?
>>>
>>> If so, then as you have noticed that there is currently no way to
>>> express that in Flink on a high level. One more idea before trying to solve
>>> it in a hacky way: How large is B? Could use a broadcast to avoid the
>>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
>>> because then it's easy to produce an operator chain, where everything even
>>> runs within the same thread.
>>>
>>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>>>
 Thanks for your reply.

 I want to join two stream A and stream B. Items in stream A come in
 first then I keep them in memory cache, as join key and item, then serval
 minutes later the items in stream B come in then the join work is
 performed. The timestamp of the latest expired item in memory cache is the
 safe rollback timestamp, I can resume source A from that timestamp when I
 restart.

 It's not very percise, maybe lost same items or send same items twice,
 but seems useful to me in my situation. But if job restart, both source A
 and source B resume from last consumed offset, it will make the absense of
 serval minutes join result, which is unacceptable.

 The topo I consider is like

 source A -> parser --shuffle--> join -> sink
 source B -> parser ...(parallel)  |--->timestampcalculator

 Memory cache aside in join operator, the join operator will broadcast
 the timestamp of latest expired cache item to the timestampcalculator. Then
 timestampcalculator will use them to calculate a safe rollback timestamp (a
 moving minimum) that source A can resume from that timestamp, source B will
 also restart from that timestamp. I will add a bloomfilter in sink's state
 to avoid duplicate items.

 So I want to let timestampcalculator operator and source A are located
 in one TM, then I can send this timestamp from timestampcalculator to
 source A by static variable.

 Hope I make my problem clear with my poor English, it seems a little
 tricky. But I think it's the only way to do two streams join and avoid to
 store very huge state.



 Arvid Heise  于2020年11月20日周五 下午2:58写道:

> I still haven't fully understood. Do you mean you can't infer the
> timestamp in source A because it depends on some internal field of source 
> B?
>
> How is that actually working in a parallel setting? Which timestamp is
> used in the different instances of a source?
>
> Say, we have task A1 which is the first subtask of source A and task
> B2 as the second subtask of source B. How would you like them to be
> located? How does that correlate to the third subtask of the 

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Till Rohrmann
Hi Si-li Liu,

if you want to run T with a parallelism of 1, then your parallelism of A
should be limited by the total number of slots on your TM. Otherwise you
would have some A_i which are not running on a machine with T.

For the approach with the colocation constraint, you can take a look at
Transformation.setCoLocationGroupKey() [1]. Using this API one can define
operators whose sub tasks need to run on the same machine (e.g. A_i runs
together with B_i on the same machine, even in the same slot). However,
this is pretty much an internal feature which might change in future
versions.

What I did not fully understand is what should happen if your TM dies.
Wouldn't then the information of T be lost and the sources would start from
offset 0 again? According to your explanation, this should be intolerable
given the business requirements.

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426

Cheers,
Till

On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise  wrote:

> If you would prefer to have T with parallelism 1, one complete alternative
> solution would be to leave the timestamp in the state of T and extract the
> timestamp from the savepoint/checkpoint upon start of the application using
> the state processor API [1]. Unfortunately, it may be a bit hacky when you
> do a normal recovery as there is not a single entrypoint (if you start new
> you could just extract that timestamp from main()). Of course, you could
> also store the information in an external storage but that would also make
> the architecture more complicated.
>
> Let's see if anyone has an idea on the co-location topic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:
>
>> Thanks for your reply!
>>
>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
>> should have 1 parallism in topo, also all A_i can start from the same
>> timestamp, but some minor difference of resume timestamp in different A_i
>> source is also acceptable. So I think multiple T operator is also ok to me
>> here. But the prerequisite of this topo can work is I can make sure T and A
>> always reside same TM.
>>
>> The problem here both stream A and stream B is very huge. 200k ~ 300k
>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
>> compressed) per messages, and I have to keep the whole message in cache. So
>> it's hard to fit into Flink state.
>>
>>
>>
>> Arvid Heise  于2020年11月21日周六 上午3:35写道:
>>
>>> Your topology is definitively interesting and makes sense to me on a
>>> high level. The main question remaining is the parallelism. I'm assuming
>>> you run your pipeline with parallelism p and both source A and
>>> timestampcalculator T are run with parallelism p. You want to create a
>>> situation where for A_i, there is an T_i which run in the same slot. Am I
>>> right?
>>>
>>> If so, then as you have noticed that there is currently no way to
>>> express that in Flink on a high level. One more idea before trying to solve
>>> it in a hacky way: How large is B? Could use a broadcast to avoid the
>>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
>>> because then it's easy to produce an operator chain, where everything even
>>> runs within the same thread.
>>>
>>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>>>
 Thanks for your reply.

 I want to join two stream A and stream B. Items in stream A come in
 first then I keep them in memory cache, as join key and item, then serval
 minutes later the items in stream B come in then the join work is
 performed. The timestamp of the latest expired item in memory cache is the
 safe rollback timestamp, I can resume source A from that timestamp when I
 restart.

 It's not very percise, maybe lost same items or send same items twice,
 but seems useful to me in my situation. But if job restart, both source A
 and source B resume from last consumed offset, it will make the absense of
 serval minutes join result, which is unacceptable.

 The topo I consider is like

 source A -> parser --shuffle--> join -> sink
 source B -> parser ...(parallel)  |--->timestampcalculator

 Memory cache aside in join operator, the join operator will broadcast
 the timestamp of latest expired cache item to the timestampcalculator. Then
 timestampcalculator will use them to calculate a safe rollback timestamp (a
 moving minimum) that source A can resume from that timestamp, source B will
 also restart from that timestamp. I will add a bloomfilter in sink's state
 to avoid duplicate items.

 So I want to let timestampcalculator operator and source A are located
 in one TM, then I can send this timestamp from timestampcalculator to
 source A by static variable.

 Hope I make my problem clear 

Re: Logs of JobExecutionListener

2020-11-23 Thread Flavio Pompermaier
My final analysis is that the RestClusterClient lack of many methods
(jarUpload, jarRun, getExceptions for example) and that the submitJob (and
the JobSubmitHandler endpoint) is bugged or should be deprecated (because
it does not call the job listeners).
Indeed, if the JarRunHandler endpoint is invoked (e.b. from the Web UI) the
job listeners are invoked correctly.
In order to do what I want I see the following options (correct me if I'm
wrong):

   1. Wait for JobSubmitHandler to be fixed (if the fact that the job
   listeners are not called is a bug..maybe it is not but this should be
   documented at least)
   2. Extend the RestClusterClient in order to replace the call to the
   submitJob with jarUpload + jarRun (+jarDelete maybe)
   3. Obtain a JobID from ClientUtils.executeProgram()..but how to do that
   is not clear at all to me..is that possibile?

Thanks in advance for any support,
Flavio


On Fri, Nov 20, 2020 at 10:09 PM Flavio Pompermaier 
wrote:

> I think that the problem is that my REST service submits the job to
> the Flink standalone cluster and responds to the client with the
> submitted job ID.
> To achieve this, I was using the
> RestClusterClient because with that I can use the
> following code and retrieve the JobID:
>
> (1) JobID flinkJobId =
>
> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();
>
> Unfortunately this does not activate the job listener (that is quite
> surprising to me...I thought that such a listener was triggered by the
> JobManager).
> So, after Aljoscha answer I take a deeper look into the Flink CLI code
> and what it does is basically this:
>
> (2) ClientUtils.executeProgram(new DefaultExecutorServiceLoader(),
> flinkConf, packagedProgram, false, false);
>
> That works as expected (I wasn't aware of the ThreadLocal mechanism
> used by the ContextEnvironment and StreamContextEnvironment: a very
> advanced programming technique) but it does not allow to get back the
> job id that I need..I can live with that because I can save the Flink
> Job ID in an external service when the job listener triggers the
> onJobSubmitted method but I think this mechanism is quite weird..
>
> So my question is: is there a simple way to achieve my goal? Am I
> doing something wrong?
> At the moment I had to implement a job-status polling thread after the
> line (1) but this looks like a  workaround to me..
>
> Best,
> Flavio
>
> On Thu, Nov 19, 2020 at 4:28 PM Flavio Pompermaier 
> wrote:
> >
> > You're right..I removed my flink dir and I re-extracted it and now it
> > works. Unfortunately I didn't keep the old version to understand what
> > were the difference but the error was probably caused by the fact that
> > I had a previous version of the WordCount.jar (without the listener)
> > in the flink lib dir.. (in another dev session I was experimenting in
> > running the job having the user jar in the lib dir). Sorry for the
> > confusion.
> > Just one last question: is the listener executed on the client or on
> > the job server? This is not entirely clear to me..
> >
> > Best,
> > Flavio
> >
> > On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin 
> wrote:
> > >
> > > I also tried 1.11.0 and 1.11.2, both work for me.
> > >
> > > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek 
> wrote:
> > >>
> > >> Hmm, there was this issue:
> > >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be
> fixed
> > >> in your version.
> > >>
> > >> On 19.11.20 12:58, Flavio Pompermaier wrote:
> > >> > Which version are you using?
> > >> > I used the exact same commands on Flink 1.11.0 and I didn't get the
> job
> > >> > listener output..
> > >> >
> > >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin 
> ha scritto:
> > >> >
> > >> >> Hi Flavio and Aljoscha,
> > >> >>
> > >> >> Sorry for the late heads up. I could not actually reproduce the
> reported
> > >> >> problem with 'flink run' and local standalone cluster on master.
> > >> >> I get the expected output with the suggested modification of
> WordCount
> > >> >> program:
> > >> >>
> > >> >> $ bin/start-cluster.sh
> > >> >>
> > >> >> $ rm -rf out; bin/flink run
> > >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar
> --output
> > >> >> flink/build-target/out
> > >> >>
> > >> >> Executing WordCount example with default input data set.
> > >> >> Use --input to specify file input.
> > >> >>  SUBMITTED
> > >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
> > >> >> Program execution finished
> > >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
> > >> >> Job Runtime: 139 ms
> > >> >>
> > >> >>  EXECUTED
> > >> >>
> > >> >> Best,
> > >> >> Andrey
> > >> >>
> > >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <
> aljos...@apache.org>
> > >> >> wrote:
> > >> >>
> > >> >>> JobListener.onJobExecuted() is only invoked in
> > >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute().
> If none
> > 

关于Catalog的建议

2020-11-23 Thread 赵一旦
目前Flink提供memory、jdbc、hive这3种catalog。
感觉实际使用中,可以使用如下几种方案。

(1)选择memory catalog,然后每次sql都带上自己的相关DDL。
(2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。

方案1和方案2各有优缺点。
方案1的优点:
比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
方案1的缺点:
很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。

-然后,我的问题来了。
在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。

问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。

当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。


Re: flink读mysql分库分表

2020-11-23 Thread Leonard Xu
Hi,
我没理解错的话你是想一次读出所有表(分库分表)的所有数据, 用一个DDL建表语句搞定,目前还不支持

祝好,
Leonard

> 在 2020年11月23日,17:22,酷酷的浑蛋  写道:
> 
> 
> 
> flink读mysql分库分表可以自动识别吗? 还是只能一个一个读?
> 



Re: Is possible that make two operators always locate in same taskmanager?

2020-11-23 Thread Arvid Heise
If you would prefer to have T with parallelism 1, one complete alternative
solution would be to leave the timestamp in the state of T and extract the
timestamp from the savepoint/checkpoint upon start of the application using
the state processor API [1]. Unfortunately, it may be a bit hacky when you
do a normal recovery as there is not a single entrypoint (if you start new
you could just extract that timestamp from main()). Of course, you could
also store the information in an external storage but that would also make
the architecture more complicated.

Let's see if anyone has an idea on the co-location topic.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu  wrote:

> Thanks for your reply!
>
> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
> should have 1 parallism in topo, also all A_i can start from the same
> timestamp, but some minor difference of resume timestamp in different A_i
> source is also acceptable. So I think multiple T operator is also ok to me
> here. But the prerequisite of this topo can work is I can make sure T and A
> always reside same TM.
>
> The problem here both stream A and stream B is very huge. 200k ~ 300k
> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
> compressed) per messages, and I have to keep the whole message in cache. So
> it's hard to fit into Flink state.
>
>
>
> Arvid Heise  于2020年11月21日周六 上午3:35写道:
>
>> Your topology is definitively interesting and makes sense to me on a high
>> level. The main question remaining is the parallelism. I'm assuming you run
>> your pipeline with parallelism p and both source A and timestampcalculator
>> T are run with parallelism p. You want to create a situation where for A_i,
>> there is an T_i which run in the same slot. Am I right?
>>
>> If so, then as you have noticed that there is currently no way to express
>> that in Flink on a high level. One more idea before trying to solve it in a
>> hacky way: How large is B? Could use a broadcast to avoid the shuffle on A?
>> I'm thinking of creating a pipeline A->J(side input B)->T, because then
>> it's easy to produce an operator chain, where everything even runs within
>> the same thread.
>>
>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>>
>>> Thanks for your reply.
>>>
>>> I want to join two stream A and stream B. Items in stream A come in
>>> first then I keep them in memory cache, as join key and item, then serval
>>> minutes later the items in stream B come in then the join work is
>>> performed. The timestamp of the latest expired item in memory cache is the
>>> safe rollback timestamp, I can resume source A from that timestamp when I
>>> restart.
>>>
>>> It's not very percise, maybe lost same items or send same items twice,
>>> but seems useful to me in my situation. But if job restart, both source A
>>> and source B resume from last consumed offset, it will make the absense of
>>> serval minutes join result, which is unacceptable.
>>>
>>> The topo I consider is like
>>>
>>> source A -> parser --shuffle--> join -> sink
>>> source B -> parser ...(parallel)  |--->timestampcalculator
>>>
>>> Memory cache aside in join operator, the join operator will broadcast
>>> the timestamp of latest expired cache item to the timestampcalculator. Then
>>> timestampcalculator will use them to calculate a safe rollback timestamp (a
>>> moving minimum) that source A can resume from that timestamp, source B will
>>> also restart from that timestamp. I will add a bloomfilter in sink's state
>>> to avoid duplicate items.
>>>
>>> So I want to let timestampcalculator operator and source A are located
>>> in one TM, then I can send this timestamp from timestampcalculator to
>>> source A by static variable.
>>>
>>> Hope I make my problem clear with my poor English, it seems a little
>>> tricky. But I think it's the only way to do two streams join and avoid to
>>> store very huge state.
>>>
>>>
>>>
>>> Arvid Heise  于2020年11月20日周五 下午2:58写道:
>>>
 I still haven't fully understood. Do you mean you can't infer the
 timestamp in source A because it depends on some internal field of source 
 B?

 How is that actually working in a parallel setting? Which timestamp is
 used in the different instances of a source?

 Say, we have task A1 which is the first subtask of source A and task B2
 as the second subtask of source B. How would you like them to be located?
 How does that correlate to the third subtask of the join (let's call it 
 J3).

 Remember that through the shuffling before the join there is no clear
 correlation between any subtask of A or B to J...

 On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:

> Thanks for your help!
>
> Now the timestamps already go with the items in streaming. My
> streaming pipeline is like this:
>
> source -> parser --shuffle--> join -> sink

Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-23 Thread Benchao Li
Hi Dongwon,

You are hitting a known bug[1] which is fixed in 1.11.3 and 1.12.0

Another tip, currently, LIKE clause cannot work with Hive table. (General
table stored in hive metastore should work)

[1] https://issues.apache.org/jira/browse/FLINK-19281

Dongwon Kim  于2020年11月17日周二 上午12:04写道:

> Hi Danny~
> Sorry for late reply,
>
> Let's take a look at a running example:
>
>> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>   .inBatchMode()
>>   .build();
>>
>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>
>> HiveCatalog hiveCatalog = new HiveCatalog("hive",null, args[1]);
>> tEnv.registerCatalog("hive", hiveCatalog);
>>
>> GenericInMemoryCatalog inmemCatalog = new GenericInMemoryCatalog("inmem");
>> tEnv.registerCatalog("inmem", inmemCatalog);
>> tEnv.useCatalog("inmem");
>>
>> TableResult result = tEnv.executeSql(
>>   "CREATE TABLE copied LIKE hive.navi.gps"
>> );
>>
>
> I've got the following log messages:
>
>> 00:50:22,157 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>>[] - Setting hive conf dir as /Users/eastcirclek/hive-conf
>> 00:50:22,503 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>>[] - Created HiveCatalog 'hive'
>> 00:50:22,515 INFO  hive.metastore
>>   [] - Trying to connect to metastore with URI thrift://...:9083
>> 00:50:22,678 INFO  hive.metastore
>>   [] - Connected to metastore.
>> 00:50:22,678 INFO  org.apache.flink.table.catalog.hive.HiveCatalog
>>[] - Connected to Hive metastore
>> 00:50:22,799 INFO  org.apache.flink.table.catalog.CatalogManager
>>[] - Set the current default catalog as [inmem] and the current
>> default database as [default].
>> *Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Source table
>> '`inmem`.`default`.`hive.navi.gps`' of the LIKE clause not found in the
>> catalog, at line 1, column 26*
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:147)
>> at java.util.Optional.orElseThrow(Optional.java:290)
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:147)
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:96)
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> at Test.main(Test.java:53)
>>
>
> It seems like hive.navi.gps is recognized as a table name as a whole.
> I currently declare such a table by specifying all fields without the LIKE
> clause.
>
> Do I miss something?
>
> FYI, I'm working with Flink-1.11.2.
>
> Thank you~
>
> Best,
>
> Dongwon
>
>
> On Fri, Nov 13, 2020 at 5:19 PM Danny Chan  wrote:
>
>> Hi Dongwon ~
>>
>> Table from different catalog/db is supported, you need to specify the
>> full path of the source table:
>>
>> CREATE TABLE Orders_with_watermark (
>> *...*) WITH (
>> *...*)LIKE my_catalog.my_db.Orders;
>>
>>
>> Dongwon Kim  于2020年11月11日周三 下午2:53写道:
>>
>>> Hi,
>>>
>>> Is it disallowed to refer to a table from different databases or
>>> catalogs when someone creates a table?
>>>
>>> According to [1], there's no way to refer to tables belonging to
>>> different databases or catalogs.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>

-- 

Best,
Benchao Li


Re: Dynamic ad hoc query deployment strategy

2020-11-23 Thread lalala
Hi Kostas,

Yes, that would satisfy my use case as the platform is always
future-oriented. Any arbitrary query is executed on the latest data.

>From your comment, I understand that even the session mode does not optimize
our readers. I wish Flink could support arbitrary job submission and graph
generation in runtime, so we could submit jobs dynamically from main() as we
do in Spark.

If we want to group similar jobs, what would you recommend us for arbitrary
long-running jobs? Can we somehow take a snapshot of the queries running
under a job graph then resubmit them with the new query?

I assume if we do the following under a single job(main method);

’’’
Source: create table A...
Query1: select * from A
Query 2: select * from A
’’’

Both queries will share the same reader as they are part of a single job
graph. Can we somehow take a snapshot of this and submit another query with
them again under the same job graph?

I really appreciate your time for answering my questions,

Best.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 Thread 赵一旦
哦哦, 好吧,我一直以为你说的“新旧”是是否指定了update-mode。理解错了。
good,那应该没问题了,我去改改。




Jark Wu  于2020年11月23日周一 下午5:18写道:

> 你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query
> 是否有更新来决定工作模式的。
> 如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 17:14, 赵一旦  wrote:
>
> > duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
> > values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。
> >
> > 此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
> > 。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。
> >
> >
> > 赵一旦  于2020年11月23日周一 下午5:09写道:
> >
> > > 总结下:
> > > (1)group
> > >
> >
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> > > (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
> > >
> > >
> > >
> >
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> > > 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
> > >
> > >
> > > 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate
> > update方式输出。
> > > 甚至DDL中推荐可以搞个自定义on
> > > duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on
> duplicate
> > > update功能。
> > >
> > >
> > >
> > >
> > > 赵一旦  于2020年11月23日周一 下午4:48写道:
> > >
> > >> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> > >> 发现这种方式也不行,但是加了group by之后是可以的。
> > >>
> > >> (1)
> > >> 所以说是否还需要query带有key的语义才行呢?
> > >> 比如group by的结果是可能update的,并且基于group by key也指出了key。
> > >>
> > >> 那么group by + tumble
> window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
> > >>
> > >> (2)如JarkWu所说,是mysql表的DDL部分决定。
> > >>
> > >> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
> > >>
> > >> Jark Wu  于2020年11月23日周一 下午4:28写道:
> > >>
> > >>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
> > >>>
> > >>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
> > >>>
> > >>> Best,
> > >>> Jark
> > >>>
> > >>> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
> > >>>
> > >>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> > >>> >
> > >>> >
> > >>>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> > >>> > 页面。
> > >>> >
> > >>> >
> > >>> >
> > >>>
> >
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> > >>> >
> > >>> > Jark Wu  于2020年11月23日周一 下午3:32写道:
> > >>> >
> > >>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> > >>> > >
> > >>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> > >>> > >
> > >>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > >>> > > > <
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > >>> > > > >
> > >>> > > >
> > >>> > > > Flink uses the primary key that defined in DDL when writing
> data
> > to
> > >>> > > > external databases. The connector operate in upsert mode if the
> > >>> primary
> > >>> > > key
> > >>> > > > was defined, otherwise, the connector operate in append mode.
> > >>> > > >
> > >>> > > > In upsert mode, Flink will insert a new row or update the
> > existing
> > >>> row
> > >>> > > > according to the primary key, Flink can ensure the idempotence
> in
> > >>> this
> > >>> > > way.
> > >>> > > > To guarantee the output result is as expected, it’s recommended
> > to
> > >>> > define
> > >>> > > > primary key for the table and make sure the primary key is one
> of
> > >>> the
> > >>> > > > unique key sets or primary key of the underlying database
> table.
> > In
> > >>> > > append
> > >>> > > > mode, Flink will interpret all records as INSERT messages, the
> > >>> INSERT
> > >>> > > > operation may fail if a primary key or unique constraint
> > violation
> > >>> > > happens
> > >>> > > > in the underlying database.
> > >>> > > >
> > >>> > > > See CREATE TABLE DDL
> > >>> > > > <
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > >>> > > > >
> > >>> > > > for
> > >>> > > > more details about PRIMARY KEY syntax.
> > >>> > > >
> > >>> > > >
> > >>> > > > 这里也有一点,In append mode, Flink will interpret all records as
> INSERT
> > >>> > > messages,
> > >>> > > > the INSERT operation may fail if a primary key or unique
> > constraint
> > >>> > > > violation happens in the underlying database.  什么叫append
> > >>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > >>> > > >
> > >>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > >>> > > >
> > >>> > > >
> > >>> > > >
> > >>> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
> > >>> > > >
> > >>> > > > > 补充sql:
> > >>> > > > >
> > >>> > > > > DDL:
> > >>> > > > >
> > >>> > > > > CREATE TABLE flink_recent_pv_subid
> > >>> > > > > (
> > >>> > > > > `supply_id` STRING,
> > >>> > > > > `subid` STRING,
> > >>> > > > > `mark`  STRING,
> > >>> > > > > `time`  STRING,
> > >>> > > > > `pv`BIGINT,
> > >>> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, 

Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-23 Thread 赵一旦
@Yun Tang,应该是这个问题。
请教下这几个参数具体含义。

backoff in milliseconds for partition requests of input channels
是什么逻辑,以及initial和max分别表达含义。


akka.ask.timeout这个参数相对明显,就是超时,这个以前也涉及过,在cancel/submit/savepoint等情况都可能导致集群slot陆续没掉,然后再陆续回来(pass环境,基本就是部分机器失联,然后重新连接的case)。



Yun Tang  于2020年11月23日周一 下午5:11写道:

> Hi
>
> 集群负载比较大的时候,下游一直收不到request的partition,就会导致PartitionNotFoundException,建议增大
> taskmanager.network.request-backoff.max [1][2] 以增大重试次数
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-network-request-backoff-max
> [2] https://juejin.cn/post/6844904185347964942#heading-8
>
>
> 祝好
> 唐云
> 
> From: 赵一旦 
> Sent: Monday, November 23, 2020 13:08
> To: user-zh@flink.apache.org 
> Subject: Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。
>
> 这个报错和kafka没有关系的哈,我大概理解是提交任务的瞬间,jobManager/taskManager机器压力较大,存在机器之间心跳超时什么的?
> 这个partition应该是指flink运行图中的数据partition,我感觉。没有具体细看,每次提交的瞬间可能遇到这个问题,然后会自动重试成功。
>
> zhisheng  于2020年11月18日周三 下午10:51写道:
>
> > 是不是有 kafka 机器挂了?
> >
> > Best
> > zhisheng
> >
> > hailongwang <18868816...@163.com> 于2020年11月18日周三 下午5:56写道:
> >
> > > 感觉还有其它 root cause,可以看下还有其它日志不?
> > >
> > >
> > > Best,
> > > Hailong
> > >
> > > At 2020-11-18 15:52:57, "赵一旦"  wrote:
> > > >2020-11-18 16:51:37
> > > >org.apache.flink.runtime.io
> > .network.partition.PartitionNotFoundException:
> > > >Partition
> > > b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
> > > >not found.
> > > >at org.apache.flink.runtime.io.network.partition.consumer.
> > > >RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
> > > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >
> > >
> >
> >RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
> > > >at org.apache.flink.runtime.io.network.partition.consumer.
> > > >SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
> > > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >
> > >
> >
> >SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
> > > >)
> > > >at
> > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> > > >.java:670)
> > > >at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> > > >CompletableFuture.java:646)
> > > >at java.util.concurrent.CompletableFuture$Completion.run(
> > > >CompletableFuture.java:456)
> > > >at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > > >at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > > >ForkJoinExecutorConfigurator.scala:44)
> > > >at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > >at
> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> > > >.java:1339)
> > > >at
> > > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > >at
> > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
> > > >.java:107)
> > > >
> > > >
> > > >请问这是什么问题呢?
> > >
> >
>


flink读mysql分库分表

2020-11-23 Thread 酷酷的浑蛋


flink读mysql分库分表可以自动识别吗? 还是只能一个一个读?



Re: pyflink 1.11.1 调用包含第三方依赖库的udf时报错

2020-11-23 Thread Xingbo Huang
Hi,

你可以帖下taskmanager的日志吗,这个日志只能看到启动Python进程的时候挂掉了,其他信息看不到。

Best,
Xingbo

fengmuqi...@ruqimobility.com  于2020年11月23日周一
下午4:11写道:

> hi.
> pyflink 1.11.1 调用包含第三方依赖库的udf时报错 :
>
> 运行环境:
> windows10
> python==3.7.9
> apache-flink==1.11.1
> apache-beam==2.19.0
>
> udf 依赖第三方库:
> h3==3.7.0
>
> pytest 通过。
>
> 运行时报错,报错信息如下
> 2020-11-23 14:20:51,656 WARN  org.apache.flink.runtime.taskmanager.Task
> [] - Source: TableSourceScan(table=[[default_catalog,
> default_database, order_info]], fields=[source, order_id, user_id, car_id,
> driver_name, driver_id, time_dispatch_done, time_start, time_cancel,
> status, start_city, end_city, start_ad_code, end_ad_code, cancel_reason_id,
> realStartLatitude, realStartLongitude]) -> StreamExecPythonCalc -> Sink:
> Sink(table=[default_catalog.default_database.print_table],
> fields=[hash_h3]) (3/8) (056a0a0cdf3838794f4023e61d04a690) switched from
> RUNNING to FAILED.
> java.lang.RuntimeException: Failed to create stage bundle factory!
>at
> org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
> Caused by:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.IllegalStateException: Process died with exit code 0
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:331)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
>at
> 

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 Thread Jark Wu
你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query 是否有更新来决定工作模式的。
如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。

Best,
Jark

On Mon, 23 Nov 2020 at 17:14, 赵一旦  wrote:

> duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
> values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。
>
> 此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
> 。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。
>
>
> 赵一旦  于2020年11月23日周一 下午5:09写道:
>
> > 总结下:
> > (1)group
> >
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> > (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
> >
> >
> >
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> > 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
> >
> >
> > 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate
> update方式输出。
> > 甚至DDL中推荐可以搞个自定义on
> > duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
> > update功能。
> >
> >
> >
> >
> > 赵一旦  于2020年11月23日周一 下午4:48写道:
> >
> >> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> >> 发现这种方式也不行,但是加了group by之后是可以的。
> >>
> >> (1)
> >> 所以说是否还需要query带有key的语义才行呢?
> >> 比如group by的结果是可能update的,并且基于group by key也指出了key。
> >>
> >> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
> >>
> >> (2)如JarkWu所说,是mysql表的DDL部分决定。
> >>
> >> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
> >>
> >> Jark Wu  于2020年11月23日周一 下午4:28写道:
> >>
> >>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
> >>>
> >>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
> >>>
> >>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> >>> >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> >>> > 页面。
> >>> >
> >>> >
> >>> >
> >>>
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> >>> >
> >>> > Jark Wu  于2020年11月23日周一 下午3:32写道:
> >>> >
> >>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> >>> > >
> >>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> >>> > >
> >>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> >>> > > > <
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> >>> > > > >
> >>> > > >
> >>> > > > Flink uses the primary key that defined in DDL when writing data
> to
> >>> > > > external databases. The connector operate in upsert mode if the
> >>> primary
> >>> > > key
> >>> > > > was defined, otherwise, the connector operate in append mode.
> >>> > > >
> >>> > > > In upsert mode, Flink will insert a new row or update the
> existing
> >>> row
> >>> > > > according to the primary key, Flink can ensure the idempotence in
> >>> this
> >>> > > way.
> >>> > > > To guarantee the output result is as expected, it’s recommended
> to
> >>> > define
> >>> > > > primary key for the table and make sure the primary key is one of
> >>> the
> >>> > > > unique key sets or primary key of the underlying database table.
> In
> >>> > > append
> >>> > > > mode, Flink will interpret all records as INSERT messages, the
> >>> INSERT
> >>> > > > operation may fail if a primary key or unique constraint
> violation
> >>> > > happens
> >>> > > > in the underlying database.
> >>> > > >
> >>> > > > See CREATE TABLE DDL
> >>> > > > <
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> >>> > > > >
> >>> > > > for
> >>> > > > more details about PRIMARY KEY syntax.
> >>> > > >
> >>> > > >
> >>> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> >>> > > messages,
> >>> > > > the INSERT operation may fail if a primary key or unique
> constraint
> >>> > > > violation happens in the underlying database.  什么叫append
> >>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> >>> > > >
> >>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
> >>> > > >
> >>> > > > > 补充sql:
> >>> > > > >
> >>> > > > > DDL:
> >>> > > > >
> >>> > > > > CREATE TABLE flink_recent_pv_subid
> >>> > > > > (
> >>> > > > > `supply_id` STRING,
> >>> > > > > `subid` STRING,
> >>> > > > > `mark`  STRING,
> >>> > > > > `time`  STRING,
> >>> > > > > `pv`BIGINT,
> >>> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT
> >>> ENFORCED
> >>> > > > > ) WITH (
> >>> > > > >   'connector.type' = 'jdbc',
> >>> > > > >
> >>> > > > >   ..
> >>> > > > >
> >>> > > > > );
> >>> > > > >
> >>> > > > >
> >>> > > > > 查询SQL:
> >>> > > > >
> >>> > > > > INSERT INTO
> >>> > > > > flink_recent_pv_subid
> >>> > > > > SELECT
> >>> > > > > `sid`,
> >>> > > > > `subid`,
> >>> > > > > `mark`,
> >>> > > > > 

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 Thread 赵一旦
duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。

此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。


赵一旦  于2020年11月23日周一 下午5:09写道:

> 总结下:
> (1)group
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
>
>
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
>
>
> 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate update方式输出。
> 甚至DDL中推荐可以搞个自定义on
> duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
> update功能。
>
>
>
>
> 赵一旦  于2020年11月23日周一 下午4:48写道:
>
>> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
>> 发现这种方式也不行,但是加了group by之后是可以的。
>>
>> (1)
>> 所以说是否还需要query带有key的语义才行呢?
>> 比如group by的结果是可能update的,并且基于group by key也指出了key。
>>
>> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
>>
>> (2)如JarkWu所说,是mysql表的DDL部分决定。
>>
>> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
>>
>> Jark Wu  于2020年11月23日周一 下午4:28写道:
>>
>>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
>>>
>>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
>>>
>>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>>> >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
>>> > 页面。
>>> >
>>> >
>>> >
>>> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>>> >
>>> > Jark Wu  于2020年11月23日周一 下午3:32写道:
>>> >
>>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
>>> > >
>>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
>>> > >
>>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
>>> > > > <
>>> > > >
>>> > >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>>> > > > >
>>> > > >
>>> > > > Flink uses the primary key that defined in DDL when writing data to
>>> > > > external databases. The connector operate in upsert mode if the
>>> primary
>>> > > key
>>> > > > was defined, otherwise, the connector operate in append mode.
>>> > > >
>>> > > > In upsert mode, Flink will insert a new row or update the existing
>>> row
>>> > > > according to the primary key, Flink can ensure the idempotence in
>>> this
>>> > > way.
>>> > > > To guarantee the output result is as expected, it’s recommended to
>>> > define
>>> > > > primary key for the table and make sure the primary key is one of
>>> the
>>> > > > unique key sets or primary key of the underlying database table. In
>>> > > append
>>> > > > mode, Flink will interpret all records as INSERT messages, the
>>> INSERT
>>> > > > operation may fail if a primary key or unique constraint violation
>>> > > happens
>>> > > > in the underlying database.
>>> > > >
>>> > > > See CREATE TABLE DDL
>>> > > > <
>>> > > >
>>> > >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
>>> > > > >
>>> > > > for
>>> > > > more details about PRIMARY KEY syntax.
>>> > > >
>>> > > >
>>> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
>>> > > messages,
>>> > > > the INSERT operation may fail if a primary key or unique constraint
>>> > > > violation happens in the underlying database.  什么叫append
>>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>>> > > >
>>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>>> > > >
>>> > > >
>>> > > >
>>> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
>>> > > >
>>> > > > > 补充sql:
>>> > > > >
>>> > > > > DDL:
>>> > > > >
>>> > > > > CREATE TABLE flink_recent_pv_subid
>>> > > > > (
>>> > > > > `supply_id` STRING,
>>> > > > > `subid` STRING,
>>> > > > > `mark`  STRING,
>>> > > > > `time`  STRING,
>>> > > > > `pv`BIGINT,
>>> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT
>>> ENFORCED
>>> > > > > ) WITH (
>>> > > > >   'connector.type' = 'jdbc',
>>> > > > >
>>> > > > >   ..
>>> > > > >
>>> > > > > );
>>> > > > >
>>> > > > >
>>> > > > > 查询SQL:
>>> > > > >
>>> > > > > INSERT INTO
>>> > > > > flink_recent_pv_subid
>>> > > > > SELECT
>>> > > > > `sid`,
>>> > > > > `subid`,
>>> > > > > `mark`,
>>> > > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
>>> > > > 'MMddHHmm') as `time`,
>>> > > > > count(1) AS `pv`
>>> > > > > FROM baidu_log_view
>>> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
>>> > > MINUTE);
>>> > > > >
>>> > > > >
>>> > > > > 赵一旦  于2020年11月23日周一 下午3:00写道:
>>> > > > >
>>> > > > >> @hailongwang 一样的。
>>> > > > >>
>>> > > > >> 有个情况说明下,我是tumble
>>> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
>>> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
>>> > > > >> 

Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-23 Thread Yun Tang
Hi

集群负载比较大的时候,下游一直收不到request的partition,就会导致PartitionNotFoundException,建议增大 
taskmanager.network.request-backoff.max [1][2] 以增大重试次数

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-network-request-backoff-max
[2] https://juejin.cn/post/6844904185347964942#heading-8


祝好
唐云

From: 赵一旦 
Sent: Monday, November 23, 2020 13:08
To: user-zh@flink.apache.org 
Subject: Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

这个报错和kafka没有关系的哈,我大概理解是提交任务的瞬间,jobManager/taskManager机器压力较大,存在机器之间心跳超时什么的?
这个partition应该是指flink运行图中的数据partition,我感觉。没有具体细看,每次提交的瞬间可能遇到这个问题,然后会自动重试成功。

zhisheng  于2020年11月18日周三 下午10:51写道:

> 是不是有 kafka 机器挂了?
>
> Best
> zhisheng
>
> hailongwang <18868816...@163.com> 于2020年11月18日周三 下午5:56写道:
>
> > 感觉还有其它 root cause,可以看下还有其它日志不?
> >
> >
> > Best,
> > Hailong
> >
> > At 2020-11-18 15:52:57, "赵一旦"  wrote:
> > >2020-11-18 16:51:37
> > >org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > >Partition
> > b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
> > >not found.
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> >
> >
> >RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> >
> >
> >SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
> > >)
> > >at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> > >.java:670)
> > >at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> > >CompletableFuture.java:646)
> > >at java.util.concurrent.CompletableFuture$Completion.run(
> > >CompletableFuture.java:456)
> > >at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > >at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > >ForkJoinExecutorConfigurator.scala:44)
> > >at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> > >.java:1339)
> > >at
> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > >at
> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
> > >.java:107)
> > >
> > >
> > >请问这是什么问题呢?
> >
>


Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 Thread 赵一旦
总结下:
(1)group
by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
(2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。

如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。


说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate update方式输出。
甚至DDL中推荐可以搞个自定义on
duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
update功能。




赵一旦  于2020年11月23日周一 下午4:48写道:

> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> 发现这种方式也不行,但是加了group by之后是可以的。
>
> (1)
> 所以说是否还需要query带有key的语义才行呢?
> 比如group by的结果是可能update的,并且基于group by key也指出了key。
>
> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
>
> (2)如JarkWu所说,是mysql表的DDL部分决定。
>
> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
>
> Jark Wu  于2020年11月23日周一 下午4:28写道:
>
>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
>>
>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
>>
>> Best,
>> Jark
>>
>> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
>>
>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
>> > 页面。
>> >
>> >
>> >
>> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>> >
>> > Jark Wu  于2020年11月23日周一 下午3:32写道:
>> >
>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
>> > >
>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
>> > >
>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
>> > > > <
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>> > > > >
>> > > >
>> > > > Flink uses the primary key that defined in DDL when writing data to
>> > > > external databases. The connector operate in upsert mode if the
>> primary
>> > > key
>> > > > was defined, otherwise, the connector operate in append mode.
>> > > >
>> > > > In upsert mode, Flink will insert a new row or update the existing
>> row
>> > > > according to the primary key, Flink can ensure the idempotence in
>> this
>> > > way.
>> > > > To guarantee the output result is as expected, it’s recommended to
>> > define
>> > > > primary key for the table and make sure the primary key is one of
>> the
>> > > > unique key sets or primary key of the underlying database table. In
>> > > append
>> > > > mode, Flink will interpret all records as INSERT messages, the
>> INSERT
>> > > > operation may fail if a primary key or unique constraint violation
>> > > happens
>> > > > in the underlying database.
>> > > >
>> > > > See CREATE TABLE DDL
>> > > > <
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
>> > > > >
>> > > > for
>> > > > more details about PRIMARY KEY syntax.
>> > > >
>> > > >
>> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
>> > > messages,
>> > > > the INSERT operation may fail if a primary key or unique constraint
>> > > > violation happens in the underlying database.  什么叫append
>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>> > > >
>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>> > > >
>> > > >
>> > > >
>> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
>> > > >
>> > > > > 补充sql:
>> > > > >
>> > > > > DDL:
>> > > > >
>> > > > > CREATE TABLE flink_recent_pv_subid
>> > > > > (
>> > > > > `supply_id` STRING,
>> > > > > `subid` STRING,
>> > > > > `mark`  STRING,
>> > > > > `time`  STRING,
>> > > > > `pv`BIGINT,
>> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
>> > > > > ) WITH (
>> > > > >   'connector.type' = 'jdbc',
>> > > > >
>> > > > >   ..
>> > > > >
>> > > > > );
>> > > > >
>> > > > >
>> > > > > 查询SQL:
>> > > > >
>> > > > > INSERT INTO
>> > > > > flink_recent_pv_subid
>> > > > > SELECT
>> > > > > `sid`,
>> > > > > `subid`,
>> > > > > `mark`,
>> > > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
>> > > > 'MMddHHmm') as `time`,
>> > > > > count(1) AS `pv`
>> > > > > FROM baidu_log_view
>> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
>> > > MINUTE);
>> > > > >
>> > > > >
>> > > > > 赵一旦  于2020年11月23日周一 下午3:00写道:
>> > > > >
>> > > > >> @hailongwang 一样的。
>> > > > >>
>> > > > >> 有个情况说明下,我是tumble
>> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
>> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
>> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
>> > > > >>
>> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
>> > > > >>>
>> > > > >>>
>> > > > >>> Best,
>> > > > >>> Hailong
>> > > > >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
>> > > > >>> >如题,按照官方文档,当mysql表定义了primary
>> key的时候,会使用UpsertTableSink,并且会使用insert
>> > 

Re: Dynamic ad hoc query deployment strategy

2020-11-23 Thread Kostas Kloudas
Hi Lalala,

Even in session mode, the jobgraph is created before the job is
executed. So all the above hold.
Although I am not super familiar with the catalogs, what you want is
that two or more jobs share the same readers of a source. This is not
done automatically in DataStream or DataSet and I am pretty sure that
also Table and SQL do not perform any cross-query optimization.

In addition, even if they did, are you sure that this would be enough
for your queries? THe users will submit their queries at any point in
time and this would mean that each query would start processing from
where the reader is at that point in time, which is arbitrary. Is this
something that satisfies your requirements?

I will also include Dawid in the discussion to see if he has anything
to add about the Table API and SQL.

Cheers,
Kostas

On Fri, Nov 20, 2020 at 7:47 PM lalala  wrote:
>
> Hi Kostas,
>
> Thank you for your response.
>
> Is what you are saying valid for session mode? I can submit my jobs to the
> existing Flink session, will they be able to share the sources?
>
> We do register our Kafka tables to `GenericInMemoryCatalog`, and the
> documentation says `The GenericInMemoryCatalog is an in-memory
> implementation of a catalog. All objects will be available only for the
> lifetime of the session.`. I presume, in session mode, we can share Kafka
> source for multiple SQL jobs?
>
> That is not want we wanted for the best isolation, but if it is not possible
> with Flink, we are also good with session mode.
>
> Best regards,
>
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 Thread 赵一旦
嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
发现这种方式也不行,但是加了group by之后是可以的。

(1)
所以说是否还需要query带有key的语义才行呢?
比如group by的结果是可能update的,并且基于group by key也指出了key。

那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?

(2)如JarkWu所说,是mysql表的DDL部分决定。

如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?

Jark Wu  于2020年11月23日周一 下午4:28写道:

> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
>
> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
>
> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> > 页面。
> >
> >
> >
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> >
> > Jark Wu  于2020年11月23日周一 下午3:32写道:
> >
> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> > >
> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> > >
> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > > > >
> > > >
> > > > Flink uses the primary key that defined in DDL when writing data to
> > > > external databases. The connector operate in upsert mode if the
> primary
> > > key
> > > > was defined, otherwise, the connector operate in append mode.
> > > >
> > > > In upsert mode, Flink will insert a new row or update the existing
> row
> > > > according to the primary key, Flink can ensure the idempotence in
> this
> > > way.
> > > > To guarantee the output result is as expected, it’s recommended to
> > define
> > > > primary key for the table and make sure the primary key is one of the
> > > > unique key sets or primary key of the underlying database table. In
> > > append
> > > > mode, Flink will interpret all records as INSERT messages, the INSERT
> > > > operation may fail if a primary key or unique constraint violation
> > > happens
> > > > in the underlying database.
> > > >
> > > > See CREATE TABLE DDL
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > > > >
> > > > for
> > > > more details about PRIMARY KEY syntax.
> > > >
> > > >
> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> > > messages,
> > > > the INSERT operation may fail if a primary key or unique constraint
> > > > violation happens in the underlying database.  什么叫append
> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > > >
> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > > >
> > > >
> > > >
> > > > 赵一旦  于2020年11月23日周一 下午3:02写道:
> > > >
> > > > > 补充sql:
> > > > >
> > > > > DDL:
> > > > >
> > > > > CREATE TABLE flink_recent_pv_subid
> > > > > (
> > > > > `supply_id` STRING,
> > > > > `subid` STRING,
> > > > > `mark`  STRING,
> > > > > `time`  STRING,
> > > > > `pv`BIGINT,
> > > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > > > > ) WITH (
> > > > >   'connector.type' = 'jdbc',
> > > > >
> > > > >   ..
> > > > >
> > > > > );
> > > > >
> > > > >
> > > > > 查询SQL:
> > > > >
> > > > > INSERT INTO
> > > > > flink_recent_pv_subid
> > > > > SELECT
> > > > > `sid`,
> > > > > `subid`,
> > > > > `mark`,
> > > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> > > > 'MMddHHmm') as `time`,
> > > > > count(1) AS `pv`
> > > > > FROM baidu_log_view
> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
> > > MINUTE);
> > > > >
> > > > >
> > > > > 赵一旦  于2020年11月23日周一 下午3:00写道:
> > > > >
> > > > >> @hailongwang 一样的。
> > > > >>
> > > > >> 有个情况说明下,我是tumble
> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
> > > > >>
> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > > > >>>
> > > > >>>
> > > > >>> Best,
> > > > >>> Hailong
> > > > >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
> > > > >>> >如题,按照官方文档,当mysql表定义了primary
> key的时候,会使用UpsertTableSink,并且会使用insert
> > on
> > > > >>> >duplicate方式写入。
> > > > >>> >
> > > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> > > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> > > > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> > > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key
> 'uniq_ssmt'
> > > > >>> >at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > > >>> Method)
> > > > >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> > > > >>> >NativeConstructorAccessorImpl.java:62)
> > > > >>> >at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> > > > >>> >DelegatingConstructorAccessorImpl.java:45)
> > > > >>> >at
> > > 

flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

2020-11-23 Thread dpzhoufengdev
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的?
聚合计算的逻辑

Table tableoneHour = tableEnv.sqlQuery(

"select  appname" +

",productCode" +

",link" +

 ",count(case when nodeName = 'FailTerminateEndEvent' 
then 1 else null end) as errNum" +

",count(case when nodeName = 'EndEvent' and passStatus 
= 'Accept' then 1 else null end ) as passNum " +

",count(case when nodeName = 'EndEvent' and passStatus 
= 'Reject' then 1 else null end) as refNum " +

",count(case when nodeName = 'EndEvent' and passStatus 
<> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " +

",sum(case when nodeName = 'EndEvent' then loansum else 
0 end ) as loansum" +

",count(1) as allNum " +

",'OneHour' as windowType " +

",HOP_END(rowtime, INTERVAL '1'  HOUR, INTERVAL '1' 
HOUR) as inputtime " +

"from  table1_2 WHERE link in ('1','2','5')  GROUP BY 
HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " +

",appname,productCode,link");


将table转成dataStream


//计算的多张表union到一起
Table tablesql = 
tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay);
DataStream> dataStream2 = 
tableEnv.toRetractStream(tablesql,Row.class);
DataStream> dataStream7Day = 
tableEnv.toRetractStream(table7Day,Row.class);


//将Table转成dataStream
DataStream reslut1 = dataStream2.map(new 
MapFunction, String>() {
@Override
public String map(Tuple2 tuple2) throws Exception {
Map json = new HashMap<>();
json.put("appname",tuple2.f1.getField(0));
json.put("productCode", tuple2.f1.getField(1));
json.put("link",tuple2.f1.getField(2));
json.put("errNum",tuple2.f1.getField(3));
json.put("passNum",tuple2.f1.getField(4));
json.put("refNum",tuple2.f1.getField(5));
json.put("processNum",tuple2.f1.getField(6));
json.put("loansum",tuple2.f1.getField(7));
json.put("allNum",tuple2.f1.getField(8));
json.put("windowType",tuple2.f1.getField(9));
json.put("inputtime",tuple2.f1.getField(10));
return JSON.toJSONString(json);
}
});






将结果sink到kafka中
reslut1.addSink(new FlinkKafkaProducer08<>("",new 
SimpleStringSchema(),props1));
 reslut2.addSink(new FlinkKafkaProducer08<>("",new 
SimpleStringSchema(),props1));


sink到kafka的数据存在两条完全一样的数据




| |
dpzhoufengdev
|
|
dpzhoufeng...@163.com
|
签名由网易邮箱大师定制

flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

2020-11-23 Thread dpzhoufengdev
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的?
聚合计算的逻辑

Table tableoneHour = tableEnv.sqlQuery(

"select  appname" +

",productCode" +

",link" +

 ",count(case when nodeName = 'FailTerminateEndEvent' 
then 1 else null end) as errNum" +

",count(case when nodeName = 'EndEvent' and passStatus 
= 'Accept' then 1 else null end ) as passNum " +

",count(case when nodeName = 'EndEvent' and passStatus 
= 'Reject' then 1 else null end) as refNum " +

",count(case when nodeName = 'EndEvent' and passStatus 
<> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " +

",sum(case when nodeName = 'EndEvent' then loansum else 
0 end ) as loansum" +

",count(1) as allNum " +

",'OneHour' as windowType " +

",HOP_END(rowtime, INTERVAL '1'  HOUR, INTERVAL '1' 
HOUR) as inputtime " +

"from  table1_2 WHERE link in ('1','2','5')  GROUP BY 
HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " +

",appname,productCode,link");


将table转成dataStream


//计算的多张表union到一起
Table tablesql = 
tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay);
DataStream> dataStream2 = 
tableEnv.toRetractStream(tablesql,Row.class);
DataStream> dataStream7Day = 
tableEnv.toRetractStream(table7Day,Row.class);


//将Table转成dataStream
DataStream reslut1 = dataStream2.map(new 
MapFunction, String>() {
@Override
public String map(Tuple2 tuple2) throws Exception {
Map json = new HashMap<>();
json.put("appname",tuple2.f1.getField(0));
json.put("productCode", tuple2.f1.getField(1));
json.put("link",tuple2.f1.getField(2));
json.put("errNum",tuple2.f1.getField(3));
json.put("passNum",tuple2.f1.getField(4));
json.put("refNum",tuple2.f1.getField(5));
json.put("processNum",tuple2.f1.getField(6));
json.put("loansum",tuple2.f1.getField(7));
json.put("allNum",tuple2.f1.getField(8));
json.put("windowType",tuple2.f1.getField(9));
json.put("inputtime",tuple2.f1.getField(10));
return JSON.toJSONString(json);
}
});






将结果sink到kafka中
reslut1.addSink(new FlinkKafkaProducer08<>("",new 
SimpleStringSchema(),props1));
 reslut2.addSink(new FlinkKafkaProducer08<>("",new 
SimpleStringSchema(),props1));


sink到kafka的数据存在两条完全一样的数据




| |
dpzhoufengdev
|
|
dpzhoufeng...@163.com
|
签名由网易邮箱大师定制

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 Thread Jark Wu
这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。

新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。

Best,
Jark

On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:

> 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> 页面。
>
>
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>
> Jark Wu  于2020年11月23日周一 下午3:32写道:
>
> > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> >
> > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> >
> > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > > >
> > >
> > > Flink uses the primary key that defined in DDL when writing data to
> > > external databases. The connector operate in upsert mode if the primary
> > key
> > > was defined, otherwise, the connector operate in append mode.
> > >
> > > In upsert mode, Flink will insert a new row or update the existing row
> > > according to the primary key, Flink can ensure the idempotence in this
> > way.
> > > To guarantee the output result is as expected, it’s recommended to
> define
> > > primary key for the table and make sure the primary key is one of the
> > > unique key sets or primary key of the underlying database table. In
> > append
> > > mode, Flink will interpret all records as INSERT messages, the INSERT
> > > operation may fail if a primary key or unique constraint violation
> > happens
> > > in the underlying database.
> > >
> > > See CREATE TABLE DDL
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > > >
> > > for
> > > more details about PRIMARY KEY syntax.
> > >
> > >
> > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> > messages,
> > > the INSERT operation may fail if a primary key or unique constraint
> > > violation happens in the underlying database.  什么叫append
> > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > >
> > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > >
> > >
> > >
> > > 赵一旦  于2020年11月23日周一 下午3:02写道:
> > >
> > > > 补充sql:
> > > >
> > > > DDL:
> > > >
> > > > CREATE TABLE flink_recent_pv_subid
> > > > (
> > > > `supply_id` STRING,
> > > > `subid` STRING,
> > > > `mark`  STRING,
> > > > `time`  STRING,
> > > > `pv`BIGINT,
> > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > > > ) WITH (
> > > >   'connector.type' = 'jdbc',
> > > >
> > > >   ..
> > > >
> > > > );
> > > >
> > > >
> > > > 查询SQL:
> > > >
> > > > INSERT INTO
> > > > flink_recent_pv_subid
> > > > SELECT
> > > > `sid`,
> > > > `subid`,
> > > > `mark`,
> > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> > > 'MMddHHmm') as `time`,
> > > > count(1) AS `pv`
> > > > FROM baidu_log_view
> > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
> > MINUTE);
> > > >
> > > >
> > > > 赵一旦  于2020年11月23日周一 下午3:00写道:
> > > >
> > > >> @hailongwang 一样的。
> > > >>
> > > >> 有个情况说明下,我是tumble
> window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
> > > >>
> > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > > >>>
> > > >>>
> > > >>> Best,
> > > >>> Hailong
> > > >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
> > > >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert
> on
> > > >>> >duplicate方式写入。
> > > >>> >
> > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> > > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> > > >>> >at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > >>> Method)
> > > >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> > > >>> >NativeConstructorAccessorImpl.java:62)
> > > >>> >at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> > > >>> >DelegatingConstructorAccessorImpl.java:45)
> > > >>> >at
> > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > > >>> >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> > > >>> >at com.mysql.jdbc.Util.getInstance(Util.java:386)
> > > >>> >at
> > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> > > >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> > > >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> > > >>> >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> > > >>> >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> > > >>> >at
> > 

pyflink 1.11.1 调用包含第三方依赖库的udf时报错

2020-11-23 Thread fengmuqi...@ruqimobility.com
hi.
pyflink 1.11.1 调用包含第三方依赖库的udf时报错 :

运行环境:
windows10 
python==3.7.9
apache-flink==1.11.1
apache-beam==2.19.0

udf 依赖第三方库:
h3==3.7.0

pytest 通过。

运行时报错,报错信息如下
2020-11-23 14:20:51,656 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, order_info]], fields=[source, order_id, user_id, car_id, 
driver_name, driver_id, time_dispatch_done, time_start, time_cancel, status, 
start_city, end_city, start_ad_code, end_ad_code, cancel_reason_id, 
realStartLatitude, realStartLongitude]) -> StreamExecPythonCalc -> Sink: 
Sink(table=[default_catalog.default_database.print_table], fields=[hash_h3]) 
(3/8) (056a0a0cdf3838794f4023e61d04a690) switched from RUNNING to FAILED.
java.lang.RuntimeException: Failed to create stage bundle factory! 
   at 
org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) 
~[flink-dist_2.11-1.11.1.jar:1.11.1]
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: Process died with exit code 0
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:331)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:320)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:250)
 ~[flink-python_2.11-1.11.1.jar:1.11.1]
   at