Re: Flink Task Manager GC overhead limit exceeded

2020-04-29 文章 Xintong Song
Then I would suggest the following.
- Check the task manager log to see if the '-D' properties are properly
loaded. They should be located at the beginning of the log file.
- You can also try to log into the pod and check the JVM launch command
with "ps -ef | grep TaskManagerRunner". I suspect there might be some
argument passing problem regarding the spaces and double quotation marks.


Thank you~

Xintong Song



On Thu, Apr 30, 2020 at 11:39 AM Eleanore Jin 
wrote:

> Hi Xintong,
>
> Thanks for the detailed explanation!
>
> as for the 2nd question: I mount  it to am emptyDir, I assume pod restart
> will not cause the pod to be rescheduled to another node, so it should
> stay?  I verified by directly adding this to the flink-conf.yaml, which I
> see the heap dump is taken and stays in the directory:  env.java.opts:
> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/dumps
>
> In addition, I also don't see the log print out something like: Heap dump
> file created [5220997112 bytes in 73.464 secs], which I see when directly
> adding the options in the flink-conf.yaml
>
> containers:
>
> - volumeMounts:
>
> - mountPath: /dumps
>
>   name: heap-dumps
>
> volumes:
>
>   - emptyDir: {}
>
> name: heap-dumps
>
>
> Thanks a lot!
>
> Eleanore
>
> On Wed, Apr 29, 2020 at 7:55 PM Xintong Song 
> wrote:
>
>> Hi Eleanore,
>>
>> I'd like to explain about 1 & 2. For 3, I have no idea either.
>>
>> 1. I dont see the heap size from UI for task manager show correctly
>>>
>>
>> Despite the 'heap' in the key, 'taskmanager.heap.size' accounts for the
>> total memory of a Flink task manager, rather than only the heap memory. A
>> Flink task manager process consumes not only java heap memory, but also
>> direct memory (e.g., network buffers) and native memory (e.g., JVM
>> overhead). That's why the JVM heap size shown on the UI is much smaller
>> than the configured 'taskmanager.heap.size'. Please refer to this document
>> [1] for more details. This document comes from Flink 1.9 and has not been
>> back-ported to 1.8, but the contents should apply to 1.8 as well.
>>
>> 2. I dont see the heap dump file in the restarted pod /dumps/oom.bin, did
>>> I set the java opts wrong?
>>>
>>
>> The java options look good to me. It the configured path '/dumps/oom.bin'
>> a local path of the pod or a path of the host mounted onto the pod? The
>> restarted pod is a completely new different pod. Everything you write to
>> the old pod goes away as the pod terminated, unless they are written to the
>> host through mounted storage.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>>
>> On Thu, Apr 30, 2020 at 7:41 AM Eleanore Jin 
>> wrote:
>>
>>> Hi All,
>>>
>>> Currently I am running a flink job cluster (v1.8.2) on kubernetes with 4
>>> pods, each pod with 4 parallelism.
>>>
>>> The flink job reads from a source topic with 96 partitions, and does per
>>> element filter, the filtered value comes from a broadcast topic and it
>>> always use the latest message as the filter criteria, then publish to a
>>> sink topic.
>>>
>>> There is no checkpointing and state involved.
>>>
>>> Then I am seeing GC overhead limit exceeded error continuously and the
>>> pods keep on restarting
>>>
>>> So I tried to increase the heap size for task manager by
>>>
>>> containers:
>>>
>>>   - args:
>>>
>>> - task-manager
>>>
>>> - -Djobmanager.rpc.address=service-job-manager
>>>
>>> - -Dtaskmanager.heap.size=4096m
>>>
>>> - -Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError
>>> -XX:HeapDumpPath=/dumps/oom.bin"
>>>
>>>
>>> 3 things I noticed,
>>>
>>>
>>> 1. I dont see the heap size from UI for task manager show correctly
>>>
>>> [image: image.png]
>>>
>>> 2. I dont see the heap dump file in the restarted pod /dumps/oom.bin,
>>> did I set the java opts wrong?
>>>
>>> 3. I continously seeing below logs from all pods, not sure if causes any
>>> issue
>>> {"@timestamp":"2020-04-29T23:39:43.387Z","@version":"1","message":"[Consumer
>>> clientId=consumer-1, groupId=aba774bc] Node 6 was unable to process the
>>> fetch request with (sessionId=2054451921, epoch=474):
>>> FETCH_SESSION_ID_NOT_FOUND.","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"pool-6-thread-1","level":"INFO","level_value":2}
>>>
>>> Thanks a lot for any help!
>>>
>>> Best,
>>> Eleanore
>>>
>>


Re: Flink Task Manager GC overhead limit exceeded

2020-04-29 文章 Xintong Song
Hi Eleanore,

I'd like to explain about 1 & 2. For 3, I have no idea either.

1. I dont see the heap size from UI for task manager show correctly
>

Despite the 'heap' in the key, 'taskmanager.heap.size' accounts for the
total memory of a Flink task manager, rather than only the heap memory. A
Flink task manager process consumes not only java heap memory, but also
direct memory (e.g., network buffers) and native memory (e.g., JVM
overhead). That's why the JVM heap size shown on the UI is much smaller
than the configured 'taskmanager.heap.size'. Please refer to this document
[1] for more details. This document comes from Flink 1.9 and has not been
back-ported to 1.8, but the contents should apply to 1.8 as well.

2. I dont see the heap dump file in the restarted pod /dumps/oom.bin, did I
> set the java opts wrong?
>

The java options look good to me. It the configured path '/dumps/oom.bin' a
local path of the pod or a path of the host mounted onto the pod? The
restarted pod is a completely new different pod. Everything you write to
the old pod goes away as the pod terminated, unless they are written to the
host through mounted storage.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html

On Thu, Apr 30, 2020 at 7:41 AM Eleanore Jin  wrote:

> Hi All,
>
> Currently I am running a flink job cluster (v1.8.2) on kubernetes with 4
> pods, each pod with 4 parallelism.
>
> The flink job reads from a source topic with 96 partitions, and does per
> element filter, the filtered value comes from a broadcast topic and it
> always use the latest message as the filter criteria, then publish to a
> sink topic.
>
> There is no checkpointing and state involved.
>
> Then I am seeing GC overhead limit exceeded error continuously and the
> pods keep on restarting
>
> So I tried to increase the heap size for task manager by
>
> containers:
>
>   - args:
>
> - task-manager
>
> - -Djobmanager.rpc.address=service-job-manager
>
> - -Dtaskmanager.heap.size=4096m
>
> - -Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/dumps/oom.bin"
>
>
> 3 things I noticed,
>
>
> 1. I dont see the heap size from UI for task manager show correctly
>
> [image: image.png]
>
> 2. I dont see the heap dump file in the restarted pod /dumps/oom.bin, did
> I set the java opts wrong?
>
> 3. I continously seeing below logs from all pods, not sure if causes any
> issue
> {"@timestamp":"2020-04-29T23:39:43.387Z","@version":"1","message":"[Consumer
> clientId=consumer-1, groupId=aba774bc] Node 6 was unable to process the
> fetch request with (sessionId=2054451921, epoch=474):
> FETCH_SESSION_ID_NOT_FOUND.","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"pool-6-thread-1","level":"INFO","level_value":2}
>
> Thanks a lot for any help!
>
> Best,
> Eleanore
>


Re:回复:flink1.9,state process api 读取MapState,出错

2020-04-29 文章 guanyq
我把静态类替换成List也还报同样错误
附件是代码

















在 2020-04-30 10:32:26,"shao.hongxiao" <17611022...@163.com> 写道:
>是不是静态内部类的问题?
>Rules for POJO types
>
>Flink recognizes a data type as a POJO type (and allows “by-name” field 
>referencing) if the following conditions are fulfilled:
>
>The class is public and standalone (no non-static inner class)
>The class has a public no-argument constructor
>All non-static, non-transient fields in the class (and all superclasses) are 
>either public (and non-final) or have a public getter- and a setter- method 
>that follows the Java beans naming conventions for getters and setters.
>
>Note that when a user-defined data type can’t be recognized as a POJO type, it 
>must be processed as GenericType and serialized with Kryo.
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:17611022...@163.com
>|
>签名由网易邮箱大师定制
>在2020年4月30日 09:40,shx<17611022...@163.com> 写道:
>能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢
>
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:17611022...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年04月30日 09:04,guanyq 写道:
>代码中没特别指定Serializer。都是默认的序列化。
>在 2020-04-29 18:20:22,"Congxian Qiu"  写道:
>Hi
>从错误日志看,是 StateMigration 相关的问题。
>你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer
>是一样的或者是兼容的,你可以参考下这个文档[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html
>
>Best,
>Congxian
>
>
>guanyq  于2020年4月29日周三 下午6:09写道:
>
>
>附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。
package com.data.processing.utils;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class ReadListState {

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
String checkpointDataUri = 
"hdfs://audit-dp02:8020/flink/checkpoints/OnlineOrderDispatchDuration/";
String stateDataUri = 
"hdfs://audit-dp02:8020/flink/savepoints/OnlineOrderUncompletedDataProcess/20200429/savepoint-a7e717-a512a56609b4";
String uid = "OnlineOrderUncompletedDataProcess_1";
// Reading state
ExistingSavepoint savepoint = Savepoint.load(env, stateDataUri, new 
FsStateBackend(checkpointDataUri));

DataSet> keyedMapStateDataSet = 
savepoint.readKeyedState(uid, new ReaderFunction());
keyedMapStateDataSet.flatMap(new FlatMapFunction, 
String>() {
@Override
public void flatMap(List lst, Collector collector) 
throws Exception {
lst.forEach(new Consumer() {
@Override
public void accept(String s) {
collector.collect(s);
}
});
}
}).print();
env.execute("e");
}

public static class ReaderFunction extends KeyedStateReaderFunction> {

private transient MapState dayUnComputeCnt;

@Override
public void open(Configuration parameters) {

MapStateDescriptor descriptor = new 
MapStateDescriptor(
"dayUnComputeCnt",
TypeInformation.of(String.class),
TypeInformation.of(Long.class)
);
dayUnComputeCnt = getRuntimeContext().getMapState(descriptor);
}

@Override
public void readKey(
String key,
Context ctx,
Collector> out) throws Exception {

List unCompleteLst = new ArrayList<>();
dayUnComputeCnt.keys().forEach(new Consumer() {
@Override
public void accept(String s) {
unCompleteLst.add(s);
}
});

out.collect(unCompleteLst);
}
}
}

window ??8h ????????

2020-04-29 文章 ??????????????
8h
window(TumblingEventTimeWindows.of(Time.days(1), 
Time.hours(-8)))??-8 
window(TumblingProcessingTimeWindows.of(Time.seconds(60L)))

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 文章 Jingsong Li
Sorry for mistake,

I proposal:

connector: 'filesystem'
path: '...'
format: 'json'
format.option:
option1: '...'
option2: '...'
option3: '...'

And I think most of cases, users just need configure 'format' key, we
should make it convenient for them. There is no big problem in making
format options more complex.

And for Kafka key and value, we can:

connector: 'kafka'
key.format: 'json'
key.format.option:
option1: '...'
option2: '...'
value.format: 'json'
value.format.option:
option1: '...'
option2: '...'

Best,
Jingsong Lee

On Thu, Apr 30, 2020 at 10:16 AM Jingsong Li  wrote:

> Thanks Timo for staring the discussion.
>
> I am +1 for "format: 'json'".
> Take a look to Dawid's yaml case:
>
> connector: 'filesystem'
> path: '...'
> format: 'json'
> format:
> option1: '...'
> option2: '...'
> option3: '...'
>
> Is this work?
> According to my understanding, 'format' key is the attribute of connector,
> which can be separately configured outside. In the 'format' block, they are
> the attribute of format.
> So this json style block can only contain the properties exclude format
> itself.
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 30, 2020 at 9:58 AM Benchao Li  wrote:
>
>> Thanks Timo for staring the discussion.
>>
>> Generally I like the idea to keep the config align with a standard like
>> json/yaml.
>>
>> From the user's perspective, I don't use table configs from a config file
>> like yaml or json for now,
>> And it's ok to change it to yaml like style. Actually we didn't know that
>> this could be a yaml like
>> configuration hierarchy. If it has a hierarchy, we maybe consider that in
>> the future to load the
>> config from a yaml/json file.
>>
>> Regarding the name,
>> 'format.kind' looks fine to me. However there is another name from the
>> top of my head:
>> 'format.name', WDYT?
>>
>> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>>
>>> Hi all,
>>>
>>> I also wanted to share my opinion.
>>>
>>> When talking about a ConfigOption hierarchy we use for configuring Flink
>>> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
>>> compatible style. Those options are primarily read from a file and thus
>>> should at least try to follow common practices for nested formats if we
>>> ever decide to switch to one.
>>>
>>> Here the question is about the properties we use in SQL statements. The
>>> origin/destination of these usually will be external catalog, usually in a
>>> flattened(key/value) representation so I agree it is not as important as in
>>> the aforementioned case. Nevertheless having a yaml based catalog or being
>>> able to have e.g. yaml based snapshots of a catalog in my opinion is
>>> appealing. At the same time cost of being able to have a nice
>>> yaml/hocon/json representation is just adding a single suffix to a
>>> single(at most 2 key + value) property. The question is between `format` =
>>> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
>>> doing it.
>>>
>>> Just to have a full picture. Both cases can be represented in yaml, but
>>> the difference is significant:
>>> format: 'json'
>>> format.option: 'value'
>>>
>>> vs
>>> format:
>>> kind: 'json'
>>>
>>> option: 'value'
>>>
>>> Best,
>>> Dawid
>>>
>>> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>>>
>>> Personally I don't have any preference here.  Compliance wih standard
>>> YAML parser is probably more important
>>>
>>> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>>>
 From a user's perspective, I prefer the shorter one "format=json",
 because
 it's more concise and straightforward. The "kind" is redundant for
 users.
 Is there a real case requires to represent the configuration in JSON
 style?
 As far as I can see, I don't see such requirement, and everything works
 fine by now.

 So I'm in favor of "format=json". But if the community insist to follow
 code style on this, I'm also fine with the longer one.

 Btw, I also CC user mailing list to listen more user's feedback.
 Because I
 think this is relative to usability.

 Best,
 Jark

 On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
 wrote:

 >  > Therefore, should we advocate instead:
 >  >
 >  > 'format.kind' = 'json',
 >  > 'format.fail-on-missing-field' = 'false'
 >
 > Yes. That's pretty much it.
 >
 > This is reasonable important to nail down as with such violations I
 > believe we could not actually switch to a standard YAML parser.
 >
 > On 29/04/2020 16:05, Timo Walther wrote:
 > > Hi everyone,
 > >
 > > discussions around ConfigOption seem to be very popular recently.
 So I
 > > would also like to get some opinions on a different topic.
 > >
 > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
 > > agreed on the following DDL syntax:
 > >
 > > CREATE TABLE fs_table (
 > >  ...
 > > ) WITH (

回复:flink1.9,state process api 读取MapState,出错

2020-04-29 文章 shao.hongxiao
是不是静态内部类的问题?
Rules for POJO types

Flink recognizes a data type as a POJO type (and allows “by-name” field 
referencing) if the following conditions are fulfilled:

The class is public and standalone (no non-static inner class)
The class has a public no-argument constructor
All non-static, non-transient fields in the class (and all superclasses) are 
either public (and non-final) or have a public getter- and a setter- method 
that follows the Java beans naming conventions for getters and setters.

Note that when a user-defined data type can’t be recognized as a POJO type, it 
must be processed as GenericType and serialized with Kryo.



| |
邵红晓
|
|
邮箱:17611022...@163.com
|
签名由网易邮箱大师定制
在2020年4月30日 09:40,shx<17611022...@163.com> 写道:
能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢




| |
邵红晓
|
|
邮箱:17611022...@163.com
|

签名由 网易邮箱大师 定制

在2020年04月30日 09:04,guanyq 写道:
代码中没特别指定Serializer。都是默认的序列化。
在 2020-04-29 18:20:22,"Congxian Qiu"  写道:
Hi
从错误日志看,是 StateMigration 相关的问题。
你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer
是一样的或者是兼容的,你可以参考下这个文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html

Best,
Congxian


guanyq  于2020年4月29日周三 下午6:09写道:


附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。


Re:回复:flink1.9,state process api 读取MapState,出错

2020-04-29 文章 guanyq






附件是代码
还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗
-- 代码是读出所有map状态的key。













在 2020-04-30 09:40:45,"shx" <17611022...@163.com> 写道:
>能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢
>
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:17611022...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年04月30日 09:04,guanyq 写道:
>代码中没特别指定Serializer。都是默认的序列化。
>在 2020-04-29 18:20:22,"Congxian Qiu"  写道:
>>Hi
>>从错误日志看,是 StateMigration 相关的问题。
>>你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer
>>是一样的或者是兼容的,你可以参考下这个文档[1]
>>
>>[1]
>>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html
>>
>>Best,
>>Congxian
>>
>>
>>guanyq  于2020年4月29日周三 下午6:09写道:
>>
>>>
>>> 附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。
package com.data.processing.entity;

/**
 * 新增
 *
 * @author guanyq
 * @date 2020/4/5
 */
public class OnlineOrderUncompleted {
private String provinceCode;
private String inModeCode;
private String acceptType;
private String siteSelectionType;
private String resPrejudgeFlag;
private Long step;
private String orderId;
private String receiveDate;

public OnlineOrderUncompleted() {}

public OnlineOrderUncompleted(String provinceCode, String inModeCode, 
String acceptType, String siteSelectionType, String resPrejudgeFlag, Long step, 
String orderId, String receiveDate) {
this.provinceCode = provinceCode;
this.inModeCode = inModeCode;
this.acceptType = acceptType;
this.siteSelectionType = siteSelectionType;
this.resPrejudgeFlag = resPrejudgeFlag;
this.step = step;
this.orderId = orderId;
this.receiveDate = receiveDate;
}

@Override
public String toString() {
return "OnlineOrderUncompleted{" +
"provinceCode='" + provinceCode + '\'' +
", inModeCode='" + inModeCode + '\'' +
", acceptType='" + acceptType + '\'' +
", siteSelectionType='" + siteSelectionType + '\'' +
", resPrejudgeFlag='" + resPrejudgeFlag + '\'' +
", step='" + step + '\'' +
", orderId='" + orderId + '\'' +
", receiveDate='" + receiveDate + '\'' +
'}';
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public String getProvinceCode() {
return provinceCode;
}

public void setProvinceCode(String provinceCode) {
this.provinceCode = provinceCode;
}

public String getInModeCode() {
return inModeCode;
}

public void setInModeCode(String inModeCode) {
this.inModeCode = inModeCode;
}

public String getAcceptType() {
return acceptType;
}

public void setAcceptType(String acceptType) {
this.acceptType = acceptType;
}

public String getSiteSelectionType() {
return siteSelectionType;
}

public void setSiteSelectionType(String siteSelectionType) {
this.siteSelectionType = siteSelectionType;
}

public String getResPrejudgeFlag() {
return resPrejudgeFlag;
}

public void setResPrejudgeFlag(String resPrejudgeFlag) {
this.resPrejudgeFlag = resPrejudgeFlag;
}

public String getReceiveDate() {
return receiveDate;
}

public void setReceiveDate(String receiveDate) {
this.receiveDate = receiveDate;
}

public Long getStep() {
return step;
}

public void setStep(Long step) {
this.step = step;
}
}
package com.data.processing.unconditionalacceptance;

import com.alibaba.fastjson.JSONObject;
import com.data.processing.entity.OnlineOrderUncompleted;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 文章 Kurt Young
IIUC FLIP-122 already delegate the responsibility for designing and parsing
connector properties to connector developers.
So frankly speaking, no matter which style we choose, there is no strong
guarantee for either of these. So it's also possible
that developers can choose a totally different way to express properties,
such as:

'format' = 'csv',
'csv.allow-comments' = 'true',
'csv.ignore-parse-errors' = 'true'

which also seems quite straightforward and easy to use. So my opinion on
this would be since there is no guarantee for developers
to choose "format" as common prefix of all format related properties, there
is not much value to extend 'format' to 'format.kind'.


Best,
Kurt


On Thu, Apr 30, 2020 at 10:17 AM Jingsong Li  wrote:

> Thanks Timo for staring the discussion.
>
> I am +1 for "format: 'json'".
> Take a look to Dawid's yaml case:
>
> connector: 'filesystem'
> path: '...'
> format: 'json'
> format:
> option1: '...'
> option2: '...'
> option3: '...'
>
> Is this work?
> According to my understanding, 'format' key is the attribute of connector,
> which can be separately configured outside. In the 'format' block, they are
> the attribute of format.
> So this json style block can only contain the properties exclude format
> itself.
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 30, 2020 at 9:58 AM Benchao Li  wrote:
>
>> Thanks Timo for staring the discussion.
>>
>> Generally I like the idea to keep the config align with a standard like
>> json/yaml.
>>
>> From the user's perspective, I don't use table configs from a config file
>> like yaml or json for now,
>> And it's ok to change it to yaml like style. Actually we didn't know that
>> this could be a yaml like
>> configuration hierarchy. If it has a hierarchy, we maybe consider that in
>> the future to load the
>> config from a yaml/json file.
>>
>> Regarding the name,
>> 'format.kind' looks fine to me. However there is another name from the
>> top of my head:
>> 'format.name', WDYT?
>>
>> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>>
>>> Hi all,
>>>
>>> I also wanted to share my opinion.
>>>
>>> When talking about a ConfigOption hierarchy we use for configuring Flink
>>> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
>>> compatible style. Those options are primarily read from a file and thus
>>> should at least try to follow common practices for nested formats if we
>>> ever decide to switch to one.
>>>
>>> Here the question is about the properties we use in SQL statements. The
>>> origin/destination of these usually will be external catalog, usually in a
>>> flattened(key/value) representation so I agree it is not as important as in
>>> the aforementioned case. Nevertheless having a yaml based catalog or being
>>> able to have e.g. yaml based snapshots of a catalog in my opinion is
>>> appealing. At the same time cost of being able to have a nice
>>> yaml/hocon/json representation is just adding a single suffix to a
>>> single(at most 2 key + value) property. The question is between `format` =
>>> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
>>> doing it.
>>>
>>> Just to have a full picture. Both cases can be represented in yaml, but
>>> the difference is significant:
>>> format: 'json'
>>> format.option: 'value'
>>>
>>> vs
>>> format:
>>> kind: 'json'
>>>
>>> option: 'value'
>>>
>>> Best,
>>> Dawid
>>>
>>> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>>>
>>> Personally I don't have any preference here.  Compliance wih standard
>>> YAML parser is probably more important
>>>
>>> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>>>
 From a user's perspective, I prefer the shorter one "format=json",
 because
 it's more concise and straightforward. The "kind" is redundant for
 users.
 Is there a real case requires to represent the configuration in JSON
 style?
 As far as I can see, I don't see such requirement, and everything works
 fine by now.

 So I'm in favor of "format=json". But if the community insist to follow
 code style on this, I'm also fine with the longer one.

 Btw, I also CC user mailing list to listen more user's feedback.
 Because I
 think this is relative to usability.

 Best,
 Jark

 On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
 wrote:

 >  > Therefore, should we advocate instead:
 >  >
 >  > 'format.kind' = 'json',
 >  > 'format.fail-on-missing-field' = 'false'
 >
 > Yes. That's pretty much it.
 >
 > This is reasonable important to nail down as with such violations I
 > believe we could not actually switch to a standard YAML parser.
 >
 > On 29/04/2020 16:05, Timo Walther wrote:
 > > Hi everyone,
 > >
 > > discussions around ConfigOption seem to be very popular recently.
 So I
 > > would also like to get some opinions on a different topic.
 > >
 > > How do we represent hierarchies in ConfigOption? In 

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 文章 Jingsong Li
Thanks Timo for staring the discussion.

I am +1 for "format: 'json'".
Take a look to Dawid's yaml case:

connector: 'filesystem'
path: '...'
format: 'json'
format:
option1: '...'
option2: '...'
option3: '...'

Is this work?
According to my understanding, 'format' key is the attribute of connector,
which can be separately configured outside. In the 'format' block, they are
the attribute of format.
So this json style block can only contain the properties exclude format
itself.

Best,
Jingsong Lee

On Thu, Apr 30, 2020 at 9:58 AM Benchao Li  wrote:

> Thanks Timo for staring the discussion.
>
> Generally I like the idea to keep the config align with a standard like
> json/yaml.
>
> From the user's perspective, I don't use table configs from a config file
> like yaml or json for now,
> And it's ok to change it to yaml like style. Actually we didn't know that
> this could be a yaml like
> configuration hierarchy. If it has a hierarchy, we maybe consider that in
> the future to load the
> config from a yaml/json file.
>
> Regarding the name,
> 'format.kind' looks fine to me. However there is another name from the top
> of my head:
> 'format.name', WDYT?
>
> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>
>> Hi all,
>>
>> I also wanted to share my opinion.
>>
>> When talking about a ConfigOption hierarchy we use for configuring Flink
>> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
>> compatible style. Those options are primarily read from a file and thus
>> should at least try to follow common practices for nested formats if we
>> ever decide to switch to one.
>>
>> Here the question is about the properties we use in SQL statements. The
>> origin/destination of these usually will be external catalog, usually in a
>> flattened(key/value) representation so I agree it is not as important as in
>> the aforementioned case. Nevertheless having a yaml based catalog or being
>> able to have e.g. yaml based snapshots of a catalog in my opinion is
>> appealing. At the same time cost of being able to have a nice
>> yaml/hocon/json representation is just adding a single suffix to a
>> single(at most 2 key + value) property. The question is between `format` =
>> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
>> doing it.
>>
>> Just to have a full picture. Both cases can be represented in yaml, but
>> the difference is significant:
>> format: 'json'
>> format.option: 'value'
>>
>> vs
>> format:
>> kind: 'json'
>>
>> option: 'value'
>>
>> Best,
>> Dawid
>>
>> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>>
>> Personally I don't have any preference here.  Compliance wih standard
>> YAML parser is probably more important
>>
>> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>>
>>> From a user's perspective, I prefer the shorter one "format=json",
>>> because
>>> it's more concise and straightforward. The "kind" is redundant for users.
>>> Is there a real case requires to represent the configuration in JSON
>>> style?
>>> As far as I can see, I don't see such requirement, and everything works
>>> fine by now.
>>>
>>> So I'm in favor of "format=json". But if the community insist to follow
>>> code style on this, I'm also fine with the longer one.
>>>
>>> Btw, I also CC user mailing list to listen more user's feedback. Because
>>> I
>>> think this is relative to usability.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
>>> wrote:
>>>
>>> >  > Therefore, should we advocate instead:
>>> >  >
>>> >  > 'format.kind' = 'json',
>>> >  > 'format.fail-on-missing-field' = 'false'
>>> >
>>> > Yes. That's pretty much it.
>>> >
>>> > This is reasonable important to nail down as with such violations I
>>> > believe we could not actually switch to a standard YAML parser.
>>> >
>>> > On 29/04/2020 16:05, Timo Walther wrote:
>>> > > Hi everyone,
>>> > >
>>> > > discussions around ConfigOption seem to be very popular recently. So
>>> I
>>> > > would also like to get some opinions on a different topic.
>>> > >
>>> > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
>>> > > agreed on the following DDL syntax:
>>> > >
>>> > > CREATE TABLE fs_table (
>>> > >  ...
>>> > > ) WITH (
>>> > >  'connector' = 'filesystem',
>>> > >  'path' = 'file:///path/to/whatever',
>>> > >  'format' = 'csv',
>>> > >  'format.allow-comments' = 'true',
>>> > >  'format.ignore-parse-errors' = 'true'
>>> > > );
>>> > >
>>> > > Of course this is slightly different from regular Flink core
>>> > > configuration but a connector still needs to be configured based on
>>> > > these options.
>>> > >
>>> > > However, I think this FLIP violates our code style guidelines because
>>> > >
>>> > > 'format' = 'json',
>>> > > 'format.fail-on-missing-field' = 'false'
>>> > >
>>> > > is an invalid hierarchy. `format` cannot be a string and a top-level
>>> > > object at the same time.
>>> > >
>>> > > We have similar problems in our runtime configuration:
>>> > >
>>> > > 

??????flink????????hdfs parquest,hdfs????ha??????????

2020-04-29 文章 ??????(Jiacheng Jiang)
??fs.defaultFS??



----
??: "196371551"<196371...@qq.com; 
: 2020??4??30??(??) 8:50
??: "user"

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 文章 Benchao Li
Thanks Timo for staring the discussion.

Generally I like the idea to keep the config align with a standard like
json/yaml.

>From the user's perspective, I don't use table configs from a config file
like yaml or json for now,
And it's ok to change it to yaml like style. Actually we didn't know that
this could be a yaml like
configuration hierarchy. If it has a hierarchy, we maybe consider that in
the future to load the
config from a yaml/json file.

Regarding the name,
'format.kind' looks fine to me. However there is another name from the top
of my head:
'format.name', WDYT?

Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:

> Hi all,
>
> I also wanted to share my opinion.
>
> When talking about a ConfigOption hierarchy we use for configuring Flink
> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
> compatible style. Those options are primarily read from a file and thus
> should at least try to follow common practices for nested formats if we
> ever decide to switch to one.
>
> Here the question is about the properties we use in SQL statements. The
> origin/destination of these usually will be external catalog, usually in a
> flattened(key/value) representation so I agree it is not as important as in
> the aforementioned case. Nevertheless having a yaml based catalog or being
> able to have e.g. yaml based snapshots of a catalog in my opinion is
> appealing. At the same time cost of being able to have a nice
> yaml/hocon/json representation is just adding a single suffix to a
> single(at most 2 key + value) property. The question is between `format` =
> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
> doing it.
>
> Just to have a full picture. Both cases can be represented in yaml, but
> the difference is significant:
> format: 'json'
> format.option: 'value'
>
> vs
> format:
> kind: 'json'
>
> option: 'value'
>
> Best,
> Dawid
>
> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>
> Personally I don't have any preference here.  Compliance wih standard YAML
> parser is probably more important
>
> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>
>> From a user's perspective, I prefer the shorter one "format=json", because
>> it's more concise and straightforward. The "kind" is redundant for users.
>> Is there a real case requires to represent the configuration in JSON
>> style?
>> As far as I can see, I don't see such requirement, and everything works
>> fine by now.
>>
>> So I'm in favor of "format=json". But if the community insist to follow
>> code style on this, I'm also fine with the longer one.
>>
>> Btw, I also CC user mailing list to listen more user's feedback. Because I
>> think this is relative to usability.
>>
>> Best,
>> Jark
>>
>> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
>> wrote:
>>
>> >  > Therefore, should we advocate instead:
>> >  >
>> >  > 'format.kind' = 'json',
>> >  > 'format.fail-on-missing-field' = 'false'
>> >
>> > Yes. That's pretty much it.
>> >
>> > This is reasonable important to nail down as with such violations I
>> > believe we could not actually switch to a standard YAML parser.
>> >
>> > On 29/04/2020 16:05, Timo Walther wrote:
>> > > Hi everyone,
>> > >
>> > > discussions around ConfigOption seem to be very popular recently. So I
>> > > would also like to get some opinions on a different topic.
>> > >
>> > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
>> > > agreed on the following DDL syntax:
>> > >
>> > > CREATE TABLE fs_table (
>> > >  ...
>> > > ) WITH (
>> > >  'connector' = 'filesystem',
>> > >  'path' = 'file:///path/to/whatever',
>> > >  'format' = 'csv',
>> > >  'format.allow-comments' = 'true',
>> > >  'format.ignore-parse-errors' = 'true'
>> > > );
>> > >
>> > > Of course this is slightly different from regular Flink core
>> > > configuration but a connector still needs to be configured based on
>> > > these options.
>> > >
>> > > However, I think this FLIP violates our code style guidelines because
>> > >
>> > > 'format' = 'json',
>> > > 'format.fail-on-missing-field' = 'false'
>> > >
>> > > is an invalid hierarchy. `format` cannot be a string and a top-level
>> > > object at the same time.
>> > >
>> > > We have similar problems in our runtime configuration:
>> > >
>> > > state.backend=
>> > > state.backend.incremental=
>> > > restart-strategy=
>> > > restart-strategy.fixed-delay.delay=
>> > > high-availability=
>> > > high-availability.cluster-id=
>> > >
>> > > The code style guide states "Think of the configuration as nested
>> > > objects (JSON style)". So such hierarchies cannot be represented in a
>> > > nested JSON style.
>> > >
>> > > Therefore, should we advocate instead:
>> > >
>> > > 'format.kind' = 'json',
>> > > 'format.fail-on-missing-field' = 'false'
>> > >
>> > > What do you think?
>> > >
>> > > Thanks,
>> > > Timo
>> > >
>> > > [1]
>> > >
>> >
>> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
>> > >
>> >
>> >
>
>

-- 

回复:flink1.9,state process api 读取MapState,出错

2020-04-29 文章 shx
能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢




| |
邵红晓
|
|
邮箱:17611022...@163.com
|

签名由 网易邮箱大师 定制

在2020年04月30日 09:04,guanyq 写道:
代码中没特别指定Serializer。都是默认的序列化。
在 2020-04-29 18:20:22,"Congxian Qiu"  写道:
>Hi
>从错误日志看,是 StateMigration 相关的问题。
>你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer
>是一样的或者是兼容的,你可以参考下这个文档[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html
>
>Best,
>Congxian
>
>
>guanyq  于2020年4月29日周三 下午6:09写道:
>
>>
>> 附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。


Re:Re: flink1.9,state process api 读取MapState,出错

2020-04-29 文章 guanyq
代码中没特别指定Serializer。都是默认的序列化。
在 2020-04-29 18:20:22,"Congxian Qiu"  写道:
>Hi
>从错误日志看,是 StateMigration 相关的问题。
>你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer
>是一样的或者是兼容的,你可以参考下这个文档[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html
>
>Best,
>Congxian
>
>
>guanyq  于2020年4月29日周三 下午6:09写道:
>
>>
>> 附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。


flink????????hdfs parquest,hdfs????ha??????????

2020-04-29 文章 196371551
hI??
  
kafka??es??hdfshdfsha??NameNodeflinkhdfs??hdfs://xx:8020/NameNode??

flink????????hdfs parquest,hdfs????ha??????????

2020-04-29 文章 196371551
hI??
  
kafka??es??hdfshdfsha??NameNodeflinkhdfs??hdfs://xx:8020/NameNode??

Flink Task Manager GC overhead limit exceeded

2020-04-29 文章 Eleanore Jin
Hi All,

Currently I am running a flink job cluster (v1.8.2) on kubernetes with 4
pods, each pod with 4 parallelism.

The flink job reads from a source topic with 96 partitions, and does per
element filter, the filtered value comes from a broadcast topic and it
always use the latest message as the filter criteria, then publish to a
sink topic.

There is no checkpointing and state involved.

Then I am seeing GC overhead limit exceeded error continuously and the pods
keep on restarting

So I tried to increase the heap size for task manager by

containers:

  - args:

- task-manager

- -Djobmanager.rpc.address=service-job-manager

- -Dtaskmanager.heap.size=4096m

- -Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/dumps/oom.bin"


3 things I noticed,


1. I dont see the heap size from UI for task manager show correctly

[image: image.png]

2. I dont see the heap dump file in the restarted pod /dumps/oom.bin, did I
set the java opts wrong?

3. I continously seeing below logs from all pods, not sure if causes any
issue
{"@timestamp":"2020-04-29T23:39:43.387Z","@version":"1","message":"[Consumer
clientId=consumer-1, groupId=aba774bc] Node 6 was unable to process the
fetch request with (sessionId=2054451921, epoch=474):
FETCH_SESSION_ID_NOT_FOUND.","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"pool-6-thread-1","level":"INFO","level_value":2}

Thanks a lot for any help!

Best,
Eleanore


Re: flink dataset 分组后拼接分组后内容

2020-04-29 文章 祝尚
Hi,hery168
可以这样写

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Tuple3.of(1.0, 2.0, 1), Tuple3.of(2.0, 2.0, 
1),Tuple3.of(3.0,2.0,1), Tuple3.of(1.0, 2.0, 2), Tuple3.of(2.0, 2.0, 
2),Tuple3.of(3.0,2.0,2))
.map((MapFunction, Tuple3>) t -> Tuple3.of(String.valueOf(t.f0), t.f1, t.f2)).groupBy(2)
.reduce((ReduceFunction>) (tuple, t1) 
-> Tuple3.of(tuple.f0 + "#" + t1.f0, tuple.f1, tuple.f2)).print();
env.execute();
结果:
(1.0#2.0#3.0,2.0,1)
(1.0#2.0#3.0,2.0,2)

> 2020年4月28日 下午5:24,hery168  写道:
> 
> col1 col2 pid 
> 
> 1.0  2.0  1
> 
> 2.0  2.0  1
> 
> 1.0  2.0  1
> 
> 3.0  2.0  1
> 
> 1.0  2.0  1
> 
> 1.0  2.0  2
> 
> 1.0  2.0  2
> 
> 1.0  2.0  2
> 
> 1.0  2.0  2
> 
> 1.0  2.0  2
> 各位大神,想问一下利用flink dataset 对pid 列进行分组,然后对分组后的col1列的内容进行拼接,如1.0#2.0#1.0#3.0
> 请问大家这个该怎么实现?



Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 文章 Dawid Wysakowicz
Hi all,

I also wanted to share my opinion.

When talking about a ConfigOption hierarchy we use for configuring Flink
cluster I would be a strong advocate for keeping a yaml/hocon/json/...
compatible style. Those options are primarily read from a file and thus
should at least try to follow common practices for nested formats if we
ever decide to switch to one.

Here the question is about the properties we use in SQL statements. The
origin/destination of these usually will be external catalog, usually in
a flattened(key/value) representation so I agree it is not as important
as in the aforementioned case. Nevertheless having a yaml based catalog
or being able to have e.g. yaml based snapshots of a catalog in my
opinion is appealing. At the same time cost of being able to have a nice
yaml/hocon/json representation is just adding a single suffix to a
single(at most 2 key + value) property. The question is between `format`
= `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
doing it.

Just to have a full picture. Both cases can be represented in yaml, but
the difference is significant:

format: 'json'
format.option: 'value'

vs

format:
    kind: 'json'

    option: 'value'


Best,
Dawid

On 29/04/2020 17:13, Flavio Pompermaier wrote:
> Personally I don't have any preference here.  Compliance wih standard
> YAML parser is probably more important
>
> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  > wrote:
>
> From a user's perspective, I prefer the shorter one "format=json",
> because
> it's more concise and straightforward. The "kind" is redundant for
> users.
> Is there a real case requires to represent the configuration in
> JSON style?
> As far as I can see, I don't see such requirement, and everything
> works
> fine by now.
>
> So I'm in favor of "format=json". But if the community insist to
> follow
> code style on this, I'm also fine with the longer one.
>
> Btw, I also CC user mailing list to listen more user's feedback.
> Because I
> think this is relative to usability.
>
> Best,
> Jark
>
> On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler  > wrote:
>
> >  > Therefore, should we advocate instead:
> >  >
> >  > 'format.kind' = 'json',
> >  > 'format.fail-on-missing-field' = 'false'
> >
> > Yes. That's pretty much it.
> >
> > This is reasonable important to nail down as with such violations I
> > believe we could not actually switch to a standard YAML parser.
> >
> > On 29/04/2020 16:05, Timo Walther wrote:
> > > Hi everyone,
> > >
> > > discussions around ConfigOption seem to be very popular
> recently. So I
> > > would also like to get some opinions on a different topic.
> > >
> > > How do we represent hierarchies in ConfigOption? In FLIP-122, we
> > > agreed on the following DDL syntax:
> > >
> > > CREATE TABLE fs_table (
> > >  ...
> > > ) WITH (
> > >  'connector' = 'filesystem',
> > >  'path' = 'file:///path/to/whatever',
> > >  'format' = 'csv',
> > >  'format.allow-comments' = 'true',
> > >  'format.ignore-parse-errors' = 'true'
> > > );
> > >
> > > Of course this is slightly different from regular Flink core
> > > configuration but a connector still needs to be configured
> based on
> > > these options.
> > >
> > > However, I think this FLIP violates our code style guidelines
> because
> > >
> > > 'format' = 'json',
> > > 'format.fail-on-missing-field' = 'false'
> > >
> > > is an invalid hierarchy. `format` cannot be a string and a
> top-level
> > > object at the same time.
> > >
> > > We have similar problems in our runtime configuration:
> > >
> > > state.backend=
> > > state.backend.incremental=
> > > restart-strategy=
> > > restart-strategy.fixed-delay.delay=
> > > high-availability=
> > > high-availability.cluster-id=
> > >
> > > The code style guide states "Think of the configuration as nested
> > > objects (JSON style)". So such hierarchies cannot be
> represented in a
> > > nested JSON style.
> > >
> > > Therefore, should we advocate instead:
> > >
> > > 'format.kind' = 'json',
> > > 'format.fail-on-missing-field' = 'false'
> > >
> > > What do you think?
> > >
> > > Thanks,
> > > Timo
> > >
> > > [1]
> > >
> >
> 
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> > >
> >
> >
>


signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 文章 Jark Wu
>From a user's perspective, I prefer the shorter one "format=json", because
it's more concise and straightforward. The "kind" is redundant for users.
Is there a real case requires to represent the configuration in JSON style?
As far as I can see, I don't see such requirement, and everything works
fine by now.

So I'm in favor of "format=json". But if the community insist to follow
code style on this, I'm also fine with the longer one.

Btw, I also CC user mailing list to listen more user's feedback. Because I
think this is relative to usability.

Best,
Jark

On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler  wrote:

>  > Therefore, should we advocate instead:
>  >
>  > 'format.kind' = 'json',
>  > 'format.fail-on-missing-field' = 'false'
>
> Yes. That's pretty much it.
>
> This is reasonable important to nail down as with such violations I
> believe we could not actually switch to a standard YAML parser.
>
> On 29/04/2020 16:05, Timo Walther wrote:
> > Hi everyone,
> >
> > discussions around ConfigOption seem to be very popular recently. So I
> > would also like to get some opinions on a different topic.
> >
> > How do we represent hierarchies in ConfigOption? In FLIP-122, we
> > agreed on the following DDL syntax:
> >
> > CREATE TABLE fs_table (
> >  ...
> > ) WITH (
> >  'connector' = 'filesystem',
> >  'path' = 'file:///path/to/whatever',
> >  'format' = 'csv',
> >  'format.allow-comments' = 'true',
> >  'format.ignore-parse-errors' = 'true'
> > );
> >
> > Of course this is slightly different from regular Flink core
> > configuration but a connector still needs to be configured based on
> > these options.
> >
> > However, I think this FLIP violates our code style guidelines because
> >
> > 'format' = 'json',
> > 'format.fail-on-missing-field' = 'false'
> >
> > is an invalid hierarchy. `format` cannot be a string and a top-level
> > object at the same time.
> >
> > We have similar problems in our runtime configuration:
> >
> > state.backend=
> > state.backend.incremental=
> > restart-strategy=
> > restart-strategy.fixed-delay.delay=
> > high-availability=
> > high-availability.cluster-id=
> >
> > The code style guide states "Think of the configuration as nested
> > objects (JSON style)". So such hierarchies cannot be represented in a
> > nested JSON style.
> >
> > Therefore, should we advocate instead:
> >
> > 'format.kind' = 'json',
> > 'format.fail-on-missing-field' = 'false'
> >
> > What do you think?
> >
> > Thanks,
> > Timo
> >
> > [1]
> >
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> >
>
>


Re: flink1.9,state process api 读取MapState,出错

2020-04-29 文章 Congxian Qiu
Hi
从错误日志看,是 StateMigration 相关的问题。
你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer
是一样的或者是兼容的,你可以参考下这个文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html

Best,
Congxian


guanyq  于2020年4月29日周三 下午6:09写道:

>
> 附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。


????????????????????

2020-04-29 文章 ????????
??




----
??:"??"<573693...@qq.com;
:2020??4??29??(??) 12:16
??:"user-zh"

?????? flink ????????????-metaspace ????

2020-04-29 文章 ??????????????
 
memoryfull
 gc??taskmanager??job??metaspace??
fullgc??


----
??:"Xintong Song"

Re: flink 内存设置问题-metaspace 溢出

2020-04-29 文章 Xintong Song
>
> 这是我在gceasy分析出来的报告,看了log,确实是加大内存时候,出现的,第一次去看这个log,还请大佬帮忙看看,附件也上传了log了。


你这个只有 process.size=65536m 的时候的日志吧,我的意思是说 process.size=1568m 的时候 full GC
应该也存在,只不过单次时间会更短。


另外你这个也不是发生 metaspace oom 时候的日志吧,从你的这个日志上看,metaspace 才用了不到 80m,最大是 256m。


Thank you~

Xintong Song



On Wed, Apr 29, 2020 at 2:57 PM 了不起的盖茨比 <573693...@qq.com> wrote:

> 嗯呢谢谢我去看看
>
>
>
>
> --原始邮件--
> 发件人:"Michael Ran" 发送时间:2020年4月29日(星期三) 下午2:55
> 收件人:"user-zh"
> 主题:Re:回复: flink 内存设置问题-metaspace 溢出
>
>
>
> 可以看看介绍:
> https://flink.apache.org/news/2020/04/21/memory-management-improvements-flink-1.10.html
> 后续改进: https://issues.apache.org/jira/browse/FLINK-16406
>
>
>
> 在 2020-04-29 14:09:21,"了不起的盖茨比" <573693...@qq.com 写道:
>
>
> https://gceasy.io/my-gc-report.jsp?p=YXJjaGl2ZWQvMjAyMC8wNC8yOC8tLWdjLmxvZy0tNC0xMC01Ng==channel=WEB
>
>
> 这是我在gceasy分析出来的报告,看了log,确实是加大内存时候,出现的,第一次去看这个log,还请大佬帮忙看看,附件也上传了log了。
> -- 原始邮件 --
> 发件人: "Xintong Song" 发送时间: 2020年4月29日(星期三) 中午12:45
> 收件人: "user-zh" 主题: Re: flink 内存设置问题-metaspace 溢出
>
>
> full GC 应该不是增大内存后才出现的,这一点可以确认一下 GC log。但是增大内存,可能会造成一次 full GC 花费的时间更长,从而导致
> TM 心跳超时。
> 同样的,metaspace OOM 也可能是由于 GC 速度变慢造成的。JVM 有单独的线程负责 GC,通常是在
> heap/direct/metaspace 这些区域用满之前达到某个阈值就开始 GC,如果 GC 速度慢于内存申请的速度,也有可能造成 OOM。
> 按照我们的经验,一个 TM 用 64G 也是比较大了,如果都是以 java heap 内存为主的话,那可能需要具体配一下 GC 策略。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Apr 29, 2020 at 12:27 PM 了不起的盖茨比 <573693...@qq.com wrote:
>
>  除了 memory增加,其余配置还是使用默认的。就是不明白,为什么我内存增加了,反而会出现full
>  gc情况,之后taskmanager失去心跳,然后继续提交job,出现metaspace溢出这个情况。
>  在使用默认配置内存时候反而是没有出现fullgc。想明白其中缘由。
> 
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Xintong Song"  发送时间:nbsp;2020年4月29日(星期三) 中午11:00
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink 内存设置问题-metaspace 溢出
> 
> 
> 
>  Metaspace OOM 通常是 JVM 加载的类过多导致的。TM 内存从 1568m 增大到 65536m,是否有增加 slot
>  的数量呢?这个有可能造成运行时加载的类变多,metaspace 大小不变的情况下也可能会触发 OOM。
>  目前社区已经收到许多反馈,关于 1.10.0 的默认 metaspace 大小可能不太合理,在 1.10.1
> 中会调大这个默认值。你这边也可以先把
>  taskmanager.memory.metaspace.size 调到 256m 试一试。
> 
>  Thank you~
> 
>  Xintong Song
> 
> 
> 
>  On Tue, Apr 28, 2020 at 7:21 PM 出发 <573693...@qq.comgt; wrote:
> 
>  gt; 物理机内存:124G当进行如下设置时候
>  gt; taskmanager.memory.process.size: 65536m
>  gt;
>  gt; es5-connector sink数据
>  gt; 此时会出现频繁fullgc java.lang.OutOfMemoryError: Metaspace
>  gt;
>  gt;
>  gt;
>  gt; 当我设置
>  gt; taskmanager.memory.process.size: 1568m
>  gt; es5-connector sink数据
>  gt; 不会出现溢出问题
>  gt;
>  gt;
>  gt;
> 
> 这个问题非常的困扰,实在不明白,我设置大内存之后,并没有影响到metaspace,而且当用系统默认内存时候竟然不出错,百思不得其解。望能够帮助理解下
>  gt; 上述操作重复试验很多次都是一样结果


Re: flink sql job 并行度

2020-04-29 文章 Benchao Li
Hi lucas,

据我所知,现在还是不能做到的。
这个可能未来需要用hint来做到控制每个operator的并发度。

lucas.wu  于2020年4月29日周三 上午11:16写道:

> Hi all:
> 最近在使用flink sql 做一些计算任务,发现如果一条sql被解析成execute
> plan后可能会有多个job,但是这些job的并行度是一样的,目前来看,好像还不能对这些job进行并行度的调整,请问一下大家,有什么办法可能调整sql解析后的job的并行度呢?



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


?????? flink ????????????-metaspace ????

2020-04-29 文章 ??????????????





----
??:"Michael Ran"https://flink.apache.org/news/2020/04/21/memory-management-improvements-flink-1.10.html
?? https://issues.apache.org/jira/browse/FLINK-16406



?? 2020-04-29 14:09:21??"??" <573693...@qq.com ??

https://gceasy.io/my-gc-report.jsp?p=YXJjaGl2ZWQvMjAyMC8wNC8yOC8tLWdjLmxvZy0tNC0xMC01Ng==channel=WEB


gceasylogloglog
--  --
??: "Xintong Song"

Re:?????? flink ????????????-metaspace ????

2020-04-29 文章 Michael Ran
?? 
https://flink.apache.org/news/2020/04/21/memory-management-improvements-flink-1.10.html
?? https://issues.apache.org/jira/browse/FLINK-16406



?? 2020-04-29 14:09:21??"??" <573693...@qq.com> ??

https://gceasy.io/my-gc-report.jsp?p=YXJjaGl2ZWQvMjAyMC8wNC8yOC8tLWdjLmxvZy0tNC0xMC01Ng===WEB


gceasylogloglog
--  --
??: "Xintong Song";
: 2020??4??29??(??) 12:45
??: "user-zh";
: Re: flink -metaspace 


full GC ?? GC 
log?? full GC 
TM ??
metaspace OOM  GC JVM  
GC??
heap/direct/metaspace ?? GC?? GC 
 OOM??
 TM ?? 64G  java heap 
?? GC ??

Thank you~

Xintong Song



On Wed, Apr 29, 2020 at 12:27 PM ?? <573693...@qq.com> wrote:

>  
> memoryfull
> gc??taskmanager??job??metaspace??
> fullgc??
>
>
>
>
>
> ----
> ??:"Xintong Song" :2020??4??29??(??) 11:00
> ??:"user-zh"
> :Re: flink -metaspace 
>
>
>
> Metaspace OOM ?? JVM TM ?? 1568m ?? 
> 65536m slot
> metaspace 
>  OOM??
> ?? 1.10.0 ?? metaspace  
> 1.10.1 
> taskmanager.memory.metaspace.size  256m 
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Apr 28, 2020 at 7:21 PM  <573693...@qq.com wrote:
>
>  ??:124G??
>  taskmanager.memory.process.size: 65536m
> 
>  es5-connector sink
>  ??fullgc java.lang.OutOfMemoryError: Metaspace
> 
> 
> 
>  
>  taskmanager.memory.process.size: 1568m
>  es5-connector sink
>  
> 
> 
> 
> ??metaspace
>  ??


?????? flink ????????????-metaspace ????

2020-04-29 文章 ??????????????
https://gceasy.io/my-gc-report.jsp?p=YXJjaGl2ZWQvMjAyMC8wNC8yOC8tLWdjLmxvZy0tNC0xMC01Ng==channel=WEB


gceasylogloglog
----
??:"Xintong Song"