Re: flink sql 去重算法

2020-03-19 Thread LakeShen
Hi zhisheng,

我之前也遇到类似的问题,Flink 状态默认永久保留,针对这种 SQL 任务,我想到的就是设置状态空闲保留时间。
比如按照天来聚合的,空闲状态的最小保留时间26小时,最大空闲撞他为48小时(具体时间根据业务来设置),
总之肯定要设置一个最小和最大的空闲状态保留时间,不可能让状态永久保留。

对于 Flink 1.10 Blink planner 来说,TTL 时间就是设置的最小空闲状态保留时间,最大的空闲状态保留时间貌似没有用到。
Flink 1.10 默认状态清理机制是 in background 了,对于 RocksDBStateBackend 来说,使用
Compaction Filter 算法来清理。

第二个就是使用增量 Checkpoint 方式吧。

Best wishes,
LakeShen



lucas.wu  于2020年3月20日周五 上午11:50写道:

> 可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。
>
>
> 原始邮件
> 发件人:zhishengzhisheng2...@gmail.com
> 收件人:user-zhuser...@flink.apache.org
> 发送时间:2020年3月20日(周五) 11:44
> 主题:Re: flink sql 去重算法
>
>
> hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state
> 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS
> 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration ">
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li libenc...@gmail.com 于2020年3月20日周五
> 上午9:50写道:  Hi hiliuxg,   count distinct 用的MapVIew来做的去重:
> 在batch场景下,MapView的底层实现就是HashMap;
> 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。
>  hiliuxg 736742...@qq.com 于2020年3月19日周四 下午11:31写道:hi all:   请问flink
> sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ?
>  还是简单通过java的set容器去重的呢? --   Benchao Li  School of Electronics
> Engineering and Computer Science, Peking University  Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: ddl

2020-03-19 Thread Jingsong Li
Hi,

底层实现的话可以参考下[1]

[1] https://github.com/apache/bahir-flink/tree/master/flink-connector-kudu

Best,
Jingsong Lee

On Thu, Mar 19, 2020 at 11:30 PM hiliuxg <736742...@qq.com> wrote:

> 你可以自己定义tablesinkfactory,flink已经预留了这个接口
>
>
>
>
> --原始邮件--
> 发件人:"LakeShen" 发送时间:2020年3月14日(星期六) 中午11:13
> 收件人:"user-zh"
> 主题:Re: ddl
>
>
>
> Hi 志华,
>  你可以完全自己扩展 Flink SQL DDL 语法的功能,用来支持你们公司自己的实时数据源,或者
> Sink
> 等等,具体实现,请参考楼上 jinhai
> 的链接
>
> Best wishes,
> 沈磊
>
> jinhai wang 
>  Page on “User-defined Sources  Sinks”. For flink 1.10:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
>  <
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
>  
> 
>  Best Regards
> 
>  jinhai...@gmail.com
> 
>   2020年3月13日 下午7:17,王志华   
>   目前FLINK中对于DDL这块,它都只能什么类型的技术作为源头表或者SINK 表呢,我也网上也仅仅看到了ddl mysql
> sink、ddl
>  hbase
> sink等。还有其他类型的支持吗?如果不支持的话,是否flink开放了相关的接口,可以提供对其他类型技术的ddl语法支持呢?比如想做一个
>  ddl kudu sink之类的
>  
>  
>   | |
>   王志华
>   |
>   |
>   a15733178...@163.com
>   |
>   签名由网易邮箱大师定制
>  
> 
> 



-- 
Best, Jingsong Lee


Re: rowtime 的类型序列化问题

2020-03-19 Thread Jingsong Li
Hi lucas,

看起来这个是query event_time字段的bug,TimeIndicatorTypeInfo导致的问题。

如果你用的是1.10,可以建个JIRA来跟踪这个问题。

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 11:40 AM lucas.wu  wrote:

> Hi all:
> 建表语句
> create table `source_table`(
> `SeqNo` varchar,
> `Type` varchar,
> `Table` varchar,
> `ServerId` varchar,
> `Database` varchar,
> `OldData` varchar,
> `GTID` varchar,
> `Offset` varchar,
> `event_ts` as
> to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
> WATERMARK FOR event_ts AS event_ts - interval '60' second
> ) with(…)
>
>
> 查询语句
> insert into sinkTable from Select * from source_table;
>
>
>
> 报错信息:
> java.lang.ClassCastException: java.sql.Timestamp cannot be cast to
> java.lang.Long at
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at SinkConversion$51.processElement(Unknown Source)
> ……
>
>
>
> 最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
> 请问这个问题可以避免吗?



-- 
Best, Jingsong Lee


Re: flink sql 1.10.0 对hive GenericUDF支持,之前的图片在邮件组不显示,重新编辑

2020-03-19 Thread Jingsong Li
 Hi,

GenericUDFUnixTimeStamp 这个UDF
用上了Hive的SessionState,而我们的hive-integration中目前缺少这部分的支持。
Flink也支持这个函数,你可以考虑先用Flink的函数来支持。

我创建了相关issue,会尽量在1.10.1把它修复了。

[1] https://issues.apache.org/jira/browse/FLINK-16688

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 11:33 AM Chief  wrote:

> hi all:
> 现在在做把原来离线 hive 上的任务迁移到flink sql
> 1.10.0的工作,之前看文档说flink 支持hive的GenericUDF,但是我发现个别在flink中报错,请问是我哪里设置有问题么?
> 比如current_timestamp()
> 在hive中语句:
> select
> from_unixtime(unix_timestamp(current_timestamp()),'MMddHHmmss');正常执行
>而提交flink sql 任务报错
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> java.lang.reflect.InvocationTargetException
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> com.trusfort.twinkling.sql.util.SqlSubmit.callInsertInto(SqlSubmit.java:55)
> at
> com.trusfort.twinkling.sql.util.SqlSubmit.callCommand(SqlSubmit.java:27)
> at
> com.trusfort.twinkling.sql.batch.template.BatchSlidingWindowSqlTemplate.main(BatchSlidingWindowSqlTemplate.java:60)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at
> java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at
> java.security.AccessController.doPrivileged(Native Method)
> at
> javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException
> at
> org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
> at
> org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
> at
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
> at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> at
> org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> at
> org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> at
> org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> 

Re: Streaming kafka data sink to hive

2020-03-19 Thread Jingsong Li
Hi wanglei,

> 1  Is there any flink-hive-connector that i can use to write to hive
streamingly?

"Streaming kafka data sink to hive" is under discussion.[1]
And POC work is ongoing.[2] We want to support it in release-1.11.

> 2  Since HDFS is not friendly to frequently append and hive's data is
stored to hdfs,  is it  OK if the throughput is high?

