Re: Re: Re: Re: Failed to cancel a job using the STOP rest API
Hi Thomas. The bug https://issues.apache.org/jira/browse/FLINK-21028 is still present in 1.12.1. You would need to upgrade to at least 1.13.0, 1.12.2 or 1.11.4. However as I mentioned before, 1.11.4 hasn't yet been released. On the other hand both 1.12.2 and 1.13.0 have already been superseded by more recent minor releases (1.13.1 and 1.12.4 respectively). Piotre śr., 16 cze 2021 o 06:01 Thomas Wang napisał(a): > Thanks everyone. I'm using Flink on EMR. I just updated to EMR 6.3 which > uses Flink 1.12.1. I will report back whether this resolves the issue. > > Thomas > > On Wed, Jun 9, 2021 at 11:15 PM Yun Gao wrote: > >> Very thanks Kezhu for the catch, it also looks to me the same issue as >> FLINK-21028. >> >> -- >> From:Piotr Nowojski >> Send Time:2021 Jun. 9 (Wed.) 22:12 >> To:Kezhu Wang >> Cc:Thomas Wang ; Yun Gao ; user < >> user@flink.apache.org> >> Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API >> >> Yes good catch Kezhu, IllegalStateException sounds very much like >> FLINK-21028. >> >> Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't >> been released yet)? >> >> Piotrek >> >> wt., 8 cze 2021 o 17:18 Kezhu Wang napisał(a): >> Could it be same as FLINK-21028[1] (titled as “Streaming application >> didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ? >> >> >> [1]: https://issues.apache.org/jira/browse/FLINK-21028 >> >> >> Best, >> Kezhu Wang >> >> On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote: >> Hi Thomas, >> >> I tried but do not re-produce the exception yet. I have filed >> an issue for the exception first [1]. >> >> >> >> [1] https://issues.apache.org/jira/browse/FLINK-22928 >> >> >> --Original Mail -- >> *Sender:*Thomas Wang >> *Send Date:*Tue Jun 8 07:45:52 2021 >> *Recipients:*Yun Gao >> *CC:*user >> *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API >> This is actually a very simple job that reads from Kafka and writes to S3 >> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API >> and nothing custom. >> >> Thomas >> >> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao wrote: >> Hi Thoms, >> >> Very thanks for reporting the exceptions, and it seems to be not work as >> expected to me... >> Could you also show us the dag of the job ? And does some operators in >> the source task >> use multiple-threads to emit records? >> >> Best, >> Yun >> >> >> --Original Mail -- >> *Sender:*Thomas Wang >> *Send Date:*Sun Jun 6 04:02:20 2021 >> *Recipients:*Yun Gao >> *CC:*user >> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API >> One thing I noticed is that if I set drain = true, the job could be >> stopped correctly. Maybe that's because I'm using a Parquet file sink which >> is a bulk-encoded format and only writes to disk during checkpoints? >> >> Thomas >> >> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang wrote: >> Hi Yun, >> >> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm >> not quite sure what they mean though. Any hints? >> >> Thanks. >> >> Thomas >> >> ``` >> 2021-06-05 10:02:51 >> java.util.concurrent.ExecutionException: >> org.apache.flink.streaming.runtime.tasks. >> ExceptionInChainedOperatorException: Could not forward element to next >> operator >> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture >> .java:357) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java: >> 1928) >> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) >> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >> .close(StreamOperatorWrapper.java:130) >> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >> .close(StreamOperatorWrapper.java:134) >> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >> .close(StreamOperatorWrapper.java:80) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain >> .closeOperators(OperatorChain.java:302) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( >> StreamTask.java:576) >> at org.apach
Re: Re: Re: Re: Failed to cancel a job using the STOP rest API
Thanks everyone. I'm using Flink on EMR. I just updated to EMR 6.3 which uses Flink 1.12.1. I will report back whether this resolves the issue. Thomas On Wed, Jun 9, 2021 at 11:15 PM Yun Gao wrote: > Very thanks Kezhu for the catch, it also looks to me the same issue as > FLINK-21028. > > -- > From:Piotr Nowojski > Send Time:2021 Jun. 9 (Wed.) 22:12 > To:Kezhu Wang > Cc:Thomas Wang ; Yun Gao ; user < > user@flink.apache.org> > Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API > > Yes good catch Kezhu, IllegalStateException sounds very much like > FLINK-21028. > > Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't > been released yet)? > > Piotrek > > wt., 8 cze 2021 o 17:18 Kezhu Wang napisał(a): > Could it be same as FLINK-21028[1] (titled as “Streaming application > didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ? > > > [1]: https://issues.apache.org/jira/browse/FLINK-21028 > > > Best, > Kezhu Wang > > On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote: > Hi Thomas, > > I tried but do not re-produce the exception yet. I have filed > an issue for the exception first [1]. > > > > [1] https://issues.apache.org/jira/browse/FLINK-22928 > > > --Original Mail -------------- > *Sender:*Thomas Wang > *Send Date:*Tue Jun 8 07:45:52 2021 > *Recipients:*Yun Gao > *CC:*user > *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API > This is actually a very simple job that reads from Kafka and writes to S3 > using the StreamingFileSink w/ Parquet format. I'm all using Flink's API > and nothing custom. > > Thomas > > On Sun, Jun 6, 2021 at 6:43 PM Yun Gao wrote: > Hi Thoms, > > Very thanks for reporting the exceptions, and it seems to be not work as > expected to me... > Could you also show us the dag of the job ? And does some operators in the > source task > use multiple-threads to emit records? > > Best, > Yun > > > ------Original Mail -- > *Sender:*Thomas Wang > *Send Date:*Sun Jun 6 04:02:20 2021 > *Recipients:*Yun Gao > *CC:*user > *Subject:*Re: Re: Failed to cancel a job using the STOP rest API > One thing I noticed is that if I set drain = true, the job could be > stopped correctly. Maybe that's because I'm using a Parquet file sink which > is a bulk-encoded format and only writes to disk during checkpoints? > > Thomas > > On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang wrote: > Hi Yun, > > Thanks for the tips. Yes, I do see some exceptions as copied below. I'm > not quite sure what they mean though. Any hints? > > Thanks. > > Thomas > > ``` > 2021-06-05 10:02:51 > java.util.concurrent.ExecutionException: > org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture > .java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java: > 1928) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .close(StreamOperatorWrapper.java:130) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .close(StreamOperatorWrapper.java:134) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .close(StreamOperatorWrapper.java:80) > at org.apache.flink.streaming.runtime.tasks.OperatorChain > .closeOperators(OperatorChain.java:302) > at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( > StreamTask.java:576) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:544) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) > at org.apache.flink.streaming.api.operators.CountingOutput > .emitWatermark(CountingOutput.java:41) > at org.apache.flink.streaming.runtime.operators. > TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( > TimestampsAndWatermarksOperator.java:165) > at org.apache.flink
Re: Re: Re: Re: Failed to cancel a job using the STOP rest API
Very thanks Kezhu for the catch, it also looks to me the same issue as FLINK-21028. -- From:Piotr Nowojski Send Time:2021 Jun. 9 (Wed.) 22:12 To:Kezhu Wang Cc:Thomas Wang ; Yun Gao ; user Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API Yes good catch Kezhu, IllegalStateException sounds very much like FLINK-21028. Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't been released yet)? Piotrek wt., 8 cze 2021 o 17:18 Kezhu Wang napisał(a): Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ? [1]: https://issues.apache.org/jira/browse/FLINK-21028 Best, Kezhu Wang On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote: Hi Thomas, I tried but do not re-produce the exception yet. I have filed an issue for the exception first [1]. [1] https://issues.apache.org/jira/browse/FLINK-22928 --Original Mail -- Sender:Thomas Wang Send Date:Tue Jun 8 07:45:52 2021 Recipients:Yun Gao CC:user Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom. Thomas On Sun, Jun 6, 2021 at 6:43 PM Yun Gao wrote: Hi Thoms, Very thanks for reporting the exceptions, and it seems to be not work as expected to me... Could you also show us the dag of the job ? And does some operators in the source task use multiple-threads to emit records? Best, Yun --Original Mail -- Sender:Thomas Wang Send Date:Sun Jun 6 04:02:20 2021 Recipients:Yun Gao CC:user Subject:Re: Re: Failed to cancel a job using the STOP rest API One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints? Thomas On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang wrote: Hi Yun, Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints? Thanks. Thomas ``` 2021-06-05 10:02:51 java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80) at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165) at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69) at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor
Re: Re: Re: Re: Failed to cancel a job using the STOP rest API
Yes good catch Kezhu, IllegalStateException sounds very much like FLINK-21028. Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't been released yet)? Piotrek wt., 8 cze 2021 o 17:18 Kezhu Wang napisał(a): > Could it be same as FLINK-21028[1] (titled as “Streaming application > didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ? > > > [1]: https://issues.apache.org/jira/browse/FLINK-21028 > > > Best, > Kezhu Wang > > On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote: > > Hi Thomas, > > I tried but do not re-produce the exception yet. I have filed > an issue for the exception first [1]. > > > > [1] https://issues.apache.org/jira/browse/FLINK-22928 > > > --Original Mail -- > *Sender:*Thomas Wang > *Send Date:*Tue Jun 8 07:45:52 2021 > *Recipients:*Yun Gao > *CC:*user > *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API > >> This is actually a very simple job that reads from Kafka and writes to S3 >> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API >> and nothing custom. >> >> Thomas >> >> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao wrote: >> >>> Hi Thoms, >>> >>> Very thanks for reporting the exceptions, and it seems to be not work as >>> expected to me... >>> Could you also show us the dag of the job ? And does some operators in >>> the source task >>> use multiple-threads to emit records? >>> >>> Best, >>> Yun >>> >>> >>> --Original Mail -- >>> *Sender:*Thomas Wang >>> *Send Date:*Sun Jun 6 04:02:20 2021 >>> *Recipients:*Yun Gao >>> *CC:*user >>> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API >>> >>>> One thing I noticed is that if I set drain = true, the job could be >>>> stopped correctly. Maybe that's because I'm using a Parquet file sink which >>>> is a bulk-encoded format and only writes to disk during checkpoints? >>>> >>>> Thomas >>>> >>>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang wrote: >>>> >>>>> Hi Yun, >>>>> >>>>> Thanks for the tips. Yes, I do see some exceptions as copied below. >>>>> I'm not quite sure what they mean though. Any hints? >>>>> >>>>> Thanks. >>>>> >>>>> Thomas >>>>> >>>>> ``` >>>>> 2021-06-05 10:02:51 >>>>> java.util.concurrent.ExecutionException: >>>>> org.apache.flink.streaming.runtime.tasks. >>>>> ExceptionInChainedOperatorException: Could not forward element to >>>>> next operator >>>>> at java.util.concurrent.CompletableFuture.reportGet( >>>>> CompletableFuture.java:357) >>>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture >>>>> .java:1928) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .close(StreamOperatorWrapper.java:130) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .close(StreamOperatorWrapper.java:134) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>>> .close(StreamOperatorWrapper.java:80) >>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain >>>>> .closeOperators(OperatorChain.java:302) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .afterInvoke(StreamTask.java:576) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>> StreamTask.java:544) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: org.apache.flink.streaming.runtime.tasks. >>>>> ExceptionInChainedOperatorException: Could not forward element to >>>>> next operator >>>>> at org.apache.flink.streaming.runtime.tasks. >>>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) >>>>>
Re: Re: Re: Re: Failed to cancel a job using the STOP rest API
Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ? [1]: https://issues.apache.org/jira/browse/FLINK-21028 Best, Kezhu Wang On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote: Hi Thomas, I tried but do not re-produce the exception yet. I have filed an issue for the exception first [1]. [1] https://issues.apache.org/jira/browse/FLINK-22928 --Original Mail -- *Sender:*Thomas Wang *Send Date:*Tue Jun 8 07:45:52 2021 *Recipients:*Yun Gao *CC:*user *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API > This is actually a very simple job that reads from Kafka and writes to S3 > using the StreamingFileSink w/ Parquet format. I'm all using Flink's API > and nothing custom. > > Thomas > > On Sun, Jun 6, 2021 at 6:43 PM Yun Gao wrote: > >> Hi Thoms, >> >> Very thanks for reporting the exceptions, and it seems to be not work as >> expected to me... >> Could you also show us the dag of the job ? And does some operators in >> the source task >> use multiple-threads to emit records? >> >> Best, >> Yun >> >> >> --Original Mail -- >> *Sender:*Thomas Wang >> *Send Date:*Sun Jun 6 04:02:20 2021 >> *Recipients:*Yun Gao >> *CC:*user >> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API >> >>> One thing I noticed is that if I set drain = true, the job could be >>> stopped correctly. Maybe that's because I'm using a Parquet file sink which >>> is a bulk-encoded format and only writes to disk during checkpoints? >>> >>> Thomas >>> >>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang wrote: >>> >>>> Hi Yun, >>>> >>>> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm >>>> not quite sure what they mean though. Any hints? >>>> >>>> Thanks. >>>> >>>> Thomas >>>> >>>> ``` >>>> 2021-06-05 10:02:51 >>>> java.util.concurrent.ExecutionException: >>>> org.apache.flink.streaming.runtime.tasks. >>>> ExceptionInChainedOperatorException: Could not forward element to next >>>> operator >>>> at java.util.concurrent.CompletableFuture.reportGet( >>>> CompletableFuture.java:357) >>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture >>>> .java:1928) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .close(StreamOperatorWrapper.java:130) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .close(StreamOperatorWrapper.java:134) >>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>>> .close(StreamOperatorWrapper.java:80) >>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain >>>> .closeOperators(OperatorChain.java:302) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( >>>> StreamTask.java:576) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>> StreamTask.java:544) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.flink.streaming.runtime.tasks. >>>> ExceptionInChainedOperatorException: Could not forward element to next >>>> operator >>>> at org.apache.flink.streaming.runtime.tasks. >>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) >>>> at org.apache.flink.streaming.api.operators.CountingOutput >>>> .emitWatermark(CountingOutput.java:41) >>>> at org.apache.flink.streaming.runtime.operators. >>>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( >>>> TimestampsAndWatermarksOperator.java:165) >>>> at org.apache.flink.api.common.eventtime. >>>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit( >>>> BoundedOutOfOrdernessWatermarks.java:69) >>>> at org.apache.flink.streaming.runtime.operators. >>>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator >>>> .java:125)
Re: Re: Re: Re: Failed to cancel a job using the STOP rest API
Hi Thomas, I tried but do not re-produce the exception yet. I have filed an issue for the exception first [1]. [1] https://issues.apache.org/jira/browse/FLINK-22928 --Original Mail -- Sender:Thomas Wang Send Date:Tue Jun 8 07:45:52 2021 Recipients:Yun Gao CC:user Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom. Thomas On Sun, Jun 6, 2021 at 6:43 PM Yun Gao wrote: Hi Thoms, Very thanks for reporting the exceptions, and it seems to be not work as expected to me... Could you also show us the dag of the job ? And does some operators in the source task use multiple-threads to emit records? Best, Yun --Original Mail -- Sender:Thomas Wang Send Date:Sun Jun 6 04:02:20 2021 Recipients:Yun Gao CC:user Subject:Re: Re: Failed to cancel a job using the STOP rest API One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints? Thomas On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang wrote: Hi Yun, Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints? Thanks. Thomas ``` 2021-06-05 10:02:51 java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80) at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165) at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69) at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ... 9 more Caused by: java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41
Re: Re: Re: Failed to cancel a job using the STOP rest API
This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom. Thomas On Sun, Jun 6, 2021 at 6:43 PM Yun Gao wrote: > Hi Thoms, > > Very thanks for reporting the exceptions, and it seems to be not work as > expected to me... > Could you also show us the dag of the job ? And does some operators in the > source task > use multiple-threads to emit records? > > Best, > Yun > > > --Original Mail -- > *Sender:*Thomas Wang > *Send Date:*Sun Jun 6 04:02:20 2021 > *Recipients:*Yun Gao > *CC:*user > *Subject:*Re: Re: Failed to cancel a job using the STOP rest API > >> One thing I noticed is that if I set drain = true, the job could be >> stopped correctly. Maybe that's because I'm using a Parquet file sink which >> is a bulk-encoded format and only writes to disk during checkpoints? >> >> Thomas >> >> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang wrote: >> >>> Hi Yun, >>> >>> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm >>> not quite sure what they mean though. Any hints? >>> >>> Thanks. >>> >>> Thomas >>> >>> ``` >>> 2021-06-05 10:02:51 >>> java.util.concurrent.ExecutionException: >>> org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at java.util.concurrent.CompletableFuture.reportGet( >>> CompletableFuture.java:357) >>> at java.util.concurrent.CompletableFuture.get(CompletableFuture >>> .java:1928) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .close(StreamOperatorWrapper.java:130) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .close(StreamOperatorWrapper.java:134) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .close(StreamOperatorWrapper.java:80) >>> at org.apache.flink.streaming.runtime.tasks.OperatorChain >>> .closeOperators(OperatorChain.java:302) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( >>> StreamTask.java:576) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:544) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.streaming.runtime.tasks. >>> ExceptionInChainedOperatorException: Could not forward element to next >>> operator >>> at org.apache.flink.streaming.runtime.tasks. >>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) >>> at org.apache.flink.streaming.api.operators.CountingOutput >>> .emitWatermark(CountingOutput.java:41) >>> at org.apache.flink.streaming.runtime.operators. >>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( >>> TimestampsAndWatermarksOperator.java:165) >>> at org.apache.flink.api.common.eventtime. >>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit( >>> BoundedOutOfOrdernessWatermarks.java:69) >>> at org.apache.flink.streaming.runtime.operators. >>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator >>> .java:125) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .lambda$closeOperator$5(StreamOperatorWrapper.java:205) >>> at org.apache.flink.streaming.runtime.tasks. >>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor >>> .runThrowing(StreamTaskActionExecutor.java:92) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .closeOperator(StreamOperatorWrapper.java:203) >>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper >>> .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) >>> at org.apache.flink.streaming.runtime.tasks. >>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor >>> .runThrowing(StreamTaskActionExecutor.java:92) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail >>> .java:7
Re: Re: Re: Failed to cancel a job using the STOP rest API
Hi Thoms, Very thanks for reporting the exceptions, and it seems to be not work as expected to me... Could you also show us the dag of the job ? And does some operators in the source task use multiple-threads to emit records? Best, Yun --Original Mail -- Sender:Thomas Wang Send Date:Sun Jun 6 04:02:20 2021 Recipients:Yun Gao CC:user Subject:Re: Re: Failed to cancel a job using the STOP rest API One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints? Thomas On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang wrote: Hi Yun, Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints? Thanks. Thomas ``` 2021-06-05 10:02:51 java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80) at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165) at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69) at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ... 9 more Caused by: java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762) at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638) ... 21 more Caused by: java.lang.IllegalStateException at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90