Re: Writing to S3 parquet files in Blink batch mode. Flink 1.10

2020-06-17 Thread Dmytro Dragan
Hi Jingsong,

Thank you for detailed clarification.

Best regards,
Dmytro Dragan | dd...@softserveinc.com | Lead Big Data Engineer | Big Data & 
Analytics | SoftServe


From: Jingsong Li 
Sent: Thursday, June 18, 2020 4:58:22 AM
To: Dmytro Dragan 
Cc: user@flink.apache.org 
Subject: Re: Writing to S3 parquet files in Blink batch mode. Flink 1.10

Hi Dmytro,

Yes, Batch mode must disabled checkpoint, So StreamingFileSink can not be used 
in batch mode (StreamingFileSink requires checkpoint whatever formats), we are 
refactoring it to more generic, and can be used in batch mode, but this is a 
future topic.
Currently, in batch mode, for sink, we must use `OutputFormat` with 
`FinalizeOnMaster` instead of `SinkFunction`.  We should implement the file 
committing in the method of `FinalizeOnMaster`. If you have enough time, you 
can implement a custom `OutputFormat`, it is complicated.

Now the status quo is:
- For 1.10, blink batch support writing to the hive table, if you can convert 
your table to a hive table with parquet and S3, it can be. [1]
- For 1.11, there is a new connector named `filesystem connector`, [2], you can 
define a table with parquet and S3, and writing to the table by SQL.
- For 1.11, moreover, both hive and filesystem connector support streaming 
writing by built-in reusing StreamingFileSink.

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html#writing-to-hive
[2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html

Best,
Jingsong

On Tue, Jun 16, 2020 at 10:50 PM Dmytro Dragan 
mailto:dd...@softserveinc.com>> wrote:
Hi guys,

In our use case we consider to write data to AWS S3 in parquet format using 
Blink Batch mode.
As far as I see from one side to write parquet file valid approach is to use 
StreamingFileSink with Parquet bulk-encoded format, but
Based to documentation and tests it works only with OnCheckpointRollingPolicy.

While Blink Batch mode requires disabled checkpoint.

Has anyone faced with similar issue?



--
Best, Jingsong Lee


flink监控

2020-06-17 Thread wch...@163.com
现有集群hadoop-2.8.0, 并且在其中两台节点上有flink客户端包.
提交任务都在其中一台提交任务.per-job
现在要做flink任务监控, 准备使用大家推荐的pushgateway+prometheus+grafana.
flink on yarn使用的logback打印的日志. hadoop没有开启日志聚合.
现在有个问题是: 

hadoop的相关日志例如 
resourcemanager、nodemanager、datanode以及userlogs目录下的taskmanager日志的异常告警要怎么做呢???
有什么方案吗?



wch...@163.com


Re: 关于多个来源,如何保证数据对齐

2020-06-17 Thread Jark Wu
我也觉得双流 JOIN 或者 interval join 应该可以解决你的需求。

On Mon, 15 Jun 2020 at 19:41, Benchao Li  wrote:

> Hi,
> 听起来你的需求应该就是做一个双流join,可以做一个基于事件时间的双流join[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#interval-joins
>
> 阿华田  于2020年6月15日周一 下午6:31写道:
>
> > 建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache,
> > com.google.common.cache;
> >
> >
> > | |
> > 阿华田
> > |
> > |
> > a15733178...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月15日 14:41,steven chen 写道:
> > hi:
> > 1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。
> > 2. 有topic a 是所有的用户pv日志, topic b
> > 是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的kafka topic c中,
> > 问题:当a 消息 提前到达,b 消息晚20分钟到达,job 在工作时如何保证2个topic 数据对齐,或者说2边数据进行关联整合?
> > 相当于2条消息处理后合并成1条往下游sink ,如何保证数据数据a和b对应的上?
> >
> >
> >
> >
> >
>


Re: Trouble with large state

2020-06-17 Thread Yun Tang
Hi Jeff


  1.  " after around 50GB of state, I stop being able to reliably take 
checkpoints or savepoints. "
What is the exact reason that job cannot complete checkpoint? Expired before 
completing or decline by some tasks? The former one is manly caused by high 
back-pressure and the later one is mainly due to some internal error.
  2.  Have you checked what reason the remote task manager is lost?
If the remote task manager is not crashed, it might be due to GC impact, I 
think you might need to check task-manager logs and GC logs.

Best
Yun Tang

From: Jeff Henrikson 
Sent: Thursday, June 18, 2020 1:46
To: user 
Subject: Trouble with large state

Hello Flink users,

I have an application of around 10 enrichment joins.  All events are
read from kafka and have event timestamps.  The joins are built using
.cogroup, with a global window, triggering on every 1 event, plus a
custom evictor that drops records once a newer record for the same ID
has been processed.  Deletes are represented by empty events with
timestamp and ID (tombstones). That way, we can drop records when
business logic dictates, as opposed to when a maximum retention has been
attained.  The application runs RocksDBStateBackend, on Kubernetes on
AWS with local SSDs.

Unit tests show that the joins produce expected results.  On an 8 node
cluster, watermark output progress seems to indicate I should be able to
bootstrap my state of around 500GB in around 1 day.  I am able to save
and restore savepoints for the first half an hour of run time.

My current trouble is that after around 50GB of state, I stop being able
to reliably take checkpoints or savepoints.  Some time after that, I
start getting a variety of failures where the first suspicious log event
is a generic cluster connectivity error, such as:

 1) java.io.IOException: Connecting the channel failed: Connecting
 to remote task manager + '/10.67.7.101:38955' has failed. This
 might indicate that the remote task manager has been lost.

 2) org.apache.flink.runtime.io.network.netty.exception
 .RemoteTransportException: Connection unexpectedly closed by remote
 task manager 'null'. This might indicate that the remote task
 manager was lost.

 3) Association with remote system
 [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
 gated for [50] ms. Reason: [Association failed with
 [akka.tcp://flink@10.67.6.66:34987]] Caused by:
 [java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum
savable state size.

I could enable HA, but for the time being I have been leaving it out to
avoid the possibility of masking deterministic faults.

Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
 parallelism=8
 maxParallelism=64
 setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
 setTolerableCheckpointFailureNumber(1000)
 setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
 RocksDBStateBackend
 setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
 setNumberOfTransferThreads(25)
 setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

 jobmanager.rpc.address: localhost
 jobmanager.rpc.port: 6123
 jobmanager.heap.size: 28000m
 taskmanager.memory.process.size: 28000m
 taskmanager.memory.jvm-metaspace.size: 512m
 taskmanager.numberOfTaskSlots: 1
 parallelism.default: 1
 jobmanager.execution.failover-strategy: full

 cluster.evenly-spread-out-slots: false

 taskmanager.memory.network.fraction: 0.2   # default 0.1
 taskmanager.memory.framework.off-heap.size: 2GB
 taskmanager.memory.task.off-heap.size: 2GB
 taskmanager.network.memory.buffers-per-channel: 32 # default 2
 taskmanager.memory.managed.fraction: 0.4   # docs say
default 0.1, but something seems to set 0.4
 taskmanager.memory.task.off-heap.size: 2048MB  # default 128M

 state.backend.fs.memory-threshold: 1048576
 state.backend.fs.write-buffer-size: 1024
 state.backend.local-recovery: true
 state.backend.rocksdb.writebuffer.size: 64MB
 state.backend.rocksdb.writebuffer.count: 8
 state.backend.rocksdb.writebuffer.number-to-merge: 4
 state.backend.rocksdb.timer-service.factory: heap
 state.backend.rocksdb.block.cache-size: 6400 # default 8MB
 state.backend.rocksdb.write-batch-size: 1600 # default 2MB

 web.checkpoints.history: 250


Flink plugin File System for GCS

2020-06-17 Thread Alexander Filipchik
Hello,
I'm trying to implement a flink native FS for GCS which can be used with a
streaming file sink. I used S3 one as a reference and made it work locally.
however, it fails to load when I deploy it to the cluster. If I put hadoop
in the fat jar I get:

Caused by: java.lang.LinkageError: loader constraint violation: loader
(instance of org/apache/flink/core/plugin/PluginLoader$PluginClassLoader)
previously initiated loading for a different type with name
"org/apache/hadoop/conf/Configuration"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at
org.apache.flink.core.plugin.PluginLoader$PluginClassLoader.loadClass(PluginLoader.java:149)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at
com.css.flink.fs.gcs.FlinkGcsFileSystemFactory.createInitializedGcsFS(FlinkGcsFileSystemFactory.java:63)

If i remove hadoop from fat jar but add hadoop uber to lib folder I get:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

if I remove hadoop from fat jar and put
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar into libs, I'm getting:
java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem

I think it is some issue with classloaders, but not sure what exactly
causes it. It looks like some classes are loaded from lib and some are not.
Any advice?

Thank you,Alex


Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-17 Thread Jark Wu
你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/
自己编译一下:mvn clean install -DskipTests
在 build-target 下就是打出来的 1.11 的分发包内容。

Best,
Jark



On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17626017...@163.com> wrote:

>
>
> 是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月17日 13:25,Rui Li 写道:
> 是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。
>
> On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote:
>
> Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息
>
>
>
>
> 在2020年06月17日 10:27,Benchao Li 写道:
> 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
> module了。
> 如果只是connector、format这些用老的版本,应该是没有问题的。
> 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink
>
> Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:
>
> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> 我编译了1.11包
> 在sql-cli下查询hive的表报如下错误:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoClassDefFoundError:
> org/apache/flink/table/dataformat/BaseRow
>
>
> 查注册的kafka表报:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
>
>
> 依赖包是从1.10.1下面拷贝的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
> Got it!
> Thx,junbao
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 09:32,zhangjunbao 写道:
> 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> https://issues.apache.org/jira/browse/FLINK-17189 <
> https://issues.apache.org/jira/browse/FLINK-17189>
>
> Best,
> Junbao Zhang
>
> 2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:
>
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> broker 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[$5])
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> SECOND)])
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
>
>
> flink版本:1.10.1
> blink planner,streaming model
>
>
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>
>
>
>
>
> --
> Best regards!
> Rui Li
>


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-17 Thread Jark Wu
是的,我觉得这样子是能绕过的。