We should concern small files, It's better to have 128MB for each file.
If the throughput is high, I think you can try to write files in 5 minutes
or 10 minutes.
You can learn more in [3].

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-115-Filesystem-connector-in-Table-td38870.html
[2]https://github.com/apache/flink/pull/11457
[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 11:55 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> We have many app logs on our app server  and want to parse the logs to
> structed table format and then sink to hive.
> Seems it is good to use batch mode. The app log is hourly compressed and
> it is convenience to do partitioning.
>
> We want to use streaming mode. Tail the app logs to Kafka,  then use flink
> to read kafka topic  and then sink to Hive.
> I have several questions.
>
> 1  Is there any flink-hive-connector that i can use to write to hive
> streamingly?
> 2  Since HDFS is not friendly to frequently append and hive's data is
> stored to hdfs,  is it  OK if the throughput is high?
>
> Thanks,
> Lei
>
> --
> wangl...@geekplus.com.cn
>


-- 
Best, Jingsong Lee


Savepoint Location from Flink REST API

2020-03-19 Thread Aaron Langford
Hey Flink Community,

I'm combing through docs right now, and I don't see that a savepoint
location is returned or surfaced anywhere. When I do this in the CLI, I get
a nice message that tells me where in S3 it put my savepoint (unique
savepoint ID included). I'm looking for that same result to be available
via the REST API. Does this exist today?

Aaron


Re: Flink 1.10 JSON 解析

2020-03-19 Thread 宇张
hi,
好的,我这面进行了尝试,将 data 的schema定义需要改成
ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
.jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
ARRAY> of table field
'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
type.
而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
[image: image.png]

On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:

> Hi,
>
> 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
> schema 了。
>
> Best,
> Jark
>
> On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:
>
> > hi:
> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
> > [image: image.png]
> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
> >
> >
> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
> >
> >
> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
> > connect:
> >
> > streamTableEnv
> > .connect(
> > new Kafka()
> > .version("0.11")
> > .topic("mysql_binlog_test_str")
> > .startFromEarliest()
> > .property("zookeeper.connect", "localhost:2181")
> > .property("bootstrap.servers", "localhost:9092")
> > )
> > .withFormat(
> > new Json()
> >
>  
> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
> > )
> > .withSchema(
> > new Schema()
> > .field("business", DataTypes.STRING())
> > .field("data",
> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
> > DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
> > DataTypes.FIELD("invoice_no",
> DataTypes.STRING())
> > .field("database", DataTypes.STRING())
> > .field("table", DataTypes.STRING())
> > .field("ts", DataTypes.DECIMAL(38, 18))
> > .field("type", DataTypes.STRING())
> > .field("putRowNum", DataTypes.DECIMAL(38, 18))
> > )
> > .createTemporaryTable("Test");
> >
> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.
> >
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> > Caused by: java.lang.ClassCastException:
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
> > cannot be cast to
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > at
> >
> 

How can i set the value of taskmanager.network.numberOfBuffers ?

2020-03-19 Thread forideal
Hi community


 This parameter makes me confused.
 

|taskmanager.network.numberOfBuffers|
70
|
 In my job, i use 700 slots, but ,i have to set the this parameter to 
70.If not,i will get a exception.


 java.io.IOException: Insufficient number of network buffers: required 700, 
but only 1 available. The total number of network buffers is currently set to 
8 of 32768 bytes each. You can increase this number by setting the 
configuration keys 'taskmanager.network.memory.fraction', 
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
 
 But actually this waste too many resource.
   Memory Segments
|   Type | Count |
|   Available | 698,838 |
|
  Total
| 700,000 |
| Direct | 700,103 | 21.4 GB | 21.4 GB |
| Mapped | 0 | 0 B | 0 B |


Best Wishes
forideal



pushgateway内存异常

2020-03-19 Thread yanggang_it_job
Hi:

向大家请教一个使用org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter推送指标到
pushgateway时,pushgateway内存使用异常的问题,具体异常如下
 1、实际我们的内存使用在8G左右,但是pushgateway的内存一直在35G左右波动
 2、pushgateway曲线波动较大,不是一条平稳的曲线,会有8G左右的波动
 
希望大家帮忙看看导致以上问题的原因,谢谢...

Streaming kafka data sink to hive

2020-03-19 Thread wangl...@geekplus.com.cn

We have many app logs on our app server  and want to parse the logs to structed 
table format and then sink to hive.
Seems it is good to use batch mode. The app log is hourly compressed and it is 
convenience to do partitioning.

We want to use streaming mode. Tail the app logs to Kafka,  then use flink to 
read kafka topic  and then sink to Hive.
I have several questions.

1  Is there any flink-hive-connector that i can use to write to hive 
streamingly?
2  Since HDFS is not friendly to frequently append and hive's data is stored to 
hdfs,  is it  OK if the throughput is high? 

Thanks,
Lei



wangl...@geekplus.com.cn 


Re: flink sql 去重算法

2020-03-19 Thread lucas.wu
可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。


原始邮件
发件人:zhishengzhisheng2...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月20日(周五) 11:44
主题:Re: flink sql 去重算法


hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state 
很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS 
集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration 
">https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li libenc...@gmail.com 于2020年3月20日周五 
上午9:50写道:  Hi hiliuxg,   count distinct 用的MapVIew来做的去重:  
在batch场景下,MapView的底层实现就是HashMap;  
在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。   hiliuxg 
736742...@qq.com 于2020年3月19日周四 下午11:31写道:hi all:   请问flink sqlnbsp; 
count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ?   还是简单通过java的set容器去重的呢? --   
Benchao Li  School of Electronics Engineering and Computer Science, Peking 
University  Tel:+86-15650713730  Email: libenc...@gmail.com; 
libenc...@pku.edu.cn

Re: flink sql 去重算法

2020-03-19 Thread zhisheng
hi,

我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state
很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS
集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration

 ,除此之外不清楚大家是否还有什么其他好的解决方法?

Benchao Li  于2020年3月20日周五 上午9:50写道:

> Hi hiliuxg,
>
> count distinct 用的MapVIew来做的去重:
> 在batch场景下,MapView的底层实现就是HashMap;
> 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。
>
> hiliuxg <736742...@qq.com> 于2020年3月19日周四 下午11:31写道:
>
> > hi all:
> > 请问flink sql count(disitinct) 底层的算法是怎样的? 是bitmap ?
> > 还是简单通过java的set容器去重的呢?
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: Flink 1.10 JSON 解析

2020-03-19 Thread Jark Wu
Hi,

看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
schema 了。

Best,
Jark

On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:

> hi:
> 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
> [image: image.png]
> 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
>
> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
>
> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
> connect:
>
> streamTableEnv
> .connect(
> new Kafka()
> .version("0.11")
> .topic("mysql_binlog_test_str")
> .startFromEarliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")
> )
> .withFormat(
> new Json()
> 
> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
> )
> .withSchema(
> new Schema()
> .field("business", DataTypes.STRING())
> .field("data", DataTypes.ROW(DataTypes.FIELD("f0", 
> DataTypes.ROW(
> DataTypes.FIELD("tracking_number", 
> DataTypes.STRING()),
> DataTypes.FIELD("invoice_no", 
> DataTypes.STRING())
> .field("database", DataTypes.STRING())
> .field("table", DataTypes.STRING())
> .field("ts", DataTypes.DECIMAL(38, 18))
> .field("type", DataTypes.STRING())
> .field("putRowNum", DataTypes.DECIMAL(38, 18))
> )
> .createTemporaryTable("Test");
>
> 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.
>
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.lang.ClassCastException:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
> cannot be cast to
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
>
>
>


rowtime 的类型序列化问题

2020-03-19 Thread lucas.wu
Hi all:
建表语句
create table `source_table`(
`SeqNo` varchar,
`Type` varchar,
`Table` varchar,
`ServerId` varchar,
`Database` varchar,
`OldData` varchar,
`GTID` varchar,
`Offset` varchar,
`event_ts` as 
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS event_ts - interval '60' second
) with(…)


查询语句
insert into sinkTable from Select * from source_table;



报错信息:
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
java.lang.Long at 
org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
 at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
 at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
 at SinkConversion$51.processElement(Unknown Source)
……


最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
请问这个问题可以避免吗?

Flink 1.10 JSON 解析

2020-03-19 Thread 宇张
hi:
1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
[image: image.png]
2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
connect:

streamTableEnv
.connect(
new Kafka()
.version("0.11")
.topic("mysql_binlog_test_str")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(
new Json()

.jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
)
.withSchema(
new Schema()
.field("business", DataTypes.STRING())
.field("data",
DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
DataTypes.FIELD("tracking_number",
DataTypes.STRING()),
DataTypes.FIELD("invoice_no",
DataTypes.STRING())
.field("database", DataTypes.STRING())
.field("table", DataTypes.STRING())
.field("ts", DataTypes.DECIMAL(38, 18))
.field("type", DataTypes.STRING())
.field("putRowNum", DataTypes.DECIMAL(38, 18))
)
.createTemporaryTable("Test");

异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.

at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.lang.ClassCastException:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
cannot be cast to
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more


flink sql 1.10.0 ??hive GenericUDF????????????????????????????????????????

2020-03-19 Thread Chief
hi all??
?? hive ??flink sql 
1.10.0flink 
hive??GenericUDF??flink??
current_timestamp()
??hive
select 
from_unixtime(unix_timestamp(current_timestamp()),'MMddHHmmss');
   ??flink sql 
org.apache.flink.table.api.ValidationException: SQL validation failed. 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at 
com.trusfort.twinkling.sql.util.SqlSubmit.callInsertInto(SqlSubmit.java:55)
at 
com.trusfort.twinkling.sql.util.SqlSubmit.callCommand(SqlSubmit.java:27)
at 
com.trusfort.twinkling.sql.batch.template.BatchSlidingWindowSqlTemplate.main(BatchSlidingWindowSqlTemplate.java:60)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
at 
org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
at 
org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at 
org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at 
org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1865)
at 

flink sql 1.10.0 ??hive GenericUDF????

2020-03-19 Thread Chief
hi all??
?? hive ??flink sql 
1.10.0flink 
hive??GenericUDF??flink??
current_timestamp()
??hive??

   ??flink sql 
org.apache.flink.table.api.ValidationException: SQL validation failed. 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at 
com.trusfort.twinkling.sql.util.SqlSubmit.callInsertInto(SqlSubmit.java:55)
at 
com.trusfort.twinkling.sql.util.SqlSubmit.callCommand(SqlSubmit.java:27)
at 
com.trusfort.twinkling.sql.batch.template.BatchSlidingWindowSqlTemplate.main(BatchSlidingWindowSqlTemplate.java:60)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
at 
org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
at 
org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at 
org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at 
org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1865)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1853)
at 

RE: Need help on timestamp type conversion for Table API on Pravega Connector

2020-03-19 Thread B.Zhou
Hi,

Thanks for the reference, Jark. In Pravega connector, user will define Schema 
first and then create the table with the descriptor using the schema, see [1] 
and error also came from this test case. We also tried the recommended 
`bridgedTo(Timestamp.class)` method in the schema construction, it came with 
the same error stack trace.
We are also considering switching to Blink planner implementation, do you think 
we can get this issue fixed with the change?

Here is the full stacktrace:

```
org.apache.flink.table.codegen.CodeGenException: Unsupported cast from 
'LocalDateTime' to 'Long'.

   at 
org.apache.flink.table.codegen.calls.ScalarOperators$.generateCast(ScalarOperators.scala:815)
   at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:941)
   at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
   at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$visitCall$1(CodeGenerator.scala:752)
   at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
   at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
   at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
   at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
   at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
   at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
   at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:742)
   at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
   at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
   at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1(CodeGenerator.scala:273)
   at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(CodeGenerator.scala:269)
   at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
   at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
   at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
   at 
scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
   at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
   at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
   at 
scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
   at 
org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.generateConversionMapper(BatchScan.scala:95)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow(BatchScan.scala:59)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow$(BatchScan.scala:35)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.convertToInternalRow(BatchTableSourceScan.scala:45)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:165)
   at 
org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:114)
   at 
org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:92)
   at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
   at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
   at 
org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:87)
   at 
io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceBatchDescriptor(FlinkPravegaTableITCase.java:349)
   at 
io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceUsingDescriptor(FlinkPravegaTableITCase.java:246)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at 

Re: Help with flink hdfs sink

2020-03-19 Thread Jingsong Li
Hi Nick,

You can try "new Path("hdfs:///tmp/auditlog/")". There is one additional /
after hdfs://, which is a protocol name.

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 3:13 AM Nick Bendtner  wrote:

> Hi guys,
> I am using flink version 1.7.2.
> I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME.
> Here is the debug log for this :
>
> 2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils   
>   - Cannot find hdfs-default configuration-file path in Flink 
> config.
> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils   
>   - Cannot find hdfs-site configuration-file path in Flink config.
> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils   
>   - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop 
> configuration
> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils   
>   - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop 
> configuration
> 2020-03-19 18:59:34,344 INFO  
> org.apache.flink.runtime.security.modules.HadoopModule- Hadoop user 
> set to kafka (auth:KERBEROS)
>
>
> This is what my streaming file sink code looks like.
>
>
> val sink: StreamingFileSink[String] = StreamingFileSink
>   .forRowFormat(new Path("hdfs://tmp/auditlog/"), new 
> SimpleStringEncoder[String]("UTF-8"))
>   .withRollingPolicy(DefaultRollingPolicy.create()
> .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
> .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
> .withMaxPartSize(1024
>   * 1024 * 1024)
> .build())
>   .build()
>
> result.addSink(sink).name("HDFSSink")
>
>
> When I run the job I get this error stack trace :
>
>  INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: 
> HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from RUNNING to 
> FAILED.
> java.io.IOException: Cannot instantiate file system for URI: 
> hdfs://tmp/auditlog
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: 
> tmp
> at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:687)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:628)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>
>
> Why is it trying to connect to /tmp ? Is it not supposed to get the namenodes 
> from the core-site.xml and hdfs-site.xml ?
>
> Can you please help with the correct way to configure hdfs sink.
>
>
> Best,
>
> Nick.
>
>
>
>

-- 
Best, Jingsong Lee


Re: Flink long state TTL Concerns

2020-03-19 Thread Matthew Rafael Magsombol
I see...
The way we run our setup is that we run these in a kubernetes cluster where
we have one cluster running one job.
The total parallelism of the whole cluster is equal to the number of
taskmanagers where each task manager has 1 core cpu accounting for 1 slot.
If we add a state ttl, do you have any recommendation as to how much I
should bump the cpu per task manager? 2 cores per task manager with 1 slot
per task manager ( and the other cpu core will be used for TTL state
cleanup? ).
Or is that overkill?

On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin 
wrote:

> Hi Matt,
>
> Generally speaking, using state with TTL in Flink should not differ a lot
> from just using Flink with state [1].
> You have to provision your system so that it can keep the state of size
> which is worth of 7 days.
>
> The existing Flink state backends provide background cleanup to
> automatically remove the expired state eventually,
> so that your application does not need to do any explicit access of the
> expired state to clean it.
> The background cleanup is active by default since Flink 1.10 [2].
>
> Enabling TTL for state, of course, comes for price because you need to
> store timestamp and spend CPU cycles for the background cleanup.
> This affects storage size and potentially processing latency per record.
> You can read about details and caveats in the docs: for heap state [3] and
> RocksDB [4].
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-of-expired-state
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#incremental-cleanup
> [4]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>
> On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol 
> wrote:
>
>> Suppose I'm using state stored in-memory that has a TTL of 7 days max.
>> Should I run into any issues with state this long other than potential OOM?
>>
>> Let's suppose I extend this such that we add rocksdb...any concerns with
>> this with respect to maintenance?
>>
>> Most of the examples that I've been seeing seem to pair state with
>> timewindows but I'll only read from this state every 15 seconds ( or some
>> small timewindow ). After each timewindow, I *won't* be cleaning up the
>> data within the state b/c I'll need to re-lookup from this state on future
>> time windows. I'll effectively rely on TTL based on key expiration time and
>> I was wondering what potential issues I should watch out for this.
>>
>


Re: flink sql 去重算法

2020-03-19 Thread Benchao Li
Hi hiliuxg,

count distinct 用的MapVIew来做的去重:
在batch场景下,MapView的底层实现就是HashMap;
在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。

hiliuxg <736742...@qq.com> 于2020年3月19日周四 下午11:31写道:

> hi all:
> 请问flink sql count(disitinct) 底层的算法是怎样的? 是bitmap ?
> 还是简单通过java的set容器去重的呢?



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-19 Thread Becket Qin
Hi Rong,

The issue here is that the PartitionDiscoverer has an internal
KafkaConsumer which reuses the client.id set by the users for the actual
fetching KafkaConsumer. Different KafkaConsumers distinguish their metrics
by client.id, therefore if there are two KafkaConsumers in the same JVM
with the same client.id, their metrics will collide with each other. This
is why the exception was reported.

Thanks,

Jiangjie (Becket) Qin

On Thu, Mar 19, 2020 at 12:04 AM Rong Rong  wrote:

> Hi Becket/Till,
>
> Thanks for the detail explanation. Just to confirm:
> the issue in FLINK-8093 refers to multiple Kafka consumer within the same
> TM - thus the fix should be to make consumer client.id unique for
> different tasks ?
> and the issue here is an issue internal to the Kafka consumer, where both
> the polling consumer thread and the MBean JMX reporter thread share the
> same client.id - thus we should fix this in the Kafka level?
>
> If this is the correct understanding, I think we should separate them
> since they are in fact 2 different issues.
>
> --
> Rong
>
> On Tue, Mar 17, 2020 at 3:36 AM Becket Qin  wrote:
>
>> Actually it might be better to create another ticket, FLINK-8093 was
>> mainly complaining about the JMX bean collision when there are multiple
>> tasks running in the same TM.
>>
>> Jiangjie (Becket) Qin
>>
>> On Tue, Mar 17, 2020 at 6:33 PM Becket Qin  wrote:
>>
>>> Hi Till,
>>>
>>> It looks FLINK-8093  
>>> reports
>>> the same issue, although the reported information is not exactly correct,
>>> as this should not cause the producer to fail. I'll take care of the ticket.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann 
>>> wrote:
>>>
 @Becket do we already have a JIRA ticket to track this effort?

 Cheers,
 Till

 On Mon, Mar 16, 2020 at 4:07 AM Becket Qin 
 wrote:

> Hi Sidney,
>
> The WARN logging you saw was from the AbstractPartitionDiscoverer
> which is created by FlinkKafkaConsumer itself. It has an internal consumer
> which shares the client.id of the actual consumer fetching data. This
> is a bug that we should fix.
>
> As Rong said, this won't affect the normal operation of the consumer.
> It is just an AppInfo MBean for reporting some information. There might be
> some slight impact on the accuracy of the consumer metrics, but should be
> almost ignorable because the partition discoverer is quite inactive
> compared with the actual consumer.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong 
> wrote:
>
>> We also had seen this issue before running Flink apps in a shared
>> cluster environment.
>>
>> Basically, Kafka is trying to register a JMX MBean[1] for application
>> monitoring.
>> This is only a WARN suggesting that you are registering more than one
>> MBean with the same client id "consumer-1", it should not affect your
>> normal application behavior.
>>
>> This is most likely occurring if you have more than one Kafka
>> consumer within the same JVM, are you using a session cluster[2]? can you
>> share more on your application configuration including parallelism and 
>> slot
>> configs?
>> Also based on the log, you are not configuring the "client.id"
>> correctly. which config key are you using? could you also share your fill
>> Kafka properties map?
>>
>>
>> --
>> Rong
>>
>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>
>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <
>> sidney.fei...@startapp.com> wrote:
>>
>>> Hey,
>>> I've been using Flink for a while now without any problems when
>>> running apps with a FlinkKafkaConsumer.
>>> All my apps have the same overall logic (consume from kafka ->
>>> transform event -> write to file) and the only way they differ from each
>>> other is the topic they read (remaining kafka config remains identical) 
>>> and
>>> the way they transform the event.
>>> But suddenly, I've been starting to get the following error:
>>>
>>>
>>> 2020-03-15 12:13:56,911 WARN
>>>  org.apache.kafka.common.utils.AppInfoParser   - Error
>>> registering AppInfo mbean
>>> javax.management.InstanceAlreadyExistsException:
>>> kafka.consumer:type=app-info,id=consumer-1
>>>at
>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>
>>>at
>>> 

Re: Flink long state TTL Concerns

2020-03-19 Thread Andrey Zagrebin
Hi Matt,

Generally speaking, using state with TTL in Flink should not differ a lot
from just using Flink with state [1].
You have to provision your system so that it can keep the state of size
which is worth of 7 days.

The existing Flink state backends provide background cleanup to
automatically remove the expired state eventually,
so that your application does not need to do any explicit access of the
expired state to clean it.
The background cleanup is active by default since Flink 1.10 [2].

Enabling TTL for state, of course, comes for price because you need to
store timestamp and spend CPU cycles for the background cleanup.
This affects storage size and potentially processing latency per record.
You can read about details and caveats in the docs: for heap state [3] and
RocksDB [4].

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-of-expired-state
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#incremental-cleanup
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-during-rocksdb-compaction

On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol  wrote:

> Suppose I'm using state stored in-memory that has a TTL of 7 days max.
> Should I run into any issues with state this long other than potential OOM?
>
> Let's suppose I extend this such that we add rocksdb...any concerns with
> this with respect to maintenance?
>
> Most of the examples that I've been seeing seem to pair state with
> timewindows but I'll only read from this state every 15 seconds ( or some
> small timewindow ). After each timewindow, I *won't* be cleaning up the
> data within the state b/c I'll need to re-lookup from this state on future
> time windows. I'll effectively rely on TTL based on key expiration time and
> I was wondering what potential issues I should watch out for this.
>


Help with flink hdfs sink

2020-03-19 Thread Nick Bendtner
Hi guys,
I am using flink version 1.7.2.
I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME.
Here is the debug log for this :

2020-03-19 18:59:34,316 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Cannot
find hdfs-default configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Cannot
find hdfs-site configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Adding
/home/was/HDFSConf/conf/core-site.xml to hadoop configuration
2020-03-19 18:59:34,317 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Adding
/home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
2020-03-19 18:59:34,344 INFO
org.apache.flink.runtime.security.modules.HadoopModule- Hadoop
user set to kafka (auth:KERBEROS)


This is what my streaming file sink code looks like.


val sink: StreamingFileSink[String] = StreamingFileSink
  .forRowFormat(new Path("hdfs://tmp/auditlog/"), new
SimpleStringEncoder[String]("UTF-8"))
  .withRollingPolicy(DefaultRollingPolicy.create()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024
  * 1024 * 1024)
.build())
  .build()

result.addSink(sink).name("HDFSSink")


When I run the job I get this error stack trace :

 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph-
Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from
RUNNING to FAILED.
java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
at 
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: tmp
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:687)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:628)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)


Why is it trying to connect to /tmp ? Is it not supposed to get the
namenodes from the core-site.xml and hdfs-site.xml ?

Can you please help with the correct way to configure hdfs sink.


Best,

Nick.


Re: flink sql如何支持每隔5分钟触发当日零点到当前5分钟的聚合计算

2020-03-19 Thread Jark Wu
Hi 你可以看下这篇文章是否满足的你需求:
http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql
#统计一天每10分钟累计独立用户数

Best,
Jark


On Thu, 19 Mar 2020 at 23:30, hiliuxg <736742...@qq.com> wrote:

> hi all:有这样子一个场景,我想通过每隔5分钟统计当日零点到当前5分钟的pv和uv,用批处理大概大概表达如下:
> select 
> '2020-03-19' as dt ,
> '2020-03-19 12:05:00' as etltime ,
> count(1) as pv ,
> count(distinct userid) as uv
> from t_user_log
> where logintime = '2020-03-19 00:00:00' and logintime <
> '2020-03-19 12:05:00'
>
>
> 这里,没法用flink sql 处理,当时这种场景又特别多,各位大神有好的方案处理吗?


java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-03-19 Thread Steve Whelan
Hi,

I am attempting to create a Key/Value serializer for the Kafka table
connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant
classes, updating the serializer.

First, I created `JsonRowKeyedSerializationSchema` which implements
`KeyedSerializationSchema`[2], which is deprecated. The way it works is you
provide a list of indices in your Row output that are the Key. This works
successfully.

When I tried migrating my `JsonRowKeyedSerializationSchema` to implement
`KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError`
exception. Normally, this would me I'm using an old interface however all
my Flink dependencies are version 1.9. I can not find this abstract
`serialize()` function in the Flink codebase. Has anyone come across this
before?

When I print the method of my `JsonRowKeyedSerializationSchema` class, I do
see the below which seems to be getting called by the FlinkKafkaProducer
but I do not see it in `KafkaSerializationSchema`:

public abstract
org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long)
java.lang.Object
java.lang.Long


*`JsonRowKeyedSerializationSchema` class*

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.producer.ProducerRecord;

public class JsonRowKeyedSerializationSchema implements
KafkaSerializationSchema {

  // constructors and helpers

  @Override
  public ProducerRecord serialize(Row row, @Nullable Long
aLong) {
return new ProducerRecord<>("some_topic", serializeKey(row),
serializeValue(row));
  }
}


*Stacktrace:*

Caused by: java.lang.AbstractMethodError: Method
com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
is abstract
at
com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at

Flink long state TTL Concerns

2020-03-19 Thread Matt Magsombol
Suppose I'm using state stored in-memory that has a TTL of 7 days max. Should I 
run into any issues with state this long other than potential OOM?

Let's suppose I extend this such that we add rocksdb...any concerns with this 
with respect to maintenance?

Most of the examples that I've been seeing seem to pair state with timewindows 
but I'll only read from this state every 15 seconds ( or some small timewindow 
). After each timewindow, I *won't* be cleaning up the data within the state 
b/c I'll need to re-lookup from this state on future time windows. I'll 
effectively rely on TTL based on key expiration time and I was wondering what 
potential issues I should watch out for this.


Re: Issues with Watermark generation after join

2020-03-19 Thread Dominik Wosiński
I have created a simple minimal reproducible example that shows what I am
talking about:
https://github.com/DomWos/FlinkTTF/tree/sql-ttf

It contains a test that shows that even if the output is in order which is
enforced by multiple sleeps, then for parallelism > 1 there is no output
and for parallelism == 1, the output is produced normally.

Best Regards,
Dom.


flink sql????????????5??????????????????????5??????????????

2020-03-19 Thread hiliuxg
hi 
all:??5??5??pv??uv
select 
'2020-03-19' as dt ,
'2020-03-19 12:05:00' as etltime ,
count(1) as pv ,
count(distinct userid) as uv
from t_user_log
where logintime = '2020-03-19 00:00:00' and logintime < 
'2020-03-19 12:05:00'


flink sql ??

flink sql ????????

2020-03-19 Thread hiliuxg
hi all??
flink sql count(disitinct)  ??bitmap ? 
java??set??

?????? ddl

2020-03-19 Thread hiliuxg
??tablesinkfactory??flink??




----
??:"LakeShen"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
 <
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
 

 Best Regards

 jinhai...@gmail.com

  2020??3??13?? 7:17 

Re: Can't create a savepoint with State Processor API

2020-03-19 Thread Dmitry Minaev
Yep, that works! Many thanks David, really appreciate it!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Can't create a savepoint with State Processor API

2020-03-19 Thread David Anderson
You are very close. I got your example to work by switching from the
MemoryStateBackend to the FsStateBackend, and adding

bEnv.execute();

at the end of main().

I'm not sure why either of those might be necessary, but it doesn't seem to
work without both changes.

See https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf for
my version.

*David Anderson* | Training Coordinator

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


On Thu, Mar 19, 2020 at 2:54 AM Dmitry Minaev  wrote:

> Hi everyone,
>
> I'm looking for a way to modify state inside an operator in Flink. I found
> State Processor API
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html#modifying-savepoints>
>
> that allows to modify savepoints, which looks great. But I can't make it
> work.
>
> I can read an existing state from savepoint but if I try to create (or
> modify) a savepoint it doesn't write it by some reason.
>
> Questions:
> 1. Is State Processor API the right way to achieve what I'm looking for?
> Are
> there any other approaches?
> 2. can I ran this as a standalone java program or it has to be a part of a
> Flink job?
> 3. I expect to have a new savepoint in the provided location after running
> the code below, is that the right expectation?
> ```
> public static void main( String[] args ) throws Exception
> {
> ExecutionEnvironment bEnv =
> ExecutionEnvironment.getExecutionEnvironment();
>
> BootstrapTransformation transform =
> OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
> .keyBy(String::valueOf)
> .transform(new SimplestTransform());
>
> Savepoint.create(new MemoryStateBackend(),
> 16).withOperator("my-operator-uid",
> transform).write("file:///tmp/savepoints/");
> }
>
> public class SimplestTransform extends KeyedStateBootstrapFunction Integer>
> {
> ValueState state;
>
> @Override
> public void open( Configuration parameters) {
> ValueStateDescriptor descriptor = new
> ValueStateDescriptor<>("total", Types.INT);
> state = getRuntimeContext().getState(descriptor);
> }
>
> @Override
> public void processElement(Integer value, Context ctx) throws Exception
> {
> state.update(value);
> }
> }
> ```
>
> It finishes successfully but it doesn't write anything to the specified
> folder. I tried folder format with "file://" prefix and without it.
>
> I feel I'm missing something.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


The question about the FLIP-45

2020-03-19 Thread LakeShen
Hi community,

Now I am reading the FLIP-45 Reinforce Job Stop Semantic, I have three
questions about it :
1. What the command to use to stop the Flink task, stop or cancel?

2. If use stop command to stop filnk task , but I see the flink source code
, the stop command we can set the savepoint dir , if we didn't set it , the
default savepoint dir will use . Both the target Savepoint  Dir or default
savepoint dir are null , the flink will throw the exception. But in FLIP-45
, If retained checkpoint is enabled, we should always do a checkpoint when
stopping job. I can't find this code.

Thanks to your reply.

Best regards,
LakeShen


The question about the FLIP-45

2020-03-19 Thread LakeShen
Hi community,

Now I am reading the FLIP-45 Reinforce Job Stop Semantic, I have three
questions about it :
1. What the command to use to stop the Flink task, stop or cancel?

2. If use stop command to stop filnk task , but I see the flink source code
, the stop command we can set the savepoint dir , if we didn't set it , the
default savepoint dir will use . Both the target Savepoint  Dir or default
savepoint dir are null , the flink will throw the exception. But in FLIP-45
, If retained checkpoint is enabled, we should always do a checkpoint when
stopping job. I can't find this code.

Thanks to your reply.

Best regards,
LakeShen


Re: Need help on timestamp type conversion for Table API on Pravega Connector

2020-03-19 Thread Jark Wu
This maybe a similar issue to [1], we continue the discussion there.

Best,
Jark

[1]:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Timetamp-types-incompatible-after-migration-to-1-10-td33784.html#a33791


On Tue, 17 Mar 2020 at 18:05, Till Rohrmann  wrote:

> Thanks for reporting this issue Brian. I'm not a Table API expert but I
> know that there is some work on the type system ongoing. I've pulled Timo
> and Jingsong into the conversation who might be able to tell you what
> exactly changed and whether the timestamp issue might be caused by the
> changes.
>
> Cheers,
> Till
>
> On Mon, Mar 16, 2020 at 5:48 AM  wrote:
>
>> Hi community,
>>
>>
>>
>> Pravega connector is a connector that provides both Batch and Streaming
>> Table API implementation. We uses descriptor API to build Table source.
>> When we plan to upgrade to Flink 1.10, we found the unit tests are not
>> passing with our existing Batch Table API. There is a type conversion error
>> in the Timestamp with our descriptor Table API. The detail is in the issue
>> here: https://github.com/pravega/flink-connectors/issues/341 Hope
>> someone from Flink community can help us with some suggestions on this
>> issue. Thanks.
>>
>>
>>
>> Best Regards,
>>
>> Brian
>>
>>
>>
>


flink????????????????

2020-03-19 Thread 512348363
??
DataStream

Re: SQL Timetamp types incompatible after migration to 1.10

2020-03-19 Thread Jark Wu
Hi Brian,

Could you share the full exception stack of  `Unsupport cast from
LocalDateTime to Long` in the PR?

In 1.10 DDL, the conversion class or TypeInformation for TIMESTAMP becomes
`LocalDateTime`. Maybe your prolem is related to this change?
If the connector doesn't support `LocalDateTime`, you should bridge back to
sql Timestamp, see CsvTableSource [1].

Regarding to the batch table support in 1.10 blink planner, in blink
planner, batch and streaming are unified.
For sources, you can use `StreamTableSourceFactory` and returns bounded
`StreamTableSource` (via StreamTableSource#isBounded()) to support batch
source.
For sinks, you can take JDBCTableSourceSinkFactory [2] as an exmaple.

Best,
Jark

[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java#L308
[2]:
https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java#L72

On Thu, 19 Mar 2020 at 19:32,  wrote:

> Hi Jark,
>
>
>
> I saw this mail and found this is a similar issue I raised to the
> community several days ago.[1] Can you have a look to see if it’s the same
> issue as this.
>
>
>
> If yes, there is a further question. From the Pravega connector side, the
> issue is raised in our Batch Table API which means users using the
> BatchTableEnvironment to create tables. Currently, BatchTableEnvironment
> does not support Blink planner. Any suggestions on how we can support Batch
> Tables in Flink 1.10?
>
>
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Need-help-on-timestamp-type-conversion-for-Table-API-on-Pravega-Connector-td33660.html
>
>
>
> Best Regards,
>
> Brian
>
>
>
> *From:* Jark Wu 
> *Sent:* Thursday, March 19, 2020 17:14
> *To:* Paul Lam
> *Cc:* user
> *Subject:* Re: SQL Timetamp types incompatible after migration to 1.10
>
>
>
> [EXTERNAL EMAIL]
>
> Hi Paul,
>
>
>
> Are you using old planner? Did you try blink planner? I guess it maybe a
> bug in old planner which doesn't work well on new types.
>
>
>
> Best,
>
> Jark
>
>
>
> On Thu, 19 Mar 2020 at 16:27, Paul Lam  wrote:
>
> Hi,
>
>
>
> Recently I upgraded a simple application that inserts static data into a
> table from 1.9.0 to 1.10.0, and
>
> encountered a timestamp type incompatibility problem during the table sink
> validation.
>
>
>
> The SQL is like:
>
> ```
>
> insert into kafka.test.tbl_a # schema: (user_name STRING, user_id INT,
> login_time TIMESTAMP)
>
> select ("ann", 1000, TIMESTAMP "2019-12-30 00:00:00")
>
> ```
>
>
>
> And the error thrown:
>
> ```
>
> Field types of query result and registered TableSink
> `kafka`.`test`.`tbl_a` do not match.
>
> Query result schema: [EXPR$0: String, EXPR$1: Integer, EXPR$2: Timestamp]
>
> TableSink schema: [user_name: String, user_id: Integer, login_time:
> LocalDateTime]
>
> ```
>
>
>
> After some digging, I found the root cause might be that since FLINK-14645
> timestamp fields
>
> defined via TableFactory had been bridged to LocalDateTime, but timestamp
> literals are
>
> still backed by java.sql.Timestamp.
>
>
>
> Is my reasoning correct? And is there any workaround? Thanks a lot!
>
>
>
> Best,
>
> Paul Lam
>
>
>
>


RE: SQL Timetamp types incompatible after migration to 1.10

2020-03-19 Thread B.Zhou
Hi Jark,

I saw this mail and found this is a similar issue I raised to the community 
several days ago.[1] Can you have a look to see if it’s the same issue as this.

If yes, there is a further question. From the Pravega connector side, the issue 
is raised in our Batch Table API which means users using the 
BatchTableEnvironment to create tables. Currently, BatchTableEnvironment does 
not support Blink planner. Any suggestions on how we can support Batch Tables 
in Flink 1.10?

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Need-help-on-timestamp-type-conversion-for-Table-API-on-Pravega-Connector-td33660.html

Best Regards,
Brian

From: Jark Wu 
Sent: Thursday, March 19, 2020 17:14
To: Paul Lam
Cc: user
Subject: Re: SQL Timetamp types incompatible after migration to 1.10


[EXTERNAL EMAIL]
Hi Paul,

Are you using old planner? Did you try blink planner? I guess it maybe a bug in 
old planner which doesn't work well on new types.

Best,
Jark

On Thu, 19 Mar 2020 at 16:27, Paul Lam 
mailto:paullin3...@gmail.com>> wrote:
Hi,

Recently I upgraded a simple application that inserts static data into a table 
from 1.9.0 to 1.10.0, and
encountered a timestamp type incompatibility problem during the table sink 
validation.

The SQL is like:
```
insert into kafka.test.tbl_a # schema: (user_name STRING, user_id INT, 
login_time TIMESTAMP)
select ("ann", 1000, TIMESTAMP "2019-12-30 00:00:00")
```

And the error thrown:
```
Field types of query result and registered TableSink `kafka`.`test`.`tbl_a` do 
not match.
Query result schema: [EXPR$0: String, EXPR$1: Integer, EXPR$2: Timestamp]
TableSink schema: [user_name: String, user_id: Integer, login_time: 
LocalDateTime]
```

After some digging, I found the root cause might be that since FLINK-14645 
timestamp fields
defined via TableFactory had been bridged to LocalDateTime, but timestamp 
literals are
still backed by java.sql.Timestamp.

Is my reasoning correct? And is there any workaround? Thanks a lot!

Best,
Paul Lam



Re: SQL Timetamp types incompatible after migration to 1.10

2020-03-19 Thread Jark Wu
Hi Paul,

Are you using old planner? Did you try blink planner? I guess it maybe a
bug in old planner which doesn't work well on new types.

Best,
Jark

On Thu, 19 Mar 2020 at 16:27, Paul Lam  wrote:

> Hi,
>
> Recently I upgraded a simple application that inserts static data into a
> table from 1.9.0 to 1.10.0, and
> encountered a timestamp type incompatibility problem during the table sink
> validation.
>
> The SQL is like:
> ```
> insert into kafka.test.tbl_a # schema: (user_name STRING, user_id INT,
> login_time TIMESTAMP)
> select ("ann", 1000, TIMESTAMP "2019-12-30 00:00:00")
> ```
>
> And the error thrown:
> ```
> Field types of query result and registered TableSink
> `kafka`.`test`.`tbl_a` do not match.
> Query result schema: [EXPR$0: String, EXPR$1: Integer, EXPR$2: Timestamp]
> TableSink schema: [user_name: String, user_id: Integer, login_time:
> LocalDateTime]
> ```
>
> After some digging, I found the root cause might be that since FLINK-14645
> timestamp fields
> defined via TableFactory had been bridged to LocalDateTime, but timestamp
> literals are
> still backed by java.sql.Timestamp.
>
> Is my reasoning correct? And is there any workaround? Thanks a lot!
>
> Best,
> Paul Lam
>
>


Re: Timestamp Erasure

2020-03-19 Thread Jark Wu
Hi Dom,

The output elements from ProcessingTime timer in BroadcastProcessFunction
or KeyedCoProcessFunction will be erased timestamp.
So you have to assign a new `*assignTimestampsAndWatermarks` *after that,
or use EventTime timer.

Best,
Jark

On Thu, 19 Mar 2020 at 16:40, Dominik Wosiński  wrote:

> Yes, I understand this completely, but my question is a little bit
> different.
>
> The issue is that if I have something like :
> *val firstStream = dataStreamFromKafka*
> *.assignTimestampAndWatermarks(...)*
> *val secondStream = otherStreamFromKafka*
> *.assignTimestampsAndWatermarks(...)*
> *.broadcast(...)*
>
> So, now If I do something like:
> *firstStream.keyby(...).connect(secondStream)*
> *.process(someBroadcastProcessFunction)*
>
> Now, I only select one field from the second stream and this is *not the
> timestamp field *and from the first stream I select all fields *including
> timestamp *(in process function when creating a new record).
>
> Then everything works like a charm and no issues there. But If I register
> ProcessingTime timer in this *someBroadcastProcessFunction *and any
> element is produced from *onTimer* function, then I get the issue
> described above.
>
> Best Regards,
> Dom.
>
> czw., 19 mar 2020 o 02:41 Jark Wu  napisał(a):
>
>> Hi  Dom,
>>
>> If you are converting a DataStream to a Table with a rowtime attribute,
>> then the  DataStream should hold event-time timestamp.
>> For example, call `assignTimestampsAndWatermarks` before converting to
>> table. You can find more details in the doc [1].
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1
>>
>> On Thu, 19 Mar 2020 at 02:38, Dominik Wosiński  wrote:
>>
>>> Hey,
>>> I just wanted to ask about one thing about timestamps. So, currently If
>>> I have a KeyedBroadcastProcess function followed by Temporal Table Join, it
>>> works like a charm. But, say I want to delay emitting some of the results
>>> due to any reason. So If I *registerProcessingTimeTimer*  and any
>>> elements are emitted in *onTimer* call then the timestamps are erased,
>>> meaning that I will simply get :
>>> *Caused by: java.lang.RuntimeException: Rowtime timestamp is null.
>>> Please make sure that a proper TimestampAssigner is defined and the stream
>>> environment uses the EventTime time characteristic.*
>>> * at DataStreamSourceConversion$10.processElement(Unknown Source)*
>>> * at
>>> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)*
>>> * at
>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)*
>>> * at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)*
>>> * ... 23 more*
>>>
>>> Is that the expected behavior? I haven't seen it described anywhere
>>> before and I wasn't able to find any docs specifying this.
>>>
>>> Thanks in advance,
>>> Best Regards,
>>> Dom.
>>>
>>


Re: FlinkCEP - Detect absence of a certain event

2020-03-19 Thread Humberto Rodriguez Avila
Thanks David, I will look at your references 

> On 19 Mar 2020, at 09:51, David Anderson  wrote:
> 
> Humberto,
> 
> Although Flink CEP lacks notFollowedBy at the end of a Pattern, there is a 
> way to implement this by exploiting the timeout feature. 
> 
> The Flink training includes an exercise [1] where the objective is to 
> identify taxi rides with a START event that is not followed by an END event 
> within two hours. You'll find a solution to this exercise that uses CEP in 
> [2].
> 
> [1] https://training.ververica.com/exercises/longRides.html 
> 
> [2] 
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/solutions/datastream_java/cep/LongRidesCEPSolution.java
>  
> 
> 
> Best,
> David
> 
> 
> 
> On Thu, Mar 19, 2020 at 4:41 AM Zhijiang  > wrote:
> Hi Humberto,
> 
> I guess Fuji is familiar with Flink CEP and he can answer your proposed 
> question. I already cc him.
> 
> Best,
> Zhijiang
> 
> --
> From:Humberto Rodriguez Avila  >
> Send Time:2020 Mar. 18 (Wed.) 17:31
> To:user mailto:user@flink.apache.org>>
> Subject:FlinkCEP - Detect absence of a certain event
> 
> In the documentation of FlinkCEP, I found that I can enforce that a 
> particular event doesn't occur between two other events using notFollowedBy 
> or notNext.
> 
> However, I was wondering If I could detect the absence of a certain event 
> after a time X. For example, if an event A is not followed by another event A 
> within 10 seconds, fire an alert or do something.
> 
> Could be possible to define a FlinkCEP pattern to capture that situation?
> 
> Thanks in advance, Humberto
> 
> 



Re: FlinkCEP - Detect absence of a certain event

2020-03-19 Thread Humberto Rodriguez Avila
Thanks David, I will look at your references 

> On 19 Mar 2020, at 09:51, David Anderson  > wrote:
> 
> Humberto,
> 
> Although Flink CEP lacks notFollowedBy at the end of a Pattern, there is a 
> way to implement this by exploiting the timeout feature. 
> 
> The Flink training includes an exercise [1] where the objective is to 
> identify taxi rides with a START event that is not followed by an END event 
> within two hours. You'll find a solution to this exercise that uses CEP in 
> [2].
> 
> [1] https://training.ververica.com/exercises/longRides.html 
> 
> [2] 
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/solutions/datastream_java/cep/LongRidesCEPSolution.java
>  
> 
> 
> Best,
> David
> 
> 
> 
> On Thu, Mar 19, 2020 at 4:41 AM Zhijiang  > wrote:
> Hi Humberto,
> 
> I guess Fuji is familiar with Flink CEP and he can answer your proposed 
> question. I already cc him.
> 
> Best,
> Zhijiang
> 
> --
> From:Humberto Rodriguez Avila  >
> Send Time:2020 Mar. 18 (Wed.) 17:31
> To:user mailto:user@flink.apache.org>>
> Subject:FlinkCEP - Detect absence of a certain event
> 
> In the documentation of FlinkCEP, I found that I can enforce that a 
> particular event doesn't occur between two other events using notFollowedBy 
> or notNext.
> 
> However, I was wondering If I could detect the absence of a certain event 
> after a time X. For example, if an event A is not followed by another event A 
> within 10 seconds, fire an alert or do something.
> 
> Could be possible to define a FlinkCEP pattern to capture that situation?
> 
> Thanks in advance, Humberto
> 
> 



Re: FlinkCEP - Detect absence of a certain event

2020-03-19 Thread David Anderson
Humberto,

Although Flink CEP lacks notFollowedBy at the end of a Pattern, there is a
way to implement this by exploiting the timeout feature.

The Flink training includes an exercise [1] where the objective is to
identify taxi rides with a START event that is not followed by an END event
within two hours. You'll find a solution to this exercise that uses CEP in
[2].

[1] https://training.ververica.com/exercises/longRides.html
[2]
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/solutions/datastream_java/cep/LongRidesCEPSolution.java

Best,
David



On Thu, Mar 19, 2020 at 4:41 AM Zhijiang  wrote:

> Hi Humberto,
>
> I guess Fuji is familiar with Flink CEP and he can answer your proposed
> question. I already cc him.
>
> Best,
> Zhijiang
>
> --
> From:Humberto Rodriguez Avila 
> Send Time:2020 Mar. 18 (Wed.) 17:31
> To:user 
> Subject:FlinkCEP - Detect absence of a certain event
>
> In the documentation of FlinkCEP, I found that I can enforce that a
> particular event doesn't occur between two other events using
> notFollowedBy or notNext.
>
> However, I was wondering If I could detect the absence of a certain event
> after a time X. For example, if an event *A* is not followed by another
> event *A* within 10 seconds, fire an alert or do something.
>
> Could be possible to define a FlinkCEP pattern to capture that situation?
>
> Thanks in advance, Humberto
>
>
>


Re: Timestamp Erasure

2020-03-19 Thread Dominik Wosiński
Yes, I understand this completely, but my question is a little bit
different.

The issue is that if I have something like :
*val firstStream = dataStreamFromKafka*
*.assignTimestampAndWatermarks(...)*
*val secondStream = otherStreamFromKafka*
*.assignTimestampsAndWatermarks(...)*
*.broadcast(...)*

So, now If I do something like:
*firstStream.keyby(...).connect(secondStream)*
*.process(someBroadcastProcessFunction)*

Now, I only select one field from the second stream and this is *not the
timestamp field *and from the first stream I select all fields *including
timestamp *(in process function when creating a new record).

Then everything works like a charm and no issues there. But If I register
ProcessingTime timer in this *someBroadcastProcessFunction *and any element
is produced from *onTimer* function, then I get the issue described above.

Best Regards,
Dom.

czw., 19 mar 2020 o 02:41 Jark Wu  napisał(a):

> Hi  Dom,
>
> If you are converting a DataStream to a Table with a rowtime attribute,
> then the  DataStream should hold event-time timestamp.
> For example, call `assignTimestampsAndWatermarks` before converting to
> table. You can find more details in the doc [1].
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1
>
> On Thu, 19 Mar 2020 at 02:38, Dominik Wosiński  wrote:
>
>> Hey,
>> I just wanted to ask about one thing about timestamps. So, currently If I
>> have a KeyedBroadcastProcess function followed by Temporal Table Join, it
>> works like a charm. But, say I want to delay emitting some of the results
>> due to any reason. So If I *registerProcessingTimeTimer*  and any
>> elements are emitted in *onTimer* call then the timestamps are erased,
>> meaning that I will simply get :
>> *Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please
>> make sure that a proper TimestampAssigner is defined and the stream
>> environment uses the EventTime time characteristic.*
>> * at DataStreamSourceConversion$10.processElement(Unknown Source)*
>> * at
>> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)*
>> * at
>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)*
>> * at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)*
>> * ... 23 more*
>>
>> Is that the expected behavior? I haven't seen it described anywhere
>> before and I wasn't able to find any docs specifying this.
>>
>> Thanks in advance,
>> Best Regards,
>> Dom.
>>
>


SQL Timetamp types incompatible after migration to 1.10

2020-03-19 Thread Paul Lam
Hi,

Recently I upgraded a simple application that inserts static data into a table 
from 1.9.0 to 1.10.0, and 
encountered a timestamp type incompatibility problem during the table sink 
validation.

The SQL is like:
```
insert into kafka.test.tbl_a # schema: (user_name STRING, user_id INT, 
login_time TIMESTAMP)
select ("ann", 1000, TIMESTAMP "2019-12-30 00:00:00")
```

And the error thrown:
```
Field types of query result and registered TableSink `kafka`.`test`.`tbl_a` do 
not match.
  Query result schema: [EXPR$0: String, EXPR$1: Integer, EXPR$2: Timestamp]
  TableSink schema:[user_name: String, user_id: Integer, login_time: 
LocalDateTime]
```

After some digging, I found the root cause might be that since FLINK-14645 
timestamp fields 
defined via TableFactory had been bridged to LocalDateTime, but timestamp 
literals are 
still backed by java.sql.Timestamp.

Is my reasoning correct? And is there any workaround? Thanks a lot!

Best,
Paul Lam



Re: FlinkCEP - Detect absence of a certain event

2020-03-19 Thread Humberto Rodriguez Avila
Hello Fuji and Zhijiang


Thanks for time! 

Fuji thanks for the links 

I hope that in the future similar scenarios can be implemented in FlinkCEP. 
Other CEPs like ESPER support this particular type of negated patterns.

Best regards,
Humberto

> On 19 Mar 2020, at 04:55, Shuai Xu  wrote:
> 
> Hi Humberto,
> Unfortunately, the answer is NO. Now FlinkCEP doesn't support notFollowedBy 
> in the end of a Pattern, even with a time window. I have proposed an 
> improvement to enable it. The detail can be found in 
> https://lists.apache.org/thread.html/rc505728048663d672ad379578ac67d3219f6076986c80a2362802ebb%40%3Cdev.flink.apache.org%3E
>  
> .
>  However, I didn't receive enough responses, so this work is suspended now.
> 
> Zhijiang mailto:wangzhijiang...@aliyun.com>> 
> 于2020年3月19日周四 上午11:41写道:
> Hi Humberto,
> 
> I guess Fuji is familiar with Flink CEP and he can answer your proposed 
> question. I already cc him.
> 
> Best,
> Zhijiang
> 
> --
> From:Humberto Rodriguez Avila  >
> Send Time:2020 Mar. 18 (Wed.) 17:31
> To:user mailto:user@flink.apache.org>>
> Subject:FlinkCEP - Detect absence of a certain event
> 
> In the documentation of FlinkCEP, I found that I can enforce that a 
> particular event doesn't occur between two other events using notFollowedBy 
> or notNext.
> 
> However, I was wondering If I could detect the absence of a certain event 
> after a time X. For example, if an event A is not followed by another event A 
> within 10 seconds, fire an alert or do something.
> 
> Could be possible to define a FlinkCEP pattern to capture that situation?
> 
> Thanks in advance, Humberto
> 
>