[jira] [Created] (FLINK-20768) Support routing field for Elasticsearch connector
wangsan created FLINK-20768: --- Summary: Support routing field for Elasticsearch connector Key: FLINK-20768 URL: https://issues.apache.org/jira/browse/FLINK-20768 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Reporter: wangsan Routing in Elasticsearch can help with search efficency for large scale dataset, we should support this feature as an optional config in Elasticsearch connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20624) Refactor StreamExecJoinRule、StreamExecIntervalJoinRule and StreamExecTemporalJoinRule
wangsan created FLINK-20624: --- Summary: Refactor StreamExecJoinRule、StreamExecIntervalJoinRule and StreamExecTemporalJoinRule Key: FLINK-20624 URL: https://issues.apache.org/jira/browse/FLINK-20624 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.13.0 Reporter: wangsan Fix For: 1.13.0 Currentlly, some code are duplicated in `StreamExecJoinRule`、`StreamExecIntervalJoinRule` and `StreamExecTemporalJoinRule`, this JIRA tries to eliminate the code duplication, so we can maintain code easier. `StreamExecJoinRule`、`StreamExecIntervalJoinRule` and `StreamExecTemporalJoinRule` are rules for stream-stream join with different match condition, we can add an abstract class `StreamExecJoinRuleBase` as a base implementation for them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18596) Derive format schema from table schema may get error result
wangsan created FLINK-18596: --- Summary: Derive format schema from table schema may get error result Key: FLINK-18596 URL: https://issues.apache.org/jira/browse/FLINK-18596 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.0 Reporter: wangsan If rowtime attribute references a regular field, derive format schema from table schema may get error result: ```java Schema schema = new Schema() .field("f1", DataTypes.STRING()) .field("f2", DataTypes.BIGINT()).from("t") .field("r", DataTypes.TIMESTAMP(3)) .rowtime( new Rowtime().timestampsFromField("t").watermarksPeriodicBounded(3)); final Map properties = schema.toProperties(); final TableSchema actualSchema = TableFormatFactoryBase.deriveSchema(properties); ``` this code snippet will result in `ValidationException("Field names must be unique. Duplicate field: '" + fullFieldName + "'")`, but the excepted result should be: ```java TableSchema expectedSchema = TableSchema.builder() .field("f1", Types.STRING) .field("t", Types.LONG) .build(); ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15548) Make KeyedCoProcessOperatorWithWatermarkDelay extends KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator
wangsan created FLINK-15548: --- Summary: Make KeyedCoProcessOperatorWithWatermarkDelay extends KeyedCoProcessOperator instead of LegacyKeyedCoProcessOperator Key: FLINK-15548 URL: https://issues.apache.org/jira/browse/FLINK-15548 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.10.0 Reporter: wangsan Fix For: 1.10.0 `LegacyKeyedCoProcessOperator` is marked as deprecated, we should make `KeyedCoProcessOperatorWithWatermarkDelay` extends `KeyedCoProcessOperator` instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-13823) Incorrect debug log in CompileUtils
wangsan created FLINK-13823: --- Summary: Incorrect debug log in CompileUtils Key: FLINK-13823 URL: https://issues.apache.org/jira/browse/FLINK-13823 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.0 Reporter: wangsan Fix For: 1.9.1 There is a typo in `CompileUtils`: ```java CODE_LOG.debug("Compiling: %s \n\n Code:\n%s", name, code); ``` The placeholder should be `{}` instead of `%s`. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-11687) Remove useless code in StreamingJobGraphGenerator
wangsan created FLINK-11687: --- Summary: Remove useless code in StreamingJobGraphGenerator Key: FLINK-11687 URL: https://issues.apache.org/jira/browse/FLINK-11687 Project: Flink Issue Type: Task Components: DataStream API Affects Versions: 1.7.2, 1.6.3 Reporter: wangsan Assignee: wangsan `allOutputs` in `StreamingJobGraphGenerator#setVertexConfig` is useless, we should remove it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Questions about UDTF in flink SQL
Hi all, When using user-defined table function in Flink SQL, it seems that the result type of a table function must be determinstic. If I want a UDTF whose result type is determined by its input parameters, what should I do? What I want to do is like this: ``` SELECT input, f1, f2 length FROM MyTable, LATERAL TABLE(unnest_udtf(input, v1, v2)) as T(f1, f2), LATERAL TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, f5) ``` I can surely register the same UDTF with different name and configuration, but I guess that’s not a good idea :(. If we can not make this in Flink SQL for now , may be we should consider this feature in future? Best, wangsan
[jira] [Created] (FLINK-10770) Some generated functions are not opened properly.
wangsan created FLINK-10770: --- Summary: Some generated functions are not opened properly. Key: FLINK-10770 URL: https://issues.apache.org/jira/browse/FLINK-10770 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.2, 1.7.0 Reporter: wangsan Recently I found sometimes UDFs are not open properly. It turns out when transforming sql to execution plan, some generated functions' *open* method are not called. e.g. *NonWindowJoin*, *TimeBoundedStreamJoin*, *FlatJoinRunner*. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Reverse the order of fields in Flink SQL
Hi Yinhua, This is actually a bug in Flink table, you can check this issue https://issues.apache.org/jira/browse/FLINK-10290 <https://issues.apache.org/jira/browse/FLINK-10290>. I opened a PR for this issue a couple of days ago, but there is still some problem so it’s not ready to be merged. We have used that in our internal Flink version, and for now it works well. May be you can take a look at it. Best, wangsan > On Oct 24, 2018, at 9:31 AM, yinhua.dai wrote: > > Hi Timo, > > I write simple testing code for the issue, please checkout > https://gist.github.com/yinhua-dai/143304464270afd19b6a926531f9acb1 > > I write a custom table source which just use RowCsvInputformat to create the > dataset, and use the provided CsvTableSink, and can reproduce the issue. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
[jira] [Created] (FLINK-10553) Unified table sink and source name in SQL statement
wangsan created FLINK-10553: --- Summary: Unified table sink and source name in SQL statement Key: FLINK-10553 URL: https://issues.apache.org/jira/browse/FLINK-10553 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.0, 1.7.0 Reporter: wangsan Assignee: wangsan Since sink table can now be registered using ExternalCatalog, just the same as source table, the source and sink name in SQL statement should also be treated equally. Now we can only use `catalog.database.table` for sink table (enclosed in back-ticks as a identifier), this is not consistent with source table name (do not treat the whole name as a identifier). *INSERT INTO catalog.database.sinktable SELECT ... FROM catalog.database.sourcetable* should be supported . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10290) Conversion error in StreamScan and BatchScan
wangsan created FLINK-10290: --- Summary: Conversion error in StreamScan and BatchScan Key: FLINK-10290 URL: https://issues.apache.org/jira/browse/FLINK-10290 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.0, 1.5.3 Reporter: wangsan `RowTypeInfo#equals()` only compares field types, and fields names are not considered. When checking the equality of `inputType` and `internalType`, we should compare both filed types and field names. Behavior of this bug: A table T with schema (a: Long, b:Long, c:Long) SELECT b,c,a from T expected: b,c,a actually: a,b,c -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Side effect of DataStreamRel#translateToPlan
Hi Timo, I think this may not only affect explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that. I will open an issue late this day, if this is indeed a problem. Best, wangsan > On Aug 21, 2018, at 10:16 PM, Timo Walther wrote: > > Hi, > > this sounds like a bug to me. Maybe the explain() method is not implemented > correctly. Can you open an issue for it in Jira? > > Thanks, > Timo > > > Am 21.08.18 um 15:04 schrieb wangsan: >> Hi all, >> >> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that >> may cause the execution plan not as what we expected. Every time we call >> DataStreamRel#translateToPlan (in TableEnvirnment#explain, >> TableEnvirnment#writeToSink, etc), we add same operators in execution >> environment repeatedly. >> >> Should we eliminate the side effect of DataStreamRel#translateToPlan ? >> >> Best, Wangsan >> >> appendix >> >> tenv.registerTableSource("test_source", sourceTable) >> >> val t = tenv.sqlQuery("SELECT * from test_source") >> println(tenv.explain(t)) >> println(tenv.explain(t)) >> >> implicit val typeInfo = TypeInformation.of(classOf[Row]) >> tenv.toAppendStream(t) >> println(tenv.explain(t)) >> We call explain three times, and the Physical Execution Plan are all >> diffrent. >> >> == Abstract Syntax Tree == >> LogicalProject(f1=[$0], f2=[$1]) >> LogicalTableScan(table=[[test_source]]) >> >> == Optimized Logical Plan == >> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], >> source=[CsvTableSource(read fields: f1, f2)]) >> >> == Physical Execution Plan == >> Stage 1 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 2 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 3 : Operator >> content : Map >> ship_strategy : FORWARD >> >> >> == Abstract Syntax Tree == >> LogicalProject(f1=[$0], f2=[$1]) >> LogicalTableScan(table=[[test_source]]) >> >> == Optimized Logical Plan == >> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], >> source=[CsvTableSource(read fields: f1, f2)]) >> >> == Physical Execution Plan == >> Stage 1 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 2 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 3 : Operator >> content : Map >> ship_strategy : FORWARD >> >> Stage 4 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 5 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 6 : Operator >> content : Map >> ship_strategy : FORWARD >> >> >> == Abstract Syntax Tree == >> LogicalProject(f1=[$0], f2=[$1]) >> LogicalTableScan(table=[[test_source]]) >> >> == Optimized Logical Plan == >> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], >> source=[CsvTableSource(read fields: f1, f2)]) >> >> == Physical Execution Plan == >> Stage 1 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 2 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 3 : Operator >> content : Map >> ship_strategy : FORWARD >> >> Stage 4 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 5 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 6 : Operator >> content : Map >> ship_strategy : FORWARD >> >> Stage 7 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 8 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 9 : Operator >> content : Map >> ship_strategy : FORWARD >> >> Stage 10 : Operator >> content : to: Row >> ship_strategy : FORWARD >> >> Stage 11 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 12 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 13 : Operator >> content : Map >> ship_strategy : FORWARD >> >>
Side effect of DataStreamRel#translateToPlan
Hi all, I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly. Should we eliminate the side effect of DataStreamRel#translateToPlan ? Best, Wangsan appendix tenv.registerTableSource("test_source", sourceTable) val t = tenv.sqlQuery("SELECT * from test_source") println(tenv.explain(t)) println(tenv.explain(t)) implicit val typeInfo = TypeInformation.of(classOf[Row]) tenv.toAppendStream(t) println(tenv.explain(t)) We call explain three times, and the Physical Execution Plan are all diffrent. == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD Stage 7 : Data Source content : collect elements with CollectionInputFormat Stage 8 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 9 : Operator content : Map ship_strategy : FORWARD Stage 10 : Operator content : to: Row ship_strategy : FORWARD Stage 11 : Data Source content : collect elements with CollectionInputFormat Stage 12 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 13 : Operator content : Map ship_strategy : FORWARD
Re: Confusions About JDBCOutputFormat
Well, I see. If the connection is established when writing data into DB, we need to cache received rows since last write. IMO, maybe we do not need to open connections repeatedly or introduce connection pools. Test and refresh the connection periodically can simply solve this problem. I’ve implemented this at https://github.com/apache/flink/pull/6301 <https://github.com/apache/flink/pull/6301>, It would be kind of you to review this. Best, wangsan > On Jul 11, 2018, at 2:25 PM, Hequn Cheng wrote: > > Hi wangsan, > > What I mean is establishing a connection each time write data into JDBC, > i.e. establish a connection in flush() function. I think this will make > sure the connection is ok. What do you think? > > On Wed, Jul 11, 2018 at 12:12 AM, wangsan <mailto:wamg...@163.com>> wrote: > >> Hi Hequn, >> >> Establishing a connection for each batch write may also have idle >> connection problem, since we are not sure when the connection will be >> closed. We call flush() method when a batch is finished or snapshot state, >> but what if the snapshot is not enabled and the batch size not reached >> before the connection is closed? >> >> May be we could use a Timer to test the connection periodically and keep >> it alive. What do you think? >> >> I will open a jira and try to work on that issue. >> >> Best, >> wangsan >> >> >> >> On Jul 10, 2018, at 8:38 PM, Hequn Cheng wrote: >> >> Hi wangsan, >> >> I agree with you. It would be kind of you to open a jira to check the >> problem. >> >> For the first problem, I think we need to establish connection each time >> execute batch write. And, it is better to get the connection from a >> connection pool. >> For the second problem, to avoid multithread problem, I think we should >> synchronized the batch object in flush() method. >> >> What do you think? >> >> Best, Hequn >> >> >> >> On Tue, Jul 10, 2018 at 2:36 PM, wangsan > <mailto:wamg...@163.com>> wrote: >> >>> Hi all, >>> >>> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink >>> application. But I am confused with the implementation of JDBCOutputFormat. >>> >>> 1. The Connection was established when JDBCOutputFormat is opened, and >>> will be used all the time. But if this connction lies idle for a long time, >>> the database will force close the connetion, thus errors may occur. >>> 2. The flush() method is called when batchCount exceeds the threshold, >>> but it is also called while snapshotting state. So two threads may modify >>> upload and batchCount, but without synchronization. >>> >>> Please correct me if I am wrong. >>> >>> —— >>> wangsan
[jira] [Created] (FLINK-9794) JDBCOutputFormat does not consider idle connection and multithreads synchronization
wangsan created FLINK-9794: -- Summary: JDBCOutputFormat does not consider idle connection and multithreads synchronization Key: FLINK-9794 URL: https://issues.apache.org/jira/browse/FLINK-9794 Project: Flink Issue Type: Bug Components: Streaming Connectors Affects Versions: 1.5.0, 1.4.0 Reporter: wangsan Current implementation of JDBCOutputFormat has two potential problems: 1. The Connection was established when JDBCOutputFormat is opened, and will be used all the time. But if this connection lies idle for a long time, the database will force close the connection, thus errors may occur. 2. The flush() method is called when batchCount exceeds the threshold, but it is also called while snapshotting state. So two threads may modify upload and batchCount, but without synchronization. We need fix these two problems to make JDBCOutputFormat more reliable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Confusions About JDBCOutputFormat
Hi Hequn, Establishing a connection for each batch write may also have idle connection problem, since we are not sure when the connection will be closed. We call flush() method when a batch is finished or snapshot state, but what if the snapshot is not enabled and the batch size not reached before the connection is closed? May be we could use a Timer to test the connection periodically and keep it alive. What do you think? I will open a jira and try to work on that issue. Best, wangsan > On Jul 10, 2018, at 8:38 PM, Hequn Cheng wrote: > > Hi wangsan, > > I agree with you. It would be kind of you to open a jira to check the problem. > > For the first problem, I think we need to establish connection each time > execute batch write. And, it is better to get the connection from a > connection pool. > For the second problem, to avoid multithread problem, I think we should > synchronized the batch object in flush() method. > > What do you think? > > Best, Hequn > > > > On Tue, Jul 10, 2018 at 2:36 PM, wangsan <mailto:wamg...@163.com>> wrote: > Hi all, > > I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink > application. But I am confused with the implementation of JDBCOutputFormat. > > 1. The Connection was established when JDBCOutputFormat is opened, and will > be used all the time. But if this connction lies idle for a long time, the > database will force close the connetion, thus errors may occur. > 2. The flush() method is called when batchCount exceeds the threshold, but it > is also called while snapshotting state. So two threads may modify upload and > batchCount, but without synchronization. > > Please correct me if I am wrong. > > —— > wangsan >
Confusions About JDBCOutputFormat
Hi all, I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink application. But I am confused with the implementation of JDBCOutputFormat. 1. The Connection was established when JDBCOutputFormat is opened, and will be used all the time. But if this connction lies idle for a long time, the database will force close the connetion, thus errors may occur. 2. The flush() method is called when batchCount exceeds the threshold, but it is also called while snapshotting state. So two threads may modify upload and batchCount, but without synchronization. Please correct me if I am wrong. —— wangsan
Question about job canceling in Flink
Hi all, We are currently using BucketingSink to save data into HDFS in parquet format. But when the flink job was canceled, we always got Exception in BucketingSink’s close method. The detailed exception info is as below: [ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal of stream operator. java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117) at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301) ……. at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) After digging into the source code, we found that when Flink job is canceled, a TaskCanceler thread is created. The TaskCanceler thread calls cancel() on the invokable and periodically interrupts the task thread until it has terminated. try { invokable.cancel(); } catch (Throwable t) { logger.error("Error while canceling the task {}.", taskName, t); }//..executer.interrupt();try { executer.join(interruptInterval); }catch (InterruptedException e) { // we can ignore this}//.. Notice that TaskCanceler first send interrupt signal to task thread, following with join method. And since the task thread is now try to close DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed in task thread. synchronized (dataQueue) {while (!streamerClosed) { checkClosed(); if (lastAckedSeqno >= seqno) {break; } try { dataQueue.wait(1000); // when we receive an ack, we notify on // dataQueue } catch (InterruptedException ie) {thrownewInterruptedIOException( "Interrupted while waiting for data to be acknowledged by pipeline"); } } I was so confused why TaskCanceler call executer.interrupt() before executer.join(interruptInterval). Can anyone help? Best, wangsan