On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.com> wrote:

> 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
> val resTmpTab: Table = tabEnv.sqlQuery(
>   """
> SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00'))
> time_str,COUNT(DISTINCT userkey) uv
> FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""")
>
> val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
>   .filter(line=line._1==true).map(line=line._2)
>
> val res= tabEnv.fromDataStream(resTmpStream)
> tabEnv.sqlUpdate(
>   s"""
> INSERT INTO rt_totaluv
> SELECT _1,MAX(_2)
> FROM $res
> GROUP BY _1
> """)
>
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年6月17日(星期三) 中午1:55
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 在 Flink 1.11 中,你可以尝试这样:
>
> CREATE TABLE mysql (
>  time_str STRING,
>  uv BIGINT,
>  PRIMARY KEY (ts) NOT ENFORCED
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>  'table-name' = 'myuv'
> );
>
> INSERT INTO mysql
> SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), COUNT(DISTINCT
> user_id)
> FROM user_behavior;
>
> On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com wrote:
>
>  感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
>  sink表这个样式
>  tm uv
>  2020/06/17 13:46:00 1
>  2020/06/17 13:47:00 2
>  2020/06/17 13:48:00 3
> 
> 
>  group by 日期的话,分钟如何获取
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Benchao Li"  发送时间:nbsp;2020年6月17日(星期三) 中午11:46
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> 
> 
> 
>  Hi,
>  我感觉这种场景可以有两种方式,
>  1. 可以直接用group by + mini batch
>  2. window聚合 + fast emit
> 
>  对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
>  这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
>  用参数[2] 来打开。
> 
>  对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
>  fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
>  table.exec.emit.early-fire.enabled = true
>  table.exec.emit.early-fire.delay = 60 s
> 
>  [1]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
>  [2]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> 
>  x <35907...@qq.comgt; 于2020年6月17日周三 上午11:14写道:
> 
>  gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
>  gt; CREATE VIEW uv_per_10min AS
>  gt; SELECTamp;nbsp;
>  gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;,
> '-MM-dd
>  HH:mm:00'))amp;nbsp;OVER w
>  gt; AS time_str,amp;nbsp;
>  gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
>  gt; FROM user_behavior
>  gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED
> PRECEDING AND
>  gt; CURRENT ROW);
>  gt;
>  gt;
>  gt; 想请教一下,应该如何处理?
>  gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd')
> 这样可以吗,另外状态应该如何清理?
>  gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
>  gt; 多谢


Re: Convert flink table with field of type RAW to datastream

2020-06-17 Thread Jark Wu
Hi YI,

Flink doesn't have a TypeInformation for `java.util.Date`, but
only SqlTimeTypeInfo.DATE for `java.sql.Date`.
That's why the TypeInformation.of(java.util.Date) is being recognized as a
RAW type.

To resolve your problem, I think in `TypeInformation.of(..)` you should use
a concrete type for `java.util.Date`, e.g. `java.sql.Timestamp`,
`java.sql.Date`, `java.sql.Time`.

Best,
Jark

On Thu, 18 Jun 2020 at 10:32, YI  wrote:

> Hi all,
>
> I am using flink to process external data. The source format is json, and
> the underlying data types are defined in a external library.
> I generated table schema with `TableSchema.fromTypeInfo` and
> `TypeInformation.of[_]`. From what I read, this method is deprecated.
> But I didn't find any alternatives. Manually tweaking table schema is not
> viable as there are simply too many types.
>
> One of the field in the source type is `java.util.Date`. I tried to
> convert the obtained table to a datastream with Table.toAppendStream.
> When I ran
> `tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`,
> the following exception occurred.
>
> Exception in thread "main" org.apache.flink.table.api.TableException: Type
> is not supported: Date
> at
> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350)
> at
> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63)
> at
> org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198)
> at
> org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96)
> at
> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131)
> at
> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228)
> at
> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094)
> at
> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268)
> at
> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134)
> at
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at
> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
> at
> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
> at
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91)
> at
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at
> org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130)
> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
> at
> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130)
> at
> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
> at
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
> at
> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at
> org.apache.flink.table.calcite.FlinkRelBuilder.tableOperation(FlinkRelBuilder.scala:106)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:390)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
> at
> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableLike.map(TraversableLike.scala:273)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
> at
> 

关于广播流的疑问

2020-06-17 Thread a511955993

Hi

在官网上看到关于广播流的说法,有一些疑问,在文档[1]写到广播后台的state backend只有in 
memory,没有rocksdb。在CoBroadcastWithKeyedOperator的open方法中,状态是通过getOperatorStateBackend().getBroadcastState(descriptor)创建的,getOperatorStateBackend()中通过stateBackend.createOperatorStateBackend创建对应的state
 backend,state backend不是根据配置选择对应的工厂创建出来的吗,这部分是如何限定in memory?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/broadcast_state.html#important-considerations


Best

Looking forward to your reply and help.

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Re: Writing to S3 parquet files in Blink batch mode. Flink 1.10

2020-06-17 Thread Jingsong Li
Hi Dmytro,

Yes, Batch mode must disabled checkpoint, So StreamingFileSink can not be
used in batch mode (StreamingFileSink requires checkpoint whatever
formats), we are refactoring it to more generic, and can be used in batch
mode, but this is a future topic.
Currently, in batch mode, for sink, we must use `OutputFormat` with
`FinalizeOnMaster` instead of `SinkFunction`.  We should implement the file
committing in the method of `FinalizeOnMaster`. If you have enough time,
you can implement a custom `OutputFormat`, it is complicated.

Now the status quo is:
- For 1.10, blink batch support writing to the hive table, if you can
convert your table to a hive table with parquet and S3, it can be. [1]
- For 1.11, there is a new connector named `filesystem connector`, [2], you
can define a table with parquet and S3, and writing to the table by SQL.
- For 1.11, moreover, both hive and filesystem connector support streaming
writing by built-in reusing StreamingFileSink.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html#writing-to-hive
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html

Best,
Jingsong

On Tue, Jun 16, 2020 at 10:50 PM Dmytro Dragan 
wrote:

> Hi guys,
>
>
>
> In our use case we consider to write data to AWS S3 in parquet format
> using Blink Batch mode.
>
> As far as I see from one side to write parquet file valid approach is to
> use *StreamingFileSink* with Parquet bulk-encoded format, but
>
> Based to documentation and tests it works only with
> OnCheckpointRollingPolicy.
>
>
>
> While Blink Batch mode requires disabled checkpoint.
>
>
>
> Has anyone faced with similar issue?
>
>
>


-- 
Best, Jingsong Lee


Re: Implications on incremental checkpoint duration based on RocksDBStateBackend, Number of Slots per TM

2020-06-17 Thread Yun Tang
Hi Sameer

If you only have one disk for one TM, 10 TMs could deploy at most 10 disks 
while 100TM could deploy at most 100 disks.
The sync checkpoint phase of RocksDB need to write disk and if you could 
distribute the write pressure over more disks, you could get better performance 
which is what you observed.

The synchronous checkpoint phase actually means the task can only execute 
checkpoint and cannot process elements at that time.
On the other hand, the asynchronous phase means the task upload checkpoint data 
asyncly and could still process elements at that time.
Moreover, flushing RocksDB in sync phase is executed in task main thread and 
one TM could have many task main threads.

Since the synchronous checkpoint phase is only triggered after barrier 
alignment finished, we cannot ensure all RocksDB instances would execute 
flushing at the same time.


Best
Yun Tang

From: Sameer W 
Sent: Thursday, June 18, 2020 3:34
To: user 
Subject: Implications on incremental checkpoint duration based on 
RocksDBStateBackend, Number of Slots per TM

Hi,

The number of RocksDB databases the Flink creates is equal to the number of 
operator states multiplied by the number of slots.

Assuming a parallelism of 100 for a job which is executed on 100 TM's with 1 
slot per TM as opposed to 10 TM's with 10 slots per TM I have noticed that the 
former configuration is more efficient for incremental checkpointing. In both 
cases the number of RocksDB databases is the same, except in the latter case 10 
times as many are created in one TM vs the former case.

Reading the 
link
 below, it says - "link uses this to figure out the state changes. To do this, 
Flink triggers a flush in RocksDB, forcing all memtables into sstables on disk, 
and hard-linked in a local temporary directory. This process is synchronous to 
the processing pipeline, and Flink performs all further steps asynchronously 
and does not block processing."

What does "Synchronous to the processing pipeline" mean? Does it mean that 
flushing to DB happens synchronously (serially) for all RocksDB databases in 
one TM? Is the flushing single threaded per TM

Thanks,
Sameer



?????? ??????FLINKSQL1.10????????????UV

2020-06-17 Thread x
??1.10??,???
val resTmpTab: Table = tabEnv.sqlQuery(
  """
SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT 
userkey) uv
FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""")

val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
  .filter(line=line._1==true).map(line=line._2)

val res= tabEnv.fromDataStream(resTmpStream)
tabEnv.sqlUpdate(
  s"""
INSERT INTO rt_totaluv
SELECT _1,MAX(_2)
FROM $res
GROUP BY _1
""")


----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 [2]

 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

 x <35907...@qq.comgt; ??2020??6??17?? 11:14??

 gt; 
??0??UV??UV??
 gt; CREATE VIEW uv_per_10min AS
 gt; SELECTamp;nbsp;
 gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;, '-MM-dd
 HH:mm:00'))amp;nbsp;OVER w
 gt; AS time_str,amp;nbsp;
 gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 gt; FROM user_behavior
 gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING 
AND
 gt; CURRENT ROW);
 gt;
 gt;
 gt; ??
 gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 gt; PS??1.10??DDL??CREATE VIEW??
 gt; 

Convert flink table with field of type RAW to datastream

2020-06-17 Thread YI
Hi all,

I am using flink to process external data. The source format is json, and the 
underlying data types are defined in a external library.
I generated table schema with `TableSchema.fromTypeInfo` and 
`TypeInformation.of[_]`. From what I read, this method is deprecated.
But I didn't find any alternatives. Manually tweaking table schema is not 
viable as there are simply too many types.

One of the field in the source type is `java.util.Date`. I tried to convert the 
obtained table to a datastream with Table.toAppendStream.
When I ran 
`tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`, 
the following exception occurred.

Exception in thread "main" org.apache.flink.table.api.TableException: Type is 
not supported: Date
at 
org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198)
at 
org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96)
at 
org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131)
at 
org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228)
at 
org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094)
at 
org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268)
at 
org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at 
org.apache.flink.table.calcite.FlinkRelBuilder.tableOperation(FlinkRelBuilder.scala:106)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:390)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
at io.redacted.test.package$.testJoin(package.scala:31)
at io.redacted.test.package$.process(package.scala:26)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)


Re: Blink Planner Retracting Streams

2020-06-17 Thread Jark Wu
Hi John,

Maybe I misunderstand something, but CRow doesn't have the `getSchema()`
method. You can getSchema() on the Table, this also works if you convert
the table into Tuple2.
Actually, there is no big difference between CRow and Tuple2,
they both wrap the change flag and the Row.

Best,
Jark



On Thu, 18 Jun 2020 at 06:39, John Mathews  wrote:

> Hello Godfrey,
>
> Thanks for the response!
>
> I think the problem with Tuple2, is that if my understanding is correct of
> how CRow worked, when CRow's getSchema() was returned it would return the
> underlying schema of the row it contained. Tuple2 doesn't do that, so it
> changes/breaks a lot of our downstream code that is relying on the
> TableSchema to return the underlying row's schema, and not a Tuple schema.
>
> Any thoughts on that issue?
>
>
> On Wed, Jun 17, 2020 at 12:16 AM godfrey he  wrote:
>
>> hi John,
>>
>> You can use Tuple2[Boolean, Row] to replace CRow, the
>> StreamTableEnvironment#toRetractStream method return DataStream[(Boolean,
>> T)].
>>
>> the code looks like:
>>
>> tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
>>   override def map(value: (Boolean, Row)): R = ...
>> })
>>
>> Bests,
>> Godfrey
>>
>> John Mathews  于2020年6月17日周三 下午12:13写道:
>>
>>> Hello,
>>>
>>> I am working on migrating from the flink table-planner to the new blink
>>> one, and one problem I am running into is that it doesn't seem like Blink
>>> has a concept of a CRow, unlike the original table-planner.
>>>
>>> I am therefore struggling to figure out how to properly convert a
>>> retracting stream to a SingleOutputStreamOperator when using just the Blink
>>> planner libraries.
>>>
>>> E.g. in the old planner I could do something like this:
>>> SingleOutputStreamOperator stream =
>>> tableEnvironment.toRetractStream(table, typeInfo)
>>> .map(value -> new CRow(value.f1, value.f0);
>>>
>>> but without the CRow, I'm not sure how to accomplish this.
>>>
>>> Any suggestions?
>>>
>>> Thanks!
>>> John
>>>
>>>
>>>


Re: flink sql sink mysql requires primary keys

2020-06-17 Thread Jark Wu
Hi,

在 Flink 1.10 中,sink 的 primary key 是从 query 推导的,如果 query 推导不出 pk 就会报你看到的错误
“UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.”
你的这个作业就是 query pk 推导不出来的 case。

此外 DDL 上声明 PK 在1.10也是不支持的。

这些问题,在 1.11 都解决了,可以尝试自己拿 release-1.11 分支编译下尝试下。
Flink 1.11 中,sink的 primary key 都是从 DDL 上用户显式声明出来的,不会去推导 query pk。

Best,
Jark


On Thu, 18 Jun 2020 at 09:39, Zhou Zach  wrote:

> 加了primary key报错,
> Exception in thread "main"
> org.apache.flink.table.planner.operations.SqlConversionException: Primary
> key and unique key are not supported yet.
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52)
> at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
>
>
> Query:
>
>
> streamTableEnv.sqlUpdate(
> """
> |
> |CREATE TABLE user_uv(
> |`time` VARCHAR,
> |cnt bigint,
> |PRIMARY KEY (`time`)
> |) WITH (
> |'connector.type' = 'jdbc',
> |'connector.write.flush.max-rows' = '1'
> |)
> |""".stripMargin)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2020-06-17 20:59:35, "Zhou Zach"  wrote:
> >Exception in thread "main" org.apache.flink.table.api.TableException:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> >   at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >   at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> >   at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> >   at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> >   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >   at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >   at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> >   at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> >   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >   at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68)
> >   at
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
> >
> >
> >
> >
> >
> >Query:
> >Flink :1.10.0
> >CREATE TABLE user_uv(
> >|`time` VARCHAR,
> >|cnt bigint
> >|) WITH (
> >|'connector.type' = 'jdbc')
> >|insert into user_uv
> >|select  MAX(DATE_FORMAT(created_time, '-MM-dd HH:mm:00')) as `time`,
> COUNT(DISTINCT  uid) as cnt
> >|from `user`
> >|group by DATE_FORMAT(created_time, '-MM-dd HH:mm:00')
>


Re:flink sql sink mysql requires primary keys

2020-06-17 Thread Zhou Zach
加了primary key报错,
Exception in thread "main" 
org.apache.flink.table.planner.operations.SqlConversionException: Primary key 
and unique key are not supported yet.
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:169)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at 
org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:52)
at 
org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)


Query:


streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_uv(
|`time` VARCHAR,
|cnt bigint,
|PRIMARY KEY (`time`)
|) WITH (
|'connector.type' = 'jdbc',
|'connector.write.flush.max-rows' = '1'
|)
|""".stripMargin)

















At 2020-06-17 20:59:35, "Zhou Zach"  wrote:
>Exception in thread "main" org.apache.flink.table.api.TableException: 
>UpsertStreamTableSink requires that Table has a full primary keys if it is 
>updated.
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>   at 
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68)
>   at 
> org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
>
>
>
>
>
>Query:
>Flink :1.10.0
>CREATE TABLE user_uv(
>|`time` VARCHAR,
>|cnt bigint
>|) WITH (
>|'connector.type' = 'jdbc')
>|insert into user_uv
>|select  MAX(DATE_FORMAT(created_time, '-MM-dd HH:mm:00')) as `time`, 
>COUNT(DISTINCT  uid) as cnt
>|from `user`
>|group by DATE_FORMAT(created_time, '-MM-dd HH:mm:00')


Interact with different S3 buckets from a shared Flink cluster

2020-06-17 Thread Ricardo Cardante

Hi!





We are working in a use case where we have a shared Flink cluster to deploy 
multiple jobs from different teams. With this strategy, we are facing a 
challenge regarding the interaction with S3. Given that we already configured 
S3 for the state backend (through flink-conf.yaml) every time we use API 
functions that communicate with the file system (e.g., DataStream readFile) the 
applicational configurations appear to be overridden by those of the cluster 
while attempting to communicate with external S3 buckets. What we've thought so 
far:




1. Provide a core-site.xml resource file targeting the external S3 buckets we 
want to interact with. We've tested, and the credentials ultimately seem to be 
ignored in behalf of the IAM roles that are pre-loaded with the instances;

2. Load the cluster instances with multiple IAM roles. The problem with this is 
that we would allow each job to interact with out-of-scope buckets;

3. Spin multiple clusters with different configurations - we would like to 
avoid this since we started from the premise of sharing a single cluster per 
context;




What would be a clean/recommended solution to interact with multiple S3 buckets 
with different security policies from a shared Flink cluster? 


Thanks in advance.


Re: Blink Planner Retracting Streams

2020-06-17 Thread John Mathews
Hello Godfrey,

Thanks for the response!

I think the problem with Tuple2, is that if my understanding is correct of
how CRow worked, when CRow's getSchema() was returned it would return the
underlying schema of the row it contained. Tuple2 doesn't do that, so it
changes/breaks a lot of our downstream code that is relying on the
TableSchema to return the underlying row's schema, and not a Tuple schema.

Any thoughts on that issue?


On Wed, Jun 17, 2020 at 12:16 AM godfrey he  wrote:

> hi John,
>
> You can use Tuple2[Boolean, Row] to replace CRow, the
> StreamTableEnvironment#toRetractStream method return DataStream[(Boolean,
> T)].
>
> the code looks like:
>
> tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
>   override def map(value: (Boolean, Row)): R = ...
> })
>
> Bests,
> Godfrey
>
> John Mathews  于2020年6月17日周三 下午12:13写道:
>
>> Hello,
>>
>> I am working on migrating from the flink table-planner to the new blink
>> one, and one problem I am running into is that it doesn't seem like Blink
>> has a concept of a CRow, unlike the original table-planner.
>>
>> I am therefore struggling to figure out how to properly convert a
>> retracting stream to a SingleOutputStreamOperator when using just the Blink
>> planner libraries.
>>
>> E.g. in the old planner I could do something like this:
>> SingleOutputStreamOperator stream =
>> tableEnvironment.toRetractStream(table, typeInfo)
>> .map(value -> new CRow(value.f1, value.f0);
>>
>> but without the CRow, I'm not sure how to accomplish this.
>>
>> Any suggestions?
>>
>> Thanks!
>> John
>>
>>
>>


Implications on incremental checkpoint duration based on RocksDBStateBackend, Number of Slots per TM

2020-06-17 Thread Sameer W
Hi,

The number of RocksDB databases the Flink creates is equal to the number of
operator states multiplied by the number of slots.

Assuming a parallelism of 100 for a job which is executed on 100 TM's with
1 slot per TM as opposed to 10 TM's with 10 slots per TM I have noticed
that the former configuration is more efficient for incremental
checkpointing. In both cases the number of RocksDB databases is the same,
except in the latter case 10 times as many are created in one TM vs the
former case.

Reading the link

below, it says - "link uses this to figure out the state changes. To do
this, Flink triggers a flush in RocksDB, forcing all memtables into
sstables on disk, and hard-linked in a local temporary directory. *This
process is synchronous to the processing pipeline*, and Flink performs all
further steps asynchronously and does not block processing."

What does "Synchronous to the processing pipeline" mean? Does it mean that
flushing to DB happens synchronously (serially) for all RocksDB databases
in one TM? Is the flushing single threaded per TM

Thanks,
Sameer



Re: Unable to run flink job in dataproc cluster with jobmanager provided

2020-06-17 Thread Till Rohrmann
Hi Sourabh,

do you have access to the cluster logs? They could be helpful for debugging
the problem. Which version of Flink are you using?

Cheers,
Till

On Wed, Jun 17, 2020 at 7:39 PM Sourabh Mehta 
wrote:

> No, I am not.
>
> On Wed, 17 Jun 2020 at 10:48 PM, Chesnay Schepler 
> wrote:
>
>> Are you by any chance creating a local environment via
>> (Stream)ExecutionEnvironment#createLocalEnvironment?
>>
>> On 17/06/2020 17:05, Sourabh Mehta wrote:
>>
>> Hi Team,
>>
>> I'm  exploring flink for one of my use case, I'm facing some issues
>> while running a flink job in cluster mode. Below are the steps I followed
>> to setup and run job in cluster mode :
>> 1. Setup flink on google cloud dataproc using
>> https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink
>>
>> 2. After setting up the cluster I could see the flink session started and
>> could see the UI for the same.
>>
>> 3 Submitted job from dataproc master node using below command
>>
>> sudo HADOOP_CONF_DIR=/etc/hadoop/conf /usr/lib/flink/bin/flink run -m
>> yarn-cluster -yid application_1592311654771_0001 -class
>> com.sm.flink.FlinkDriver /usr/lib/flink/lib/flink-1.0.10-sm-SNAPSHOT.jar
>> hdfs://cluster-flink-poc-m:8020/user/flink/rocksdb/
>>
>> After running the job I see the job started successfully but created a
>> mini local cluster and ran in local mode. I don't see any jobs submitted to
>> JobManger and I also see 0 task managers on UI.
>>
>> Can someone please help me understand here?, do let me know what input
>> is required to investigate the same.
>>
>>
>>
>>
>>


Trouble with large state

2020-06-17 Thread Jeff Henrikson

Hello Flink users,

I have an application of around 10 enrichment joins.  All events are 
read from kafka and have event timestamps.  The joins are built using 
.cogroup, with a global window, triggering on every 1 event, plus a 
custom evictor that drops records once a newer record for the same ID 
has been processed.  Deletes are represented by empty events with 
timestamp and ID (tombstones). That way, we can drop records when 
business logic dictates, as opposed to when a maximum retention has been 
attained.  The application runs RocksDBStateBackend, on Kubernetes on 
AWS with local SSDs.


Unit tests show that the joins produce expected results.  On an 8 node 
cluster, watermark output progress seems to indicate I should be able to 
bootstrap my state of around 500GB in around 1 day.  I am able to save 
and restore savepoints for the first half an hour of run time.


My current trouble is that after around 50GB of state, I stop being able 
to reliably take checkpoints or savepoints.  Some time after that, I 
start getting a variety of failures where the first suspicious log event 
is a generic cluster connectivity error, such as:


1) java.io.IOException: Connecting the channel failed: Connecting
to remote task manager + '/10.67.7.101:38955' has failed. This
might indicate that the remote task manager has been lost.

2) org.apache.flink.runtime.io.network.netty.exception
.RemoteTransportException: Connection unexpectedly closed by remote
task manager 'null'. This might indicate that the remote task
manager was lost.

3) Association with remote system
[akka.tcp://flink@10.67.6.66:34987] has failed, address is now
gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@10.67.6.66:34987]] Caused by:
[java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum 
savable state size.


I could enable HA, but for the time being I have been leaving it out to 
avoid the possibility of masking deterministic faults.


Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
parallelism=8
maxParallelism=64
setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
setTolerableCheckpointFailureNumber(1000)
setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
RocksDBStateBackend
setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
setNumberOfTransferThreads(25)
setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 28000m
taskmanager.memory.process.size: 28000m
taskmanager.memory.jvm-metaspace.size: 512m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: full

cluster.evenly-spread-out-slots: false

taskmanager.memory.network.fraction: 0.2   # default 0.1
taskmanager.memory.framework.off-heap.size: 2GB
taskmanager.memory.task.off-heap.size: 2GB
taskmanager.network.memory.buffers-per-channel: 32 # default 2
taskmanager.memory.managed.fraction: 0.4   # docs say 
default 0.1, but something seems to set 0.4

taskmanager.memory.task.off-heap.size: 2048MB  # default 128M

state.backend.fs.memory-threshold: 1048576
state.backend.fs.write-buffer-size: 1024
state.backend.local-recovery: true
state.backend.rocksdb.writebuffer.size: 64MB
state.backend.rocksdb.writebuffer.count: 8
state.backend.rocksdb.writebuffer.number-to-merge: 4
state.backend.rocksdb.timer-service.factory: heap
state.backend.rocksdb.block.cache-size: 6400 # default 8MB
state.backend.rocksdb.write-batch-size: 1600 # default 2MB

web.checkpoints.history: 250


Re: Unable to run flink job in dataproc cluster with jobmanager provided

2020-06-17 Thread Sourabh Mehta
No, I am not.

On Wed, 17 Jun 2020 at 10:48 PM, Chesnay Schepler 
wrote:

> Are you by any chance creating a local environment via
> (Stream)ExecutionEnvironment#createLocalEnvironment?
>
> On 17/06/2020 17:05, Sourabh Mehta wrote:
>
> Hi Team,
>
> I'm  exploring flink for one of my use case, I'm facing some issues while
> running a flink job in cluster mode. Below are the steps I followed to
> setup and run job in cluster mode :
> 1. Setup flink on google cloud dataproc using
> https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink
>
> 2. After setting up the cluster I could see the flink session started and
> could see the UI for the same.
>
> 3 Submitted job from dataproc master node using below command
>
> sudo HADOOP_CONF_DIR=/etc/hadoop/conf /usr/lib/flink/bin/flink run -m
> yarn-cluster -yid application_1592311654771_0001 -class
> com.sm.flink.FlinkDriver /usr/lib/flink/lib/flink-1.0.10-sm-SNAPSHOT.jar
> hdfs://cluster-flink-poc-m:8020/user/flink/rocksdb/
>
> After running the job I see the job started successfully but created a
> mini local cluster and ran in local mode. I don't see any jobs submitted to
> JobManger and I also see 0 task managers on UI.
>
> Can someone please help me understand here?, do let me know what input is
> required to investigate the same.
>
>
>
>
>


Re: flink-s3-fs-hadoop retry configuration

2020-06-17 Thread Jeff Henrikson

Robert,

Thanks for the tip!

Before you replied, I did figure out to put the keys in flink-conf.yaml, 
using btrace.  I instrumented the methods 
org.apache.hadoop.conf.Configuration.get for the keys, and 
org.apache.hadoop.conf.Configuration.substituteVars for effective 
values.  (There is a btrace bug where you can't just observe the return 
value from .get directly.)


I did not see in the code any way to observe the effective configuration 
using logging.


Regards,


Jeff



On 5/8/20 7:29 AM, Robert Metzger wrote:

I validated my assumption. Putting

s3.connection.maximum: 123456

into the flink-conf.yaml file results in the following DEBUG log output:

2020-05-08 16:20:47,461 DEBUG 
org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding 
Flink config entry for s3.connection.maximum as 
fs.s3a.connection.maximum to Hadoop config


I guess that is the recommended way of passing configuration into the S3 
connectors of Flink.


You also asked how to detect retries: DEBUG-log level is helpful again. 
I just tried connecting against an invalid port, and got these messages:


2020-05-08 16:26:37,671 DEBUG 
org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - 
http-outgoing-7: Shutdown connection
2020-05-08 16:26:37,671 DEBUG 
org.apache.http.impl.execchain.MainClientExec                [] - 
Connection discarded
2020-05-08 16:26:37,671 DEBUG 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - 
Connection released: [id: 7][route: {}->http://127.0.0.1:9000][total 
kept alive: 0; route allocated: 0 of 123456; total allocated: 0 of 123456]
2020-05-08 16:26:37,671 DEBUG com.amazonaws.request 
                    [] - Retrying Request: HEAD http://127.0.0.1:9000 
/test/ Headers: (User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 
Mac_OS_X/10.15.3 OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 
scala/2.11.12, amz-sdk-invocation-id: 
051f9877-1c22-00ed-ad26-8361bcf14b98, Content-Type: 
application/octet-stream, )
2020-05-08 16:26:37,671 DEBUG com.amazonaws.http.AmazonHttpClient   
                    [] - Retriable error detected, will retry in 4226ms, 
attempt number: 7



maybe it makes sense to set the log level only for 
"com.amazonaws.http.AmazonHttpClient" to DEBUG.


How to configure the log level depends on the deployment method. 
Usually, its done by replacing the first INFO with DEBUG in 
conf/log4j.properties. ("rootLogger.level = DEBUG")



Best,
Robert

On Fri, May 8, 2020 at 3:51 PM Robert Metzger > wrote:


Hey Jeff,

Which Flink version are you using?
Have you tried configuring the S3 filesystem via Flink's  config
yaml? Afaik all config parameters prefixed with "s3." are mirrored
into the Hadoop file system connector.


On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson mailto:jehenri...@gmail.com>> wrote:

  > 2) How can I tell if flink-s3-fs-hadoop is actually managing
to pick up
  > the hadoop configuration I have provided, as opposed to some
separate
  > default configuration?

I'm reading the docs and source of flink-fs-hadoop-shaded.  I
see that
core-default-shaded.xml has fs.s3a.connection.maximum set to
15.  I have
around 20 different DataStreams being instantiated from S3, so
if they
each require one connection to be healthy, then 15 is definitely
not a
good value.

However, I seem to be unable to override
fs.s3a.connection.maximum using
my core-site.xml.  I am also unable to see the DEBUG level
messages for
the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.

So now I'm wondering:

      1) Anybody know how to see DEBUG output for
flink-fs-hadoop-shaded?

      2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
      override the config?


Thanks in advance,


Jeff Henrikson




https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded


https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml

    
      fs.s3a.connection.maximum
      15
      Controls the maximum number of simultaneous
connections to S3.
    




On 5/1/20 7:30 PM, Jeff Henrikson wrote:
 > Hello Flink users,
 >
 > I could use help with three related questions:
 >
 > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
 >
 > 2) How can I tell if flink-s3-fs-hadoop is actually managing
to pick up
 > the hadoop configuration I have provided, as opposed to some
separate
 > default configuration?  My job fails quickly when I read
larger or more
 > numerous objects from S3.  I conjecture the 

Re: Unable to run flink job in dataproc cluster with jobmanager provided

2020-06-17 Thread Chesnay Schepler
Are you by any chance creating a local environment via 
(Stream)ExecutionEnvironment#createLocalEnvironment?


On 17/06/2020 17:05, Sourabh Mehta wrote:

Hi Team,

I'm  exploring flink for one of my use case, I'm facing some issues 
while running a flink job in cluster mode. Below are the steps I 
followed to setup and run job in cluster mode :
1. Setup flink on google cloud dataproc using 
https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink


2. After setting up the cluster I could see the flink session started 
and could see the UI for the same.


3 Submitted job from dataproc master node using below command

sudo HADOOP_CONF_DIR=/etc/hadoop/conf /usr/lib/flink/bin/flink run -m 
yarn-cluster -yid application_1592311654771_0001 -class 
com.sm.flink.FlinkDriver 
/usr/lib/flink/lib/flink-1.0.10-sm-SNAPSHOT.jar 
hdfs://cluster-flink-poc-m:8020/user/flink/rocksdb/


After running the job I see the job started successfully but created a 
mini local cluster and ran in local mode. I don't see any jobs 
submitted to JobManger and I also see 0 task managers on UI.


Can someone please help me understand here?, do let me know what input 
is required to investigate the same.








Unable to run flink job in dataproc cluster with jobmanager provided

2020-06-17 Thread Sourabh Mehta
Hi Team,

I'm  exploring flink for one of my use case, I'm facing some issues while
running a flink job in cluster mode. Below are the steps I followed to
setup and run job in cluster mode :
1. Setup flink on google cloud dataproc using
https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink

2. After setting up the cluster I could see the flink session started and
could see the UI for the same.

3 Submitted job from dataproc master node using below command

sudo HADOOP_CONF_DIR=/etc/hadoop/conf /usr/lib/flink/bin/flink run -m
yarn-cluster -yid application_1592311654771_0001 -class
com.sm.flink.FlinkDriver /usr/lib/flink/lib/flink-1.0.10-sm-SNAPSHOT.jar
hdfs://cluster-flink-poc-m:8020/user/flink/rocksdb/

After running the job I see the job started successfully but created a mini
local cluster and ran in local mode. I don't see any jobs submitted to
JobManger and I also see 0 task managers on UI.

Can someone please help me understand here?, do let me know what input is
required to investigate the same.


Re: Flink ML

2020-06-17 Thread Piotr Nowojski
Hi,

It looks like FLIP-39 is only partially implemented as for now [1], so I’m not 
sure which features are already done. I’m including Shaoxuan Wang in this 
thread, maybe he will be able to better answer your question.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-12470 


> On 16 Jun 2020, at 14:55, Dimitris Vogiatzidakis 
>  wrote:
> 
> Hello,
> I'm a cs student currently working on my Bachelor's thesis. I've used Flink 
> to extract features out of some datasets, and I would like to use them 
> together with another dataset of (1,0) (Node exists or doesn't) to perform a 
> logistic regresssion. I have found that FLIP-39 has been accepted and it is 
> running in version 1.10.0 that I also currently use, but I'm having trouble 
> implementing it. Are there any java examples currently up and running? Or if 
> you can propose a different way to perform the task? 
> Thank you.
> 
> -Dimitris Vogiatzidakis 



Re: Does Flink support reading files or CSV files from java.io.InputStream instead of file paths?

2020-06-17 Thread Marco Villalobos
While I still think it would be great for Flink to accept an InputStream, and 
allow the programmer to decide if it is a remote TCP call or local file, for 
the sake of my demo, I simply 
found the file path within Gradle and supplied to the Gradle application run 
plugin like this:

run {
args = ["--input-file", file('timeseries.csv')]
}

and that launched my application with minimal configuration.

> On Jun 17, 2020, at 7:11 AM, Aljoscha Krettek  wrote:
> 
> Hi,
> 
> for simple demos you can also use env.fromElements() or env.fromCollection() 
> to create a source from some data that you have already available.
> 
> Does that help?
> 
> Best,
> Aljoscha
> 
> On 16.06.20 15:35, Marco Villalobos wrote:
>> Okay, it is not supported.
>> I understand such a feature is not needed in production systems, but it 
>> could make testing and demos more portable. I was writing a demo, and I 
>> wanted it to run without command-line arguments, which would have been very 
>> handy. I want the user to simply checkout the code and run it without having 
>> to supply a command line parameter declaring where the input file resides.
>> Thank you.
>>> On Jun 16, 2020, at 4:57 AM, Aljoscha Krettek  wrote:
>>> 
>>> Hi Marco,
>>> 
>>> this is not possible since Flink is designed mostly to read files from a 
>>> distributed filesystem, where paths are used to refer to those files. If 
>>> you read from files on the classpath you could just use plain old Java code 
>>> and won't need a distributed processing system such as Flink.
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> On 16.06.20 06:46, Marco Villalobos wrote:
 Does Flink support reading files or CSV files from java.io.InputStream 
 instead of file paths?
 I'd rather just store my file on the class path and load it with 
 java.lang.ClassLoader#getResourceAsStream(String).
 If there is a way, I'd appreciate an example.
>>> 
> 



Re: Running Kubernetes on Flink with Savepoint

2020-06-17 Thread Matt Magsombol
Yeah, our set up is a bit out dated ( since flink 1.7-ish ) but we're 
effectively just using helm templates...when upgrading to 1.10, I just ended up 
looking at diffs and change logs for changes...
Anyways, thanks, I was hoping that flink has a community supported way of doing 
this, but I think I know what to do internally

On 2020/06/15 15:11:32, Robert Metzger  wrote: 
> Hi Matt,
> 
> sorry for the late reply. Why are you using the "flink-docker" helm example
> instead of
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
>  or
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>  ?
> I don't think that the helm charts you are mentioning are actively
> maintained or recommended for production use.
> 
> If you want to create a savepoint in Flink, you'll need to trigger it via
> the JobManager's REST API (independent of how you deploy it). I guess
> you'll have to come up with some tooling that orchestrates triggering a
> savepoint before shutting down / upgrading the job.
> See also:
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#jobs-jobid-savepoints
> 
> Best,
> Robert
> 
> 
> 
> On Wed, Jun 10, 2020 at 2:48 PM Matt Magsombol  wrote:
> 
> > We're currently using this template:
> > https://github.com/docker-flink/examples/tree/master/helm/flink for
> > running kubernetes flink for running a job specific cluster ( with a nit of
> > specifying the class as the main runner for the cluster ).
> >
> >
> > How would I go about setting up adding savepoints, so that we can edit our
> > currently existing running jobs to add pipes to the flink job without
> > having to restart our state? Reasoning is that our state has a 1 day TTL
> > and updating our code without state will have to restart this from scratch.
> >
> > Through documentation, I see that I'd need to run some sort of command.
> > This is not possible to be consistent if we're using the helm charts
> > specified in the link.
> >
> > I see this email thread talking about a certain problem with savepoints +
> > kubernetes but doesn't quite specify how to set this up with helm:
> > https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E
> >
> >
> > According to hasun@zendesk from that thread, they mention that "We always
> > make a savepoint before we shutdown the job-cluster. So the savepoint is
> > always the latest. When we fix a bug or change the job graph, it can resume
> > well."
> >
> > This is the exact use case that I'm looking to appease. Other than
> > specifying configs, are there any other additional parameters that I'd need
> > to add within helm to specify that it needs to take in the latest savepoint
> > upon starting?
> >
> 


Re: Shared state between two process functions

2020-06-17 Thread Congxian Qiu
Hi

Maybe you can take a look at broadcast state[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/broadcast_state.html

Best,
Congxian


Robert Metzger  于2020年6月16日周二 上午2:18写道:

> Thanks for sharing some details on the use case: Are you able to move the
> common computation into one operator that runs before the ProcessFunctions,
> and you are sending the results there?
> You can build quite advanced dataflow graphs with Flink to model your
> problem.
>
> On Mon, Jun 15, 2020 at 9:01 AM Jaswin Shah 
> wrote:
>
>> Basically,  I have multiple processafunctions and they are doing
>> some.computations based on some historical results and the historical
>> events and results are common across the process functions due to which I
>> have a lot of redundant processing in many process functions   so, I have
>> been thinking of some shared state accessible between multiple
>> keyedprocessfunctions.
>>
>> Get Outlook for Android 
>>
>> --
>> *From:* Yun Gao 
>> *Sent:* Monday, June 15, 2020 8:27:38 AM
>> *To:* Jaswin Shah ; user@flink.apache.org <
>> user@flink.apache.org>
>> *Subject:* Re: Shared state between two process functions
>>
>> Hi Jaswin,
>>
>>Currently the state belongs to single operators, thus it should be not
>> possible to share states between different operators. Could you also share
>> the original problem want to solve by sharing states ?
>>
>> Best,
>>  Yun
>>
>>
>> --Original Mail --
>> *Sender:*Jaswin Shah 
>> *Send Date:*Sun Jun 14 18:57:54 2020
>> *Recipients:*user@flink.apache.org 
>> *Subject:*Shared state between two process functions
>>
>> Hi,
>>
>> Is it possible to create the shared state(MapState) between two different
>> keyedProcessFunction? If it's possible, how can we do that in flink?
>>
>> Thanks,
>> Jaswin
>>
>>


Re: Is State TTL possible with event-time characteristics ?

2020-06-17 Thread Andrey Zagrebin
Hi Arti,

Any program can use State with TTL but the state can only expire in
processing time at the moment even if you configure event-time
characteristics.
As Congxian mentioned, the event time for TTL is planned.

The cleanup says that it will not be removed 'by default'. The following
sections [1] describe background cleanup which is not activated 'by
default' in 1.9 but in 1.10.
If you activate the background cleanup, you do not have to read the expired
state to clean it up as if you have those timers you mentioned.
See also the docs for details about background cleanup caveats.

The timers approach is a valid way but heavy-weight in terms of storage
because Flink will have to create a separate state for timers:
key/timestamp.
The timers approach is not implemented in Flink out-of-the-box at the
moment. It can be implemented in the application as a simple background
cleanup.

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background

On Wed, Jun 17, 2020 at 3:07 PM Congxian Qiu  wrote:

> Hi
>  Currently, Flink does not support event-time TTL state, there is an
> issue[1] tracking this.
> [1] https://issues.apache.org/jira/browse/FLINK-12005
> Best,
> Congxian
>
>
> Arti Pande  于2020年6月17日周三 下午7:37写道:
>
>> With Flink 1.9 is state TTL supported for event-time characteristics? This
>> part
>> 
>> of the documentation says that
>>
>>
>>-
>>
>>Only TTLs in reference to *processing time* are currently supported.
>>
>> Does this mean if a program uses event-time characteristics with stateful
>> operators, it can not use TTL ??
>>
>> Also clean up section
>> 
>>  of
>> the documentation says state values that are never read will never be
>> cleared.
>> [image: Screenshot 2020-06-17 at 5.00.41 PM.png]
>> The question is, when processing a stream with unique elements or
>> keys why would Flink framework expect the same key to be read in order for
>> it to be removed after its expiration time ? Why does it not simply clean
>> up the value for that key based on timers automatically without waiting for
>> read operation from user code?
>>
>> Thanks
>> Arti
>>
>


Re: Improved performance when using incremental checkpoints

2020-06-17 Thread Congxian Qiu
Hi Nick

The result is a bit wired. Did you compare the disk util/performance before
and after enabling checkpoint?

Best,
Congxian


Yun Tang  于2020年6月17日周三 下午8:56写道:

> Hi Nick
>
> I think this thread use the same program as thread "MapState bad
> performance" talked.
> Please provide a simple program which could reproduce this so that we can
> help you more.
>
> Best
> Yun Tang
> --
> *From:* Aljoscha Krettek 
> *Sent:* Tuesday, June 16, 2020 19:53
> *To:* user@flink.apache.org 
> *Subject:* Re: Improved performance when using incremental checkpoints
>
> Hi,
>
> it might be that the operations that Flink performs on RocksDB during
> checkpointing will "poke" RocksDB somehow and make it clean up it's
> internal hierarchies of storage more. Other than that, I'm also a bit
> surprised by this.
>
> Maybe Yun Tang will come up with another idea.
>
> Best,
> Aljoscha
>
> On 16.06.20 12:42, nick toker wrote:
> > Hi,
> >
> > We used both flink versions 1.9.1 and 1.10.1
> > We used rocksDB default configuration.
> > The streaming pipeline is very simple.
> >
> > 1. Kafka consumer
> > 2. Process function
> > 3. Kafka producer
> >
> > The code of the process function is listed below:
> >
> > private transient MapState testMapState;
> >
> > @Override
> >  public void processElement(Map value, Context ctx,
> > Collector> out) throws Exception {
> >
> >  if (testMapState.isEmpty()) {
> >
> >  testMapState.putAll(value);
> >
> >  out.collect(value);
> >
> >  testMapState.clear();
> >  }
> >  }
> >
> > We used the same code with ValueState and observed the same results.
> >
> >
> > BR,
> >
> > Nick
> >
> >
> > ‫בתאריך יום ג׳, 16 ביוני 2020 ב-11:56 מאת ‪Yun Tang‬‏ <‪myas...@live.com
> > ‬‏>:‬
> >
> >> Hi Nick
> >>
> >> It's really strange that performance could improve when checkpoint is
> >> enabled.
> >> In general, enable checkpoint might bring a bit performance downside to
> >> the whole job.
> >>
> >> Could you give more details e.g. Flink version, configurations of
> RocksDB
> >> and simple code which could reproduce this problem.
> >>
> >> Best
> >> Yun Tang
> >> --
> >> *From:* nick toker 
> >> *Sent:* Tuesday, June 16, 2020 15:44
> >> *To:* user@flink.apache.org 
> >> *Subject:* Improved performance when using incremental checkpoints
> >>
> >> Hello,
> >>
> >> We are using RocksDB as the backend state.
> >> At first we didn't enable the checkpoints mechanism.
> >>
> >> We observed the following behaviour and we are wondering why ?
> >>
> >> When using the rocksDB *without* checkpoint the performance was very
> >> extremely bad.
> >> And when we enabled the checkpoint the performance was improved by a*
> >> factor of 10*.
> >>
> >> Could you please explain if this behaviour is expected ?
> >> Could you please explain why enabling the checkpoint significantly
> >> improves the performance ?
> >>
> >> BR,
> >> Nick
> >>
> >
>
>


Re: Improved performance when using incremental checkpoints

2020-06-17 Thread Yun Tang
Hi Nick

I think this thread use the same program as thread "MapState bad performance" 
talked.
Please provide a simple program which could reproduce this so that we can help 
you more.

Best
Yun Tang

From: Aljoscha Krettek 
Sent: Tuesday, June 16, 2020 19:53
To: user@flink.apache.org 
Subject: Re: Improved performance when using incremental checkpoints

Hi,

it might be that the operations that Flink performs on RocksDB during
checkpointing will "poke" RocksDB somehow and make it clean up it's
internal hierarchies of storage more. Other than that, I'm also a bit
surprised by this.

Maybe Yun Tang will come up with another idea.

Best,
Aljoscha

On 16.06.20 12:42, nick toker wrote:
> Hi,
>
> We used both flink versions 1.9.1 and 1.10.1
> We used rocksDB default configuration.
> The streaming pipeline is very simple.
>
> 1. Kafka consumer
> 2. Process function
> 3. Kafka producer
>
> The code of the process function is listed below:
>
> private transient MapState testMapState;
>
> @Override
>  public void processElement(Map value, Context ctx,
> Collector> out) throws Exception {
>
>  if (testMapState.isEmpty()) {
>
>  testMapState.putAll(value);
>
>  out.collect(value);
>
>  testMapState.clear();
>  }
>  }
>
> We used the same code with ValueState and observed the same results.
>
>
> BR,
>
> Nick
>
>
> ‫בתאריך יום ג׳, 16 ביוני 2020 ב-11:56 מאת ‪Yun Tang‬‏ <‪myas...@live.com
> ‬‏>:‬
>
>> Hi Nick
>>
>> It's really strange that performance could improve when checkpoint is
>> enabled.
>> In general, enable checkpoint might bring a bit performance downside to
>> the whole job.
>>
>> Could you give more details e.g. Flink version, configurations of RocksDB
>> and simple code which could reproduce this problem.
>>
>> Best
>> Yun Tang
>> --
>> *From:* nick toker 
>> *Sent:* Tuesday, June 16, 2020 15:44
>> *To:* user@flink.apache.org 
>> *Subject:* Improved performance when using incremental checkpoints
>>
>> Hello,
>>
>> We are using RocksDB as the backend state.
>> At first we didn't enable the checkpoints mechanism.
>>
>> We observed the following behaviour and we are wondering why ?
>>
>> When using the rocksDB *without* checkpoint the performance was very
>> extremely bad.
>> And when we enabled the checkpoint the performance was improved by a*
>> factor of 10*.
>>
>> Could you please explain if this behaviour is expected ?
>> Could you please explain why enabling the checkpoint significantly
>> improves the performance ?
>>
>> BR,
>> Nick
>>
>



Re: Is State TTL possible with event-time characteristics ?

2020-06-17 Thread Congxian Qiu
Hi
 Currently, Flink does not support event-time TTL state, there is an
issue[1] tracking this.
[1] https://issues.apache.org/jira/browse/FLINK-12005
Best,
Congxian


Arti Pande  于2020年6月17日周三 下午7:37写道:

> With Flink 1.9 is state TTL supported for event-time characteristics? This
> part
> 
> of the documentation says that
>
>
>-
>
>Only TTLs in reference to *processing time* are currently supported.
>
> Does this mean if a program uses event-time characteristics with stateful
> operators, it can not use TTL ??
>
> Also clean up section
> 
>  of
> the documentation says state values that are never read will never be
> cleared.
> [image: Screenshot 2020-06-17 at 5.00.41 PM.png]
> The question is, when processing a stream with unique elements or
> keys why would Flink framework expect the same key to be read in order for
> it to be removed after its expiration time ? Why does it not simply clean
> up the value for that key based on timers automatically without waiting for
> read operation from user code?
>
> Thanks
> Arti
>


Is State TTL possible with event-time characteristics ?

2020-06-17 Thread Arti Pande
With Flink 1.9 is state TTL supported for event-time characteristics? This
part

of the documentation says that


   -

   Only TTLs in reference to *processing time* are currently supported.

Does this mean if a program uses event-time characteristics with stateful
operators, it can not use TTL ??

Also clean up section

of
the documentation says state values that are never read will never be
cleared.
[image: Screenshot 2020-06-17 at 5.00.41 PM.png]
The question is, when processing a stream with unique elements or
keys why would Flink framework expect the same key to be read in order for
it to be removed after its expiration time ? Why does it not simply clean
up the value for that key based on timers automatically without waiting for
read operation from user code?

Thanks
Arti


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-17 Thread Fabian Hueske
Congrats Yu!

Cheers, Fabian

Am Mi., 17. Juni 2020 um 10:20 Uhr schrieb Till Rohrmann <
trohrm...@apache.org>:

> Congratulations Yu!
>
> Cheers,
> Till
>
> On Wed, Jun 17, 2020 at 7:53 AM Jingsong Li 
> wrote:
>
> > Congratulations Yu, well deserved!
> >
> > Best,
> > Jingsong
> >
> > On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei  wrote:
> >
> >> Congrats, Yu!
> >>
> >> GXGX & well deserved!!
> >>
> >> Best Regards,
> >>
> >> Yuan
> >>
> >> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> >>> part of the Apache Flink Project Management Committee (PMC).
> >>>
> >>> Yu Li has been very active on Flink's Statebackend component, working
> on
> >>> various improvements, for example the RocksDB memory management for
> 1.10.
> >>> and keeps checking and voting for our releases, and also has
> successfully
> >>> produced two releases(1.10.0&1.10.1) as RM.
> >>>
> >>> Congratulations & Welcome Yu Li!
> >>>
> >>> Best,
> >>> Jincheng (on behalf of the Flink PMC)
> >>>
> >>
> >
> > --
> > Best, Jingsong Lee
> >
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-17 Thread Till Rohrmann
Congratulations Yu!

Cheers,
Till

On Wed, Jun 17, 2020 at 7:53 AM Jingsong Li  wrote:

> Congratulations Yu, well deserved!
>
> Best,
> Jingsong
>
> On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei  wrote:
>
>> Congrats, Yu!
>>
>> GXGX & well deserved!!
>>
>> Best Regards,
>>
>> Yuan
>>
>> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
>> wrote:
>>
>>> Hi all,
>>>
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>>
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>>
>>> Congratulations & Welcome Yu Li!
>>>
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-17 Thread Till Rohrmann
Congratulations Yu!

Cheers,
Till

On Wed, Jun 17, 2020 at 7:53 AM Jingsong Li  wrote:

> Congratulations Yu, well deserved!
>
> Best,
> Jingsong
>
> On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei  wrote:
>
>> Congrats, Yu!
>>
>> GXGX & well deserved!!
>>
>> Best Regards,
>>
>> Yuan
>>
>> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
>> wrote:
>>
>>> Hi all,
>>>
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>>
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>>
>>> Congratulations & Welcome Yu Li!
>>>
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Kinesis ProvisionedThroughputExceededException

2020-06-17 Thread Roman Grebennikov
Hi,

It will occur if your job will reach SHARD_GETRECORDS_RETRIES consecutive 
failed attempts to pull the data from kinesis.
So if you scale up the topic in kinesis and tune a bit backoff parameters, you 
will lower the probability of this exception almost to zero (but with increased 
costs and worst-case latency).

But yes, this is a main drawback of managed solutions - as far as you reach a 
significant load, you need to pay more. Other managed option within AWS is to 
switch to MSK, managed Kafka, which has no such significant restrictions.

And the final option is to wait until FLINK-17688 
 will be implemented (using 
Kinesis enhanced fan-out, so Kinesis will push the data to consumer, instead of 
consumer periodically pulling the data).

Roman Grebennikov | g...@dfdx.me


On Wed, Jun 17, 2020, at 04:39, M Singh wrote:
> 
> 
> Thanks Roman for your response and advice.
> 
> From my understanding increasing shards will increase throughput but still if 
> more than 5 requests are made per shard/per second, and since we have 20 apps 
> (and increasing) then the exception might occur. 
> 
> Please let me know if I have missed anything.
> 
> Mans
> On Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov  
> wrote:
> 
> 
> Hi, 
> 
> usually this exception is thrown by aws-java-sdk and means that your kinesis 
> stream is hitting a throughput limit (what a surprise). We experienced the 
> same thing when we had a single "event-bus" style stream and multiple flink 
> apps reading from it.
> 
> Each Kinesis partition has a limit of 5 poll operations per second. If you 
> have a stream with 4 partitions and 30 jobs reading from it, I guess that 
> each job is constantly hitting op limit for kinesis with default kinesis 
> consumer settings and it does an exponential back-off (by just sleeping for a 
> small period of time and then retrying).
> 
> You have two options here:
> 1. scale up the kinesis stream, so there will be more partitions and higher 
> overall throughput limits
> 2. tune kinesis consumer backoff parameters:
> 
> Our current ones, for example, look like this:
> 
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
> // we poll every 2s
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
> in case of throughput error, initial timeout is 2s
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // 
> we can go up to 10s pause
>  
> conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
>  "1.5") // multiplying pause to 1.5 on each next step
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
> make up to 100 retries
> 
> with best regards,
> Roman Grebennikov | g...@dfdx.me
> 
> 
> On Mon, Jun 15, 2020, at 13:45, M Singh wrote:
>> Hi:
>> 
>> I am using multiple (almost 30 and growing) Flink streaming applications 
>> that read from the same kinesis stream and get 
>> ProvisionedThroughputExceededException exception which fails the job.
>> I have seen a reference 
>> 
>> http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
>>  - which indicates there might be some solution perhaps in Flink 1.8/1.9. 
>> 
>> I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due 
>> to ProvisionedThroughputExceededException - ASF JIRA 
>>  is still open.
>> 
>> 
>> So i wanted to find out 
>> 
>> 1. If this issue has been resolved and if so in which version ?
>> 2. Is there any kinesis consumer with kinesis fanout available that can help 
>> address this issue ?
>> 3. Is there any specific parameter in kinesis consumer config that can 
>> address this issue ?
>> 
>> If there is any other pointer/documentation/reference, please let me know.
>> 
>> Thanks
>> 
> 


?????? ??????FLINKSQL1.10????????????UV

2020-06-17 Thread x





----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 [2]

 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

 x <35907...@qq.comgt; ??2020??6??17?? 11:14??

 gt; 
??0??UV??UV??
 gt; CREATE VIEW uv_per_10min AS
 gt; SELECTamp;nbsp;
 gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;, '-MM-dd
 HH:mm:00'))amp;nbsp;OVER w
 gt; AS time_str,amp;nbsp;
 gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 gt; FROM user_behavior
 gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING 
AND
 gt; CURRENT ROW);
 gt;
 gt;
 gt; ??
 gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 gt; PS??1.10??DDL??CREATE VIEW??
 gt; 

env.readFile 递归读取hdfs,临时文件不存在问题

2020-06-17 Thread 阿华田
使用flink读取递归读取hdfs文件,报.tmp结尾的文件不存在异常,正常这些tmp文件flink应该不用读取吧?
File does not exist: 
/recommender/success_fid_flow/ds=2020-06-16/hour=14/2020-06-16_14.success_fid_jarvis-01.1592287200578.tmp


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



env.readFile ???????????? ????????????????????????

2020-06-17 Thread star



env.readFile(format,path, FileProcessingMode.PROCESS_CONTINUOUSLY, 6)


source
formatA , A?? :


A/20200101/
A/20200102/
A/20200103/
...
...


??6200500ck??200*500??offset??ck??


??7??