回复:toAppendStream 类型不匹配问题

2020-05-03 Thread Sun.Zhu
好的,我试试,感谢




| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

在2020年05月04日 11:22,Jark Wu 写道:
看起来是一个已经修复的 bug (FLINK-16108)。
你可以用正在 RC 的 release-1.10.1 再试下吗?
https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc2/

Best,
Jark

On Mon, 4 May 2020 at 01:01, 祝尚 <17626017...@163.com> wrote:

> 参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
> flink版本1.10
> 代码如下:
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> //以后版本会将old planner移除
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> settings);
> tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
> "user_id BIGINT,\n" +
> "item_id BIGINT,\n" +
> "category_id BIGINT,\n" +
> "behavior STRING,\n" +
> "ts TIMESTAMP(3),\n" +
> "proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
> "WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> 在ts上定义watermark,ts成为事件时间列\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
> "'connector.version' = 'universal',  -- kafka 版本,universal
> 支持 0.11 以上的版本\n" +
> "'connector.topic' = 'user_behavior',  -- kafka topic\n" +
> "'connector.startup-mode' = 'earliest-offset',  -- 从起始
> offset 开始读取\n" +
> "'connector.properties.zookeeper.connect' =
> 'localhost:2181',  -- zookeeper 地址\n" +
> "'connector.properties.bootstrap.servers' =
> 'localhost:9092',  -- kafka broker 地址\n" +
> "'format.type' = 'json'  -- 数据源格式为 json\n" +
> ")");
> Table table1 = tableEnv.sqlQuery("select
> user_id,item_id,category_id,behavior,ts," +
> "proctime from user_behavior where behavior='buy'");
> tableEnv.toAppendStream(table1, Behavior.class).print();
> env.execute();
>
> }
>
> public class Behavior {
> public Long user_id;
> public Long item_id;
> public Long category_id;
> public String behavior;
> public Timestamp ts;
> public Timestamp proctime;
>
>
> @Override
> public String toString() {
> return "Behavior{" +
> "user_id=" + user_id +
> ", item_id=" + item_id +
> ", category_id=" + category_id +
> ", behavior='" + behavior + '\'' +
> ", ts=" + ts +
> ", proctime=" + proctime +
> '}';
> }
> }
> 报错如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink  do not match.
> Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT,
> behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT
> NULL *PROCTIME*]
> Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT,
> proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)
>
> pojo的类型定义是和source table字段类型是一致的,
> 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?


Re: toAppendStream 类型不匹配问题

2020-05-03 Thread Jark Wu
看起来是一个已经修复的 bug (FLINK-16108)。
你可以用正在 RC 的 release-1.10.1 再试下吗?
https://dist.apache.org/repos/dist/dev/flink/flink-1.10.1-rc2/

Best,
Jark

On Mon, 4 May 2020 at 01:01, 祝尚 <17626017...@163.com> wrote:

> 参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
> flink版本1.10
> 代码如下:
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> //以后版本会将old planner移除
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> settings);
> tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
> "user_id BIGINT,\n" +
> "item_id BIGINT,\n" +
> "category_id BIGINT,\n" +
> "behavior STRING,\n" +
> "ts TIMESTAMP(3),\n" +
> "proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
> "WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> 在ts上定义watermark,ts成为事件时间列\n" +
> ") WITH (\n" +
> "'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
> "'connector.version' = 'universal',  -- kafka 版本,universal
> 支持 0.11 以上的版本\n" +
> "'connector.topic' = 'user_behavior',  -- kafka topic\n" +
> "'connector.startup-mode' = 'earliest-offset',  -- 从起始
> offset 开始读取\n" +
> "'connector.properties.zookeeper.connect' =
> 'localhost:2181',  -- zookeeper 地址\n" +
> "'connector.properties.bootstrap.servers' =
> 'localhost:9092',  -- kafka broker 地址\n" +
> "'format.type' = 'json'  -- 数据源格式为 json\n" +
> ")");
> Table table1 = tableEnv.sqlQuery("select
> user_id,item_id,category_id,behavior,ts," +
> "proctime from user_behavior where behavior='buy'");
> tableEnv.toAppendStream(table1, Behavior.class).print();
> env.execute();
>
> }
>
> public class Behavior {
> public Long user_id;
> public Long item_id;
> public Long category_id;
> public String behavior;
> public Timestamp ts;
> public Timestamp proctime;
>
>
> @Override
> public String toString() {
> return "Behavior{" +
> "user_id=" + user_id +
> ", item_id=" + item_id +
> ", category_id=" + category_id +
> ", behavior='" + behavior + '\'' +
> ", ts=" + ts +
> ", proctime=" + proctime +
> '}';
> }
> }
> 报错如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink  do not match.
> Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT,
> behavior: STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT
> NULL *PROCTIME*]
> Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT,
> proctime: TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)
>
> pojo的类型定义是和source table字段类型是一致的,
> 为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?


