[jira] [Commented] (FLINK-25932) Introduce ExecNodeContext.generateUid()

2022-09-09 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17602192#comment-17602192
 ] 

Wei-Che Wei commented on FLINK-25932:
-

Hi [~twalthr], got it. I was not aware of this fix. thanks for pointing that 
out. 

> Introduce ExecNodeContext.generateUid()
> ---
>
> Key: FLINK-25932
> URL: https://issues.apache.org/jira/browse/FLINK-25932
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> FLINK-25387 introduced an {{ExecNodeContext}} which contains all information 
> to generate unique and deterministic identifiers for all created 
> {{Transformation}}.
> This issue includes:
> - Add {{ExecNodeContext.generateUid(operatorName: String): String}}
> - Go through all ExecNodes and give transformations a uid. The name can be 
> constant static field within the ExecNode such that both annotation and 
> method can use it.
> - We give all transformations a uid, including stateless ones.
> - The final UID should look like: 
> {{13_stream-exec-sink_1_upsert-materializer}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25932) Introduce ExecNodeContext.generateUid()

2022-09-09 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17602168#comment-17602168
 ] 

Wei-Che Wei commented on FLINK-25932:
-

Hi [~slinkydeveloper], [~twalthr],

I found that {{ExecNodeContext#generateUid}} relies on id generated by 
{{{}generateUid{}}}, which is a singleton.

I have a use case that it will use multi-thread to submit multiple flink jobs 
and each thread will create its own {{StreamExecutionEnvironment}} and 
{{StreamTableEnvironment}}. I'm wondering if id generator will be deterministic 
as well in such use case. what do you think?

> Introduce ExecNodeContext.generateUid()
> ---
>
> Key: FLINK-25932
> URL: https://issues.apache.org/jira/browse/FLINK-25932
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> FLINK-25387 introduced an {{ExecNodeContext}} which contains all information 
> to generate unique and deterministic identifiers for all created 
> {{Transformation}}.
> This issue includes:
> - Add {{ExecNodeContext.generateUid(operatorName: String): String}}
> - Go through all ExecNodes and give transformations a uid. The name can be 
> constant static field within the ExecNode such that both annotation and 
> method can use it.
> - We give all transformations a uid, including stateless ones.
> - The final UID should look like: 
> {{13_stream-exec-sink_1_upsert-materializer}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25048) 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据

2021-11-24 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448904#comment-17448904
 ] 

Wei-Che Wei edited comment on FLINK-25048 at 11/25/21, 2:41 AM:


Hi [~liangxinli], try this

{code:java}
SingleOutputStreamOperator process = lineDSS.process(new 
ProcessFunction() {
@Override
public void processElement(String value, Context ctx, 
Collector out) throws Exception {
String[] s = value.split(" ");
String word = s[0];
String ts = s[1];
if (word.startsWith("a")) {
out.collect(value);
} else {
ctx.output(outputTag, value);
}
}
});
process.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data, ts) -> 
Long.parseLong(data.split(" ")[1])))
.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");
{code}

the problem is you tried to get the sideoutput right after another 
`SingleOutputStreamOperator` which is used to generate timestamp and watermark.
however, the actual operator that generated sideoutput is the one created by 
`lineDSS.process`.

if you want to generate timestamp and watermark for both streams, you should 
consider adding `assignTimestampsAndWatermarks` before `process`.


was (Author: tonywei):
Hi [~liangxinli], try this

{code:java}
SingleOutputStreamOperator process = lineDSS.process(new 
ProcessFunction() {
@Override
public void processElement(String value, Context ctx, 
Collector out) throws Exception {
String[] s = value.split(" ");
String word = s[0];
String ts = s[1];
if (word.startsWith("a")) {
out.collect(value);
} else {
ctx.output(outputTag, value);
}
}
});
process.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data, ts) -> 
Long.parseLong(data.split(" ")[1])))
.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");
{code}

the problem is you tried to get the sideoutput right after another 
`SingleOutputStreamOperator` which is used to generate timestamp and watermark.
however, the actual operator that generated sideoutput is the one created by 
`lineDSS.process`.

> 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据
> ---
>
> Key: FLINK-25048
> URL: https://issues.apache.org/jira/browse/FLINK-25048
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: xinli liang
>Priority: Major
> Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java
>
>
> // 1. 创建流式执行环境
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 2. 读取文件
> DataStreamSource lineDSS = env.socketTextStream("hadoop102", );
> OutputTag outputTag = new OutputTag("test"){};
> // 3. 转换数据格式
> SingleOutputStreamOperator process = lineDSS.process(new 
> ProcessFunction() {
> @Override
> public void processElement(String value, Context ctx, Collector out) 
> throws Exception {
> String[] s = value.split(" ");
> String word = s[0];
> String ts = s[1];
> if (word.startsWith("a")) {
> out.collect(value);
> } else {
> ctx.output(outputTag, value);
> }
> }
> })
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .forBoundedOutOfOrderness(Duration.ofSeconds(4))
> .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1])))
> ;
> process.print("主流>>>");
> process.getSideOutput(outputTag).print("侧输出流>>>");
> // 4. 执行
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25048) 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据

2021-11-24 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448904#comment-17448904
 ] 

Wei-Che Wei edited comment on FLINK-25048 at 11/25/21, 2:37 AM:


Hi [~liangxinli], try this

{code:java}
SingleOutputStreamOperator process = lineDSS.process(new 
ProcessFunction() {
@Override
public void processElement(String value, Context ctx, 
Collector out) throws Exception {
String[] s = value.split(" ");
String word = s[0];
String ts = s[1];
if (word.startsWith("a")) {
out.collect(value);
} else {
ctx.output(outputTag, value);
}
}
});
process.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data, ts) -> 
Long.parseLong(data.split(" ")[1])))
.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");
{code}

the problem is you tried to get the sideoutput right after another 
`SingleOutputStreamOperator` which is used to generate timestamp and watermark.
however, the actual operator that generated sideoutput is the one created by 
`lineDSS.process`.


was (Author: tonywei):
[~liangxinli] try this

{code:java}
SingleOutputStreamOperator process = lineDSS.process(new 
ProcessFunction() {
@Override
public void processElement(String value, Context ctx, 
Collector out) throws Exception {
String[] s = value.split(" ");
String word = s[0];
String ts = s[1];
if (word.startsWith("a")) {
out.collect(value);
} else {
ctx.output(outputTag, value);
}
}
});
process.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data, ts) -> 
Long.parseLong(data.split(" ")[1])))
.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");
{code}

the problem is you tried to get the sideoutput right after another 
`SingleOutputStreamOperator` which is used to generate timestamp and watermark.
however, the actual operator that generated sideoutput is the one created by 
`process`.

> 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据
> ---
>
> Key: FLINK-25048
> URL: https://issues.apache.org/jira/browse/FLINK-25048
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: xinli liang
>Priority: Major
> Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java
>
>
> // 1. 创建流式执行环境
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 2. 读取文件
> DataStreamSource lineDSS = env.socketTextStream("hadoop102", );
> OutputTag outputTag = new OutputTag("test"){};
> // 3. 转换数据格式
> SingleOutputStreamOperator process = lineDSS.process(new 
> ProcessFunction() {
> @Override
> public void processElement(String value, Context ctx, Collector out) 
> throws Exception {
> String[] s = value.split(" ");
> String word = s[0];
> String ts = s[1];
> if (word.startsWith("a")) {
> out.collect(value);
> } else {
> ctx.output(outputTag, value);
> }
> }
> })
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .forBoundedOutOfOrderness(Duration.ofSeconds(4))
> .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1])))
> ;
> process.print("主流>>>");
> process.getSideOutput(outputTag).print("侧输出流>>>");
> // 4. 执行
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25048) 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据

2021-11-24 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448904#comment-17448904
 ] 

Wei-Che Wei edited comment on FLINK-25048 at 11/25/21, 2:35 AM:


[~liangxinli] try this

{code:java}
SingleOutputStreamOperator process = lineDSS.process(new 
ProcessFunction() {
@Override
public void processElement(String value, Context ctx, 
Collector out) throws Exception {
String[] s = value.split(" ");
String word = s[0];
String ts = s[1];
if (word.startsWith("a")) {
out.collect(value);
} else {
ctx.output(outputTag, value);
}
}
});
process.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data, ts) -> 
Long.parseLong(data.split(" ")[1])))
.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");
{code}

the problem is you tried to get the sideoutput right after another 
`SingleOutputStreamOperator` which is used to generate timestamp and watermark.
however, the actual operator that generated sideoutput is the one created by 
`process`.


was (Author: tonywei):
[~liangxinli] try this

{code:java}
process.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data, ts) -> 
Long.parseLong(data.split(" ")[1])))
.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");
{code}

> 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据
> ---
>
> Key: FLINK-25048
> URL: https://issues.apache.org/jira/browse/FLINK-25048
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: xinli liang
>Priority: Major
> Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java
>
>
> // 1. 创建流式执行环境
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 2. 读取文件
> DataStreamSource lineDSS = env.socketTextStream("hadoop102", );
> OutputTag outputTag = new OutputTag("test"){};
> // 3. 转换数据格式
> SingleOutputStreamOperator process = lineDSS.process(new 
> ProcessFunction() {
> @Override
> public void processElement(String value, Context ctx, Collector out) 
> throws Exception {
> String[] s = value.split(" ");
> String word = s[0];
> String ts = s[1];
> if (word.startsWith("a")) {
> out.collect(value);
> } else {
> ctx.output(outputTag, value);
> }
> }
> })
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .forBoundedOutOfOrderness(Duration.ofSeconds(4))
> .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1])))
> ;
> process.print("主流>>>");
> process.getSideOutput(outputTag).print("侧输出流>>>");
> // 4. 执行
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25048) 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据

2021-11-24 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448904#comment-17448904
 ] 

Wei-Che Wei edited comment on FLINK-25048 at 11/25/21, 2:32 AM:


[~liangxinli] try this

{code:java}
process.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data, ts) -> 
Long.parseLong(data.split(" ")[1])))
.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");
{code}


was (Author: tonywei):
[~liangxinli] try this

> 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据
> ---
>
> Key: FLINK-25048
> URL: https://issues.apache.org/jira/browse/FLINK-25048
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: xinli liang
>Priority: Major
> Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java
>
>
> // 1. 创建流式执行环境
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 2. 读取文件
> DataStreamSource lineDSS = env.socketTextStream("hadoop102", );
> OutputTag outputTag = new OutputTag("test"){};
> // 3. 转换数据格式
> SingleOutputStreamOperator process = lineDSS.process(new 
> ProcessFunction() {
> @Override
> public void processElement(String value, Context ctx, Collector out) 
> throws Exception {
> String[] s = value.split(" ");
> String word = s[0];
> String ts = s[1];
> if (word.startsWith("a")) {
> out.collect(value);
> } else {
> ctx.output(outputTag, value);
> }
> }
> })
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .forBoundedOutOfOrderness(Duration.ofSeconds(4))
> .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1])))
> ;
> process.print("主流>>>");
> process.getSideOutput(outputTag).print("侧输出流>>>");
> // 4. 执行
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25048) 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据

2021-11-24 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17448904#comment-17448904
 ] 

Wei-Che Wei commented on FLINK-25048:
-

[~liangxinli] try this

> 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据
> ---
>
> Key: FLINK-25048
> URL: https://issues.apache.org/jira/browse/FLINK-25048
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: xinli liang
>Priority: Major
> Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java
>
>
> // 1. 创建流式执行环境
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 2. 读取文件
> DataStreamSource lineDSS = env.socketTextStream("hadoop102", );
> OutputTag outputTag = new OutputTag("test"){};
> // 3. 转换数据格式
> SingleOutputStreamOperator process = lineDSS.process(new 
> ProcessFunction() {
> @Override
> public void processElement(String value, Context ctx, Collector out) 
> throws Exception {
> String[] s = value.split(" ");
> String word = s[0];
> String ts = s[1];
> if (word.startsWith("a")) {
> out.collect(value);
> } else {
> ctx.output(outputTag, value);
> }
> }
> })
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .forBoundedOutOfOrderness(Duration.ofSeconds(4))
> .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1])))
> ;
> process.print("主流>>>");
> process.getSideOutput(outputTag).print("侧输出流>>>");
> // 4. 执行
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-22970) The documentation for `TO_TIMESTAMP` UDF has an incorrect description

2021-06-17 Thread Wei-Che Wei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-22970:

Affects Version/s: 1.14.0

> The documentation for `TO_TIMESTAMP` UDF has an incorrect description
> -
>
> Key: FLINK-22970
> URL: https://issues.apache.org/jira/browse/FLINK-22970
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
>  Labels: pull-request-available
>
> According to this ML discussion 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html]
> The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 
> timezone instead of session timezone. We should fix this documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22970) The documentation for `TO_TIMESTAMP` UDF has an incorrect description

2021-06-16 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364645#comment-17364645
 ] 

Wei-Che Wei commented on FLINK-22970:
-

Hi [~Leonard Xu]
Could you assign this jira to me and help to merge this PR if it looks good to 
you? Thank you.

> The documentation for `TO_TIMESTAMP` UDF has an incorrect description
> -
>
> Key: FLINK-22970
> URL: https://issues.apache.org/jira/browse/FLINK-22970
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Reporter: Wei-Che Wei
>Priority: Minor
>  Labels: pull-request-available
>
> According to this ML discussion 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html]
> The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 
> timezone instead of session timezone. We should fix this documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23012) Add v1.13 docs link in "Pick Docs Version" for master branch

2021-06-16 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364644#comment-17364644
 ] 

Wei-Che Wei edited comment on FLINK-23012 at 6/17/21, 2:46 AM:
---

Hi [~jark]
I have created this PR [1] that also addressed this issue. would you mind to 
review it? thank you.

[1] https://github.com/apache/flink/pull/16162


was (Author: tonywei):
Hi [~jark]

> Add v1.13 docs link in "Pick Docs Version" for master branch
> 
>
> Key: FLINK-23012
> URL: https://issues.apache.org/jira/browse/FLINK-23012
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Jark Wu
>Priority: Major
> Attachments: image-2021-06-17-10-26-29-125.png
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/
>  !image-2021-06-17-10-26-29-125.png|thumbnail! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23012) Add v1.13 docs link in "Pick Docs Version" for master branch

2021-06-16 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364644#comment-17364644
 ] 

Wei-Che Wei commented on FLINK-23012:
-

Hi [~jark]

> Add v1.13 docs link in "Pick Docs Version" for master branch
> 
>
> Key: FLINK-23012
> URL: https://issues.apache.org/jira/browse/FLINK-23012
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.14.0
>Reporter: Jark Wu
>Priority: Major
> Attachments: image-2021-06-17-10-26-29-125.png
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/
>  !image-2021-06-17-10-26-29-125.png|thumbnail! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22970) The documentation for `TO_TIMESTAMP` UDF has an incorrect description

2021-06-10 Thread Wei-Che Wei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-22970:

Description: 
According to this ML discussion 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html]

The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 
timezone instead of session timezone. We should fix this documentation.

  was:
According to this ML discussion 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html]

The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 
timezone instead of session timezone.


> The documentation for `TO_TIMESTAMP` UDF has an incorrect description
> -
>
> Key: FLINK-22970
> URL: https://issues.apache.org/jira/browse/FLINK-22970
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Wei-Che Wei
>Priority: Minor
>
> According to this ML discussion 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html]
> The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 
> timezone instead of session timezone. We should fix this documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22970) The documentation for `TO_TIMESTAMP` UDF has an incorrect description

2021-06-10 Thread Wei-Che Wei (Jira)
Wei-Che Wei created FLINK-22970:
---

 Summary: The documentation for `TO_TIMESTAMP` UDF has an incorrect 
description
 Key: FLINK-22970
 URL: https://issues.apache.org/jira/browse/FLINK-22970
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Wei-Che Wei


According to this ML discussion 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/confused-about-TO-TIMESTAMP-document-description-td44352.html]

The description for `TO_TIMESTAMP` udf is not correct. It will use UTC+0 
timezone instead of session timezone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22796) Update mem_setup_tm documentation

2021-05-28 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17353014#comment-17353014
 ] 

Wei-Che Wei commented on FLINK-22796:
-

Hi [~jark]
could you assign this issue to me? thanks.

> Update mem_setup_tm documentation
> -
>
> Key: FLINK-22796
> URL: https://issues.apache.org/jira/browse/FLINK-22796
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Wei-Che Wei
>Priority: Minor
>
> In [FLINK-20860], there are two config options introduced.
> we should update the corresponding docs as well.
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#managed-memory
> https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/deployment/memory/mem_setup_tm/#%e6%b6%88%e8%b4%b9%e8%80%85%e6%9d%83%e9%87%8d



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22796) Update mem_setup_tm documentation

2021-05-28 Thread Wei-Che Wei (Jira)
Wei-Che Wei created FLINK-22796:
---

 Summary: Update mem_setup_tm documentation
 Key: FLINK-22796
 URL: https://issues.apache.org/jira/browse/FLINK-22796
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Wei-Che Wei


In [FLINK-20860], there are two config options introduced.
we should update the corresponding docs as well.

https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#managed-memory
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/deployment/memory/mem_setup_tm/#%e6%b6%88%e8%b4%b9%e8%80%85%e6%9d%83%e9%87%8d



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22476) Extend the description of the config option `execution.target`

2021-04-27 Thread Wei-Che Wei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-22476:

Affects Version/s: 1.13.0

> Extend the description of the config option `execution.target`
> --
>
> Key: FLINK-22476
> URL: https://issues.apache.org/jira/browse/FLINK-22476
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.13.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
>  Labels: pull-request-available
>
> the current description for `execution.target` only lists `remote`, `local`, 
> `yarn-per-job`, `yarn-session` and `kubernetes-session`. [1]
> we should add `kubernetes-application` and `yarn-application` as well.
>  
>  [1] 
> [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22476) Extend the description of the config option `execution.target`

2021-04-26 Thread Wei-Che Wei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-22476:

Description: 
the current description for `execution.target` only lists `remote`, `local`, 
`yarn-per-job`, `yarn-session` and `kubernetes-session`. [1]

we should add `kubernetes-application` and `yarn-application` as well.

 

 [1] 
[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46]

  was:
the current description for `execution.target` only lists `remote`, `local`, 
`yarn-per-job`, `yarn-session` and `kubernetes-session`.

we should add `kubernetes-application` and `yarn-application` as well.

 

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46


> Extend the description of the config option `execution.target`
> --
>
> Key: FLINK-22476
> URL: https://issues.apache.org/jira/browse/FLINK-22476
> Project: Flink
>  Issue Type: Improvement
>Reporter: Wei-Che Wei
>Priority: Minor
>
> the current description for `execution.target` only lists `remote`, `local`, 
> `yarn-per-job`, `yarn-session` and `kubernetes-session`. [1]
> we should add `kubernetes-application` and `yarn-application` as well.
>  
>  [1] 
> [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22476) Extend the description of the config option `execution.target`

2021-04-26 Thread Wei-Che Wei (Jira)
Wei-Che Wei created FLINK-22476:
---

 Summary: Extend the description of the config option 
`execution.target`
 Key: FLINK-22476
 URL: https://issues.apache.org/jira/browse/FLINK-22476
 Project: Flink
  Issue Type: Improvement
Reporter: Wei-Che Wei


the current description for `execution.target` only lists `remote`, `local`, 
`yarn-per-job`, `yarn-session` and `kubernetes-session`.

we should add `kubernetes-application` and `yarn-application` as well.

 

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19358) when submit job on application mode with HA,the jobid will be 0000000000

2021-04-20 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325611#comment-17325611
 ] 

Wei-Che Wei commented on FLINK-19358:
-

Hi [~trohrmann], [~kkl0u]

I think this issue might also affect history server. [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/historyserver.html#available-requests

> when submit job on application mode with HA,the jobid will be 00
> 
>
> Key: FLINK-19358
> URL: https://issues.apache.org/jira/browse/FLINK-19358
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: Jun Zhang
>Priority: Major
>  Labels: usability
> Fix For: 1.13.0
>
>
> when submit a flink job on application mode with HA ,the flink job id will be 
> , when I have many jobs ,they have the same 
> job id , it will be lead to a checkpoint error



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-08 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947309#comment-16947309
 ] 

Wei-Che Wei edited comment on FLINK-14302 at 10/9/19 1:52 AM:
--

Hi [~becket_qin] 

I think the error was from `txnManager.getTransactionState(transactionalId)` 
[1].

You are right. Maybe we can report this issue back to kafka community as well. 
For now, could we conclude the temporary solution that we should prevent from 
committing empty transaction? I have opened a pull request already. Could you 
help me review it and give me advice? I had a problem about how to add an 
integration test on it. Thanks.

[1] 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L239]


was (Author: tonywei):
Hi [~becket_qin] 

I think the error was from `txnManager.getTransactionState(transactionalId)`.

You are right. Maybe we can report this issue back to kafka community as well. 
For now, could we conclude the temporary solution that we should prevent from 
committing empty transaction? I have opened a pull request already. Could you 
help me review it and give me advice? I had a problem about how to add an 
integration test on it. Thanks.

[1] 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L239]

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-08 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947309#comment-16947309
 ] 

Wei-Che Wei commented on FLINK-14302:
-

Hi [~becket_qin] 

I think the error was from `txnManager.getTransactionState(transactionalId)`.

You are right. Maybe we can report this issue back to kafka community as well. 
For now, could we conclude the temporary solution that we should prevent from 
committing empty transaction? I have opened a pull request already. Could you 
help me review it and give me advice? I had a problem about how to add an 
integration test on it. Thanks.

[1] 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L239]

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-07 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16941640#comment-16941640
 ] 

Wei-Che Wei edited comment on FLINK-14302 at 10/7/19 6:59 AM:
--

Sorry for that I didn't notice that the contribution flow has changed. And I 
already sent a PR for this issue, but couldn't assign it to myself.

Hi [~becket_qin] 
 If you have free time, please help me to verify if the issue in mailing list 
thread I mentioned is a bug, and should it be fix like I suggested?
 If this is a bug, and the solution is good to you, please assign this Jira 
issue to me. Thanks.


was (Author: tonywei):
Sorry for that I didn't notice that the contribution flow has changed. And I 
already sent a PR for this issue, but couldn't assign it to myself.

Hi [~jqin],
If you have free time, please help me to verify if the issue in mailing list 
thread I mentioned is a bug, and should it be fix like I suggested?
If this is a bug, and the solution is good to you, please assign this Jira 
issue to me. Thanks.

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-06 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16945515#comment-16945515
 ] 

Wei-Che Wei commented on FLINK-14302:
-

Hi [~becket_qin] 

The root cause of NOT_COORDINATOR is not from empty  partition list.

It is due to that broker who host the original transaction coordinator was 
restarted. The error should be propagate to client to refetch new coordinator, 
but the error will be bind with each element of partition list. Since this list 
was empty, the error got lost in response message. And that lead to the 
following commit failed. 

For more information, you can refer to my replys in the mailing thread. Thanks. 

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-01 Thread Wei-Che Wei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16941640#comment-16941640
 ] 

Wei-Che Wei commented on FLINK-14302:
-

Sorry for that I didn't notice that the contribution flow has changed. And I 
already sent a PR for this issue, but couldn't assign it to myself.

Hi [~jqin],
If you have free time, please help me to verify if the issue in mailing list 
thread I mentioned is a bug, and should it be fix like I suggested?
If this is a bug, and the solution is good to you, please assign this Jira 
issue to me. Thanks.

> FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if 
> `newPartitionsInTransaction` is empty when enable EoS
> ---
>
> Key: FLINK-14302
> URL: https://issues.apache.org/jira/browse/FLINK-14302
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Wei-Che Wei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As the survey in this mailing list thread [1], kafka server will bind the 
> error with topic-partition list when it handles `AddPartitionToTxnRequest`. 
> So when the request body contains no topic-partition, the error won't be sent 
> back to kafka producer client. Moreover, it producer internal api, it always 
> check if `newPartitionsInTransaction` is empty before sending 
> ADD_PARTITIONS_TO_TXN request to kafka cluster. We should apply it as well if 
> you need to explicitly call it in the first commit phase of two-phase commit 
> sink.
> [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14302) FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

2019-10-01 Thread Wei-Che Wei (Jira)
Wei-Che Wei created FLINK-14302:
---

 Summary: FlinkKafkaInternalProducer should not send 
`ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when 
enable EoS
 Key: FLINK-14302
 URL: https://issues.apache.org/jira/browse/FLINK-14302
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Wei-Che Wei


As the survey in this mailing list thread [1], kafka server will bind the error 
with topic-partition list when it handles `AddPartitionToTxnRequest`. So when 
the request body contains no topic-partition, the error won't be sent back to 
kafka producer client. Moreover, it producer internal api, it always check if 
`newPartitionsInTransaction` is empty before sending ADD_PARTITIONS_TO_TXN 
request to kafka cluster. We should apply it as well if you need to explicitly 
call it in the first commit phase of two-phase commit sink.

[1] 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-producer-failed-with-InvalidTxnStateException-when-performing-commit-transaction-td29384.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-11251) Incompatible metric name on prometheus reporter

2019-01-02 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-11251:
---

 Summary: Incompatible metric name on prometheus reporter
 Key: FLINK-11251
 URL: https://issues.apache.org/jira/browse/FLINK-11251
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.7.0, 1.6.3, 1.5.6
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei


{code}
# HELP 
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets
 currentOffsets (scope: 
taskmanager_job_task_operator_KafkaConsumer_topic_partition_4)
# TYPE 
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets
 gauge
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets{task_attempt_id="5137e35cf7319787f6cd627621fd2ea7",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="4",topic="rt_lookback_state",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="d7b1ad914351f9ee527267f51160",operator_id="d7b1ad914351f9ee527267f51160",operator_name="Source:_kafka_lookback_state_source",task_name="Source:_kafka_lookback_state_source",job_name="FlinkRuleMatchPipeline",subtask_index="7",}
 1.456090927E9
# HELP 
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets
 committedOffsets (scope: 
taskmanager_job_task_operator_KafkaConsumer_topic_partition_24)
# TYPE 
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets
 gauge
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets{task_attempt_id="9b666af68ec4734b25937b8b94cc5c84",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="24",topic="rt_event",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="61252f73469d3ffba207c548d29a0267",operator_id="61252f73469d3ffba207c548d29a0267",operator_name="Source:_kafka_source",task_name="Source:_kafka_sourcesamplingparse_and_filter",job_name="FlinkRuleMatchPipeline",subtask_index="27",}
 3.001186523E9
{code}

This is a snippet from my flink prometheus reporter. It showed that kafka 
current offsets and committed offsets metric names changed after I migrated my 
flink job from 1.6.0 to 1.6.3.

The origin metrics name should not contain {{partition index}} in metric name, 
i.e. the metric name should be 
{{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_currentOffsets}}
 and 
{{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_committedOffsets}}.

After digging into the source code, I found that the incompatibility started 
from this [PR|https://github.com/apache/flink/pull/7095], because it overloaded 
a new {{getLogicalScope(CharacterFilter, char, int)}} and didn't override in 
{{GenericValueMetricGroup}} class.
When the tail metric group from a metric is {{GenericValueMetricGroup}} and 
this new {{getLogicalScope}} is called, i.e. calling 
{{FrontMetricGroup#getLogicalScope}}, the value group name will not be ignored, 
but it should be in previous released version.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11034) Provide "rewriting config” to file system factory

2018-11-29 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-11034:
---

 Summary: Provide "rewriting config” to file system factory 
 Key: FLINK-11034
 URL: https://issues.apache.org/jira/browse/FLINK-11034
 Project: Flink
  Issue Type: Improvement
Reporter: Wei-Che Wei


In the discussion in this mailing thread 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/how-to-override-s3-key-config-in-flink-job-td24606.html],
 it showed that it is not able to overwrite config in file system factory when 
submit a flink job.

That means we will share the same config for multiple jobs in a session 
cluster. Or user can't use different configuration for checkpointing and file 
sink. For example, user might have different s3 buckets for checkpointing and 
file sink, but each of the s3 bucket might have different s3 access key for 
some management concerns.

We might need to provide a way to overwrite configuration when calling file 
system factory "get" method, and let those user facing components, like 
checkpointing or file sink, will be able to get overwriting config from user 
and create a file system with those changes in the new config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4574) Strengthen fetch interval implementation in Kinesis consumer

2018-10-15 Thread Wei-Che Wei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4574:
--

Assignee: (was: Wei-Che Wei)

> Strengthen fetch interval implementation in Kinesis consumer
> 
>
> Key: FLINK-4574
> URL: https://issues.apache.org/jira/browse/FLINK-4574
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
>
> As pointed out by [~rmetzger], right now the fetch interval implementation in 
> the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer 
> interval times than specified by the user, ex. say the specified fetch 
> interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and 
> {{y}} to complete processing the fetched records for emitting, than the 
> actual interval between each fetch is actually {{f+x+y}}.
> The main problem with this is that we can never guarantee how much time has 
> past since the last {{getRecords}} call, thus can not guarantee that returned 
> shard iterators will not have expired the next time we use them, even if we 
> limit the user-given value for {{f}} to not be longer than the iterator 
> expire time.
> I propose to improve this by, per {{ShardConsumer}}, use a 
> {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, 
> and a separate blocking queue that collects the fetched records for emitting.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2018-10-15 Thread Wei-Che Wei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4816:
--

Assignee: (was: Wei-Che Wei)

> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Priority: Major
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9486) Introduce TimerState in keyed state backend

2018-09-11 Thread Wei-Che Wei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610615#comment-16610615
 ] 

Wei-Che Wei commented on FLINK-9486:


It's clear to me now. Thanks!

> Introduce TimerState in keyed state backend
> ---
>
> Key: FLINK-9486
> URL: https://issues.apache.org/jira/browse/FLINK-9486
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> This is the first implementation subtask.
> Goal of this PR is to introduce a timer state that is registered with the 
> keyed state backend, similar to other forms of keyed state.
> For the {{HeapKeyedStateBackend}}, this state lives on the same level as the 
> {{StateTable}} that hold other forms of keyed state, and the implementation 
> is basically backed by {{InternalTimerHeap}}.
> For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this 
> state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an 
> intermediate state, and we will later also implement the alternative to store 
> the timers inside a column families in RocksDB. However, by taking this step, 
> we could also still offer the option to have RocksDB state with heap-based 
> timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9486) Introduce TimerState in keyed state backend

2018-09-11 Thread Wei-Che Wei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610597#comment-16610597
 ] 

Wei-Che Wei commented on FLINK-9486:


Hi [~srichter],
If I didn't misunderstand. The timer has been re-implemented for both rocksdb 
and heap state backend, so that timer from earlier Flink version will be 
migrated to each of them. And I need to choose one of them before migration 
because I can't switch between them. Am I right?
i.e. I can migrate timer from Flink 1.4 to rocksdb timer in Flink 1.6 or to 
heap timer in Flink 1.6, but I can switch timer from rocksdb timer to heap 
timer in Flink 1.6, vice versa.

> Introduce TimerState in keyed state backend
> ---
>
> Key: FLINK-9486
> URL: https://issues.apache.org/jira/browse/FLINK-9486
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> This is the first implementation subtask.
> Goal of this PR is to introduce a timer state that is registered with the 
> keyed state backend, similar to other forms of keyed state.
> For the {{HeapKeyedStateBackend}}, this state lives on the same level as the 
> {{StateTable}} that hold other forms of keyed state, and the implementation 
> is basically backed by {{InternalTimerHeap}}.
> For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this 
> state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an 
> intermediate state, and we will later also implement the alternative to store 
> the timers inside a column families in RocksDB. However, by taking this step, 
> we could also still offer the option to have RocksDB state with heap-based 
> timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9486) Introduce TimerState in keyed state backend

2018-09-11 Thread Wei-Che Wei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610365#comment-16610365
 ] 

Wei-Che Wei commented on FLINK-9486:


Hi [~srichter]
Can time service be migrated from heap-based to rocksdb-based? Or should I 
need to re-submit in order to change to rocksdb time service? Thank you.

> Introduce TimerState in keyed state backend
> ---
>
> Key: FLINK-9486
> URL: https://issues.apache.org/jira/browse/FLINK-9486
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> This is the first implementation subtask.
> Goal of this PR is to introduce a timer state that is registered with the 
> keyed state backend, similar to other forms of keyed state.
> For the {{HeapKeyedStateBackend}}, this state lives on the same level as the 
> {{StateTable}} that hold other forms of keyed state, and the implementation 
> is basically backed by {{InternalTimerHeap}}.
> For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this 
> state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an 
> intermediate state, and we will later also implement the alternative to store 
> the timers inside a column families in RocksDB. However, by taking this step, 
> we could also still offer the option to have RocksDB state with heap-based 
> timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4714) Set task state to RUNNING after state has been restored

2018-03-19 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404461#comment-16404461
 ] 

Wei-Che Wei commented on FLINK-4714:


Hi [~till.rohrmann]
Since FLINK-5982 has been merged, I would like to go back to this work.
As we discussed before, I prepared to move those initialization part before 
{{isRunning = true;}} to the {{StreamTask}}'s constructor.

However, I found that it didn't work as I thought.
First, I can't move {{StreamTask#initializeState()}} to the constructor, 
because it would fail {{InterruptSensitiveRestoreTest}}.
I think the reason is due to that the constructor was blocked and had not been 
created, so that {{TaskCanceler}} couldn't stop it by calling 
{{StreamTask#cancel()}}.
Second, {{StreamTask#init()}} can't be moved to constructor as well, because 
{{spotbugs-maven-plugin}} will make compilation failed due to 
{{UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR}}.

I have no idea how I could make all of them work in the {{StreamTask}}'s 
constructor, so I will prefer to turn into the original discussion: adding a 
{{initialization}} or {{open}} method.
What do you think? Please advise me if I made any mistake. Thank you.

This is my [repo|https://github.com/tony810430/flink/tree/FLINK-4714-WIP].

> Set task state to RUNNING after state has been restored
> ---
>
> Key: FLINK-4714
> URL: https://issues.apache.org/jira/browse/FLINK-4714
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Wei-Che Wei
>Priority: Major
>
> The task state is set to {{RUNNING}} as soon as the {{Task}} is executed. 
> That, however, happens before the state of the {{StreamTask}} invokable has 
> been restored. As a result, the {{CheckpointCoordinator}} starts to trigger 
> checkpoints even though the {{StreamTask}} is not ready.
> In order to avoid aborting checkpoints and properly start it, we should 
> switch the task state to {{RUNNING}} after the state has been restored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4817) Checkpoint Coordinator should be called to restore state with a specific checkpoint ID

2018-03-13 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4817:
--

Assignee: (was: Wei-Che Wei)

> Checkpoint Coordinator should be called to restore state with a specific 
> checkpoint ID
> --
>
> Key: FLINK-4817
> URL: https://issues.apache.org/jira/browse/FLINK-4817
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
>
> Rather than being called to restore the "latest" checkpoint, the Checkpoint 
> Coordinator should be called to restore a specific checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2018-01-09 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei closed FLINK-5982.
--
   Resolution: Done
Fix Version/s: 1.5.0

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
> Fix For: 1.5.0
>
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2018-01-03 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16310618#comment-16310618
 ] 

Wei-Che Wei commented on FLINK-7935:


Hi [~elevy]
What you described is almost correct. The FLINK-7692 provides users to expose 
their own variables to {{MetricGroup}}, but how to map the metric name and 
metric's variables to the third party metric system is the reporter's 
responsibility.
You can use {{MetricGroup#getAllVariables()}} to get {{type:messageType}} and 
other system scope variables. These can map to tags in DataDog reporter.
{{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} can get 
{{.messages.type}} back, so use this function to expose 
metric name, which will be {{.messages.type.counts}}. For 
example, Prometheus reporter use it to expose metric name. 
[[1|https://github.com/apache/flink/blob/beb11976fe63c20a5dc9f22ea713c05b4d5e9585/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java#L217]]
However, {{MetricGroup#getMetricIdentifier(String)}} will still return 
{{.messages.type.}}. It seems that DataDog 
reporter used this function to get metric name. 
[[2|https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java#L63]]
I think that is the limitation in DataDog reporter, maybe we can make 
{{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} as a public API, and 
update DataDog reporter.

cc [~Zentol]
Do you have any suggestions and comments? If I make any mistake on my comment, 
please correct me. Thank you.

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2018-01-02 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309048#comment-16309048
 ] 

Wei-Che Wei commented on FLINK-7935:


Hi [~elevy]
Since FLINK-7692 was solved, user can use 
{{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} and 
{{MetricGroup#getAllVariables()}} to get the same name but with different tags 
on user-defined variables. Did it solve what you need in this issue or is there 
any improvement that we can make on this issue? Thank you.

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-4817) Checkpoint Coordinator should be called to restore state with a specific checkpoint ID

2017-12-28 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4817:
--

Assignee: Wei-Che Wei

> Checkpoint Coordinator should be called to restore state with a specific 
> checkpoint ID
> --
>
> Key: FLINK-4817
> URL: https://issues.apache.org/jira/browse/FLINK-4817
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Wei-Che Wei
>
> Rather than being called to restore the "latest" checkpoint, the Checkpoint 
> Coordinator should be called to restore a specific checkpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8326) CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't use the correct parameter to trigger test function

2017-12-28 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-8326:
--

 Summary: 
CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't 
use the correct parameter to trigger test function
 Key: FLINK-8326
 URL: https://issues.apache.org/jira/browse/FLINK-8326
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei
Priority: Minor


{{CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut()}} 
didn't use the correct parameter, so that it only test scale in.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8324) Expose another offsets metrics by using new metric API to specify user defined variables

2017-12-27 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-8324:
---
Priority: Trivial  (was: Major)

> Expose another offsets metrics by using new metric API to specify user 
> defined variables
> 
>
> Key: FLINK-8324
> URL: https://issues.apache.org/jira/browse/FLINK-8324
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Trivial
> Fix For: 1.5.0
>
>
> The {{current-offsets}} and {{committed-offsets}} metrics are now attached 
> with topic name and partition id in the metric identity.
> It is not convenient to use these metrics in Prometheus, because user usually 
> uses the same metric group name to group by those metrics which have the same 
> meaning and uses tags to get the individual metric.
> For example, I will prefer to access {{current-offsets}} metric group and use 
> {{partition-x}} tag to get the offset of partition x, instead of getting 
> metric directly from {{current-offsets-partition-x}} metric.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8324) Expose another offsets metrics by using new metric API to specify user defined variables

2017-12-27 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-8324:
---
Affects Version/s: (was: 1.5.0)

> Expose another offsets metrics by using new metric API to specify user 
> defined variables
> 
>
> Key: FLINK-8324
> URL: https://issues.apache.org/jira/browse/FLINK-8324
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
> Fix For: 1.5.0
>
>
> The {{current-offsets}} and {{committed-offsets}} metrics are now attached 
> with topic name and partition id in the metric identity.
> It is not convenient to use these metrics in Prometheus, because user usually 
> uses the same metric group name to group by those metrics which have the same 
> meaning and uses tags to get the individual metric.
> For example, I will prefer to access {{current-offsets}} metric group and use 
> {{partition-x}} tag to get the offset of partition x, instead of getting 
> metric directly from {{current-offsets-partition-x}} metric.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8324) Expose another offsets metrics by using new metric API to specify user defined variables

2017-12-27 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-8324:
---
Fix Version/s: 1.5.0

> Expose another offsets metrics by using new metric API to specify user 
> defined variables
> 
>
> Key: FLINK-8324
> URL: https://issues.apache.org/jira/browse/FLINK-8324
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.5.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
> Fix For: 1.5.0
>
>
> The {{current-offsets}} and {{committed-offsets}} metrics are now attached 
> with topic name and partition id in the metric identity.
> It is not convenient to use these metrics in Prometheus, because user usually 
> uses the same metric group name to group by those metrics which have the same 
> meaning and uses tags to get the individual metric.
> For example, I will prefer to access {{current-offsets}} metric group and use 
> {{partition-x}} tag to get the offset of partition x, instead of getting 
> metric directly from {{current-offsets-partition-x}} metric.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8324) Expose another offsets metrics by using new metric API to specify user defined variables

2017-12-27 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16304988#comment-16304988
 ] 

Wei-Che Wei commented on FLINK-8324:


Hi [~tzulitai]
Looking forward to hearing your comment. Thank you.

> Expose another offsets metrics by using new metric API to specify user 
> defined variables
> 
>
> Key: FLINK-8324
> URL: https://issues.apache.org/jira/browse/FLINK-8324
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.5.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> The {{current-offsets}} and {{committed-offsets}} metrics are now attached 
> with topic name and partition id in the metric identity.
> It is not convenient to use these metrics in Prometheus, because user usually 
> uses the same metric group name to group by those metrics which have the same 
> meaning and uses tags to get the individual metric.
> For example, I will prefer to access {{current-offsets}} metric group and use 
> {{partition-x}} tag to get the offset of partition x, instead of getting 
> metric directly from {{current-offsets-partition-x}} metric.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8324) Expose another offsets metrics by using new metric API to specify user defined variables

2017-12-27 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-8324:
--

 Summary: Expose another offsets metrics by using new metric API to 
specify user defined variables
 Key: FLINK-8324
 URL: https://issues.apache.org/jira/browse/FLINK-8324
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.5.0
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei


The {{current-offsets}} and {{committed-offsets}} metrics are now attached with 
topic name and partition id in the metric identity.
It is not convenient to use these metrics in Prometheus, because user usually 
uses the same metric group name to group by those metrics which have the same 
meaning and uses tags to get the individual metric.
For example, I will prefer to access {{current-offsets}} metric group and use 
{{partition-x}} tag to get the offset of partition x, instead of getting metric 
directly from {{current-offsets-partition-x}} metric.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7692) Support user-defined variables in Metrics

2017-11-28 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269921#comment-16269921
 ] 

Wei-Che Wei commented on FLINK-7692:


[~Zentol]
This idea is much better. I would like to keep working on it based on your 
scaffold.
One more thing needs to be confirmed: I need to distinguish when to add 
{{GenericKeyMetricGroup}} or {{GenericMetricGroup}} if a group {{name}} doesn't 
exist, since it will always create {{GenericKeyMetricGroup}} in your scaffold. 
Am I right?

> Support user-defined variables in Metrics
> -
>
> Key: FLINK-7692
> URL: https://issues.apache.org/jira/browse/FLINK-7692
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.5.0
>
>
> Reporters that identify metrics with a set of key-value pairs are currently 
> limited to the variables defined by Flink, like the taskmanager ID, with 
> users not being able to supply their own.
> This is inconsistent with reporters that use metric identifiers that freely 
> include user-defined groups constructted via {{MetricGroup#addGroup(String 
> name)}}.
> I propose adding a new method {{MetricGroup#addGroup(String key, String 
> name)}} that adds a new key-value pair to the {{variables}} map in it's 
> constructor. When constructing the metric identifier the key should be 
> included as well, resulting in the same result as when constructing the 
> metric groups tree via {{group.addGroup(key).addGroup(value)}}.
> For this a new {{KeyedGenericMetricGroup}} should be created that resembles 
> the unkeyed version, with slight modifications to the constructor and 
> {{getScopeComponents}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7692) Support user-defined variables in Metrics

2017-11-06 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235098#comment-16235098
 ] 

Wei-Che Wei edited comment on FLINK-7692 at 11/7/17 5:36 AM:
-

Hi [~Zentol]

I proposed a draft for this issue and want to get some feedback from you.

A new {{KeyedGenericMetricGroup}} to support a new method 
{{MetricGroup#addGroup(String name, String value)}}.
{code}
class KeyedGenericMetricGroup extends AbstractMetricGroup {
  /** append name, value after parent.getScopeComponents()*/
  public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name, String value)

  /** append (name -> value) to variables map */
  override public getAllVariables()
}
{code}

The Comparison between {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}}
| |group.addGroup(name, value)|group.addGroup(name).addGroup(value)|
|depth from parent| 1 | 2 |
|getMetricIdentifier()|\{parentIdentifier\}.name.value|\{parentIdentifier\}.name.value|
|getLogicalScope()|\{parentScope\}.name|\{parentScope\}.name.value|
|getAllVariables()|\{parentVariables\} ++ (name -> value)|\{parentVariables\}|

This can benefit the reporter such as Prometheus to use logical scope to 
aggregate the same type of metrics and distinguish each by variables.

There are some problems I met during designing this draft.
- Since {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}} have the same id of the returning 
metric group, there is only one metric group that will be registered. However, 
because they are not in the same layer, it is not easy to check if the other is 
exist. You should check it from your parent's or child's metric group. Is it 
acceptable to reject to register metric group from {{group.addGroup(name)}} 
when {{group.addGroup(name, value)}} has been called, vice versa?
- If the above is acceptable, what is the return value when we meet the 
conflict, because we don't want to throw RuntimeException on {{Metrics}} API 
and make the user program fail, right? I preferred to return 
{{group.addGroup(name, null)}} when called {{group.addGroup(name)}} after 
{{group.addGroup(name, some_value)}}; on the other hand, return 
{{group.addGroup(name).addGroup(value)}} and log the warning message.

What do you think? Are there other better approaches we can compare their pros 
and cons? Thank you.


was (Author: tonywei):
Hi [~Zentol]

I proposed a draft for this issue and want to get some feedback from you.

A new {{KeyedGenericMetricGroup}} to support a new method 
{{MetricGroup#addGroup(String name, String value)}}.
{code}
class KeyedGenericMetricGroup extends AbstractMetricGroup {
  /** append name, value after parent.getScopeComponents()*/
  public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name, String value)

  /** append (name -> value) to variables map */
  override public getAllVariables()
}
{code}

The Comparison between {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}}
| |group.addGroup(name, value)|group.addGroup(name).addGroup(value)|
|depth from parent| 1 | 2 |
|getMetricIdentifier()|\{parentIdentifier\}.name.value|\{parentIdentifier\}.name.value|
|getLogicalScope()|\{parentScope\}.name|\{parentScope\}.name.value|
|getAllVariables()|\{parentVariables\} ++ (name -> value)|\{parentVariables\}|

This can benefit the reporter such as Prometheus to use logical scope to 
aggregate the same type of metrics and distinguish each by variables.

There are some problems I met during designing this draft.
- Since {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}} have the same id of the returning 
metric group, there is only one metric group that will be registered. However, 
because they are not in the same layer, it is not easy to check if the other is 
exist. You should check it from your parent's or child's metric group. Is it 
acceptable to reject to register metric group from {{group.addGroup(name)}} 
when {{group.addGroup(name, value)}} has been called, vice versa?
- If the above is acceptable, what is the return value when we meet the 
conflict, because we don't want to throw RuntimeException on {{Metrics}} API 
and make the user program fail, right? I preferred to return 
{{group.addGroup(name, null)}} when called {{group.addGroup(name)}} after 
{{group.addGroup(name, some_value)}}; on the other hand, return 
{{group.addGroup(name).addGroup(value) and log the warning message.

What do you think? Are there other better approaches we can compare their pros 
and cons? Thank you.

> Support user-defined variables in Metrics
> -
>
> Key: FLINK-7692
> URL: https://issues.apache.org/jira/browse/FLINK-7692
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
> 

[jira] [Commented] (FLINK-7692) Support user-defined variables in Metrics

2017-11-01 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235098#comment-16235098
 ] 

Wei-Che Wei commented on FLINK-7692:


Hi [~Zentol]

I proposed a draft for this issue and want to get some feedback from you.

A new {{KeyedGenericMetricGroup}} to support a new method 
{{MetricGroup#addGroup(String name, String value)}}.
{code}
class KeyedGenericMetricGroup extends AbstractMetricGroup {
  /** append name, value after parent.getScopeComponents()*/
  public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name, String value)

  /** append (name -> value) to variables map */
  override public getAllVariables()
}
{code}

The Comparison between {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}}
| |group.addGroup(name, value)|group.addGroup(name).addGroup(value)|
|depth from parent| 1 | 2 |
|getMetricIdentifier()|\{parentIdentifier\}.name.value|\{parentIdentifier\}.name.value|
|getLogicalScope()|\{parentScope\}.name|\{parentScope\}.name.value|
|getAllVariables()|\{parentVariables\} ++ (name -> value)|\{parentVariables\}|

This can benefit the reporter such as Prometheus to use logical scope to 
aggregate the same type of metrics and distinguish each by variables.

There are some problems I met during designing this draft.
- Since {{group.addGroup(name, value)}} and 
{{group.addGroup(name).addGroup(value)}} have the same id of the returning 
metric group, there is only one metric group that will be registered. However, 
because they are not in the same layer, it is not easy to check if the other is 
exist. You should check it from your parent's or child's metric group. Is it 
acceptable to reject to register metric group from {{group.addGroup(name)}} 
when {{group.addGroup(name, value)}} has been called, vice versa?
- If the above is acceptable, what is the return value when we meet the 
conflict, because we don't want to throw RuntimeException on {{Metrics}} API 
and make the user program fail, right? I preferred to return 
{{group.addGroup(name, null)}} when called {{group.addGroup(name)}} after 
{{group.addGroup(name, some_value)}}; on the other hand, return 
{{group.addGroup(name).addGroup(value) and log the warning message.

What do you think? Are there other better approaches we can compare their pros 
and cons? Thank you.

> Support user-defined variables in Metrics
> -
>
> Key: FLINK-7692
> URL: https://issues.apache.org/jira/browse/FLINK-7692
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.4.0
>
>
> Reporters that identify metrics with a set of key-value pairs are currently 
> limited to the variables defined by Flink, like the taskmanager ID, with 
> users not being able to supply their own.
> This is inconsistent with reporters that use metric identifiers that freely 
> include user-defined groups constructted via {{MetricGroup#addGroup(String 
> name)}}.
> I propose adding a new method {{MetricGroup#addGroup(String key, String 
> name)}} that adds a new key-value pair to the {{variables}} map in it's 
> constructor. When constructing the metric identifier the key should be 
> included as well, resulting in the same result as when constructing the 
> metric groups tree via {{group.addGroup(key).addGroup(value)}}.
> For this a new {{KeyedGenericMetricGroup}} should be created that resembles 
> the unkeyed version, with slight modifications to the constructor and 
> {{getScopeComponents}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2017-10-26 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221566#comment-16221566
 ] 

Wei-Che Wei commented on FLINK-7935:


Hi [~elevy]
Is this issue same as FLINK-7692

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2017-10-16 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4816:
--

Assignee: Wei-Che Wei

> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Wei-Che Wei
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7692) Support user-defined variables

2017-09-26 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-7692:
--

Assignee: Wei-Che Wei

> Support user-defined variables
> --
>
> Key: FLINK-7692
> URL: https://issues.apache.org/jira/browse/FLINK-7692
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.4.0
>
>
> Reporters that identify metrics with a set of key-value pairs are currently 
> limited to the variables defined by Flink, like the taskmanager ID, with 
> users not being able to supply their own.
> This is inconsistent with reporters that use metric identifiers that freely 
> include user-defined groups constructted via {{MetricGroup#addGroup(String 
> name)}}.
> I propose adding a new method {{MetricGroup#addGroup(String key, String 
> name)}} that adds a new key-value pair to the {{variables}} map in it's 
> constructor. When constructing the metric identifier the key should be 
> included as well, resulting in the same result as when constructing the 
> metric groups tree via {{group.addGroup(key).addGroup(value)}}.
> For this a new {{KeyedGenericMetricGroup}} should be created that resembles 
> the unkeyed version, with slight modifications to the constructor and 
> {{getScopeComponents}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7675) LatestCompletedCheckpointExternalPathGauge should check if external path is exist

2017-09-23 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-7675:
---
Description: 
For internal checkpoint, {{CompletedCheckpointStats.getExternalPath()}} could 
be {{null}}.
This will leads to {{LatestCompletedCheckpointExternalPathGauge}} to return 
{{null}} value to {{MetricDumpSerialization}}, then it will throw 
{{NullPointerException}} in {{serializeGauge}} function.

  was:
For internal checkpoint, {{CompletedCheckpointStats.getExternalPath()}} could 
be {{null}}.
This will leads to {{LatestCompletedCheckpointExternalPathGauge}} to return 
{{null}} value to {{MetricDumpSerialization}}, then it will throw 
{{NullPointerException}} when serialize it.


> LatestCompletedCheckpointExternalPathGauge should check if external path is 
> exist
> -
>
> Key: FLINK-7675
> URL: https://issues.apache.org/jira/browse/FLINK-7675
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> For internal checkpoint, {{CompletedCheckpointStats.getExternalPath()}} could 
> be {{null}}.
> This will leads to {{LatestCompletedCheckpointExternalPathGauge}} to return 
> {{null}} value to {{MetricDumpSerialization}}, then it will throw 
> {{NullPointerException}} in {{serializeGauge}} function.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7675) LatestCompletedCheckpointExternalPathGauge should check if external path is exist

2017-09-23 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-7675:
---
Description: 
For internal checkpoint, {{CompletedCheckpointStats.getExternalPath()}} could 
be {{null}}.
This will leads to {{LatestCompletedCheckpointExternalPathGauge}} to return 
{{null}} value to {{MetricDumpSerialization}}, then it will throw 
{{NullPointerException}} when serialize it.

  was:
For internal checkpoint, {{CompletedCheckpointStats.getExternalPath()}} could 
be {{null}}.
This will leads to {{LatestCompletedCheckpointExternalPathGauge}} to return 
{{null}} value to {{MetricDumpSerialization}}, which will throw 
{{NullPointerException}} when serialize {{Gauge}}.


> LatestCompletedCheckpointExternalPathGauge should check if external path is 
> exist
> -
>
> Key: FLINK-7675
> URL: https://issues.apache.org/jira/browse/FLINK-7675
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> For internal checkpoint, {{CompletedCheckpointStats.getExternalPath()}} could 
> be {{null}}.
> This will leads to {{LatestCompletedCheckpointExternalPathGauge}} to return 
> {{null}} value to {{MetricDumpSerialization}}, then it will throw 
> {{NullPointerException}} when serialize it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7675) LatestCompletedCheckpointExternalPathGauge should check if external path is exist

2017-09-23 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-7675:
--

Assignee: Wei-Che Wei

> LatestCompletedCheckpointExternalPathGauge should check if external path is 
> exist
> -
>
> Key: FLINK-7675
> URL: https://issues.apache.org/jira/browse/FLINK-7675
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> For internal checkpoint, {{CompletedCheckpointStats.getExternalPath()}} could 
> be {{null}}.
> This will leads to {{LatestCompletedCheckpointExternalPathGauge}} to return 
> {{null}} value to {{MetricDumpSerialization}}, which will throw 
> {{NullPointerException}} when serialize {{Gauge}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7675) LatestCompletedCheckpointExternalPathGauge should check if external path is exist

2017-09-23 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-7675:
--

 Summary: LatestCompletedCheckpointExternalPathGauge should check 
if external path is exist
 Key: FLINK-7675
 URL: https://issues.apache.org/jira/browse/FLINK-7675
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.4.0, 1.3.3
Reporter: Wei-Che Wei


For internal checkpoint, {{CompletedCheckpointStats.getExternalPath()}} could 
be {{null}}.
This will leads to {{LatestCompletedCheckpointExternalPathGauge}} to return 
{{null}} value to {{MetricDumpSerialization}}, which will throw 
{{NullPointerException}} when serialize {{Gauge}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-17 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-7630:
---
 Priority: Minor  (was: Major)
Fix Version/s: 1.3.3
   1.4.0

> Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
> 
>
> Key: FLINK-7630
> URL: https://issues.apache.org/jira/browse/FLINK-7630
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
> From this discussion, it seems that the current functionality of 
> {{ParameterTool.fromPropertiesFile}} is not enough.
> It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide 
> more kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7557) Fix typo for s3a config in AWS deployment documentation

2017-09-17 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei closed FLINK-7557.
--
Resolution: Not A Bug

> Fix typo for s3a config in AWS deployment documentation
> ---
>
> Key: FLINK-7557
> URL: https://issues.apache.org/jira/browse/FLINK-7557
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> The property name {{fs.s3.buffer.dir}} for s3a in {{core-site.xml}} should be 
> {{fs.s3a.buffer.dir}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-15 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167955#comment-16167955
 ] 

Wei-Che Wei commented on FLINK-7630:


Hi [~aljoscha]
FYI. If there is any feedback, please let me know. Thank you.

> Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
> 
>
> Key: FLINK-7630
> URL: https://issues.apache.org/jira/browse/FLINK-7630
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
> From this discussion, it seems that the current functionality of 
> {{ParameterTool.fromPropertiesFile}} is not enough.
> It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide 
> more kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-15 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-7630:
--

 Summary: Allow passing a File or an InputStream to 
ParameterTool.fromPropertiesFile()
 Key: FLINK-7630
 URL: https://issues.apache.org/jira/browse/FLINK-7630
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei


http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
>From this discussion, it seems that the current functionality of 
>{{ParameterTool.fromPropertiesFile}} is not enough.
It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide more 
kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7557) Fix typo for s3a config in AWS deployment documentation

2017-08-29 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-7557:
--

 Summary: Fix typo for s3a config in AWS deployment documentation
 Key: FLINK-7557
 URL: https://issues.apache.org/jira/browse/FLINK-7557
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei


The property name {{fs.s3.buffer.dir}} for s3a in {{core-site.xml}} should be 
{{fs.s3a.buffer.dir}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6653) Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

2017-05-24 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-6653:
--

Assignee: Wei-Che Wei

> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> --
>
> Key: FLINK-6653
> URL: https://issues.apache.org/jira/browse/FLINK-6653
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
>
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's 
> {{Shard}} instances in checkpoints. This makes bumping AWS library versions 
> hard, since any change to the {{Shard}} class by AWS will break checkpoint 
> compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or 
> disintegrate the information in {{Shard}}. Ideally, it would be best to make 
> {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence 
> avoiding Java serialization in the checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6653) Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

2017-05-24 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022800#comment-16022800
 ] 

Wei-Che Wei edited comment on FLINK-6653 at 5/24/17 12:32 PM:
--

Hi [~tzulitai],

I would like to take over this issue and here is my proposal.

Description:
# Consumer will read two possible states: {{KinesisStreamShardV2}} or 
{KinesisStreamShard}} and merge to {{KinesisStreamShardV2}}
# Convert {{KinesisStreamShardV2}} to {{KinesisStreamShardHandle}} for the 
internal class to interact with AWS library.
# Convert {{KinesisStreamShardHandle}} to {{KinesisStreamShardV2}} and write 
the new states.

Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information 
in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will 
not be able to be serialized. It is used in {{KinesisFetcher}}
 and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} 
state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and 
{{KinesisStreamShardHandle}}
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}

Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy 
state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.


was (Author: tonywei):
Hi [~tzulitai],

I would like to take over this issue and here is my proposal.

Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information 
in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will 
not be able to be serialized. It is used in {{KinesisFetcher}}
 and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} 
state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and 
{{KinesisStreamShardHandle}}
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}

Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy 
state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.

> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> --
>
> Key: FLINK-6653
> URL: https://issues.apache.org/jira/browse/FLINK-6653
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's 
> {{Shard}} instances in checkpoints. This makes bumping AWS library versions 
> hard, since any change to the {{Shard}} class by AWS will break checkpoint 
> compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or 
> disintegrate the information in {{Shard}}. Ideally, it would be best to make 
> {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence 
> avoiding Java serialization in the checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6653) Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

2017-05-24 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022800#comment-16022800
 ] 

Wei-Che Wei edited comment on FLINK-6653 at 5/24/17 12:33 PM:
--

Hi [~tzulitai],

I would like to take over this issue and here is my proposal.

Description:
# Consumer will read two possible states: {{KinesisStreamShardV2}} or 
{{KinesisStreamShard}} and merge to {{KinesisStreamShardV2}}
# Convert {{KinesisStreamShardV2}} to {{KinesisStreamShardHandle}} for the 
internal class to interact with AWS library.
# Convert {{KinesisStreamShardHandle}} to {{KinesisStreamShardV2}} and write 
the new states.

Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information 
in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will 
not be able to be serialized. It is used in {{KinesisFetcher}}
 and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} 
state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and 
{{KinesisStreamShardHandle}}
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}

Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy 
state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.


was (Author: tonywei):
Hi [~tzulitai],

I would like to take over this issue and here is my proposal.

Description:
# Consumer will read two possible states: {{KinesisStreamShardV2}} or 
{KinesisStreamShard}} and merge to {{KinesisStreamShardV2}}
# Convert {{KinesisStreamShardV2}} to {{KinesisStreamShardHandle}} for the 
internal class to interact with AWS library.
# Convert {{KinesisStreamShardHandle}} to {{KinesisStreamShardV2}} and write 
the new states.

Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information 
in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will 
not be able to be serialized. It is used in {{KinesisFetcher}}
 and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} 
state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and 
{{KinesisStreamShardHandle}}
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}

Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy 
state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.

> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> --
>
> Key: FLINK-6653
> URL: https://issues.apache.org/jira/browse/FLINK-6653
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's 
> {{Shard}} instances in checkpoints. This makes bumping AWS library versions 
> hard, since any change to the {{Shard}} class by AWS will break checkpoint 
> compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or 
> disintegrate the information in {{Shard}}. Ideally, it would be best to make 
> {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence 
> avoiding Java serialization in the checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6653) Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

2017-05-24 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022800#comment-16022800
 ] 

Wei-Che Wei commented on FLINK-6653:


Hi [~tzulitai],

I would like to take over this issue and here is my proposal.

Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information 
in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will 
not be able to be serialized. It is used in {{KinesisFetcher}}
 and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} 
state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and 
{{KinesisStreamShardHandle}}
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}

Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy 
state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.

> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> --
>
> Key: FLINK-6653
> URL: https://issues.apache.org/jira/browse/FLINK-6653
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's 
> {{Shard}} instances in checkpoints. This makes bumping AWS library versions 
> hard, since any change to the {{Shard}} class by AWS will break checkpoint 
> compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or 
> disintegrate the information in {{Shard}}. Ideally, it would be best to make 
> {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence 
> avoiding Java serialization in the checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6653) Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

2017-05-24 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16022751#comment-16022751
 ] 

Wei-Che Wei commented on FLINK-6653:


Hi~ [~tzulitai]

I would like to take over this issue and here is my proposal.

Proposed Changes:
# Introduces two models :
## `KinesisStreamShardV2`

> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> --
>
> Key: FLINK-6653
> URL: https://issues.apache.org/jira/browse/FLINK-6653
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's 
> {{Shard}} instances in checkpoints. This makes bumping AWS library versions 
> hard, since any change to the {{Shard}} class by AWS will break checkpoint 
> compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or 
> disintegrate the information in {{Shard}}. Ideally, it would be best to make 
> {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence 
> avoiding Java serialization in the checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (FLINK-6653) Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

2017-05-24 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-6653:
---
Comment: was deleted

(was: Hi~ [~tzulitai]

I would like to take over this issue and here is my proposal.

Proposed Changes:
# Introduces two models :
## `KinesisStreamShardV2`)

> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> --
>
> Key: FLINK-6653
> URL: https://issues.apache.org/jira/browse/FLINK-6653
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's 
> {{Shard}} instances in checkpoints. This makes bumping AWS library versions 
> hard, since any change to the {{Shard}} class by AWS will break checkpoint 
> compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or 
> disintegrate the information in {{Shard}}. Ideally, it would be best to make 
> {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence 
> avoiding Java serialization in the checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6328) Savepoints must not be counted as retained checkpoints

2017-05-18 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016776#comment-16016776
 ] 

Wei-Che Wei commented on FLINK-6328:


Hi all,

Are this and FLINK-6071 the similar issues?

> Savepoints must not be counted as retained checkpoints
> --
>
> Key: FLINK-6328
> URL: https://issues.apache.org/jira/browse/FLINK-6328
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Priority: Blocker
> Fix For: 1.3.0, 1.2.2
>
>
> The Checkpoint Store retains the *n* latest checkpoints.
> Savepoints are counted as well, meaning that for settings with 1 retained 
> checkpoint, there are sometimes no retained checkpoints at all, only a 
> savepoint.
> That is dangerous, because savepoints must be assumed to disappear at any 
> point in time - their lifecycle is out of control of the 
> CheckpointCoordinator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5624) Support tumbling window on streaming tables in the SQL API

2017-03-29 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948326#comment-15948326
 ] 

Wei-Che Wei commented on FLINK-5624:


Sorry for putting the wrong JIRA task id. > <

> Support tumbling window on streaming tables in the SQL API
> --
>
> Key: FLINK-5624
> URL: https://issues.apache.org/jira/browse/FLINK-5624
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> This is a follow up of FLINK-4691.
> FLINK-4691 adds supports for group-windows for streaming tables. This jira 
> proposes to expose the functionality in the SQL layer via the {{GROUP BY}} 
> clauses, as described in 
> http://calcite.apache.org/docs/stream.html#tumbling-windows.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5625) Let Date format for timestamp-based start position in Kinesis consumer be configurable.

2017-03-29 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-5625:
--

Assignee: Wei-Che Wei

> Let Date format for timestamp-based start position in Kinesis consumer be 
> configurable.
> ---
>
> Key: FLINK-5625
> URL: https://issues.apache.org/jira/browse/FLINK-5625
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
>
> Currently, the Kinesis consumer's Date format for timestamp-based start 
> positions is fixed. It'll be nice to make this format configurable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6211) Validation error in Kinesis Consumer when using AT_TIMESTAMP as start position

2017-03-29 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-6211:
--

Assignee: Wei-Che Wei

> Validation error in Kinesis Consumer when using AT_TIMESTAMP as start position
> --
>
> Key: FLINK-6211
> URL: https://issues.apache.org/jira/browse/FLINK-6211
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> {code}
> private static void validateOptionalDateProperty(Properties config, String 
> key, String message) {
> if (config.containsKey(key)) {
> try {
> initTimestampDateFormat.parse(config.getProperty(key)); —
> double value = Double.parseDouble(config.getProperty(key)); —
> if (value < 0) { throw new NumberFormatException(); }
> } catch (ParseException | NumberFormatException e){
> throw new IllegalArgumentException(message); }
> }
> }
> }
> {code}
> This validation function will always fail regardless of either string format 
> or double type.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2017-03-29 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946622#comment-15946622
 ] 

Wei-Che Wei commented on FLINK-4523:


[~tsriharsha]

I have opened a task (FLINK-6211) to solve it. Thank you again for finding this 
problem.

> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.3.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6211) Validation error in Kinesis Consumer when using AT_TIMESTAMP as start position

2017-03-29 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-6211:
---
Fix Version/s: 1.3.0

> Validation error in Kinesis Consumer when using AT_TIMESTAMP as start position
> --
>
> Key: FLINK-6211
> URL: https://issues.apache.org/jira/browse/FLINK-6211
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Wei-Che Wei
> Fix For: 1.3.0
>
>
> {code}
> private static void validateOptionalDateProperty(Properties config, String 
> key, String message) {
> if (config.containsKey(key)) {
> try {
> initTimestampDateFormat.parse(config.getProperty(key)); —
> double value = Double.parseDouble(config.getProperty(key)); —
> if (value < 0) { throw new NumberFormatException(); }
> } catch (ParseException | NumberFormatException e){
> throw new IllegalArgumentException(message); }
> }
> }
> }
> {code}
> This validation function will always fail regardless of either string format 
> or double type.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6211) Validation error in Kinesis Consumer when using AT_TIMESTAMP as start position

2017-03-29 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-6211:
---
Component/s: Kinesis Connector

> Validation error in Kinesis Consumer when using AT_TIMESTAMP as start position
> --
>
> Key: FLINK-6211
> URL: https://issues.apache.org/jira/browse/FLINK-6211
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Wei-Che Wei
> Fix For: 1.3.0
>
>
> {code}
> private static void validateOptionalDateProperty(Properties config, String 
> key, String message) {
> if (config.containsKey(key)) {
> try {
> initTimestampDateFormat.parse(config.getProperty(key)); —
> double value = Double.parseDouble(config.getProperty(key)); —
> if (value < 0) { throw new NumberFormatException(); }
> } catch (ParseException | NumberFormatException e){
> throw new IllegalArgumentException(message); }
> }
> }
> }
> {code}
> This validation function will always fail regardless of either string format 
> or double type.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6211) Validation error in Kinesis Consumer when using AT_TIMESTAMP as start position

2017-03-29 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-6211:
--

 Summary: Validation error in Kinesis Consumer when using 
AT_TIMESTAMP as start position
 Key: FLINK-6211
 URL: https://issues.apache.org/jira/browse/FLINK-6211
 Project: Flink
  Issue Type: Bug
Reporter: Wei-Che Wei


{code}
private static void validateOptionalDateProperty(Properties config, String key, 
String message) {
if (config.containsKey(key)) {
try {
initTimestampDateFormat.parse(config.getProperty(key)); —
double value = Double.parseDouble(config.getProperty(key)); —
if (value < 0) { throw new NumberFormatException(); }
} catch (ParseException | NumberFormatException e){
throw new IllegalArgumentException(message); }
}
}
}
{code}
This validation function will always fail regardless of either string format or 
double type.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-27 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943360#comment-15943360
 ] 

Wei-Che Wei commented on FLINK-5982:


Hi [~till.rohrmann],

I prefer not to pass only {{Environment}} to {{BatchTask}}.
Because this task tries to merge the {{AbstractInvokable}} and 
{{StatefulTask}}, that will make more complexity to distinguish {{BatchTask}} 
or {{StreamTask}} when calling {{loadAndInstantiateInvokable()}} in {{Task}}.
Make subclasses all have the same constructor can simplify the behavior of 
{{AbstractInvokable}} in {{Task}}.

In this way, I will prefer the suggestion from [~StephanEwen]. It temporarily 
forces {{BatchTask}} won't get a non-null state before the {{BatchTask}} 
actually handles recovery state.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-27 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942972#comment-15942972
 ] 

Wei-Che Wei commented on FLINK-5982:


Hi [~till.rohrmann]

Ok, I see. Thanks for your suggestion.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-23 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939521#comment-15939521
 ] 

Wei-Che Wei commented on FLINK-5982:


Hi [~till.rohrmann]

Thanks for your replying. For the first question, I sill feel a little confused 
and I would like to getting your suggestion. Thank you.

# It's a good choice to check passed environment in {{AbstractInvokable}} and 
pass a {{DummyEnvironment}} instead of {{null}}. I am not sure if it is good to 
state instances as well, since sometimes the state instances will be {{null}} 
in {{Task}}. If checking state instances are not {{null}}, it might need to add 
one more constructor or pass an empty state object in {{Task}}. What do you 
think?
# The exception is {{IllegalStateException("Found operator state for a 
non-stateful task invokable")}}. Now, I only put it in {{BatchTask}} not it the 
testing classes. I think it would not change their behavior. I will check it 
once again before I finish this task.
# What I want to do is leave it empty and let those classes who need to support 
the state methods to override them, but I think your way is better. I will 
implement like that and let the state methods in {{AbstractInvokable}} be 
abstract methods.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-11 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906135#comment-15906135
 ] 

Wei-Che Wei commented on FLINK-5982:


I see. I will just focus on refactoring {{setEnvironment(env)}}, 
{{setInitialState(state)}} and the new constructor. Thanks for your friendly 
reminder.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-08 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901331#comment-15901331
 ] 

Wei-Che Wei commented on FLINK-5982:


[~StephanEwen], [~till.rohrmann]
Have updated the description. Is there any suggestion? If not, I will start to 
work on it based on the approach described above.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been stateful.)
> # Change the creation of the invokable to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-08 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-5982:
---
Description: 
Currently, running a invokable in {{Task}} needs to call 
{{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
{{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
difficulty in doing the eager initialization on invokable during {{DEPLOYING}} 
state. One solution discussed in FLINK-4714 is to separate {{invoke()}} into 
{{open()}} and {{invoke()}}, but that makes the complexity for running it in 
{{Task}}.

This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to make 
it easier to construct and run an invokable.

# Refactor abstract class to have one default constructor.
#* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} are 
stateful.
#* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
{{AbstractInvokable}} have a two argument constructor with {{Environment}} and 
{{TaskStateHandles}}.
# Update all subclass
#* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
and call the constructor in {{AbstractInvokable}}.
#* Throw an error in {{BatchTask}} if the initial state is not null. (This will 
be removed after {{BatchTask}} have been stateful.)
# Change the creation of the invokable to call that constructor, update all the 
tests.

Then, we can simplify the logic to run an invokable by using constructor and 
{{invoke()}}. The eager initialization can easily be placed in the constructor 
to fulfill the requirement such as FLINK-4714.

  was:
Currently, running a invokable in {{Task}} needs to call 
{{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
{{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
difficulty in doing the eager initialization on invokable during {{DEPLOYING}} 
state. One solution discussed in FLINK-4714 is to separate {{invoke()}} into 
{{open()}} and {{invoke()}}, but that makes the complexity for running it in 
{{Task}}.

This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to make 
it easier to construct and run an invokable.

# Refactor abstract class to have one default constructor.
#* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have one 
argument constructor with {{Environment}}.
#* Remove {{setInitialState}} and create a new abstract class 
{{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
{{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
constructor with {{Environment}} and {{TaskStateHandles}}.
# Update all subclass
#* Make all subclass of only {{AbstractInvokable}} have an one argument 
constructor and call the constructor in {{AbstractInvokable}}.
#* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
{{AbstractStatefulInvokable}} and have two two argument constructor.
# Change the creation of the invokables to call that constructor, update all 
the tests.

Then, we can simplify the logic to run an invokable by using constructor and 
{{invoke()}}. The eager initialization can easily be placed in the constructor 
to fulfill the requirement such as FLINK-4714.


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} 
> are stateful.
> #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make 
> {{AbstractInvokable}} have a two argument constructor with {{Environment}} 
> and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of {{AbstractInvokable}} have a two argument constructor 
> and call the constructor in {{AbstractInvokable}}.
> #* Throw an error in {{BatchTask}} if the initial state is not null. (This 
> will be removed after {{BatchTask}} have been 

[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-08 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901309#comment-15901309
 ] 

Wei-Che Wei commented on FLINK-5982:


If handling recovery state on {{BatchTask}} is needed in the future, it makes 
sense for me to assume all tasks in Flink are stateful. I will update the 
description. Thank you.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have 
> one argument constructor with {{Environment}}.
> #* Remove {{setInitialState}} and create a new abstract class 
> {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
> {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
> constructor with {{Environment}} and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of only {{AbstractInvokable}} have an one argument 
> constructor and call the constructor in {{AbstractInvokable}}.
> #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
> {{AbstractStatefulInvokable}} and have two two argument constructor.
> # Change the creation of the invokables to call that constructor, update all 
> the tests.
> Then, we can simplify the logic to run an invokable by using constructor and 
> {{invoke()}}. The eager initialization can easily be placed in the 
> constructor to fulfill the requirement such as FLINK-4714.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-07 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-5982:
---
Description: 
Currently, running a invokable in {{Task}} needs to call 
{{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
{{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
difficulty in doing the eager initialization on invokable during {{DEPLOYING}} 
state. One solution discussed in FLINK-4714 is to separate {{invoke()}} into 
{{open()}} and {{invoke()}}, but that makes the complexity for running it in 
{{Task}}.

This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to make 
it easier to construct and run an invokable.

# Refactor abstract class to have one default constructor.
#* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have one 
argument constructor with {{Environment}}.
#* Remove {{setInitialState}} and create a new abstract class 
{{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
{{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
constructor with {{Environment}} and {{TaskStateHandles}}.
# Update all subclass
#* Make all subclass of only {{AbstractInvokable}} have an one argument 
constructor and call the constructor in {{AbstractInvokable}}.
#* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
{{AbstractStatefulInvokable}} and have two two argument constructor.
# Change the creation of the invokables to call that constructor, update all 
the tests.

Then, we can simplify the logic to run an invokable by using constructor and 
{{invoke()}}. The eager initialization can easily be placed in the constructor 
to fulfill the requirement such as FLINK-4714.

  was:
Currently, running a invokable in {{Task}} needs to call 
{{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
{{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
difficulty in doing the eager initialization on invokable during {{DEPLOYING}} 
state. One solution discussed in FLINK-4714 is to separate {{invoke()}} into 
{{open()}} and {{invoke()}}, but that makes the complexity for running it in 
{{Task}}.

This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to make 
it easier to construct and run an invokable.

# Refactor abstract class to have one default constructor.
#* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have one 
argument constructor with {{Environment}}.
#* Remove {{setInitialState}} and create a new abstract class 
{{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
{{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
constructor with {{Environment}} and {{TaskStateHandles}}.
# Update all subclass
#* Make all subclass of only {{AbstractInvokable}} have an one argument 
constructor and call the constructor in {{AbstractInvokable}}.
#* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
{{AbstractStatefulInvokable}} and have two two argument constructor.
# Change the creation of the invokables to call that constructor, update all 
the tests


> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have 
> one argument constructor with {{Environment}}.
> #* Remove {{setInitialState}} and create a new abstract class 
> {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
> {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
> constructor with {{Environment}} and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of only {{AbstractInvokable}} have an one argument 
> constructor and call the constructor in {{AbstractInvokable}}.
> #* Make all subclass of 

[jira] [Comment Edited] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-07 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899368#comment-15899368
 ] 

Wei-Che Wei edited comment on FLINK-5982 at 3/7/17 12:37 PM:
-

FYI [~StephanEwen], [~till.rohrmann]

Please let me know if you have any suggestions. Thank you.


was (Author: tonywei):
FYI [~StephanEwen], [~till.rohrmann]

Please let me know if you have any suggestions.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have 
> one argument constructor with {{Environment}}.
> #* Remove {{setInitialState}} and create a new abstract class 
> {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
> {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
> constructor with {{Environment}} and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of only {{AbstractInvokable}} have an one argument 
> constructor and call the constructor in {{AbstractInvokable}}.
> #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
> {{AbstractStatefulInvokable}} and have two two argument constructor.
> # Change the creation of the invokables to call that constructor, update all 
> the tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-07 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899368#comment-15899368
 ] 

Wei-Che Wei commented on FLINK-5982:


FYI [~StephanEwen], [~till.rohrmann]

Please let me know if you have any suggestions.

> Refactor AbstractInvokable and StatefulTask
> ---
>
> Key: FLINK-5982
> URL: https://issues.apache.org/jira/browse/FLINK-5982
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> Currently, running a invokable in {{Task}} needs to call 
> {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
> {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
> difficulty in doing the eager initialization on invokable during 
> {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate 
> {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity 
> for running it in {{Task}}.
> This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to 
> make it easier to construct and run an invokable.
> # Refactor abstract class to have one default constructor.
> #* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have 
> one argument constructor with {{Environment}}.
> #* Remove {{setInitialState}} and create a new abstract class 
> {{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
> {{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
> constructor with {{Environment}} and {{TaskStateHandles}}.
> # Update all subclass
> #* Make all subclass of only {{AbstractInvokable}} have an one argument 
> constructor and call the constructor in {{AbstractInvokable}}.
> #* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
> {{AbstractStatefulInvokable}} and have two two argument constructor.
> # Change the creation of the invokables to call that constructor, update all 
> the tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5982) Refactor AbstractInvokable and StatefulTask

2017-03-07 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-5982:
--

 Summary: Refactor AbstractInvokable and StatefulTask
 Key: FLINK-5982
 URL: https://issues.apache.org/jira/browse/FLINK-5982
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination, State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei


Currently, running a invokable in {{Task}} needs to call 
{{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a 
{{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the 
difficulty in doing the eager initialization on invokable during {{DEPLOYING}} 
state. One solution discussed in FLINK-4714 is to separate {{invoke()}} into 
{{open()}} and {{invoke()}}, but that makes the complexity for running it in 
{{Task}}.

This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to make 
it easier to construct and run an invokable.

# Refactor abstract class to have one default constructor.
#* Remove {{setEnvironment(env)}} and change {{AbstractInvokable}} to have one 
argument constructor with {{Environment}}.
#* Remove {{setInitialState}} and create a new abstract class 
{{AbstractStatefulInvokable}} to inherit {{AbstractInvokable}} and implement 
{{StatefulTask}}. Make {{AbstractStatefulInvokable}} have a two argument 
constructor with {{Environment}} and {{TaskStateHandles}}.
# Update all subclass
#* Make all subclass of only {{AbstractInvokable}} have an one argument 
constructor and call the constructor in {{AbstractInvokable}}.
#* Make all subclass of {{AbstractInvokable}} and {{StatefulTask}} to inherit 
{{AbstractStatefulInvokable}} and have two two argument constructor.
# Change the creation of the invokables to call that constructor, update all 
the tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-4817) Checkpoint Coordinator should be called to restore state with a specific checkpoint ID

2017-03-05 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4817:
--

Assignee: (was: Wei-Che Wei)

> Checkpoint Coordinator should be called to restore state with a specific 
> checkpoint ID
> --
>
> Key: FLINK-4817
> URL: https://issues.apache.org/jira/browse/FLINK-4817
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>
> Rather than being called to restore the "latest" checkpoint, the Checkpoint 
> Coordinator should be called to restore a specific checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2017-03-05 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896187#comment-15896187
 ] 

Wei-Che Wei commented on FLINK-4816:


Hi [~ram_krish]

If you have been working on it, just keep going. I will go to find another 
task. Thank you.

> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2017-03-04 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895720#comment-15895720
 ] 

Wei-Che Wei commented on FLINK-4816:


Hi [~ram_krish]

How about make the return value in {{restoreLatestCheckpointedState()}} be 
checkpoint id and set it in {{ExecutionGraph}} ?
Then, as we call {{fail()}} in {{Execution}}, we could use 
{{this.vertex.getExecutionGraph()}} to access {{ExecutionGraph}} and check if 
the checkpoint id is null to determine which exception should be thrown. What 
do you think?

BTW, if you have no time, I would really like to take care of this task. Thank 
you.

> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-4817) Checkpoint Coordinator should be called to restore state with a specific checkpoint ID

2017-03-04 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4817:
--

Assignee: Wei-Che Wei

> Checkpoint Coordinator should be called to restore state with a specific 
> checkpoint ID
> --
>
> Key: FLINK-4817
> URL: https://issues.apache.org/jira/browse/FLINK-4817
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Wei-Che Wei
>
> Rather than being called to restore the "latest" checkpoint, the Checkpoint 
> Coordinator should be called to restore a specific checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2017-03-04 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4816:
--

Assignee: (was: Wei-Che Wei)

> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information

2017-03-04 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4816:
--

Assignee: Wei-Che Wei

> Executions failed from "DEPLOYING" should retain restored checkpoint 
> information
> 
>
> Key: FLINK-4816
> URL: https://issues.apache.org/jira/browse/FLINK-4816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Wei-Che Wei
>
> When an execution fails from state {{DEPLOYING}}, it should wrap the failure 
> to better report the failure cause:
>   - If no checkpoint was restored, it should wrap the exception in a 
> {{DeployTaskException}}
>   - If a checkpoint was restored, it should wrap the exception in a 
> {{RestoreTaskException}} and record the id of the checkpoint that was 
> attempted to be restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4714) Set task state to RUNNING after state has been restored

2017-03-03 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893937#comment-15893937
 ] 

Wei-Che Wei commented on FLINK-4714:


Hi [~till.rohrmann]

Thanks for your feedback. I will try to implement the second approach and leave 
others as origin for the future task if there is any requirement.

> Set task state to RUNNING after state has been restored
> ---
>
> Key: FLINK-4714
> URL: https://issues.apache.org/jira/browse/FLINK-4714
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Wei-Che Wei
>
> The task state is set to {{RUNNING}} as soon as the {{Task}} is executed. 
> That, however, happens before the state of the {{StreamTask}} invokable has 
> been restored. As a result, the {{CheckpointCoordinator}} starts to trigger 
> checkpoints even though the {{StreamTask}} is not ready.
> In order to avoid aborting checkpoints and properly start it, we should 
> switch the task state to {{RUNNING}} after the state has been restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (FLINK-4714) Set task state to RUNNING after state has been restored

2017-03-01 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei updated FLINK-4714:
---
Comment: was deleted

(was: Hi [~uce]

I saw you implement {{ZooKeeperCompletedCheckpointStore}} in FLINK-2354 and I 
found that recover() method in that will remove all other checkpoint instead of 
the latest one.
That means recover() method should be ignored or refined in order to support 
this feature, am I right?
I have no idea if there is any side-effect after I change that implementation. 
As I know, it should be okey to retain all these complete checkpoints, so I am 
confused about the comment you wrote for recovery() in 
{{ZooKeeperCompletedCheckpointStore}}. Could you please explain that for me? 
Looking forward to your feedback. Thank you.)

> Set task state to RUNNING after state has been restored
> ---
>
> Key: FLINK-4714
> URL: https://issues.apache.org/jira/browse/FLINK-4714
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Wei-Che Wei
>
> The task state is set to {{RUNNING}} as soon as the {{Task}} is executed. 
> That, however, happens before the state of the {{StreamTask}} invokable has 
> been restored. As a result, the {{CheckpointCoordinator}} starts to trigger 
> checkpoints even though the {{StreamTask}} is not ready.
> In order to avoid aborting checkpoints and properly start it, we should 
> switch the task state to {{RUNNING}} after the state has been restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4815) Automatic fallback to earlier checkpoints when checkpoint restore fails

2017-03-01 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891711#comment-15891711
 ] 

Wei-Che Wei commented on FLINK-4815:


Hi [~uce]

I saw you implement {{ZooKeeperCompletedCheckpointStore}} in FLINK-2354 and I 
found that recover() method in that will remove all other checkpoint instead of 
the latest one.
That means recover() method should be ignored or refined in order to support 
this feature, am I right?
I have no idea if there is any side-effect after I change that implementation. 
As I know, it should be okey to retain all these complete checkpoints, so I am 
confused about the comment you wrote for recovery() in 
{{ZooKeeperCompletedCheckpointStore}}. Could you please explain that for me? 
Looking forward to your feedback. Thank you.

> Automatic fallback to earlier checkpoints when checkpoint restore fails
> ---
>
> Key: FLINK-4815
> URL: https://issues.apache.org/jira/browse/FLINK-4815
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>
> Flink should keep multiple completed checkpoints.
> When the restore of one completed checkpoint fails for a certain number of 
> times, the CheckpointCoordinator should fall back to an earlier checkpoint to 
> restore.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4714) Set task state to RUNNING after state has been restored

2017-03-01 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891709#comment-15891709
 ] 

Wei-Che Wei commented on FLINK-4714:


Hi [~uce]

I saw you implement {{ZooKeeperCompletedCheckpointStore}} in FLINK-2354 and I 
found that recover() method in that will remove all other checkpoint instead of 
the latest one.
That means recover() method should be ignored or refined in order to support 
this feature, am I right?
I have no idea if there is any side-effect after I change that implementation. 
As I know, it should be okey to retain all these complete checkpoints, so I am 
confused about the comment you wrote for recovery() in 
{{ZooKeeperCompletedCheckpointStore}}. Could you please explain that for me? 
Looking forward to your feedback. Thank you.

> Set task state to RUNNING after state has been restored
> ---
>
> Key: FLINK-4714
> URL: https://issues.apache.org/jira/browse/FLINK-4714
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Wei-Che Wei
>
> The task state is set to {{RUNNING}} as soon as the {{Task}} is executed. 
> That, however, happens before the state of the {{StreamTask}} invokable has 
> been restored. As a result, the {{CheckpointCoordinator}} starts to trigger 
> checkpoints even though the {{StreamTask}} is not ready.
> In order to avoid aborting checkpoints and properly start it, we should 
> switch the task state to {{RUNNING}} after the state has been restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4714) Set task state to RUNNING after state has been restored

2017-03-01 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891545#comment-15891545
 ] 

Wei-Che Wei commented on FLINK-4714:


Hi [~till.rohrmann]

I have some ideas about this issue and I would like to know if I can get some 
feedback from you.
As I know, this issue wants to make the task state be {{RUNNING}} after the 
{{StreamTask}} assigns true to {{StreamTask.isRunning}} (i.e. all restored 
states and operations have been prepared), so that the checkpoints won't be 
aborted.

The following are what I thought that might be possible solutions.
1. Run the other thread monitoring the {{StreamTask.isRunning}}, and change 
task state to be {{RUNNING}}. This might be a walk-around solution and I don't 
like it, because I think original {{Task}} change the state is more proactive 
and this implementation is more like a passive way.
2. Add a prepare() method in {{AbstractInvokable}} and override in 
{{StreamTask}} only. Move all prepare work from invoke() to prepare() and call 
prepare() before transit state in {{Task}}.
3. As the second implementation and redefine the invoke() method for all class 
extends {{AbstractInvokable}} as well. Original invoke() method defines that 
all operations and setting, such as I/O stream setting are included in.
The second implementation is a sub-optimal solution for me, because I think 
that implementation is more like move the initialization from {{RUNNING}} state 
to {{DEPLOYING}} state. Therefore, it is better to redefine the invoke(), not 
just customize for {{StreamTask}}.

What do you think?

> Set task state to RUNNING after state has been restored
> ---
>
> Key: FLINK-4714
> URL: https://issues.apache.org/jira/browse/FLINK-4714
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Wei-Che Wei
>
> The task state is set to {{RUNNING}} as soon as the {{Task}} is executed. 
> That, however, happens before the state of the {{StreamTask}} invokable has 
> been restored. As a result, the {{CheckpointCoordinator}} starts to trigger 
> checkpoints even though the {{StreamTask}} is not ready.
> In order to avoid aborting checkpoints and properly start it, we should 
> switch the task state to {{RUNNING}} after the state has been restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-4714) Set task state to RUNNING after state has been restored

2017-02-24 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882562#comment-15882562
 ] 

Wei-Che Wei edited comment on FLINK-4714 at 2/25/17 5:10 AM:
-

Hi [~Andrew Efimov]
Are you still working on this task? If not, I can also work on it, because the 
task I want to work on is blocked by this issue.


was (Author: tonywei):
Hi Andrew, are you still working on the task? If not, I can also work on it, 
because the task I want to work on is blocked by this issue.

> Set task state to RUNNING after state has been restored
> ---
>
> Key: FLINK-4714
> URL: https://issues.apache.org/jira/browse/FLINK-4714
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Andrew Efimov
>
> The task state is set to {{RUNNING}} as soon as the {{Task}} is executed. 
> That, however, happens before the state of the {{StreamTask}} invokable has 
> been restored. As a result, the {{CheckpointCoordinator}} starts to trigger 
> checkpoints even though the {{StreamTask}} is not ready.
> In order to avoid aborting checkpoints and properly start it, we should 
> switch the task state to {{RUNNING}} after the state has been restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4714) Set task state to RUNNING after state has been restored

2017-02-24 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882562#comment-15882562
 ] 

Wei-Che Wei commented on FLINK-4714:


Hi Andrew, are you still working on the task? If not, I can also work on it, 
because the task I want to work on is blocked by this issue.

> Set task state to RUNNING after state has been restored
> ---
>
> Key: FLINK-4714
> URL: https://issues.apache.org/jira/browse/FLINK-4714
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Andrew Efimov
>
> The task state is set to {{RUNNING}} as soon as the {{Task}} is executed. 
> That, however, happens before the state of the {{StreamTask}} invokable has 
> been restored. As a result, the {{CheckpointCoordinator}} starts to trigger 
> checkpoints even though the {{StreamTask}} is not ready.
> In order to avoid aborting checkpoints and properly start it, we should 
> switch the task state to {{RUNNING}} after the state has been restored.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-4754) Make number of retained checkpoints user configurable

2017-02-20 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4754:
--

Assignee: Wei-Che Wei

> Make number of retained checkpoints user configurable
> -
>
> Key: FLINK-4754
> URL: https://issues.apache.org/jira/browse/FLINK-4754
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.2
>Reporter: Ufuk Celebi
>Assignee: Wei-Che Wei
>
> The number of retained successful checkpoints is fixed to 1. Expose this 
> setting via the {{CheckpointConfig}} instead of having it fixed as a static 
> field in the {{CheckpointRecoveryFactory}}.
> With the current state of things, this would require to set this value lazily 
> in the checkpoint store implementations instead of setting it when creating 
> the store.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5632) Typo in StreamGraph

2017-01-24 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-5632:
--

 Summary: Typo in StreamGraph
 Key: FLINK-5632
 URL: https://issues.apache.org/jira/browse/FLINK-5632
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei
Priority: Trivial


Typo in StreamGraph field : virtuaPartitionNodes -> virtualPartitionNodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4821) Implement rescalable non-partitioned state for Kinesis Connector

2016-12-10 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4821:
--

Assignee: Wei-Che Wei

> Implement rescalable non-partitioned state for Kinesis Connector
> 
>
> Key: FLINK-4821
> URL: https://issues.apache.org/jira/browse/FLINK-4821
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the 
> implementation for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement 
> it too. This ticket tracks progress for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >