[jira] [Created] (FLINK-20768) Support routing field for Elasticsearch connector

2020-12-25 Thread wangsan (Jira)
wangsan created FLINK-20768:
---

 Summary: Support routing field for Elasticsearch connector
 Key: FLINK-20768
 URL: https://issues.apache.org/jira/browse/FLINK-20768
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Reporter: wangsan


Routing in Elasticsearch can help with search efficency for large scale 
dataset, we should support this feature as an optional config in Elasticsearch 
connector.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20624) Refactor StreamExecJoinRule、StreamExecIntervalJoinRule and StreamExecTemporalJoinRule

2020-12-16 Thread wangsan (Jira)
wangsan created FLINK-20624:
---

 Summary: Refactor StreamExecJoinRule、StreamExecIntervalJoinRule 
and StreamExecTemporalJoinRule
 Key: FLINK-20624
 URL: https://issues.apache.org/jira/browse/FLINK-20624
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.13.0
Reporter: wangsan
 Fix For: 1.13.0


Currentlly, some code are duplicated in 
`StreamExecJoinRule`、`StreamExecIntervalJoinRule` and 
`StreamExecTemporalJoinRule`, this JIRA tries to eliminate the code 
duplication, so we can maintain code easier.

`StreamExecJoinRule`、`StreamExecIntervalJoinRule` and 
`StreamExecTemporalJoinRule` 
are rules for stream-stream join with different match condition, we can add an 
abstract class `StreamExecJoinRuleBase` as a base implementation for them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18596) Derive format schema from table schema may get error result

2020-07-14 Thread wangsan (Jira)
wangsan created FLINK-18596:
---

 Summary: Derive format schema from table schema may get error 
result
 Key: FLINK-18596
 URL: https://issues.apache.org/jira/browse/FLINK-18596
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: wangsan


If rowtime attribute references a regular field, derive format schema from 
table schema may get error result:

```java
Schema schema = new Schema()
.field("f1", DataTypes.STRING())
.field("f2", DataTypes.BIGINT()).from("t")
.field("r", DataTypes.TIMESTAMP(3))
.rowtime(
new 
Rowtime().timestampsFromField("t").watermarksPeriodicBounded(3));

final Map properties = schema.toProperties();
final TableSchema actualSchema = 
TableFormatFactoryBase.deriveSchema(properties);
``` 

this code snippet will result in `ValidationException("Field names must be 
unique. Duplicate field: '" + fullFieldName + "'")`, but the excepted result 
should be:

```java
TableSchema expectedSchema = TableSchema.builder()
.field("f1", Types.STRING)
.field("t", Types.LONG)
.build();
```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15548) Make KeyedCoProcessOperatorWithWatermarkDelay extends KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator

2020-01-10 Thread wangsan (Jira)
wangsan created FLINK-15548:
---

 Summary: Make KeyedCoProcessOperatorWithWatermarkDelay extends 
KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator
 Key: FLINK-15548
 URL: https://issues.apache.org/jira/browse/FLINK-15548
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.10.0
Reporter: wangsan
 Fix For: 1.10.0


`LegacyKeyedCoProcessOperator` is marked as deprecated, we should make 
`KeyedCoProcessOperatorWithWatermarkDelay` extends `KeyedCoProcessOperator` 
instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-13823) Incorrect debug log in CompileUtils

2019-08-23 Thread wangsan (Jira)
wangsan created FLINK-13823:
---

 Summary: Incorrect debug log in CompileUtils
 Key: FLINK-13823
 URL: https://issues.apache.org/jira/browse/FLINK-13823
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.0
Reporter: wangsan
 Fix For: 1.9.1


There is a typo in `CompileUtils`:

```java
CODE_LOG.debug("Compiling: %s \n\n Code:\n%s", name, code);
```

The placeholder  should be `{}` instead of `%s`.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-11687) Remove useless code in StreamingJobGraphGenerator

2019-02-20 Thread wangsan (JIRA)
wangsan created FLINK-11687:
---

 Summary: Remove useless code in StreamingJobGraphGenerator
 Key: FLINK-11687
 URL: https://issues.apache.org/jira/browse/FLINK-11687
 Project: Flink
  Issue Type: Task
  Components: DataStream API
Affects Versions: 1.7.2, 1.6.3
Reporter: wangsan
Assignee: wangsan


`allOutputs` in `StreamingJobGraphGenerator#setVertexConfig` is useless, we 
should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Questions about UDTF in flink SQL

2018-11-28 Thread wangsan
Hi all,

When using user-defined table function in Flink SQL, it seems that the result 
type of a table function must be determinstic. 

If I want a UDTF whose result type is determined by its input parameters, what 
should I do?

What I want to do is like this:

```
SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, 
v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, 
f5)
```

I can surely register the same UDTF with different name and configuration, but 
I guess that’s not a good idea :(. 

If we can not make this in Flink SQL for now , may be we should consider this 
feature in future?

Best,
wangsan


[jira] [Created] (FLINK-10770) Some generated functions are not opened properly.

2018-11-03 Thread wangsan (JIRA)
wangsan created FLINK-10770:
---

 Summary: Some generated functions are not opened properly.
 Key: FLINK-10770
 URL: https://issues.apache.org/jira/browse/FLINK-10770
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.6.2, 1.7.0
Reporter: wangsan


Recently I found sometimes UDFs are not open properly. It turns out when 
transforming sql to execution plan, some generated functions' *open* method are 
not called. e.g. *NonWindowJoin*, *TimeBoundedStreamJoin*, *FlatJoinRunner*.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Reverse the order of fields in Flink SQL

2018-10-25 Thread wangsan
Hi Yinhua,

This is actually a bug in Flink table, you can check this issue 
https://issues.apache.org/jira/browse/FLINK-10290 
<https://issues.apache.org/jira/browse/FLINK-10290>.

I opened a PR for this issue a couple of days ago, but there is still some 
problem so it’s not ready to be merged. We have used that in our internal Flink 
version, and for now it works well. May be you can take a look at it.

Best,
wangsan

> On Oct 24, 2018, at 9:31 AM, yinhua.dai  wrote:
> 
> Hi Timo,
> 
> I write simple testing code for the issue, please checkout
> https://gist.github.com/yinhua-dai/143304464270afd19b6a926531f9acb1
> 
> I write a custom table source which just use RowCsvInputformat to create the
> dataset, and use the provided CsvTableSink, and can reproduce the issue.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



[jira] [Created] (FLINK-10553) Unified table sink and source name in SQL statement

2018-10-15 Thread wangsan (JIRA)
wangsan created FLINK-10553:
---

 Summary: Unified table sink and source name in SQL statement
 Key: FLINK-10553
 URL: https://issues.apache.org/jira/browse/FLINK-10553
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.6.0, 1.7.0
Reporter: wangsan
Assignee: wangsan


Since sink table can now be registered using ExternalCatalog, just the same as 
source table, the source and sink name in SQL statement should also be treated 
equally. Now we can only use `catalog.database.table` for sink table (enclosed 
in back-ticks as a identifier), this is not consistent with source table name 
(do not treat the whole name as a identifier). 

*INSERT INTO catalog.database.sinktable SELECT ... FROM 
catalog.database.sourcetable* should be supported .





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10290) Conversion error in StreamScan and BatchScan

2018-09-06 Thread wangsan (JIRA)
wangsan created FLINK-10290:
---

 Summary: Conversion error in StreamScan and BatchScan
 Key: FLINK-10290
 URL: https://issues.apache.org/jira/browse/FLINK-10290
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.6.0, 1.5.3
Reporter: wangsan


`RowTypeInfo#equals()` only compares field types, and fields names are not 
considered. When checking the equality of `inputType` and `internalType`, we 
should compare both filed types and field names.

Behavior of this bug:

A table T with schema (a: Long, b:Long, c:Long)
SELECT b,c,a from T
expected: b,c,a
actually: a,b,c





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi Timo, 

I think this may not only affect  explain() method. Method 
DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode 
into DataStream or DataSet, we add desired operators in execution environment. 
By side effect, I mean that if we call DataStreamRel#translateToPlan on same 
RelNode  several times, the same operators are added in execution environment 
more than once, but actually we need that for only one time. Correct me if I 
misunderstood that.

I will open an issue late this day, if this is indeed a problem.

Best,
wangsan



> On Aug 21, 2018, at 10:16 PM, Timo Walther  wrote:
> 
> Hi,
> 
> this sounds like a bug to me. Maybe the explain() method is not implemented 
> correctly. Can you open an issue for it in Jira?
> 
> Thanks,
> Timo
> 
> 
> Am 21.08.18 um 15:04 schrieb wangsan:
>> Hi all,
>> 
>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
>> may cause the execution plan not as what we expected. Every time we call 
>> DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
>> TableEnvirnment#writeToSink, etc), we add same operators in execution 
>> environment repeatedly.
>> 
>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>> 
>> Best,  Wangsan
>> 
>> appendix
>> 
>> tenv.registerTableSource("test_source", sourceTable)
>> 
>> val t = tenv.sqlQuery("SELECT * from test_source")
>> println(tenv.explain(t))
>> println(tenv.explain(t))
>> 
>> implicit val typeInfo = TypeInformation.of(classOf[Row])
>> tenv.toAppendStream(t)
>> println(tenv.explain(t))
>> We call explain three times, and the Physical Execution Plan are all 
>> diffrent.
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 7 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 8 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 9 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 10 : Operator
>> content : to: Row
>> ship_strategy : FORWARD
>> 
>> Stage 11 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 12 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 13 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 



Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly. 

Should we eliminate the side effect of DataStreamRel#translateToPlan ? 

Best,  Wangsan

appendix

tenv.registerTableSource("test_source", sourceTable)

val t = tenv.sqlQuery("SELECT * from test_source")
println(tenv.explain(t))
println(tenv.explain(t))

implicit val typeInfo = TypeInformation.of(classOf[Row])
tenv.toAppendStream(t)
println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
  LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat

Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 3 : Operator
content : Map
ship_strategy : FORWARD

Stage 4 : Data Source
content : collect elements with CollectionInputFormat

Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 6 : Operator
content : Map
ship_strategy : FORWARD

Stage 7 : Data Source
content : collect elements with CollectionInputFormat

Stage 8 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 9 : Operator
content : Map
ship_strategy : FORWARD

Stage 10 : Operator
content : to: Row
ship_strategy : FORWARD

Stage 11 : Data Source
content : collect elements with CollectionInputFormat

Stage 12 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD

Stage 13 : Operator
content : Map
ship_strategy : FORWARD



Re: Confusions About JDBCOutputFormat

2018-07-11 Thread wangsan
Well, I see. If the connection is established when writing data into DB, we 
need to cache received rows since last write. 

IMO, maybe we do not need to open connections repeatedly or introduce 
connection pools. Test and refresh the connection periodically can simply solve 
this problem. I’ve implemented this at 
https://github.com/apache/flink/pull/6301 
<https://github.com/apache/flink/pull/6301>, It would be kind of you to review 
this.

Best,
wangsan


> On Jul 11, 2018, at 2:25 PM, Hequn Cheng  wrote:
> 
> Hi wangsan,
> 
> What I mean is establishing a connection each time write data into JDBC,
> i.e.  establish a connection in flush() function. I think this will make
> sure the connection is ok. What do you think?
> 
> On Wed, Jul 11, 2018 at 12:12 AM, wangsan  <mailto:wamg...@163.com>> wrote:
> 
>> Hi Hequn,
>> 
>> Establishing a connection for each batch write may also have idle
>> connection problem, since we are not sure when the connection will be
>> closed. We call flush() method when a batch is finished or  snapshot state,
>> but what if the snapshot is not enabled and the batch size not reached
>> before the connection is closed?
>> 
>> May be we could use a Timer to test the connection periodically and keep
>> it alive. What do you think?
>> 
>> I will open a jira and try to work on that issue.
>> 
>> Best,
>> wangsan
>> 
>> 
>> 
>> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
>> 
>> Hi wangsan,
>> 
>> I agree with you. It would be kind of you to open a jira to check the
>> problem.
>> 
>> For the first problem, I think we need to establish connection each time
>> execute batch write. And, it is better to get the connection from a
>> connection pool.
>> For the second problem, to avoid multithread problem, I think we should
>> synchronized the batch object in flush() method.
>> 
>> What do you think?
>> 
>> Best, Hequn
>> 
>> 
>> 
>> On Tue, Jul 10, 2018 at 2:36 PM, wangsan > <mailto:wamg...@163.com>> wrote:
>> 
>>> Hi all,
>>> 
>>> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink
>>> application. But I am confused with the implementation of JDBCOutputFormat.
>>> 
>>> 1. The Connection was established when JDBCOutputFormat is opened, and
>>> will be used all the time. But if this connction lies idle for a long time,
>>> the database will force close the connetion, thus errors may occur.
>>> 2. The flush() method is called when batchCount exceeds the threshold,
>>> but it is also called while snapshotting state. So two threads may modify
>>> upload and batchCount, but without synchronization.
>>> 
>>> Please correct me if I am wrong.
>>> 
>>> ——
>>> wangsan



[jira] [Created] (FLINK-9794) JDBCOutputFormat does not consider idle connection and multithreads synchronization

2018-07-10 Thread wangsan (JIRA)
wangsan created FLINK-9794:
--

 Summary: JDBCOutputFormat does not consider idle connection and 
multithreads synchronization
 Key: FLINK-9794
 URL: https://issues.apache.org/jira/browse/FLINK-9794
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.5.0, 1.4.0
Reporter: wangsan


Current implementation of  JDBCOutputFormat has two potential problems: 

1. The Connection was established when JDBCOutputFormat is opened, and will be 
used all the time. But if this connection lies idle for a long time, the 
database will force close the connection, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it 
is also called while snapshotting state. So two threads may modify upload and 
batchCount, but without synchronization.

We need fix these two problems to make JDBCOutputFormat more reliable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Confusions About JDBCOutputFormat

2018-07-10 Thread wangsan
Hi Hequn,

Establishing a connection for each batch write may also have idle connection 
problem, since we are not sure when the connection will be closed. We call 
flush() method when a batch is finished or  snapshot state, but what if the 
snapshot is not enabled and the batch size not reached before the connection is 
closed?

May be we could use a Timer to test the connection periodically and keep it 
alive. What do you think?

I will open a jira and try to work on that issue.

Best, 
wangsan



> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
> 
> Hi wangsan,
> 
> I agree with you. It would be kind of you to open a jira to check the problem.
> 
> For the first problem, I think we need to establish connection each time 
> execute batch write. And, it is better to get the connection from a 
> connection pool.
> For the second problem, to avoid multithread problem, I think we should 
> synchronized the batch object in flush() method.
> 
> What do you think?
> 
> Best, Hequn
> 
> 
> 
> On Tue, Jul 10, 2018 at 2:36 PM, wangsan  <mailto:wamg...@163.com>> wrote:
> Hi all,
> 
> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink 
> application. But I am confused with the implementation of JDBCOutputFormat.
> 
> 1. The Connection was established when JDBCOutputFormat is opened, and will 
> be used all the time. But if this connction lies idle for a long time, the 
> database will force close the connetion, thus errors may occur.
> 2. The flush() method is called when batchCount exceeds the threshold, but it 
> is also called while snapshotting state. So two threads may modify upload and 
> batchCount, but without synchronization.
> 
> Please correct me if I am wrong.
> 
> ——
> wangsan
> 



Confusions About JDBCOutputFormat

2018-07-09 Thread wangsan
Hi all,

I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink 
application. But I am confused with the implementation of JDBCOutputFormat.

1. The Connection was established when JDBCOutputFormat is opened, and will be 
used all the time. But if this connction lies idle for a long time, the 
database will force close the connetion, thus errors may occur.
2. The flush() method is called when batchCount exceeds the threshold, but it 
is also called while snapshotting state. So two threads may modify upload and 
batchCount, but without synchronization.

Please correct me if I am wrong.

——
wangsan


Question about job canceling in Flink

2017-09-26 Thread wangsan
Hi all,

We are currently using BucketingSink to save data into HDFS in parquet format. 
But when the flink job was canceled, we always got Exception in BucketingSink’s 
close method. The detailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] 
[org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
   at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
   at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
   at 
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
   at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
   at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
   at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
   at 
org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
   at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
   at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
   …….
   at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
   at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
   at java.lang.Thread.run(Thread.java:745)

After digging into the source code, we found that when Flink job is canceled, a 
TaskCanceler thread is created. The TaskCanceler thread calls cancel() on the 
invokable and periodically interrupts the task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//..executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//..

Notice that TaskCanceler first send interrupt signal to task thread, following 
with join method. And since the task thread is now try to close 
DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed 
in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {break;
  }  try {
dataQueue.wait(1000); // when we receive an ack, we notify on
// dataQueue
  } catch (InterruptedException ie) {thrownewInterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was so confused why TaskCanceler call executer.interrupt() before 
executer.join(interruptInterval). Can anyone help?

Best,

wangsan