Re: Flink Task Manager GC overhead limit exceeded

2020-05-03 Thread Xintong Song
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

Thank you~

Xintong Song



On Fri, May 1, 2020 at 8:35 AM shao.hongxiao <17611022...@163.com> wrote:

> 你好,宋
> Please refer to this document [1] for more details
> 能发一下具体链接吗,我也发现flink ui上显示的内存参数不太对,我想仔细看一下相关说明
>
> 谢谢啦
>
>
>
>
> | |
> 邵红晓
> |
> |
> 邮箱:17611022...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> On 04/30/2020 12:08, Xintong Song wrote:
> Then I would suggest the following.
> - Check the task manager log to see if the '-D' properties are properly
> loaded. They should be located at the beginning of the log file.
> - You can also try to log into the pod and check the JVM launch command
> with "ps -ef | grep TaskManagerRunner". I suspect there might be some
> argument passing problem regarding the spaces and double quotation marks.
>
>
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Thu, Apr 30, 2020 at 11:39 AM Eleanore Jin 
> wrote:
>
> Hi Xintong,
>
>
> Thanks for the detailed explanation!
>
>
> as for the 2nd question: I mount  it to am emptyDir, I assume pod restart
> will not cause the pod to be rescheduled to another node, so it should
> stay?  I verified by directly adding this to the flink-conf.yaml, which I
> see the heap dump is taken and stays in the directory:  env.java.opts:
> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/dumps
>
>
> In addition, I also don't see the log print out something like: Heap dump
> file created [5220997112 bytes in 73.464 secs], which I see when directly
> adding the options in the flink-conf.yaml
>
>
> containers:
>
> - volumeMounts:
>
> - mountPath: /dumps
>
>   name: heap-dumps
>
> volumes:
>
>   - emptyDir: {}
>
> name: heap-dumps
>
>
>
>
> Thanks a lot!
>
> Eleanore
>
>
>
> On Wed, Apr 29, 2020 at 7:55 PM Xintong Song 
> wrote:
>
> Hi Eleanore,
>
>
> I'd like to explain about 1 & 2. For 3, I have no idea either.
>
>
>
> 1. I dont see the heap size from UI for task manager show correctly
>
>
>
> Despite the 'heap' in the key, 'taskmanager.heap.size' accounts for the
> total memory of a Flink task manager, rather than only the heap memory. A
> Flink task manager process consumes not only java heap memory, but also
> direct memory (e.g., network buffers) and native memory (e.g., JVM
> overhead). That's why the JVM heap size shown on the UI is much smaller
> than the configured 'taskmanager.heap.size'. Please refer to this document
> [1] for more details. This document comes from Flink 1.9 and has not been
> back-ported to 1.8, but the contents should apply to 1.8 as well.
>
>
> 2. I dont see the heap dump file in the restarted pod /dumps/oom.bin, did
> I set the java opts wrong?
>
>
>
> The java options look good to me. It the configured path '/dumps/oom.bin'
> a local path of the pod or a path of the host mounted onto the pod? The
> restarted pod is a completely new different pod. Everything you write to
> the old pod goes away as the pod terminated, unless they are written to the
> host through mounted storage.
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>
>
> On Thu, Apr 30, 2020 at 7:41 AM Eleanore Jin 
> wrote:
>
> Hi All,
>
>
>
> Currently I am running a flink job cluster (v1.8.2) on kubernetes with 4
> pods, each pod with 4 parallelism.
>
>
> The flink job reads from a source topic with 96 partitions, and does per
> element filter, the filtered value comes from a broadcast topic and it
> always use the latest message as the filter criteria, then publish to a
> sink topic.
>
>
> There is no checkpointing and state involved.
>
>
> Then I am seeing GC overhead limit exceeded error continuously and the
> pods keep on restarting
>
>
> So I tried to increase the heap size for task manager by
>
> containers:
>
>   - args:
>
> - task-manager
>
> - -Djobmanager.rpc.address=service-job-manager
>
> - -Dtaskmanager.heap.size=4096m
>
> - -Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/dumps/oom.bin"
>
>
>
>
> 3 things I noticed,
>
>
>
>
> 1. I dont see the heap size from UI for task manager show correctly
>
>
>
>
>
> 2. I dont see the heap dump file in the restarted pod /dumps/oom.bin, did
> I set the java opts wrong?
>
>
> 3. I continously seeing below logs from all pods, not sure if causes any
> issue
> {"@timestamp":"2020-04-29T23:39:43.387Z","@version":"1","message":"[Consumer
> clientId=consumer-1, groupId=aba774bc] Node 6 was unable to process the
> fetch request with (sessionId=2054451921, epoch=474):
> FETCH_SESSION_ID_NOT_FOUND.","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"pool-6-thread-1","level":"INFO","level_value":2}
>
>
> Thanks a lot for any help!
>
>
> Best,
> Eleanore


Re: Publishing Sink Task watermarks outside flink

2020-05-03 Thread Shubham Kumar
Following up on this,

I tried tweaking the Jdbc Sink as Timo suggested and was successful in it.
Basically I added a member *long maxWatermarkSeen *in JDBCOutputFormat,
so whenever a new record is added to the batch it updates the
*maxWatermarkSeen* for this subtask with
*org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark*
(if its greater).
 So whenever a *JDBCOutputFormat.flush()* is called I can be sure that
after executing batch, all records having timestamp below *maxWatermarkSeen*
are pushed to JDBC.

Now, the actual answer I am looking for is minimum of *maxWatermarkSeen*
for all subtasks. I can constantly update this to DB as <*Subtask Index,
Watermark*> and take minimum in DB.
 I guess the aggregation can't be done inside flink amongst subtasks?

Now, I have two questions:

1) Should I update this to DB using async I/O feature of flink or just
perform a blocking query in *JDBCOutputFormat.flush()* function after
executing the batch.
2) If I will be using Kafka sink (or any other sink for that matter), do I
have to again tweak around with its SinkFunction for this functionality?
   General idea being that this a common functionality for users to know
till what timestamp is sink complete and can have simpler solutions.

Thanks
Shubham

On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar 
wrote:

> Hi Timo,
>
> Yeah, I got the idea of getting access to timers through process function
> and had the same result which you explained
> that is a side output doesn't guarantee that the data is written out to
> sink. (so maybe Fabian in that post pointed out something
> else which I am missing). If I am correct then, data is written to side
> output as soon as it is processed in the Process function (maybe in
> process function itself on Ontimer call if a timer has been set, right?
>
> I am doing all computation in Datastream and then adding a mapper
> to convert to DataStream to sink through JdbcAppendTableSink
> which is part of Table API I think. I will definitely try exploring the
> Jdbc Sink function and context to get the watermark.
>
> Thinking out of the box, is it possible to add some extra operator after
> sink which will always have watermark which is greater than sink function
> watermarks,
> as its a downstream operator.
> Also, does the problem simplify if we have Kafka sink?
>
> On Tue, Apr 28, 2020 at 10:35 PM Timo Walther  wrote:
>
>> Hi Shubham,
>>
>> you can call stream.process(...). The context of ProcessFunction gives
>> you access to TimerService which let's you access the current watermark.
>>
>> I'm assuming your are using the Table API? As far as I remember,
>> watermark are travelling through the stream even if there is no
>> time-based operation happening. But you should double check that.
>>
>> However, a side output does not guarantee that the data has already been
>> written out to the sink. So I would recommend to customize the JDBC sink
>> instead and look into the row column for getting the current timestamp.
>>
>> Or even better, there should also be
>> org.apache.flink.streaming.api.functions.sink.SinkFunction.Context with
>> access to watermark.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>> On 28.04.20 13:07, Shubham Kumar wrote:
>> > Hi everyone,
>> >
>> > I have a flink application having kafka sources which calculates some
>> > stats based on it and pushes it to JDBC. Now, I want to know till what
>> > timestamp is the data completely pushed in JDBC (i.e. no more data will
>> > be pushed to timestamp smaller or equal than this). There doesn't seem
>> > to be any direct programmatic way to do so.
>> >
>> > I came across the following thread which seemed most relevant to my
>> > problem:
>> >
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461
>> >
>> > However, I can't seem to understand how to chain a process function
>> > before the sink task so as to put watermarks to a side output. (I
>> > suspect it might have something to do with datastream.addSink in
>> regular
>> > datastream sinks vs sink.consumeDataStream(stream) in
>> JDBCAppendTableSink).
>> >
>> > Further what happens if there are no windows, how to approach the
>> > problem then?
>> >
>> > Please share any pointers or relevant solution to tackle this.
>> >
>> > --
>> > Thanks & Regards
>> >
>> > Shubham Kumar
>> >
>>
>>
>
> --
> Thanks & Regards
>
> Shubham Kumar
>
>

-- 
Thanks & Regards

Shubham Kumar


Re: multiple joins in one job

2020-05-03 Thread lec ssmi
Thanks for your replay.
But as I known, if   the time attribute  will be retained and  the time
attribute field  of both streams is selected in the result after joining,
who is the final time attribute variable?

Benchao Li  于2020年4月30日周四 下午8:25写道:

> Hi lec,
>
> AFAIK, time attribute will be preserved after time interval join.
> Could you share your DDL and SQL queries with us?
>
> lec ssmi  于2020年4月30日周四 下午5:48写道:
>
>> Hi:
>>I need to join multiple stream tables  using  time interval join.  The
>> problem is that the time attribute will disappear  after the jon , and
>> pure  sql cannot declare the time attribute field again . So, to make is
>> success,  I need to insert  the last result of join to kafka ,and consume
>> it and join it with another stream table  in another flink job . This seems
>> troublesome.
>> Any good idea?
>>
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-03 Thread Chesnay Schepler

yes, exactly; I want to rule out that (somehow) HDFS is the problem.

I couldn't reproduce the issue locally myself so far.

On 01/05/2020 22:31, Hailu, Andreas wrote:


Hi Chesnay, yes – they were created using Flink 1.9.1 as we’ve only 
just started to archive them in the past couple weeks. Could you 
clarify on how you want to try local filesystem archives? As in 
changing jobmanager.archive.fs.dir and historyserver.web.tmpdir to the 
same local directory?


*// *ah**

*From:*Chesnay Schepler 
*Sent:* Wednesday, April 29, 2020 8:26 AM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

hmm...let's see if I can reproduce the issue locally.

Are the archives from the same version the history server runs on? 
(Which I supposed would be 1.9.1?)


Just for the sake of narrowing things down, it would also be 
interesting to check if it works with the archives residing in the 
local filesystem.


On 27/04/2020 18:35, Hailu, Andreas wrote:

bash-4.1$ ls -l /local/scratch/flink_historyserver_tmpdir/

total 8

drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:43
flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9

drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:22
flink-web-history-95b3f928-c60f-4351-9926-766c6ad3ee76

There are just two directories in here. I don’t see cache
directories from my attempts today, which is interesting. Looking
a little deeper into them:

bash-4.1$ ls -lr

/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9

total 1756

drwxrwxr-x 2 p2epdlsuat p2epdlsuat 1789952 Apr 21 10:44 jobs

bash-4.1$ ls -lr

/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9/jobs

total 0

-rw-rw-r-- 1 p2epdlsuat p2epdlsuat 0 Apr 21 10:43 overview.json

There are indeed archives already in HDFS – I’ve included some in
my initial mail, but here they are again just for reference:

-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs

Found 44282 items

-rw-r- 3 delp datalake_admin_dev  50569 2020-03-21 23:17
/user/p2epda/lake/delp_qa/flink_hs/000144dba9dc0f235768a46b2f26e936

-rw-r- 3 delp datalake_admin_dev  49578 2020-03-03 08:45
/user/p2epda/lake/delp_qa/flink_hs/000347625d8128ee3fd0b672018e38a5

-rw-r- 3 delp datalake_admin_dev  50842 2020-03-24 15:19
/user/p2epda/lake/delp_qa/flink_hs/0004be6ce01ba9677d1eb619ad0fa757

...

*// *ah

*From:*Chesnay Schepler 

*Sent:* Monday, April 27, 2020 10:28 AM
*To:* Hailu, Andreas [Engineering] 
; user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

If historyserver.web.tmpdir is not set then java.io.tmpdir is
used, so that should be fine.

What are the contents of /local/scratch/flink_historyserver_tmpdir?

I assume there are already archives in HDFS?

On 27/04/2020 16:02, Hailu, Andreas wrote:

My machine’s /tmp directory is not large enough to support the
archived files, so I changed my java.io.tmpdir to be in some
other location which is significantly larger. I hadn’t set
anything for historyserver.web.tmpdir, so I suspect it was
still pointing at /tmp. I just tried setting
historyserver.web.tmpdir to the same location as my
java.io.tmpdir location, but I’m afraid I’m still seeing the
following issue:

2020-04-27 09:37:42,904 [nioEventLoopGroup-3-4] DEBUG
HistoryServerStaticFileServerHandler - Unable to load
requested file /overview.json from classloader

2020-04-27 09:37:42,906 [nioEventLoopGroup-3-6] DEBUG
HistoryServerStaticFileServerHandler - Unable to load
requested file /jobs/overview.json from classloader

flink-conf.yaml for reference:

jobmanager.archive.fs.dir:
hdfs:///user/p2epda/lake/delp_qa/flink_hs/

historyserver.archive.fs.dir:
hdfs:///user/p2epda/lake/delp_qa/flink_hs/

historyserver.web.tmpdir:
/local/scratch/flink_historyserver_tmpdir/

Did you have anything else in mind when you said pointing
somewhere funny?

*// *ah

*From:*Chesnay Schepler 

*Sent:* Monday, April 27, 2020 5:56 AM
*To:* Hailu, Andreas [Engineering]

; user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not
Found?

overview.json is a generated file that is placed in the local
directory controlled by /historyserver.web.tmpdir/.

Have you configured this option to point to some 

toAppendStream  类型不匹配问题

2020-05-03 Thread 祝尚
参考jark老师博客里的demo,写了个table api/sql的程序,在table转appendStream时报错
flink版本1.10
代码如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
//以后版本会将old planner移除
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
tableEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
"user_id BIGINT,\n" +
"item_id BIGINT,\n" +
"category_id BIGINT,\n" +
"behavior STRING,\n" +
"ts TIMESTAMP(3),\n" +
"proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
"WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 
在ts上定义watermark,ts成为事件时间列\n" +
") WITH (\n" +
"'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
"'connector.version' = 'universal',  -- kafka 版本,universal 支持 
0.11 以上的版本\n" +
"'connector.topic' = 'user_behavior',  -- kafka topic\n" +
"'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 
开始读取\n" +
"'connector.properties.zookeeper.connect' = 'localhost:2181',  
-- zookeeper 地址\n" +
"'connector.properties.bootstrap.servers' = 'localhost:9092',  
-- kafka broker 地址\n" +
"'format.type' = 'json'  -- 数据源格式为 json\n" +
")");
Table table1 = tableEnv.sqlQuery("select 
user_id,item_id,category_id,behavior,ts," +
"proctime from user_behavior where behavior='buy'");
tableEnv.toAppendStream(table1, Behavior.class).print();
env.execute();

}

public class Behavior {
public Long user_id;
public Long item_id;
public Long category_id;
public String behavior;
public Timestamp ts;
public Timestamp proctime;


@Override
public String toString() {
return "Behavior{" +
"user_id=" + user_id +
", item_id=" + item_id +
", category_id=" + category_id +
", behavior='" + behavior + '\'' +
", ts=" + ts +
", proctime=" + proctime +
'}';
}
}
报错如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Field types of query result and registered TableSink  do not match.
Query schema: [user_id: BIGINT, item_id: BIGINT, category_id: BIGINT, behavior: 
STRING, ts: TIMESTAMP(3) *ROWTIME*, proctime: TIMESTAMP(3) NOT NULL *PROCTIME*]
Sink schema: [behavior: STRING, category_id: BIGINT, item_id: BIGINT, proctime: 
TIMESTAMP(3), ts: TIMESTAMP(3), user_id: BIGINT]
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
at sql.KafkaSourceTable.main(KafkaSourceTable.java:35)

pojo的类型定义是和source table字段类型是一致的,
为什么还会校验 NOT NULL *PROCTIME* ,*ROWTIME*?

Re: Flink: For terabytes of keyed state.

2020-05-03 Thread Gowri Sundaram
Hi Congxian,
Thank you so much for your response, that really helps!

>From your experience, how long does it take for Flink to redistribute
terabytes of state data on node addition / node failure.

Thanks!

On Sun, May 3, 2020 at 6:56 PM Congxian Qiu  wrote:

> Hi
>
> 1. From my experience, Flink can support such big state, you can set
> appropriate parallelism for the stateful operator. for RocksDB you may need
> to care about the disk performance.
> 2. Inside Flink, the state is separated by key-group, each
> task/parallelism contains multiple key-groups.  Flink does not need to
> restart when you add a node to the cluster, but every time restart from
> savepoint/checkpoint(or failover), Flink needs to redistribute the
> checkpoint data, this can be omitted if it's failover and local recovery[1]
> is enabled
> 3. for upload/download state, you can ref to the multiple thread
> upload/download[2][3] for speed up them
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#task-local-recovery
> [2] https://issues.apache.org/jira/browse/FLINK-10461
> [3] https://issues.apache.org/jira/browse/FLINK-11008
>
> Best,
> Congxian
>
>
> Gowri Sundaram  于2020年5月1日周五 下午6:29写道:
>
>> Hello all,
>> We have read in multiple
>> 
>> sources  that Flink has been
>> used for use cases with terabytes of application state.
>>
>> We are considering using Flink for a similar use case with* keyed state
>> in the range of 20 to 30 TB*. We had a few questions regarding the same.
>>
>>
>>- *Is Flink a good option for this kind of scale of data* ? We are
>>considering using RocksDB as the state backend.
>>- *What happens when we want to add a node to the cluster *?
>>   - As per our understanding, if we have 10 nodes in our cluster,
>>   with 20TB of state, this means that adding a node would require the 
>> entire
>>   20TB of data to be shipped again from the external checkpoint remote
>>   storage to the taskmanager nodes.
>>   - Assuming 1Gb/s network speed, and assuming all nodes can read
>>   their respective 2TB state parallely, this would mean a *minimum
>>   downtime of half an hour*. And this is assuming the throughput of
>>   the remote storage does not become the bottleneck.
>>   - Is there any way to reduce this estimated downtime ?
>>
>>
>> Thank you!
>>
>


Re: execution.checkpointing.tolerable-failed-checkpoints 无效

2020-05-03 Thread Congxian Qiu
Hi
按理说 1.10 中在 flink-conf 中配置的应该是可以的,具体的 issue 可以参考[1] 请问你用的是啥版本呢?
[1] https://issues.apache.org/jira/browse/FLINK-14788
Best,
Congxian


zhisheng  于2020年4月30日周四 下午6:51写道:

> 这个参数好像可以作业里面单独设置,可以试试看
>
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber();
>
> 蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年4月30日周四 下午3:07写道:
>
> > hi
> > 
> >
> 我在flink-conf.yaml中配置execution.checkpointing.tolerable-failed-checkpoints:
> > 100无效,默认为0,也就是不容忍错误,这样的话一个checkpoint出错,job就要重启,这个值该怎么设置呢?
> > best
> > jungglge
>


Re: Savepoint memory overhead

2020-05-03 Thread Congxian Qiu
Hi

>From the given fig, seems that the end-to-end duration of the two failed
checkpoint is small(it is not timeout due to some reason), could you please
check why did they fail?
Maybe you can find something in jm log such as "Decline checkpoint {} by
task {} of job {} at {}."
then you can go to the tm log to find out the root cause which caused the
checkpoint failed.

Best,
Congxian


Lasse Nedergaard  于2020年4月30日周四 下午4:37写道:

> Hi
>
> Thanks for the reply.
> The link you provide make us thinking of some old rocksdb cfg. We was
> still using and it could cause our container killing problems so I will do
> a test without specific  rocksdb cfg.
> But we also see RocksDbExceptions “cannot allocate memory” while appending
> to a file. And that make me think the managed men is to small for the state
> size. Please see below for a specific job with parallelism 4
>
>
>
> So task managers as we have can’t handle infinite size so I was looking
> for the understanding and guidelines for getting the config right in
> relation to the state size.
> For now we run in session mode and the setting is shared between all job
> and we have job that don’t require many resources therefore the low
> settings
>
> 1.
> Jvm heap size 734 mb
> Flink managed men 690 mb
> 1 slot for each task manager.
>
> 2.
> By a mistake we had some rocksdb Settings from prev. Version. I have
> removed this configuration and will test again.
>
> 3.
> For jobs with restart and failed checkpoints/savepoints there is a common
> trend that they have larger state than without problems. We have on some of
> the failing jobs reduced our retention so our state got smaller and then
> they run ok. We do tests where we increase parallelism and they’ve reduce
> the state size for each task manager and then they run ok.
>
> 4.
> We don’t use windows functions and the jobs use standard value, list and
> map state.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 30. apr. 2020 kl. 08.55 skrev Yun Tang :
>
> 
> Hi Lasse
>
> Would you please give more details?
>
>1. What is the memory configuration of your task manager? e.g the
>memory size of process, managed memory. And how large the memory would
>increase to once you meet problem.
>2. Did you use managed memory to control RocksDB? [1]
>3. Why you give the assumption that memory problem has relationship
>with savepoint?
>4. What is the topology of you streaming job: how many states and
>window you use, how many slots per task manager?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_tuning.html#rocksdb-state-backend
>
> Best
> Yun Tang
> --
> *From:* Lasse Nedergaard 
> *Sent:* Thursday, April 30, 2020 12:39
> *To:* Yun Tang 
> *Cc:* user 
> *Subject:* Re: Savepoint memory overhead
>
> We using Flink 1.10 running on Mesos.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 30. apr. 2020 kl. 04.53 skrev Yun Tang :
>
> 
> Hi Lasse
>
> Which version of Flink did you use? Before Flink-1.10, there might exist
> memory problem when RocksDB executes savepoint with write batch[1].
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-12785
>
> Best
> Yun Tang
> --
> *From:* Lasse Nedergaard 
> *Sent:* Wednesday, April 29, 2020 21:17
> *To:* user 
> *Subject:* Savepoint memory overhead
>
> Hi.
>
> I would like to know if there are any guidelines/recommendations for the
> memory overhead we need to calculate for when doing savepoint to s3. We use
> RockDb state backend.
>
> We run our job on relative small task managers and we can see we get
> memory problems if the state size for each task manager get "big" (we
> haven't found the rule of thumbs yet) and we can remove the problem if we
> reduce the state size, or increase parallelism and jobs with none or small
> state don't have any problems.
> So I see a relation between between allocated memory to a task manager and
> the state it can handle.
>
> So do anyone have any recommendations/ base practices for this and can
> someone explain why savepoint requires memory.
>
> Thanks
>
> In advance
>
> Lasse Nedergaard
>
>


Re: Flink: For terabytes of keyed state.

2020-05-03 Thread Congxian Qiu
Hi

1. From my experience, Flink can support such big state, you can set
appropriate parallelism for the stateful operator. for RocksDB you may need
to care about the disk performance.
2. Inside Flink, the state is separated by key-group, each
task/parallelism contains multiple key-groups.  Flink does not need to
restart when you add a node to the cluster, but every time restart from
savepoint/checkpoint(or failover), Flink needs to redistribute the
checkpoint data, this can be omitted if it's failover and local recovery[1]
is enabled
3. for upload/download state, you can ref to the multiple thread
upload/download[2][3] for speed up them

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/large_state_tuning.html#task-local-recovery
[2] https://issues.apache.org/jira/browse/FLINK-10461
[3] https://issues.apache.org/jira/browse/FLINK-11008

Best,
Congxian


Gowri Sundaram  于2020年5月1日周五 下午6:29写道:

> Hello all,
> We have read in multiple
> 
> sources  that Flink has been used
> for use cases with terabytes of application state.
>
> We are considering using Flink for a similar use case with* keyed state
> in the range of 20 to 30 TB*. We had a few questions regarding the same.
>
>
>- *Is Flink a good option for this kind of scale of data* ? We are
>considering using RocksDB as the state backend.
>- *What happens when we want to add a node to the cluster *?
>   - As per our understanding, if we have 10 nodes in our cluster,
>   with 20TB of state, this means that adding a node would require the 
> entire
>   20TB of data to be shipped again from the external checkpoint remote
>   storage to the taskmanager nodes.
>   - Assuming 1Gb/s network speed, and assuming all nodes can read
>   their respective 2TB state parallely, this would mean a *minimum
>   downtime of half an hour*. And this is assuming the throughput of
>   the remote storage does not become the bottleneck.
>   - Is there any way to reduce this estimated downtime ?
>
>
> Thank you!
>


[ANNOUNCE] Weekly Community Update 2020/18

2020-05-03 Thread Konstantin Knauf
Dear community,

happy to share - a brief - community update this week with an update on
Flink 1.10.1, our application to Google Season of Docs 2020, a discussion
to support Hadoop 3, a recap of Flink Forward Virtual 2020 and a bit more.

Flink Development
==

* [releases] Yu has published a RC #2 for Flink 1.10.1. No -1s so far. [1]

* [docs] Apache Flink's application to Google Season of Docs 2020 is about
to be finalized. Marta has opened PR for the announcement and Seth &
Aljoscha volunteered as Mentor. Apache Flink is pitching a project to
improve the documentation of Table API & SQL. [2]

* [hadoop] Robert has started a discussion on adding support for Hadoop 3.
In particular, the thread discusses the questions of whether Hadoop 3 would
be supported via flink-shaded-hadoop or not. [3]

* [configuration] Timo has started a discussion on how we represent
configuration hierarchies in properties (Flink configuration as well as
Connector properties), so that the resulting files would be valid
JSON/YAML. [4]

* [connectors] Leonard Xu proposes to refactor package, module and class
names of the Flink JDBC connector to be consistent with other connectors .
Details in the [5].

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-10-1-release-candidate-2-tp41019.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/PROPOSAL-Google-Season-of-Docs-2020-tp40264.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-support-for-Hadoop-3-and-removing-flink-shaded-hadoop-tp40570p40601.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Hierarchies-in-ConfigOption-tp40920.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Refactor-flink-jdbc-connector-structure-tp40984.html

flink-packages.org
==

* Alibaba has published a preview version of its SpillableHeapStateBackend
on flink-packages.org. [6] This statebackend is contributed to Apache in
FLINK-12692 [7]. The SpillableHeapStateBackend is a Java Heap-based
Statebackend (like the FilesystemStatebackend) that spills the coldest
state to disk before the heap is exhausted.

[6] https://flink-packages.org/packages/spillable-state-backend-for-flink
[7] https://issues.apache.org/jira/browse/FLINK-12692

Notable Bugs
==

I did not encounter anything particularly worth sharing.

Events, Blog Posts, Misc
===

* Fabian has published a recap of Flink Foward Virtual 2020 on the
Ververica blog. [8]

* All recordings of Flink Forward Virtual 2020 have been published on
Youtube. [9]

[8] https://www.ververica.com/blog/flink-forward-virtual-2020-recap
[9]
https://www.youtube.com/watch?v=NF0hXZfUyqE=PLDX4T_cnKjD0ngnBSU-bYGfgVv17MiwA7

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk