[jira] [Created] (FLINK-19745) Supplement micro-benchmark for bounded blocking partition in remote channel case
Zhijiang created FLINK-19745: Summary: Supplement micro-benchmark for bounded blocking partition in remote channel case Key: FLINK-19745 URL: https://issues.apache.org/jira/browse/FLINK-19745 Project: Flink Issue Type: Task Components: Benchmarks, Runtime / Network Reporter: Zhijiang Assignee: Zhijiang The current benchmark `BlockingPartitionBenchmark` for batch job only measures the scenario of producer & consumer deployment in the same processor, that corresponds to the local input channel on consumer side. We want to supplement another common scenario to measure the effect of reading data via network shuffle, which corresponds to the remote input channel on consumer side. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19551) Follow up improvements for shuffle service
Zhijiang created FLINK-19551: Summary: Follow up improvements for shuffle service Key: FLINK-19551 URL: https://issues.apache.org/jira/browse/FLINK-19551 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Runtime / Network Reporter: Zhijiang After resolving the core architecture and functions of pluggable shuffle service proposed by FLINK-10653, there are still some pending followup issues to be traced future in this umbrella ticket with low priority. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19003) Add micro-benchmark for unaligned checkpoints
Zhijiang created FLINK-19003: Summary: Add micro-benchmark for unaligned checkpoints Key: FLINK-19003 URL: https://issues.apache.org/jira/browse/FLINK-19003 Project: Flink Issue Type: Task Components: Benchmarks, Runtime / Checkpointing Reporter: Zhijiang Assignee: Zhijiang It is necessary to supplement the unaligned checkpoint benchmark to verify our following improvements or any effect in future. The benchmark should cover both remote and local channels separately for different code paths, and it also needs to guarantee there are some in-flight buffers during checkpoint for measuring the channel state snapshot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18612) WordCount example failure when using relative output path
Zhijiang created FLINK-18612: Summary: WordCount example failure when using relative output path Key: FLINK-18612 URL: https://issues.apache.org/jira/browse/FLINK-18612 Project: Flink Issue Type: Bug Components: fs Affects Versions: 1.11.0, 1.11.1 Reporter: Zhijiang Fix For: 1.12.0, 1.11.2 The failure log can be found here [log|https://pipelines.actions.githubusercontent.com/revSbsLpzrFApLL6BmCvScWt72tRe3wYUv7fCdCtThtI5bydk7/_apis/pipelines/1/runs/27244/signedlogcontent/21?urlExpires=2020-07-16T06%3A35%3A49.4559813Z=HMACV1=%2FfAsJgIlIf%2BDitViRJYh0DAGJZjJwhsCGS219ZyniAA%3D]. When execute the following command, we can reproduce this problem locally. * bin/start-cluster.sh * bin/flink run -p 1 examples/streaming/WordCount.jar --input input --output result It is caused by the [commit|https://github.com/apache/flink/commit/a2deff2967b7de423b10f7f01a41c06565c37e62#diff-2010e422f5e43a971cd7134a9e0b9a5f ]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18591) Fix the format issue for metrics web page
Zhijiang created FLINK-18591: Summary: Fix the format issue for metrics web page Key: FLINK-18591 URL: https://issues.apache.org/jira/browse/FLINK-18591 Project: Flink Issue Type: Bug Components: Documentation, Runtime / Metrics Affects Versions: 1.11.0 Reporter: Zhijiang Assignee: Zhijiang Fix For: 1.12.0, 1.11.0 The formatting issue is shown by link https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#checkpointing -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18552) Update migration tests in master to cover migration from release-1.11
Zhijiang created FLINK-18552: Summary: Update migration tests in master to cover migration from release-1.11 Key: FLINK-18552 URL: https://issues.apache.org/jira/browse/FLINK-18552 Project: Flink Issue Type: Bug Components: Tests Reporter: Zhijiang Fix For: 1.12.0 We should update the following tests to cover migration from release-1.11: * {{CEPMigrationTest}} * {{BucketingSinkMigrationTest}} * {{FlinkKafkaConsumerBaseMigrationTest}} * {{ContinuousFileProcessingMigrationTest}} * {{WindowOperatorMigrationTest}} * {{StatefulJobSavepointMigrationITCase}} * {{StatefulJobWBroadcastStateMigrationITCase}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18088) Umbrella for features testing in release-1.11.0
Zhijiang created FLINK-18088: Summary: Umbrella for features testing in release-1.11.0 Key: FLINK-18088 URL: https://issues.apache.org/jira/browse/FLINK-18088 Project: Flink Issue Type: Test Affects Versions: 1.11.0 Reporter: Zhijiang Fix For: 1.11.0 This is the umbrella issue for tracing the testing progress of all the related features in release-1.11.0, either the way of e2e or manually testing in cluster, to confirm the features work in practice with good quality. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18063) Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner#processEndOfPartition
Zhijiang created FLINK-18063: Summary: Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner#processEndOfPartition Key: FLINK-18063 URL: https://issues.apache.org/jira/browse/FLINK-18063 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.0 Reporter: Zhijiang Assignee: Zhijiang Fix For: 1.11.0, 1.12.0 In the handle of CheckpointBarrierUnaligner#processEndOfPartition, it only aborts the current checkpoint by judging the condition of pending checkpoint from task thread processing, so it will miss one scenario that checkpoint triggered by notifyBarrierReceived from netty thread. The proper fix should also judge the pending checkpoint inside ThreadSafeUnaligner in order to abort it and reset internal variables in case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18050) Fix the bug of recycling buffer twice once exception in ChannelStateWriteRequestDispatcher#dispatch
Zhijiang created FLINK-18050: Summary: Fix the bug of recycling buffer twice once exception in ChannelStateWriteRequestDispatcher#dispatch Key: FLINK-18050 URL: https://issues.apache.org/jira/browse/FLINK-18050 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.0 Reporter: Zhijiang Assignee: Zhijiang Fix For: 1.11.0, 1.12.0 During ChannelStateWriteRequestDispatcherImpl#dispatch, `request.cancel(e)` is called to recycle the internal buffer of request once exception happens. But for the case of requesting write output, the buffers would be also finally recycled inside ChannelStateCheckpointWriter#write no matter exceptions or not. So the buffers in request will be recycled twice in the case of exception, which would cause further exceptions in the network shuffle process to reference the same buffer. This bug can be reproduced easily via running UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointOnParallelRemoteChannel. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17994) Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived
Zhijiang created FLINK-17994: Summary: Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived Key: FLINK-17994 URL: https://issues.apache.org/jira/browse/FLINK-17994 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: Zhijiang Assignee: Zhijiang Fix For: 1.11.0 The race condition issue happens as follow: * ch1 is received from network by netty thread and schedule the ch1 into mailbox via #notifyBarrierReceived * ch2 is received from network by netty thread, but before calling #notifyBarrierReceived this barrier was inserted into channel's data queue in advance. Then it would cause task thread process ch2 earlier than #notifyBarrierReceived by netty thread. * Task thread would execute checkpoint for ch2 directly because ch2 > ch1. * After that, the previous scheduled ch1 is performed from mailbox by task thread, then it causes the IllegalArgumentException inside SubtaskCheckpointCoordinatorImpl#checkpointState because it breaks the assumption that checkpoint is executed in incremental way. One possible solution for this race condition is inserting the received barrier into channel's data queue after calling #notifyBarrierReceived, then we can make the assumption that the checkpoint is always triggered by netty thread, to simplify the current situation that checkpoint might be triggered either by task thread or netty thread. To do so we can also avoid accessing #notifyBarrierReceived method by task thread while processing the barrier to simplify the logic inside CheckpointBarrierUnaligner. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17992) Exception from RemoteInputChannel#onBuffer should not fail the whole NetworkClientHandler
Zhijiang created FLINK-17992: Summary: Exception from RemoteInputChannel#onBuffer should not fail the whole NetworkClientHandler Key: FLINK-17992 URL: https://issues.apache.org/jira/browse/FLINK-17992 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.10.1, 1.10.0 Reporter: Zhijiang Assignee: Zhijiang Fix For: 1.11.0 RemoteInputChannel#onBuffer is invoked by CreditBasedPartitionRequestClientHandler while receiving and decoding the network data. #onBuffer can throw exceptions which would tag the error in client handler and fail all the added input channels inside handler. Then it would cause a tricky potential issue as following. If the RemoteInputChannel is canceling by canceler thread, then the task thread might exit early than canceler thread terminate. That means the PartitionRequestClient might not be closed (triggered by canceler thread) while the new task attempt is already deployed into this TaskManger. Therefore the new task might reuse the previous PartitionRequestClient while requesting partitions, but note that the respective client handler was already tagged an error before during above RemoteInputChannel#onBuffer. It will cause the next round unnecessary failover. It is hard to find this potential issue in production because it can be restored normal finally after one or more additional failover. We find this potential problem from UnalignedCheckpointITCase because it will define the precise restart times within configured failures. The solution is to only fail the respective task when its internal RemoteInputChannel#onBuffer throws any exceptions instead of failing the whole channels inside client handler, then the client is still health and can also be reused by other input channels as long as it is not released yet. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17869) Fix the race condition of aborting unaligned checkpoint
Zhijiang created FLINK-17869: Summary: Fix the race condition of aborting unaligned checkpoint Key: FLINK-17869 URL: https://issues.apache.org/jira/browse/FLINK-17869 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: Zhijiang Assignee: Zhijiang Fix For: 1.11.0 On ChannelStateWriter side, the lifecycle of checkpoint should be as follows: start -> in progress/abort -> stop. We must guarantee that #abort should be queued after #start, otherwise the aborted checkpoint might be started later again in the case of race condition. There are two cases might trigger abort checkpoint: * One is CheckpointBarrierUnaligner#processEndOfPartition, which should abort all the current and future checkpoints, no need to judge the condition `isCheckpointPending()` as current code did. * Another is CheckpointBarrierUnaligner#processCancellationBarrier, which should only abort the respective checkpoint id if already triggered before. The unaligned checkpoint might be triggered either by task thread or netty thread inside ThreadSafeUnaligner. Anyway we should know the current triggered checkpoint id in order to handle both above cases properly. Another bug is that during ChannelStateWriterImpl#abort, we should not remove the respective ChannelStateWriteResult. Otherwise it would throw IllegalArgumentException when ChannelStateWriterImpl#getWriteResult in the process of checkpoint. ChannelStateWriteResult should be created at #start method and only removed at #stop method. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17823) Resolve the race condition while releasing RemoteInputChannel
Zhijiang created FLINK-17823: Summary: Resolve the race condition while releasing RemoteInputChannel Key: FLINK-17823 URL: https://issues.apache.org/jira/browse/FLINK-17823 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.11.0 Reporter: Zhijiang Assignee: Zhijiang Fix For: 1.11.0 RemoteInputChannel#releaseAllResources might be called by canceler thread. Meanwhile, the task thread can also call RemoteInputChannel#getNextBuffer. There probably cause two potential problems: * Task thread might get null buffer after canceler thread already released all the buffers, then it might cause misleading NPE in getNextBuffer. * Task thread and canceler thread might pull the same buffer concurrently, which causes unexpected exception when the same buffer is recycled twice. The solution is to properly synchronize the buffer queue in release method to avoid the same buffer pulled by both canceler thread and task thread. And in getNextBuffer method, we add some explicit checks to avoid misleading NPE and hint some valid exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17719) Provide ChannelStateReader#hasStates for hints of reading channel states
Zhijiang created FLINK-17719: Summary: Provide ChannelStateReader#hasStates for hints of reading channel states Key: FLINK-17719 URL: https://issues.apache.org/jira/browse/FLINK-17719 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing, Runtime / Task Reporter: Zhijiang Assignee: Zhijiang Currently we rely on whether unaligned checkpoint is enabled to determine whether to read recovered states during task startup, then it will block the requirements of recovery from previous unaligned states even though the current mode is aligned. We can make `ChannelStateReader` provide the hint whether there are any channel states to be read during startup, then we will never lose any chances to recover from them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17413) Remove redundant states from ThreadSafeUnaligner
Zhijiang created FLINK-17413: Summary: Remove redundant states from ThreadSafeUnaligner Key: FLINK-17413 URL: https://issues.apache.org/jira/browse/FLINK-17413 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Reporter: Zhijiang Assignee: Zhijiang In RemoteInputChannel, we already have the states of `lastRequestedCheckpointId` and `receivedCheckpointId` to control whether the received buffer should be notified to unaligner component. In current ThreadSafeUnaligner, the variable `storeNewBuffers` is also used for similar purpose to deciding whether the notified buffer should be written into persister. In other words, as long as the `RemoteInputChannel` decides to notify this received buffer, it should be always needed to spill. So we can remove the variable `storeNewBuffers` from ThreadSafeUnaligner completely. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17389) LocalExecutorITCase.testBatchQueryCancel asserts error
Zhijiang created FLINK-17389: Summary: LocalExecutorITCase.testBatchQueryCancel asserts error Key: FLINK-17389 URL: https://issues.apache.org/jira/browse/FLINK-17389 Project: Flink Issue Type: Bug Components: Table SQL / Client Reporter: Zhijiang Fix For: 1.11.0 CI [https://api.travis-ci.org/v3/job/679144612/log.txt] {code:java} 19:28:13.121 [INFO] --- 19:28:13.121 [INFO] T E S T S 19:28:13.121 [INFO] --- 19:28:17.231 [INFO] Running org.apache.flink.table.client.gateway.local.LocalExecutorITCase 19:32:06.049 [ERROR] Tests run: 70, Failures: 1, Errors: 0, Skipped: 5, Time elapsed: 228.813 s <<< FAILURE! - in org.apache.flink.table.client.gateway.local.LocalExecutorITCase 19:32:06.051 [ERROR] testBatchQueryCancel[Planner: old](org.apache.flink.table.client.gateway.local.LocalExecutorITCase) Time elapsed: 32.767 s <<< FAILURE! java.lang.AssertionError: expected: but was: at org.apache.flink.table.client.gateway.local.LocalExecutorITCase.testBatchQueryCancel(LocalExecutorITCase.java:738) 19:32:06.440 [INFO] 19:32:06.440 [INFO] Results: 19:32:06.440 [INFO] 19:32:06.440 [ERROR] Failures: 19:32:06.440 [ERROR] LocalExecutorITCase.testBatchQueryCancel:738 expected: but was: 19:32:06.440 [INFO] 19:32:06.440 [ERROR] Tests run: 70, Failures: 1, Errors: 0, Skipped: 5{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17315) UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel failed in timeout
Zhijiang created FLINK-17315: Summary: UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel failed in timeout Key: FLINK-17315 URL: https://issues.apache.org/jira/browse/FLINK-17315 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Tests Reporter: Zhijiang Fix For: 1.11.0 Build: [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=45cc9205-bdb7-5b54-63cd-89fdc0983323] logs {code:java} 2020-04-21T20:25:23.1139147Z [ERROR] Errors: 2020-04-21T20:25:23.1140908Z [ERROR] UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel:80->execute:87 » TestTimedOut 2020-04-21T20:25:23.1141383Z [INFO] 2020-04-21T20:25:23.1141675Z [ERROR] Tests run: 1525, Failures: 0, Errors: 1, Skipped: 36 {code} I run it in my local machine and it almost takes about 40 seconds to finish, so the configured 90 seconds timeout seems not enough in heavy load environment sometimes. Maybe we can remove the timeout in tests since azure already configured to monitor the timeout. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17095) KafkaProducerExactlyOnceITCase fails with "address already in use"
Zhijiang created FLINK-17095: Summary: KafkaProducerExactlyOnceITCase fails with "address already in use" Key: FLINK-17095 URL: https://issues.apache.org/jira/browse/FLINK-17095 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Tests Reporter: Zhijiang Fix For: 1.11.0 Logs: [https://travis-ci.org/github/apache/flink/jobs/673786814] {code:java} [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 7.256 s <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase [ERROR] org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase Time elapsed: 7.256 s <<< ERROR! org.apache.kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:42733: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:573) at kafka.network.Acceptor.(SocketServer.scala:451) at kafka.network.SocketServer.createAcceptor(SocketServer.scala:245) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:215) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:214) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:214) at kafka.network.SocketServer.startup(SocketServer.scala:114) at kafka.server.KafkaServer.startup(KafkaServer.scala:253) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:404) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:131) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:142) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:131) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:100) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:92) at org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase.prepare(KafkaProducerExactlyOnceITCase.java:31) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) Caused by: java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:433) at sun.nio.ch.Net.bind(Net.java:425) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:220) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:78) at
[jira] [Created] (FLINK-17094) OverWindowITCase#testRowTimeBoundedPartitionedRowsOver failed by FileNotFoundException
Zhijiang created FLINK-17094: Summary: OverWindowITCase#testRowTimeBoundedPartitionedRowsOver failed by FileNotFoundException Key: FLINK-17094 URL: https://issues.apache.org/jira/browse/FLINK-17094 Project: Flink Issue Type: Bug Components: Runtime / State Backends, Tests Reporter: Zhijiang Fix For: 1.11.0 Build: [https://travis-ci.org/github/apache/flink/jobs/673786805] logs {code:java} [ERROR] testRowTimeBoundedPartitionedRowsOver[StateBackend=ROCKSDB](org.apache.flink.table.planner.runtime.stream.sql.OverWindowITCase) Time elapsed: 0.754 s <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659) at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1625) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:673) at org.apache.flink.table.planner.runtime.stream.sql.OverWindowITCase.testRowTimeBoundedPartitionedRowsOver(OverWindowITCase.scala:417) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
[jira] [Created] (FLINK-17092) Pyflink failure for BlinkStreamDependencyTests and StreamPandasUDFITTests
Zhijiang created FLINK-17092: Summary: Pyflink failure for BlinkStreamDependencyTests and StreamPandasUDFITTests Key: FLINK-17092 URL: https://issues.apache.org/jira/browse/FLINK-17092 Project: Flink Issue Type: Bug Components: API / Python, Tests Reporter: Zhijiang Fix For: 1.11.0 Build: [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7324=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=14487301-07d2-5d56-5690-6dfab9ffd4d9] logs {code:java} 2020-04-10T13:05:25.7259119Z E : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 2020-04-10T13:05:25.7259755Z E at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 2020-04-10T13:05:25.7260301Z E at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 2020-04-10T13:05:25.7260927Z E at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1663) 2020-04-10T13:05:25.7261772Z E at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) 2020-04-10T13:05:25.7262405Z E at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:51) 2020-04-10T13:05:25.7263073Z E at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:719) 2020-04-10T13:05:25.7263588Z E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2020-04-10T13:05:25.7264090Z E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2020-04-10T13:05:25.7264668Z E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2020-04-10T13:05:25.7265175Z E at java.lang.reflect.Method.invoke(Method.java:498) 2020-04-10T13:05:25.7265807Z E at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 2020-04-10T13:05:25.7266445Z E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 2020-04-10T13:05:25.7267288Z E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) 2020-04-10T13:05:25.7267897Z E at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 2020-04-10T13:05:25.7268518Z E at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) 2020-04-10T13:05:25.7269130Z E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) 2020-04-10T13:05:25.7269623Z E at java.lang.Thread.run(Thread.java:748) 2020-04-10T13:05:25.7270112Z E Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 2020-04-10T13:05:25.7270700Z E at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) 2020-04-10T13:05:25.7271406Z E at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:175) 2020-04-10T13:05:25.7272111Z E at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 2020-04-10T13:05:25.7272665Z E at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) 2020-04-10T13:05:25.7273245Z E at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 2020-04-10T13:05:25.7273909Z E at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 2020-04-10T13:05:25.7274514Z E at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) 2020-04-10T13:05:25.7275147Z E at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) 2020-04-10T13:05:25.7275800Z E at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) 2020-04-10T13:05:25.7276447Z E at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 2020-04-10T13:05:25.7277239Z E at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
[jira] [Created] (FLINK-16821) Run Kubernetes test failed with invalid named "minikube"
Zhijiang created FLINK-16821: Summary: Run Kubernetes test failed with invalid named "minikube" Key: FLINK-16821 URL: https://issues.apache.org/jira/browse/FLINK-16821 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes, Tests Reporter: Zhijiang This is the test run [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6702=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] Log output {code:java} 2020-03-27T00:07:38.9666021Z Running 'Run Kubernetes test' 2020-03-27T00:07:38.956Z == 2020-03-27T00:07:38.9677101Z TEST_DATA_DIR: /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-38967103614 2020-03-27T00:07:41.7529865Z Flink dist directory: /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT 2020-03-27T00:07:41.7721475Z Flink dist directory: /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT 2020-03-27T00:07:41.8208394Z Docker version 19.03.8, build afacb8b7f0 2020-03-27T00:07:42.4793914Z docker-compose version 1.25.4, build 8d51620a 2020-03-27T00:07:42.5359301Z Installing minikube ... 2020-03-27T00:07:42.5494076Z % Total% Received % Xferd Average Speed TimeTime Time Current 2020-03-27T00:07:42.5494729Z Dload Upload Total SpentLeft Speed 2020-03-27T00:07:42.5498136Z 2020-03-27T00:07:42.6214887Z 0 00 00 0 0 0 --:--:-- --:--:-- --:--:-- 0 2020-03-27T00:07:43.3467750Z 0 00 00 0 0 0 --:--:-- --:--:-- --:--:-- 0 2020-03-27T00:07:43.3469636Z 100 52.0M 100 52.0M0 0 65.2M 0 --:--:-- --:--:-- --:--:-- 65.2M 2020-03-27T00:07:43.4262625Z * There is no local cluster named "minikube" 2020-03-27T00:07:43.4264438Z - To fix this, run: minikube start 2020-03-27T00:07:43.4282404Z Starting minikube ... 2020-03-27T00:07:43.7749694Z * minikube v1.9.0 on Ubuntu 16.04 2020-03-27T00:07:43.7761742Z * Using the none driver based on user configuration 2020-03-27T00:07:43.7762229Z X The none driver requires conntrack to be installed for kubernetes version 1.18.0 2020-03-27T00:07:43.8202161Z * There is no local cluster named "minikube" 2020-03-27T00:07:43.8203353Z - To fix this, run: minikube start 2020-03-27T00:07:43.8568899Z * There is no local cluster named "minikube" 2020-03-27T00:07:43.8570685Z - To fix this, run: minikube start 2020-03-27T00:07:43.8583793Z Command: start_kubernetes_if_not_running failed. Retrying... 2020-03-27T00:07:48.9017252Z * There is no local cluster named "minikube" 2020-03-27T00:07:48.9019347Z - To fix this, run: minikube start 2020-03-27T00:07:48.9031515Z Starting minikube ... 2020-03-27T00:07:49.0612601Z * minikube v1.9.0 on Ubuntu 16.04 2020-03-27T00:07:49.0616688Z * Using the none driver based on user configuration 2020-03-27T00:07:49.0620173Z X The none driver requires conntrack to be installed for kubernetes version 1.18.0 2020-03-27T00:07:49.1040676Z * There is no local cluster named "minikube" 2020-03-27T00:07:49.1042353Z - To fix this, run: minikube start 2020-03-27T00:07:49.1453522Z * There is no local cluster named "minikube" 2020-03-27T00:07:49.1454594Z - To fix this, run: minikube start 2020-03-27T00:07:49.1468436Z Command: start_kubernetes_if_not_running failed. Retrying... 2020-03-27T00:07:54.1907713Z * There is no local cluster named "minikube" 2020-03-27T00:07:54.1909876Z - To fix this, run: minikube start 2020-03-27T00:07:54.1921479Z Starting minikube ... 2020-03-27T00:07:54.3388738Z * minikube v1.9.0 on Ubuntu 16.04 2020-03-27T00:07:54.3395499Z * Using the none driver based on user configuration 2020-03-27T00:07:54.3396443Z X The none driver requires conntrack to be installed for kubernetes version 1.18.0 2020-03-27T00:07:54.3824399Z * There is no local cluster named "minikube" 2020-03-27T00:07:54.3837652Z - To fix this, run: minikube start 2020-03-27T00:07:54.4203902Z * There is no local cluster named "minikube" 2020-03-27T00:07:54.4204895Z - To fix this, run: minikube start 2020-03-27T00:07:54.4217866Z Command: start_kubernetes_if_not_running failed. Retrying... 2020-03-27T00:07:59.4235917Z Command: start_kubernetes_if_not_running failed 3 times. 2020-03-27T00:07:59.4236459Z Could not start minikube. Aborting... 2020-03-27T00:07:59.8439850Z The connection to the server localhost:8080 was refused - did you specify the right host or port? 2020-03-27T00:07:59.8939088Z The connection to the server localhost:8080 was refused - did you specify the right host or port? 2020-03-27T00:07:59.9515679Z The connection to the server localhost:8080 was refused - did you specify the right host or port? 2020-03-27T00:07:59.9528463Z Stopping minikube ... 2020-03-27T00:07:59.9921558Z
[jira] [Created] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
Zhijiang created FLINK-16770: Summary: Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file Key: FLINK-16770 URL: https://issues.apache.org/jira/browse/FLINK-16770 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Tests Reporter: Zhijiang Fix For: 1.11.0 The log : [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] There was also the similar problem in https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no parallelism change. And this case is for scaling up. Not quite sure whether the root cause is the same one. {code:java} 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test' 2020-03-25T06:50:31.3895308Z == 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304 2020-03-25T06:50:31.5500274Z Flink dist directory: /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT 2020-03-25T06:50:31.6354639Z Starting cluster. 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host fv-az655. 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655. 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come up... 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come up... 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come up... 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come up... 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up. 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true SIMULATE_FAILURE=false ... 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is running. 2020-03-25T06:50:46.1758132Z Waiting for job (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints ... 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, current progress: 173 records ... 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0. 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0. 2020-03-25T06:50:50.5468230Z ls: cannot access '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata': No such file or directory 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . ... 2020-03-25T06:50:58.4728245Z 2020-03-25T06:50:58.4732663Z 2020-03-25T06:50:58.4735785Z The program finished with the following exception: 2020-03-25T06:50:58.4737759Z 2020-03-25T06:50:58.4742666Z org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. 2020-03-25T06:50:58.4746274Zat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) 2020-03-25T06:50:58.4749954Zat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) 2020-03-25T06:50:58.4752753Zat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142) 2020-03-25T06:50:58.4755400Zat org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659) 2020-03-25T06:50:58.4757862Zat org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) 2020-03-25T06:50:58.4760282Zat org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890) 2020-03-25T06:50:58.4763591Zat org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:963) 2020-03-25T06:50:58.4764274Zat java.security.AccessController.doPrivileged(Native Method) 2020-03-25T06:50:58.4764809Zat javax.security.auth.Subject.doAs(Subject.java:422) 2020-03-25T06:50:58.4765434Zat org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) 2020-03-25T06:50:58.4766180Zat org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) 2020-03-25T06:50:58.4773549Zat org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:963) 2020-03-25T06:50:58.4774502Z Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException:
[jira] [Created] (FLINK-16768) HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart runs without exit
Zhijiang created FLINK-16768: Summary: HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart runs without exit Key: FLINK-16768 URL: https://issues.apache.org/jira/browse/FLINK-16768 Project: Flink Issue Type: Task Components: FileSystems, Tests Reporter: Zhijiang Fix For: 1.11.0 Logs: [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6584=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=d26b3528-38b0-53d2-05f7-37557c2405e4] {code:java} 2020-03-24T15:52:18.9196862Z "main" #1 prio=5 os_prio=0 tid=0x7fd36c00b800 nid=0xc21 runnable [0x7fd3743ce000] 2020-03-24T15:52:18.9197235Zjava.lang.Thread.State: RUNNABLE 2020-03-24T15:52:18.9197536Zat java.net.SocketInputStream.socketRead0(Native Method) 2020-03-24T15:52:18.9197931Zat java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 2020-03-24T15:52:18.9198340Zat java.net.SocketInputStream.read(SocketInputStream.java:171) 2020-03-24T15:52:18.9198749Zat java.net.SocketInputStream.read(SocketInputStream.java:141) 2020-03-24T15:52:18.9199171Zat sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 2020-03-24T15:52:18.9199840Zat sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) 2020-03-24T15:52:18.9200265Zat sun.security.ssl.InputRecord.read(InputRecord.java:532) 2020-03-24T15:52:18.9200663Zat sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975) 2020-03-24T15:52:18.9201213Z- locked <0x927583d8> (a java.lang.Object) 2020-03-24T15:52:18.9201589Zat sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933) 2020-03-24T15:52:18.9202026Zat sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 2020-03-24T15:52:18.9202583Z- locked <0x92758c00> (a sun.security.ssl.AppInputStream) 2020-03-24T15:52:18.9203029Zat org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) 2020-03-24T15:52:18.9203558Zat org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198) 2020-03-24T15:52:18.9204121Zat org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176) 2020-03-24T15:52:18.9204626Zat org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135) 2020-03-24T15:52:18.9205121Zat com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) 2020-03-24T15:52:18.9205679Zat com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) 2020-03-24T15:52:18.9206164Zat com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) 2020-03-24T15:52:18.9206786Zat com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125) 2020-03-24T15:52:18.9207361Zat com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) 2020-03-24T15:52:18.9207839Zat com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) 2020-03-24T15:52:18.9208327Zat com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) 2020-03-24T15:52:18.9208809Zat com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) 2020-03-24T15:52:18.9209273Zat com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) 2020-03-24T15:52:18.9210003Zat com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) 2020-03-24T15:52:18.9210658Zat com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) 2020-03-24T15:52:18.9211154Zat org.apache.hadoop.fs.s3a.S3AInputStream.lambda$read$3(S3AInputStream.java:445) 2020-03-24T15:52:18.9211631Zat org.apache.hadoop.fs.s3a.S3AInputStream$$Lambda$42/1936375962.execute(Unknown Source) 2020-03-24T15:52:18.9212044Zat org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) 2020-03-24T15:52:18.9212553Zat org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) 2020-03-24T15:52:18.9212972Zat org.apache.hadoop.fs.s3a.Invoker$$Lambda$23/1457226878.execute(Unknown Source) 2020-03-24T15:52:18.9213408Zat org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) 2020-03-24T15:52:18.9213866Zat org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) 2020-03-24T15:52:18.9214273Zat org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) 2020-03-24T15:52:18.9214701Zat org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:441) 2020-03-24T15:52:18.9215443Z- locked <0x926e88b0> (a org.apache.hadoop.fs.s3a.S3AInputStream) 2020-03-24T15:52:18.9215852Zat java.io.DataInputStream.read(DataInputStream.java:149) 2020-03-24T15:52:18.9216305Zat org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
[jira] [Created] (FLINK-16750) Kerberized YARN on Docker test fails with staring Hadoop cluster
Zhijiang created FLINK-16750: Summary: Kerberized YARN on Docker test fails with staring Hadoop cluster Key: FLINK-16750 URL: https://issues.apache.org/jira/browse/FLINK-16750 Project: Flink Issue Type: Task Components: Deployment / Docker, Deployment / YARN, Tests Reporter: Zhijiang Build: [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6563=results] logs {code:java} 2020-03-24T08:48:53.3813297Z == 2020-03-24T08:48:53.3814016Z Running 'Running Kerberized YARN on Docker test (custom fs plugin)' 2020-03-24T08:48:53.3814511Z == 2020-03-24T08:48:53.3827028Z TEST_DATA_DIR: /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-53382133956 2020-03-24T08:48:56.1944456Z Flink dist directory: /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT 2020-03-24T08:48:56.2300265Z Flink dist directory: /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT 2020-03-24T08:48:56.2412349Z Flink dist directory: /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT 2020-03-24T08:48:56.2861072Z Docker version 19.03.8, build afacb8b7f0 2020-03-24T08:48:56.8025297Z docker-compose version 1.25.4, build 8d51620a 2020-03-24T08:48:56.8499071Z Flink Tarball directory /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-53382133956 2020-03-24T08:48:56.8501170Z Flink tarball filename flink.tar.gz 2020-03-24T08:48:56.8502612Z Flink distribution directory name flink-1.11-SNAPSHOT 2020-03-24T08:48:56.8504724Z End-to-end directory /home/vsts/work/1/s/flink-end-to-end-tests 2020-03-24T08:48:56.8620115Z Building Hadoop Docker container 2020-03-24T08:48:56.9117609Z Sending build context to Docker daemon 56.83kB 2020-03-24T08:48:56.9117926Z 2020-03-24T08:48:57.0076373Z Step 1/54 : FROM sequenceiq/pam:ubuntu-14.04 2020-03-24T08:48:57.0082811Z ---> df7bea4c5f64 2020-03-24T08:48:57.0084798Z Step 2/54 : RUN set -x && addgroup hadoop && useradd -d /home/hdfs -ms /bin/bash -G hadoop -p hdfs hdfs && useradd -d /home/yarn -ms /bin/bash -G hadoop -p yarn yarn && useradd -d /home/mapred -ms /bin/bash -G hadoop -p mapred mapred && useradd -d /home/hadoop-user -ms /bin/bash -p hadoop-user hadoop-user 2020-03-24T08:48:57.0092833Z ---> Using cache 2020-03-24T08:48:57.0093976Z ---> 3c12a7d3e20c 2020-03-24T08:48:57.0096889Z Step 3/54 : RUN set -x && apt-get update && apt-get install -y curl tar sudo openssh-server openssh-client rsync unzip krb5-user 2020-03-24T08:48:57.0106188Z ---> Using cache 2020-03-24T08:48:57.0107830Z ---> 9a59599596be 2020-03-24T08:48:57.0110793Z Step 4/54 : RUN set -x && mkdir -p /var/log/kerberos && touch /var/log/kerberos/kadmind.log 2020-03-24T08:48:57.0118896Z ---> Using cache 2020-03-24T08:48:57.0121035Z ---> c83551d4f695 2020-03-24T08:48:57.0125298Z Step 5/54 : RUN set -x && rm -f /etc/ssh/ssh_host_dsa_key /etc/ssh/ssh_host_rsa_key /root/.ssh/id_rsa && ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key && ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key && ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa && cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys 2020-03-24T08:48:57.0133473Z ---> Using cache 2020-03-24T08:48:57.0134240Z ---> f69560c2bc0a 2020-03-24T08:48:57.0135683Z Step 6/54 : RUN set -x && mkdir -p /usr/java/default && curl -Ls 'http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz' -H 'Cookie: oraclelicense=accept-securebackup-cookie' | tar --strip-components=1 -xz -C /usr/java/default/ 2020-03-24T08:48:57.0148145Z ---> Using cache 2020-03-24T08:48:57.0149008Z ---> f824256d72f1 2020-03-24T08:48:57.0152616Z Step 7/54 : ENV JAVA_HOME /usr/java/default 2020-03-24T08:48:57.0155992Z ---> Using cache 2020-03-24T08:48:57.0160104Z ---> 770e6bfd219a 2020-03-24T08:48:57.0160410Z Step 8/54 : ENV PATH $PATH:$JAVA_HOME/bin 2020-03-24T08:48:57.0168690Z ---> Using cache 2020-03-24T08:48:57.0169451Z ---> 2643e1a25898 2020-03-24T08:48:57.0174785Z Step 9/54 : RUN set -x && curl -LOH 'Cookie: oraclelicense=accept-securebackup-cookie' 'http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip' && unzip jce_policy-8.zip && cp /UnlimitedJCEPolicyJDK8/local_policy.jar /UnlimitedJCEPolicyJDK8/US_export_policy.jar $JAVA_HOME/jre/lib/security 2020-03-24T08:48:57.0187797Z ---> Using cache 2020-03-24T08:48:57.0188202Z ---> 51cf2085f95d 2020-03-24T08:48:57.0188467Z Step 10/54 : ARG HADOOP_VERSION=2.8.4 2020-03-24T08:48:57.0199344Z ---> Using cache 2020-03-24T08:48:57.0199846Z ---> d169c15c288c 2020-03-24T08:48:57.0200652Z
[jira] [Created] (FLINK-16739) PrestoS3FileSystemITCase#testSimpleFileWriteAndRead fails with no such key
Zhijiang created FLINK-16739: Summary: PrestoS3FileSystemITCase#testSimpleFileWriteAndRead fails with no such key Key: FLINK-16739 URL: https://issues.apache.org/jira/browse/FLINK-16739 Project: Flink Issue Type: Task Components: Connectors / FileSystem, Tests Reporter: Zhijiang Build: [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6546=logs=e9af9cde-9a65-5281-a58e-2c8511d36983=df5b2bf5-bcff-5dc9-7626-50bed0866a82] logs {code:java} 2020-03-24T01:51:19.6988685Z [INFO] Running org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase 2020-03-24T01:51:21.6250893Z [INFO] Running org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase 2020-03-24T01:51:25.1626385Z [WARNING] Tests run: 8, Failures: 0, Errors: 0, Skipped: 2, Time elapsed: 5.461 s - in org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase 2020-03-24T01:51:50.5503712Z [ERROR] Tests run: 7, Failures: 1, Errors: 1, Skipped: 0, Time elapsed: 28.922 s <<< FAILURE! - in org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase 2020-03-24T01:51:50.5506010Z [ERROR] testSimpleFileWriteAndRead[Scheme = s3p](org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase) Time elapsed: 0.7 s <<< ERROR! 2020-03-24T01:51:50.5513057Z com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: A07D70A474EABC13; S3 Extended Request ID: R2ReW39oZ9ncoc82xb+V5h/EJV5/Mnsee+7uZ7cFMkliTQ/nKhvHPCDfr5zddbfUdR/S49VdbrA=), S3 Extended Request ID: R2ReW39oZ9ncoc82xb+V5h/EJV5/Mnsee+7uZ7cFMkliTQ/nKhvHPCDfr5zddbfUdR/S49VdbrA= (Path: s3://***/temp/tests-c79a578b-13d9-41ba-b73b-4f53fc965b96/test.txt) 2020-03-24T01:51:50.5517642Z Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: A07D70A474EABC13; S3 Extended Request ID: R2ReW39oZ9ncoc82xb+V5h/EJV5/Mnsee+7uZ7cFMkliTQ/nKhvHPCDfr5zddbfUdR/S49VdbrA=) 2020-03-24T01:51:50.5519791Z 2020-03-24T01:51:50.5520679Z [ERROR] org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase Time elapsed: 17.431 s <<< FAILURE! 2020-03-24T01:51:50.5521841Z java.lang.AssertionError: expected: but was: 2020-03-24T01:51:50.5522437Z 2020-03-24T01:51:50.8966641Z [INFO] 2020-03-24T01:51:50.8967386Z [INFO] Results: 2020-03-24T01:51:50.8967849Z [INFO] 2020-03-24T01:51:50.8968357Z [ERROR] Failures: 2020-03-24T01:51:50.8970933Z [ERROR] PrestoS3FileSystemITCase>AbstractHadoopFileSystemITTest.teardown:155->AbstractHadoopFileSystemITTest.checkPathExistence:61 expected: but was: 2020-03-24T01:51:50.8972311Z [ERROR] Errors: 2020-03-24T01:51:50.8973807Z [ERROR] PrestoS3FileSystemITCase>AbstractHadoopFileSystemITTest.testSimpleFileWriteAndRead:87 » UnrecoverableS3Operation {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16712) Refactor StreamTask to construct final fields
Zhijiang created FLINK-16712: Summary: Refactor StreamTask to construct final fields Key: FLINK-16712 URL: https://issues.apache.org/jira/browse/FLINK-16712 Project: Flink Issue Type: Task Components: Runtime / Task Reporter: Zhijiang Assignee: Zhijiang Fix For: 1.11.0 At the moment there are four fields initialized in the method of StreamTask#beforeInvoke, such as `stateBackend`, `checkpointStorage`, `timerService`, `asyncOperationsThreadPool`. In general it is suggested to use final fields to get known benefits. So we can refactor the StreamTask to initialize these fields in the constructor instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16690) Refactor StreamTaskTest to reuse TestTaskBuilder and MockStreamTaskBuilder
Zhijiang created FLINK-16690: Summary: Refactor StreamTaskTest to reuse TestTaskBuilder and MockStreamTaskBuilder Key: FLINK-16690 URL: https://issues.apache.org/jira/browse/FLINK-16690 Project: Flink Issue Type: Task Components: Runtime / Task, Tests Reporter: Zhijiang Fix For: 1.11.0 We can reuse existing TestTaskBuilder and MockStreamTaskBuilder for constructing Task and StreamTask easily in tests to simplify StreamTaskTest case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16653) Introduce ResultPartitionWriterTestBase for simplifying tests
Zhijiang created FLINK-16653: Summary: Introduce ResultPartitionWriterTestBase for simplifying tests Key: FLINK-16653 URL: https://issues.apache.org/jira/browse/FLINK-16653 Project: Flink Issue Type: Bug Components: Runtime / Network, Tests Reporter: Zhijiang Assignee: Zhijiang At the moment there are at-least four implementations of `ResultPartitionWriter` interface used in unit tests. And there are about ten methods to be implemented for `ResultPartitionWriter` and most of them are dummy in tests. When we want to extend the methods for `ResultPartitionWriter`, the above four dummy implementations in tests have to be adjusted as well, to waste a bit efforts. Therefore abstract ResultPartitionWriterTestBase is proposed to implement the basic dummy methods for `ResultPartitionWriter`, and the previous four instances can all extend it to only implement one or two methods based on specific requirements in tests. And we will probably only need to adjust the ResultPartitionWriterTestBase when extending the `ResultPartitionWriter` interface. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16645) Limit the maximum backlogs in subpartitions for data skew case
Zhijiang created FLINK-16645: Summary: Limit the maximum backlogs in subpartitions for data skew case Key: FLINK-16645 URL: https://issues.apache.org/jira/browse/FLINK-16645 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Zhijiang Fix For: 1.11.0 In the case of data skew, most of the buffers in partition's LocalBufferPool are probably requested away and accumulated in certain subpartition, which would increase in-flight data to slow down the barrier alignment. We can set up a proper config to control how many backlogs are allowed for one subpartition. If one subpartition reaches this threshold, it will make the buffer pool unavailable which blocks task processing continuously. Then we can reduce the in-flight data for speeding up checkpoint process a bit and not impact on the performance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16644) HadoopS3FileSystemBehaviorITCase failed with null uri host
Zhijiang created FLINK-16644: Summary: HadoopS3FileSystemBehaviorITCase failed with null uri host Key: FLINK-16644 URL: https://issues.apache.org/jira/browse/FLINK-16644 Project: Flink Issue Type: Bug Components: FileSystems, Tests Reporter: Zhijiang Fix For: 1.11.0 [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/6340/logs/72] {code:java} 2020-03-18T04:21:08.9614875Z [ERROR] Tests run: 16, Failures: 0, Errors: 16, Skipped: 0, Time elapsed: 0.641 s <<< FAILURE! - in org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase 2020-03-18T04:21:08.9616309Z [ERROR] testMkdirsReturnsTrueWhenCreatingDirectory(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase) Time elapsed: 0.104 s <<< ERROR! 2020-03-18T04:21:08.9617487Z java.io.IOException: null uri host. This can be caused by unencoded / in the password string 2020-03-18T04:21:08.9618318Zat org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60) 2020-03-18T04:21:08.9619063Z Caused by: java.lang.NullPointerException: null uri host. This can be caused by unencoded / in the password string 2020-03-18T04:21:08.9619752Zat org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60) 2020-03-18T04:21:08.9620120Z 2020-03-18T04:21:08.9620544Z [ERROR] testMkdirsReturnsTrueWhenCreatingDirectory(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase) Time elapsed: 0.104 s <<< ERROR! 2020-03-18T04:21:08.9621056Z java.lang.NullPointerException 2020-03-18T04:21:08.9621205Z 2020-03-18T04:21:08.9621606Z [ERROR] testMkdirsFailsWithExistingParentFile(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase) Time elapsed: 0 s <<< ERROR! 2020-03-18T04:21:08.9622187Z java.io.IOException: null uri host. This can be caused by unencoded / in the password string 2020-03-18T04:21:08.9622737Zat org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60) 2020-03-18T04:21:08.9623318Z Caused by: java.lang.NullPointerException: null uri host. This can be caused by unencoded / in the password string 2020-03-18T04:21:08.9623913Zat org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60) 2020-03-18T04:21:08.9624325Z 2020-03-18T04:21:08.9624738Z [ERROR] testMkdirsFailsWithExistingParentFile(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase) Time elapsed: 0.001 s <<< ERROR! 2020-03-18T04:21:08.9625215Z java.lang.NullPointerException 2020-03-18T04:21:08.9625364Z 2020-03-18T04:21:08.9625795Z [ERROR] testMkdirsReturnsTrueForExistingDirectory(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase) Time elapsed: 0.001 s <<< ERROR! 2020-03-18T04:21:08.9626376Z java.io.IOException: null uri host. This can be caused by unencoded / in the password string 2020-03-18T04:21:08.9626922Zat org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60) 2020-03-18T04:21:08.9627583Z Caused by: java.lang.NullPointerException: null uri host. This can be caused by unencoded / in the password string 2020-03-18T04:21:08.9628270Zat org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60) 2020-03-18T04:21:08.9628694Z 2020-03-18T04:21:08.9629127Z [ERROR] testMkdirsReturnsTrueForExistingDirectory(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase) Time elapsed: 0.001 s <<< ERROR! 2020-03-18T04:21:08.9629865Z java.lang.NullPointerException 2020-03-18T04:21:08.9630030Z 2020-03-18T04:21:08.9630404Z [ERROR] testPathAndScheme(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase) Time elapsed: 0.001 s <<< ERROR! 2020-03-18T04:21:08.9630954Z java.io.IOException: null uri host. This can be caused by unencoded / in the password string 2020-03-18T04:21:08.9631489Zat org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60) 2020-03-18T04:21:08.9632082Z Caused by: java.lang.NullPointerException: null uri host. This can be caused by unencoded / in the password string 2020-03-18T04:21:08.9632663Zat org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60) 2020-03-18T04:21:08.9633026Z 2020-03-18T04:21:08.9633396Z [ERROR] testPathAndScheme(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase) Time elapsed: 0.001 s <<< ERROR! 2020-03-18T04:21:08.9633920Z java.lang.NullPointerException 2020-03-18T04:21:08.9634125Z 2020-03-18T04:21:08.9634514Z [ERROR] testHomeAndWorkDir(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
[jira] [Created] (FLINK-16641) Announce sender's backlog to solve the deadlock issue without exclusive buffers
Zhijiang created FLINK-16641: Summary: Announce sender's backlog to solve the deadlock issue without exclusive buffers Key: FLINK-16641 URL: https://issues.apache.org/jira/browse/FLINK-16641 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Zhijiang Fix For: 1.11.0 This is the second ingredient besides FLINK-16404 to solve the deadlock problem without exclusive buffers. The scenario is as follows: * The data in subpartition with positive backlog can be sent without doubt because the exclusive credits would be feedback finally. * Without exclusive buffers, the receiver would not request floating buffers for 0 backlog. But when the new backlog is added into such subpartition, it has no way to notify the receiver side without positive credits ATM. * So it would result in waiting for each other between receiver and sender sides to cause deadlock. The sender waits for credit to notify backlog and the receiver waits for backlog to request floating credits. To solve the above problem, the sender needs a separate message to announce backlog sometimes besides existing `BufferResponse`. Then the receiver can get this info to request floating buffers to feedback. The side effect brought is to increase network transport delay and throughput regression. We can measure how much it effects in existing micro-benchmark. It might probably bear this effect to get a benefit of fast checkpoint without exclusive buffers. We can give the proper explanations in respective configuration options to let users make the final decision in practice. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16610) FlinkKafkaInternalProducerITCase fails with timeout exception
Zhijiang created FLINK-16610: Summary: FlinkKafkaInternalProducerITCase fails with timeout exception Key: FLINK-16610 URL: https://issues.apache.org/jira/browse/FLINK-16610 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Tests Reporter: Zhijiang Fix For: 1.11.0 FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator:184->KafkaTestBase.deleteTestTopic:201->Object.wait:->TestTimedOut Logs: [https://api.travis-ci.org/v3/job/662458976/log.txt] {noformat} 21:25:57,692 [ Time-limited test] INFO org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - Deleting topic flink-kafka-producer-txn-coordinator-changed java.lang.InterruptedException at java.lang.Object.wait(Native Method) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:110)21:26:25,273 [main] ERROR org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase - Test testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase) failed with: org.junit.runners.model.TestTimedOutException: test timed out after 3 milliseconds at java.lang.Object.wait(Native Method) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:110) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.tryDelete(KafkaTestEnvironmentImpl.java:173) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:160) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:201) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:184) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.tryDelete(KafkaTestEnvironmentImpl.java:173) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:160) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:201) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:184) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at
[jira] [Created] (FLINK-16586) Build ResultSubpartitionInfo and InputChannelInfo in respective constructors
Zhijiang created FLINK-16586: Summary: Build ResultSubpartitionInfo and InputChannelInfo in respective constructors Key: FLINK-16586 URL: https://issues.apache.org/jira/browse/FLINK-16586 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Zhijiang Assignee: Zhijiang In the constructors of ResultSubpartition and InputChannel, the respective ResultSubpartitionInfo and InputChannelInfo should be created as well. These infos would be used for interacting with ChannelStateReader and ChannelStateWriter future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16583) SQLClientKafkaITCase.testKafka failed in SqlClientException
Zhijiang created FLINK-16583: Summary: SQLClientKafkaITCase.testKafka failed in SqlClientException Key: FLINK-16583 URL: https://issues.apache.org/jira/browse/FLINK-16583 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Table SQL / Client, Tests Reporter: Zhijiang Fix For: 1.11.0 The end-to-end test {{SQLClientKafkaITCase.testKafka}} failed with {code:java} 18:13:02.425 [ERROR] testKafka[0: kafka-version:0.10 kafka-sql-version:.*kafka-0.10.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) Time elapsed: 32.246 s <<< ERROR! java.io.IOException: Process execution failed due error. Error output:Mar 12, 2020 6:11:46 PM org.jline.utils.Log logr WARNING: Unable to create a system terminal, creating a dumb terminal (enable debug logging for more information) Exception in thread "main" org.apache.flink.table.client.SqlClientException: Could not submit given SQL update statement to cluster. at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:131) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) at org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178) at org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151) 18:13:02.425 [ERROR] testKafka[1: kafka-version:0.11 kafka-sql-version:.*kafka-0.11.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) Time elapsed: 34.539 s <<< ERROR! java.io.IOException: Process execution failed due error. Error output:Mar 12, 2020 6:12:21 PM org.jline.utils.Log logr WARNING: Unable to create a system terminal, creating a dumb terminal (enable debug logging for more information) Exception in thread "main" org.apache.flink.table.client.SqlClientException: Could not submit given SQL update statement to cluster. at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:131) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) at org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178) at org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151) {code} [https://api.travis-ci.org/v3/job/661535183/log.txt] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16564) HadoopS3RecoverableWriterITCase fails with NullPointerException
Zhijiang created FLINK-16564: Summary: HadoopS3RecoverableWriterITCase fails with NullPointerException Key: FLINK-16564 URL: https://issues.apache.org/jira/browse/FLINK-16564 Project: Flink Issue Type: Bug Components: Connectors / FileSystem, Tests Affects Versions: 1.11.0 Reporter: Zhijiang {code:java} 23:46:14.740 [INFO] Running org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase 23:46:16.674 [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.817 s <<< FAILURE! - in org.apache.flink.fs.s3hadoop.HadoopS3FileSystemITCase 23:46:16.674 [ERROR] testDirectoryListing(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemITCase) Time elapsed: 2.166 s <<< ERROR! java.io.FileNotFoundException: No such file or directory: s3://[secure]/temp/tests-eb7ce54f-0e20-492a-8fef-ce6f742048d3/testdir 23:46:17.471 [INFO] Running org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase 23:46:19.548 [ERROR] Tests run: 13, Failures: 0, Errors: 13, Skipped: 0, Time elapsed: 4.805 s <<< FAILURE! - in org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase 23:46:19.564 [ERROR] testCloseWithNoData(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase) Time elapsed: 2.448 s <<< ERROR! java.lang.NullPointerException at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testCloseWithNoData(HadoopS3RecoverableWriterITCase.java:186) 23:46:19.567 [ERROR] testCommitAfterPersist(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase) Time elapsed: 0.111 s <<< ERROR! java.lang.NullPointerException at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testCommitAfterPersist(HadoopS3RecoverableWriterITCase.java:208) 23:46:19.567 [ERROR] testRecoverWithEmptyState(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase) Time elapsed: 0.108 s <<< ERROR! java.lang.NullPointerException at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersist(HadoopS3RecoverableWriterITCase.java:384) at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersistWithSmallData(HadoopS3RecoverableWriterITCase.java:352) at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testRecoverWithEmptyState(HadoopS3RecoverableWriterITCase.java:302) 23:46:19.567 [ERROR] testRecoverFromIntermWithoutAdditionalState(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase) Time elapsed: 0.103 s <<< ERROR! java.lang.NullPointerException at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersist(HadoopS3RecoverableWriterITCase.java:384) at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersistWithSmallData(HadoopS3RecoverableWriterITCase.java:352) at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testRecoverFromIntermWithoutAdditionalState(HadoopS3RecoverableWriterITCase.java:316) 23:46:19.568 [ERROR] testCallingDeleteObjectTwiceDoesNotThroughException(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase) Time elapsed: 0.108 s <<< ERROR! java.lang.NullPointerException at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testCallingDeleteObjectTwiceDoesNotThroughException(HadoopS3RecoverableWriterITCase.java:245) 23:46:19.568 [ERROR] testCommitAfterNormalClose(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase) Time elapsed: 0.104 s <<< ERROR! java.lang.NullPointerException at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testCommitAfterNormalClose(HadoopS3RecoverableWriterITCase.java:196) 23:46:19.568 [ERROR] testRecoverWithStateWithMultiPart(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase) Time elapsed: 0.327 s <<< ERROR! java.lang.NullPointerException at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersist(HadoopS3RecoverableWriterITCase.java:384) at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersistWithMultiPartUploads(HadoopS3RecoverableWriterITCase.java:364) at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart(HadoopS3RecoverableWriterITCase.java:330) 23:46:19.568 [ERROR] testRecoverFromIntermWithoutAdditionalStateWithMultiPart(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase) Time elapsed: 0.248 s <<< ERROR! java.lang.NullPointerException at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersist(HadoopS3RecoverableWriterITCase.java:384) at org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersistWithMultiPartUploads(HadoopS3RecoverableWriterITCase.java:364) at
[jira] [Created] (FLINK-16537) Implement ResultPartition state recovery for unaligned checkpoint
Zhijiang created FLINK-16537: Summary: Implement ResultPartition state recovery for unaligned checkpoint Key: FLINK-16537 URL: https://issues.apache.org/jira/browse/FLINK-16537 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing, Runtime / Network Reporter: Zhijiang Fix For: 1.11.0 During recovery process for unaligned checkpoint, the partition state should also be recovered besides with existing operator states. The ResultPartition would request buffer from local pool and then interact with ChannelStateReader to fill in the state data. The filled buffer would be inserted into respective ResultSubpartition queue in normal way. It should guarantee that op can not process any inputs before finishing all the output recovery to avoid mis-order issue. Refer to more details by [https://docs.google.com/document/d/16_MOQymzxrKvUHXh6QFr2AAXIKt_2vPUf8vzKy4H_tU/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16536) Implement InputChannel state recovery for unaligned checkpoint
Zhijiang created FLINK-16536: Summary: Implement InputChannel state recovery for unaligned checkpoint Key: FLINK-16536 URL: https://issues.apache.org/jira/browse/FLINK-16536 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing, Runtime / Network Reporter: Zhijiang Fix For: 1.11.0 During recovery process for unaligned checkpoint, the input channel state should also be recovered besides with existing operator states. The InputGate would request buffer from local pool and then interact with ChannelStateReader to fill in the state data. The filled buffer would be inserted into respective InputChannel queue for processing in normal way. It should guarantee that the new data from upstream side should not overtake the input state data to avoid mis-order issue. Refer to more details by [design doc|[https://docs.google.com/document/d/16_MOQymzxrKvUHXh6QFr2AAXIKt_2vPUf8vzKy4H_tU/edit]] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16454) Update the copyright year in NOTICE files
Zhijiang created FLINK-16454: Summary: Update the copyright year in NOTICE files Key: FLINK-16454 URL: https://issues.apache.org/jira/browse/FLINK-16454 Project: Flink Issue Type: Task Components: Release System Environment: The current copyright year is 2014-2019 in NOTICE files. We should change it to 2014-2020. Reporter: Zhijiang Assignee: Zhijiang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16428) Fine-grained network buffer management for backpressure
Zhijiang created FLINK-16428: Summary: Fine-grained network buffer management for backpressure Key: FLINK-16428 URL: https://issues.apache.org/jira/browse/FLINK-16428 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Zhijiang Fix For: 1.11.0 It is an umbrella ticket for tracing the progress of this improvement. This is the second ingredient to solve the “checkpoints under backpressure” problem (together with unaligned checkpoints). It consists of two steps: * See if we can use less network memory in general for streaming jobs (with potentially different distribution of floating buffers in the input side) * Under backpressure, reduce network memory to have less in-flight data (needs specification of algorithm and experiments) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16404) Solve the potential deadlock problem when reducing exclusive buffers to zero
Zhijiang created FLINK-16404: Summary: Solve the potential deadlock problem when reducing exclusive buffers to zero Key: FLINK-16404 URL: https://issues.apache.org/jira/browse/FLINK-16404 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Zhijiang Fix For: 1.11.0 One motivation of this issue is for reducing the in-flight data in the case of back pressure to speed up checkpoint. The current default exclusive buffers per channel is 2. If we reduce it to 0 and increase somewhat floating buffers for compensation, it might cause deadlock problem because all the floating buffers might be requested away by some blocked input channels and never recycled until barrier alignment. In order to solve above deadlock concern, we can make some logic changes on both sender and receiver sides. * Sender side: it should revoke previous received credit after sending checkpoint barrier, that means it would not send any following buffers until receiving new credits. * Receiver side: after processing the barrier from one channel and setting it blocked, it should release the available floating buffers for this blocked channel, and restore requesting floating buffers until barrier alignment. That means the receiver would only announce new credits to sender side after barrier alignment. Another possible benefit to do so is that the floating buffers might be more properly made use of before barrier alignment. We can further verify the performance concern via existing micro-benchmark. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16403) Solve the potential deadlock problem when reducing exclusive buffers to zero
Zhijiang created FLINK-16403: Summary: Solve the potential deadlock problem when reducing exclusive buffers to zero Key: FLINK-16403 URL: https://issues.apache.org/jira/browse/FLINK-16403 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Zhijiang One motivation of this issue is for reducing the in-flight data in the case of back pressure to speed up checkpoint. The current default exclusive buffers per channel is 2. If we reduce it to 0 and increase somewhat floating buffers for compensation, it might cause deadlock problem because all the floating buffers might be requested away by some blocked input channels and never recycled until barrier alignment. In order to solve above deadlock concern, we can make some logic changes on both sender and receiver sides. * Sender side: it should revoke previous received credit after sending checkpoint barrier, that means it would not send any following buffers until receiving new credits. * Receiver side: after processing the barrier from one channel and setting it blocked, it should release the available floating buffers for this blocked channel, and restore requesting floating buffers until barrier alignment. That means the receiver would only announce new credits to sender side after barrier alignment. Another possible benefit to do so is that the floating buffers might be more properly made use of before barrier alignment. We can further verify the performance concern via existing micro-benchmark. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16285) Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument
Zhijiang created FLINK-16285: Summary: Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument Key: FLINK-16285 URL: https://issues.apache.org/jira/browse/FLINK-16285 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Zhijiang Assignee: Zhijiang The IntermediateResultPartitionID info can be got directly from the respective InputChannel, so we can remove it from the arguments of SingleInputGate#setInputChannel to cleanup the codes. It is also helpful to simplify the unit tests and avoid passing the inconsistent IntermediateResultPartitionID with the internal ResultPartitionID that the respective InputChannel maintains. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16284) Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument
Zhijiang created FLINK-16284: Summary: Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument Key: FLINK-16284 URL: https://issues.apache.org/jira/browse/FLINK-16284 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Zhijiang Assignee: Zhijiang The IntermediateResultPartitionID info can be got directly from the respective InputChannel, so we can remove it from the arguments of SingleInputGate#setInputChannel to cleanup the codes. It is also helpful to simplify the unit tests and avoid passing the inconsistent IntermediateResultPartitionID with the internal ResultPartitionID that the respective InputChannel maintains. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16257) Remove useless ResultPartitionID from AddCredit message
Zhijiang created FLINK-16257: Summary: Remove useless ResultPartitionID from AddCredit message Key: FLINK-16257 URL: https://issues.apache.org/jira/browse/FLINK-16257 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Zhijiang Assignee: Zhijiang Fix For: 1.11.0 The ResultPartitionID in AddCredit message is never used on upstream side, so we can remove it to cleanup the codes. There would have another two benefits to do so: # Reduce the total message size from previous 52 bytes to 20 bytes. # Decouple the dependency with `InputChannel#getPartitionId` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side
zhijiang created FLINK-16012: Summary: Reduce the default number of exclusive buffers from 2 to 1 on receiver side Key: FLINK-16012 URL: https://issues.apache.org/jira/browse/FLINK-16012 Project: Flink Issue Type: Improvement Reporter: zhijiang In order to reduce the inflight buffers for checkpoint in the case of back pressure, we can reduce the number of exclusive buffers for remote input channel from default 2 to 1 as the first step. Besides that, the total required buffers are also reduced as a result. We can further verify the performance effect via various of benchmarks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15963) Reuse the same ByteBuf while writing the BufferResponse header
zhijiang created FLINK-15963: Summary: Reuse the same ByteBuf while writing the BufferResponse header Key: FLINK-15963 URL: https://issues.apache.org/jira/browse/FLINK-15963 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang On sender side while writing the BufferResponse message, it always allocates the new direct ByteBuf from netty allocator to write header part for every message. Considering only one message is written in one channel at the same time, then we can make use of a fixed ByteBuf to write header part for all the BufferResponse messages. We can verify how it effects the performance in practice/benchmark. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15914) Miss the barrier alignment metric for the case of two inputs
zhijiang created FLINK-15914: Summary: Miss the barrier alignment metric for the case of two inputs Key: FLINK-15914 URL: https://issues.apache.org/jira/browse/FLINK-15914 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / Metrics Reporter: zhijiang Assignee: zhijiang When the StreamTwoInputSelectableProcessor was introduced before, it was missing to add the barrier alignment metric in the constructor. But it does not cause problems then, because only StreamTwoInputProcessor works at that time. After StreamTwoInputProcessor is replaced by StreamTwoInputSelectableProcessor as now, this bug is exposed and we will not see the barrier alignment metric for the case of two inputs. The solution is to add this metric while constructing the current StreamTwoInputProcessor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15444) Make the component AbstractInvokable in CheckpointBarrierHandler NonNull
zhijiang created FLINK-15444: Summary: Make the component AbstractInvokable in CheckpointBarrierHandler NonNull Key: FLINK-15444 URL: https://issues.apache.org/jira/browse/FLINK-15444 Project: Flink Issue Type: Task Components: Runtime / Checkpointing Reporter: zhijiang Assignee: zhijiang The current component {{AbstractInvokable}} in {{CheckpointBarrierHandler}} is annotated as {{@Nullable}}. Actually in real code path it is passed via the constructor and never be null. The nullable annotation is only used for unit test purpose. But this way would mislead the real usage in practice and bring extra troubles, because you have to alway check whether it is null before usage in related processes. We can refactor the related unit tests to implement a dummy {{AbstractInvokable}} for tests and remove the {{@Nullable}} annotation from the related class constructors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15340) Remove the executor of pipelined compression benchmark
zhijiang created FLINK-15340: Summary: Remove the executor of pipelined compression benchmark Key: FLINK-15340 URL: https://issues.apache.org/jira/browse/FLINK-15340 Project: Flink Issue Type: Task Components: Benchmarks Reporter: zhijiang Assignee: zhijiang In [FLINK-15308|https://issues.apache.org/jira/browse/FLINK-15308], we removed the function of compression for pipelined case. Accordingly we also need to remove the respective benchmark executor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15306) Adjust the default netty transport option from nio to auto
zhijiang created FLINK-15306: Summary: Adjust the default netty transport option from nio to auto Key: FLINK-15306 URL: https://issues.apache.org/jira/browse/FLINK-15306 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Fix For: 1.11.0 The default option of `taskmanager.network.netty.transport` in NettyShuffleEnvironmentOptions is `nio` now. As we know, the `epoll` mode can get better performance, less GC and have more advanced features which are only available on linux. Therefore it is better to adjust the default option to `auto` instead, and then the framework would automatically choose the proper mode based on the platform. We would further verify the performance effect via micro benchmark if possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15187) Reuse LocalBufferPool for FileBufferReader in blocking partition
zhijiang created FLINK-15187: Summary: Reuse LocalBufferPool for FileBufferReader in blocking partition Key: FLINK-15187 URL: https://issues.apache.org/jira/browse/FLINK-15187 Project: Flink Issue Type: Task Components: Runtime / Network Reporter: zhijiang If we take the file type via `taskmanager.network.bounded-blocking-subpartition-type` for batch job, while creating the respective view for reading the subpartition persistent data, it would create two unpolled memory segments for every subpartition. This portion of temporary memory is not managed and calculated by framework, so it might cause OOM error concern. We can also reuse the ResultPartition's `LocalBufferPool` to read subpartition data to avoid this memory overhead. But there are additional two problems for reuse directly. * The current core size of `LocalBufferPool` is `numberOfSubpartitions + 1`, but every subpartition needs two segments for pre-reading atm. We can remove the pre-reading to make the current core pool size suitable for the reading requirements, because the pre-reading function seems has no obvious benefits in practice which is only effecting for the last data. * When task finishes, it would destroy the `LocalBufferPool` even though the respective `ResultPartition still alive, so the following subpartition view can not reuse the pool directly. We should adjust the respective logics to either delay destroy the pool or create a new pool for subpartition view. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15070) Supplement the case of bounded blocking partition for benchmark
zhijiang created FLINK-15070: Summary: Supplement the case of bounded blocking partition for benchmark Key: FLINK-15070 URL: https://issues.apache.org/jira/browse/FLINK-15070 Project: Flink Issue Type: Task Components: Benchmarks Reporter: zhijiang ATM the benchmark only covers the case of pipelined partition used in streaming job, so it is better to also cover the case of blocking partition for batch job. Then we can easily trace the performance concerns for any changes future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15069) Supplement the compression case for benchmark
zhijiang created FLINK-15069: Summary: Supplement the compression case for benchmark Key: FLINK-15069 URL: https://issues.apache.org/jira/browse/FLINK-15069 Project: Flink Issue Type: Task Components: Benchmarks Reporter: zhijiang While reviewing the PR of introducing data compression for persistent storage and network shuffle, we think it is better to also cover this scenario in the benchmark for tracing the performance issues future. Refer to https://github.com/apache/flink/pull/10375#pullrequestreview-325193504 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15021) Refactor to remove channelWritabilityChanged from PartitionRequestQueue
zhijiang created FLINK-15021: Summary: Refactor to remove channelWritabilityChanged from PartitionRequestQueue Key: FLINK-15021 URL: https://issues.apache.org/jira/browse/FLINK-15021 Project: Flink Issue Type: Task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang After removing the non credit-based flow control codes, the related channel writability changed logics in PartitionRequestQueue are invalid and can be removed completely. Therefore we can refactor the process to simplify the codes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14553) Respect non-blocking output in StreamTask#processInput
zhijiang created FLINK-14553: Summary: Respect non-blocking output in StreamTask#processInput Key: FLINK-14553 URL: https://issues.apache.org/jira/browse/FLINK-14553 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang Fix For: 1.10.0 The non-blocking output was introduced in FLINK-14396 and FLINK-14498 to solve the problem of handling the checkpoint barrier in the case of backpressure. In order to make the whole process through, {{StreamInputProcessor}} should be allowed to process input elements if the output is also available. The default core size of {{LocalBufferPool}} for {{ResultPartition}} should also be increased by 1 in order not to impact the performance in the new way, and this tiny memory overhead could be ignored in practice. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14551) Unaligned checkpoints
zhijiang created FLINK-14551: Summary: Unaligned checkpoints Key: FLINK-14551 URL: https://issues.apache.org/jira/browse/FLINK-14551 Project: Flink Issue Type: New Feature Components: Runtime / Checkpointing, Runtime / Network Reporter: zhijiang This is the umbrella issue for the feature of unaligned checkpoints. Refer to the [FLIP-76|https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints] for more details. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool
zhijiang created FLINK-14498: Summary: Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool Key: FLINK-14498 URL: https://issues.apache.org/jira/browse/FLINK-14498 Project: Flink Issue Type: Task Components: Runtime / Network Reporter: zhijiang If the LocalBufferPool can not request available buffer from NetworkBufferPool, it would wait for 2 seconds before trying to request again in a loop way. Therefore it would bring some delays in practice. To improve this interaction, we could introduce NetworkBufferPool#isAvailable to return a future which would be monitored by LocalBufferPool. Then once there are available buffers in NetworkBufferPool, it would complete this future to notify LocalBufferPool immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14472) Implement back-pressure monitor with non-blocking outputs
zhijiang created FLINK-14472: Summary: Implement back-pressure monitor with non-blocking outputs Key: FLINK-14472 URL: https://issues.apache.org/jira/browse/FLINK-14472 Project: Flink Issue Type: Task Components: Runtime / Network Reporter: zhijiang Fix For: 1.10.0 Currently back-pressure monitor relies on detecting task threads that are stuck in `requestBufferBuilderBlocking`. There are actually two cases to cause back-pressure ATM: * There are no available buffers in `LocalBufferPool` and all the given quotas from global pool are also exhausted. Then we need to wait for buffer recycling to `LocalBufferPool`. * No available buffers in `LocalBufferPool`, but the quota has not been used up. While requesting buffer from global pool, it is blocked because of no available buffers in global pool. Then we need to wait for buffer recycling to global pool. We already implemented the non-blocking output for the first case in [FLINK-14396|https://issues.apache.org/jira/browse/FLINK-14396], and we expect the second case done together with adjusting the back-pressure monitor which could check for `RecordWriter#isAvailable` instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14396) Implement rudimentary non-blocking network output
zhijiang created FLINK-14396: Summary: Implement rudimentary non-blocking network output Key: FLINK-14396 URL: https://issues.apache.org/jira/browse/FLINK-14396 Project: Flink Issue Type: Task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Fix For: 1.10.0 Considering the mailbox model and unaligned checkpoints requirements in future, task network output should be non-blocking. In other words, as long as output is available, it should never block for a subsequent/future single record write. In the first version, we only implement the non-blocking output for the most regular case, and do not solve the following cases which still keep the previous behavior. * Big record which might span multiple buffers * Flatmap-like operators which might emit multiple records in every process * Broadcast watermark which might request multiple buffers at a time The solution is providing the RecordWriter#isAvailable method and respective LocalBufferPool#isAvailable for judging the output beforehand. As long as there is at-least one available buffer in LocalBufferPool, the RecordWriter is available for network output in most cases. This doesn’t include runtime handling of this non-blocking and availability behavior in StreamInputProcessor Note: It requires the minimum number of buffers in output LocalBufferPool adjusting to (numberOfSubpartitions + 1) and also adjusting the monitor of backpressure future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14394) Remove unnecessary notifySubpartitionConsumed method from view reader
zhijiang created FLINK-14394: Summary: Remove unnecessary notifySubpartitionConsumed method from view reader Key: FLINK-14394 URL: https://issues.apache.org/jira/browse/FLINK-14394 Project: Flink Issue Type: Task Reporter: zhijiang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14289) Remove Optional fields from RecordWriter relevant classes
zhijiang created FLINK-14289: Summary: Remove Optional fields from RecordWriter relevant classes Key: FLINK-14289 URL: https://issues.apache.org/jira/browse/FLINK-14289 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang Based on the code style guides for [Jave Optional|[https://flink.apache.org/contributing/code-style-and-quality-java.html#java-optional]|https://flink.apache.org/contributing/code-style-and-quality-java.html#java-optional],] , the optional should not be used for class fields. So we remove the optional usages from RecordWriter, BroadcastRecordWriter and ChannelSelectorRecordWriter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14004) Define SourceReader interface to verify the integration with StreamOneInputProcessor
zhijiang created FLINK-14004: Summary: Define SourceReader interface to verify the integration with StreamOneInputProcessor Key: FLINK-14004 URL: https://issues.apache.org/jira/browse/FLINK-14004 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang We already refactored the task input and output sides based on the new source characters in FLIP-27. In order to further verify that the new source reader could work well with the unified StreamOneInputProcessor in mailbox model, we would design a unit test for integrating the whole process. In detail: * Define SourceReader and SourceOutput relevant interfaces based on FLIP-27 * Implement an example of stateless SourceReader (bounded sequence of integers) * Define SourceReaderOperator to integrate the SourceReader with StreamOneInputProcessor * Define SourceReaderStreamTask to execute the source input and implement a unit test for it. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source
zhijiang created FLINK-13798: Summary: Refactor the process of checking stream status while emitting watermark in source Key: FLINK-13798 URL: https://issues.apache.org/jira/browse/FLINK-13798 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang As we know, the watermark could be emitted to downstream only when the stream status is active. For the downstream task we already have the component of StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the source task the current implementation of this logic seems a bit tricky. There are two scenarios for the source case: * Emit watermark via source context: In the specific WatermarkContext, it would toggle the stream status as active before collecting/emitting records/watermarks. Then in the implementation of RecordWriterOutput, it would check the status always active before really emitting watermark. * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer in interval time. When it happens, it would call output stack to emit watermark. Then the RecordWriterOutput could take the role of checking status before really emitting watermark. So we can see that the checking status logic in RecordWriterOutput only works for above second scenario, and this logic seems redundant for the first scenario because WatermarkContext always toggle active status before emitting. Even worse, the logic is RecordWriterOutput would bring cycle dependency with StreamStatusMaintainer, which is a blocker for the following work of integrating source processing on runtime side. The solution is that we could migrate the checking logic from RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could also remove the toggle active logic in existing WatermarkContext. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13767) Migrate isFinished method from AvailabilityListener to AsyncDataInput
zhijiang created FLINK-13767: Summary: Migrate isFinished method from AvailabilityListener to AsyncDataInput Key: FLINK-13767 URL: https://issues.apache.org/jira/browse/FLINK-13767 Project: Flink Issue Type: Sub-task Components: Runtime / Network, Runtime / Task Reporter: zhijiang Assignee: zhijiang AvailabilityListener is both used in AsyncDataInput and StreamTaskInput. We already introduced InputStatus for StreamTaskInput#emitNext, and then InputStatus#END_OF_INPUT has the same semantic with AvailabilityListener#isFinished. But for the case of AsyncDataInput which is mainly used by InputGate layer, the isFinished() method is still needed at the moment. So we migrate this method from AvailabilityListener to AsyncDataInput, and refactor the StreamInputProcessor implementations by using InputStatus to judge finished state. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13766) Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext
zhijiang created FLINK-13766: Summary: Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext Key: FLINK-13766 URL: https://issues.apache.org/jira/browse/FLINK-13766 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang The current processing in task input processor is based on the way of pollNext. In order to unify the processing way of new source operator, we introduce the new StreamTaskInput#emitNext(Output) instead of current pollNext. Then we need to adjust the existing implementations of StreamOneInputProcessor/StreamTwoInputSelectableProcessor based on the new emit way. To do so, we could integrate all the task inputs from network/source in a unified processing on runtime side. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13765) Introduce the InputSelectionHandler for selecting next input in StreamTwoInputSelectableProcessor
zhijiang created FLINK-13765: Summary: Introduce the InputSelectionHandler for selecting next input in StreamTwoInputSelectableProcessor Key: FLINK-13765 URL: https://issues.apache.org/jira/browse/FLINK-13765 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang In StreamTwoInputSelectableProcessor there are three fields \{InputSelectable, InputSelection, availableInputsMask} to be used together for the function of selecting next available input index. It would bring two problems: * From design aspect, these fields should be abstracted into a separate component and passed into StreamTwoInputSelectableProcessor. * inputSelector.nextSelection() is called while processing elements in StreamTwoInputSelectableProcessor, so it is the blocker for integrating task input/output for both StreamOneInputProcessor/StreamTwoInputSelectableProcessor later. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13764) Pass the counter of numRecordsIn into the constructors of StreamOne/TwoInputProcessor
zhijiang created FLINK-13764: Summary: Pass the counter of numRecordsIn into the constructors of StreamOne/TwoInputProcessor Key: FLINK-13764 URL: https://issues.apache.org/jira/browse/FLINK-13764 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang Currently the counter of numRecordsIn is setup while processing input in processor. In order to integrate the processing logic based on StreamTaskInput#emitNext(Output) later, we need to pass the counter into output functions then. So this refactoring is the precondition of following works, and it could get additional benefits. One is that we could make the counter as final field in StreamInputProcessor. Another is that we could reuse the counter setup logic for both StreamOne/TwoInputProcessors. There should be no side effects if we make the counter setup a bit earlier than the previous way. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13762) Integrate the implementation of ForwardingValveOutputHandler for StreamOne/TwoInputProcessor
zhijiang created FLINK-13762: Summary: Integrate the implementation of ForwardingValveOutputHandler for StreamOne/TwoInputProcessor Key: FLINK-13762 URL: https://issues.apache.org/jira/browse/FLINK-13762 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have separate implementations of ForwardingValveOutputHandler. Especially for the implementation in StreamTwoInputSelectableProcessor, it couples the internal input index logic which would be a blocker for the following unification of StreamTaskInput/Output. We could realize a unified ForwardingValveOutputHandler for both StreamOneInput/ TwoInputSelectableProcessor, and it does not consider different inputs to always consume StreamStatus. Then we refactor the implementation of StreamStatusMaintainer for judging the status of different inputs internally before really emitting the StreamStatus. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13754) Decouple OperatorChain from StreamStatusMaintainer
zhijiang created FLINK-13754: Summary: Decouple OperatorChain from StreamStatusMaintainer Key: FLINK-13754 URL: https://issues.apache.org/jira/browse/FLINK-13754 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang There are two motivations for this refactoring: * It is the precondition for the following work of decoupling the dependency between two inputs status in ForwardingValveOutputHandler. * From the aspect of design rule, the current OperatorChain takes many unrelated roles like StreamStatusMaintainer to make it unmaintainable. The root reason for this case is from the cycle dependency between RecordWriterOutput (created by OperatorChain) and StreamStatusMaintainer. The solution is to refactor the creation of StreamStatusMaintainer and RecordWriterOutput in StreamTask level, and then break the implementation cycle dependency between them. The array of RecordWriters which has close relationship with RecordWriterOutput is created in StreamTask, so it is reasonable to create them together. The created StreamStatusMaintainer in StreamTask can be directly referenced by subclasses like OneInputStreamTask/TwoInputStreamTask. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask
zhijiang created FLINK-13753: Summary: Integrate new Source Operator with Mailbox Model in StreamTask Key: FLINK-13753 URL: https://issues.apache.org/jira/browse/FLINK-13753 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: zhijiang Assignee: zhijiang This is the umbrella issue for integrating new source operator with mailbox model in StreamTask. The motivation is based on [FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface] which proposes to refactor the whole source API and the integration of task-level actions (including checkpoint, timer, async operator) with unified [mailbox model| [https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]] on runtime side. * The benefits are simple unified processing logics because only one single thread handles all the actions without concurrent issue, and further getting rid of lock dependency which causes unfair lock concern in checkpoint process. * We still need to support the current legacy source in some releases which would probably be used for a while, especially for the scenario of performance concern. The design doc is [https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty
zhijiang created FLINK-13493: Summary: BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty Key: FLINK-13493 URL: https://issues.apache.org/jira/browse/FLINK-13493 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang In previous implementation, it would always notify the {{ResultPartition}} of consumed subpartition if any reader view is released. Based on the reference-counter release strategy it might cause problems if one blocking subpartition has multiple readers. That means the whole result partition might be released but there are still alive readers in some subpartitions. Although the default release strategy for blocking partition is based on JM/scheduler notification atm. But if we switch the option to based on consumption notification it would cause problems. And from the subpartition side it should has the right behavior no matter what is the specific release strategy in upper layer. In order to fix this bug, the {{BoundedBlockingSubpartition}} would only notify {{onConsumedSubpartition}} when all the readers are empty. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition
zhijiang created FLINK-13478: Summary: Decouple two different release strategies in BoundedBlockingSubpartition Key: FLINK-13478 URL: https://issues.apache.org/jira/browse/FLINK-13478 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Runtime / Network Reporter: zhijiang Assignee: zhijiang We have two basic release strategies atm. One is based on consumption via network notification from consumer. The other is based on notification via RPC from JM/scheduler. But in current implementation of {{BoundedBlockingSubpartition}}, these two ways are coupled with each other. In detail, the network consumption notification could only close data file after the release RPC was triggered from JM/scheduler. Also for the release RPC it has to wait all the reader views really released before closing the data file. So the release RPC still relies on network notification to some extent. In order to make these two release strategies independent, if the release call is from JM/scheduler RPC, we could immediately release all the view readers and then close the data file as well. If the release is based on consumption notification, after all the view readers for one subpartition are released, the subpartition could further notify the parent {{ResultPartition}} which decides whether to release the whole partition or not. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader
zhijiang created FLINK-13442: Summary: Remove unnecessary notifySubpartitionConsumed method from view reader Key: FLINK-13442 URL: https://issues.apache.org/jira/browse/FLINK-13442 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently the methods of `NetworkSequenceViewReader#notifySubpartitionConsumed` and ` NetworkSequenceViewReader#releaseAllResources` would be called meanwhile in netty stack during releasing resources. To make this release logic simple and clean, we could remove the redundant `notifySubpartitionConsumed` from `NetworkSequenceViewReader` side, and also remove it from `ResultSubpartitionView` side. In the implementation of `ResultSubpartitionView#releaseAllResources` it would further notify the parent subpartition of consumed state via `ResultSubpartition#notifySubpartitionConsumed` which further feedback to parent `ResultPartition` layer via `onConsumedSubpartition`. Finally `ResultPartition` could decide whether to release itself or not. E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly used for pipelined partition, it would release partition after reference counter decreased to 0. For the case of `ResultPartition` which would be generated for blocking partition by default, it would never be released after notifying consumed. And the JM/scheduler would decide when to release partition properly. In addition, `InputChannel#notifySubpartitionConsumed` could also be removed as a result of above. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13235) Change the Netty default transport mode to auto
zhijiang created FLINK-13235: Summary: Change the Netty default transport mode to auto Key: FLINK-13235 URL: https://issues.apache.org/jira/browse/FLINK-13235 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The current default config for "taskmanager.net.transport" in NettyShuffleEnvironmentOptions is "NIO". In order to use "EPOLL" mode which has better performance and is recommended when available, we could change the default config as "AUTO". Then the "NIO" mode is used as a fallback. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13141) Remove getBufferSize method from BufferPoolFactory
zhijiang created FLINK-13141: Summary: Remove getBufferSize method from BufferPoolFactory Key: FLINK-13141 URL: https://issues.apache.org/jira/browse/FLINK-13141 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang This is just a refactor work to make the interfacer of BufferPoolFactory more simple and clean. BufferPoolFactory#getBufferSize is only used for creating subpartitions in ResultPartitionFactory. We could pass the network buffer size from NettyShuffleEnvironmentConfiguration while constructing the ResultPartitionFactory, then the interface method getBufferSize could be removed form BufferPoolFactory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13126) Construct special test/benchmark to verify the backlog effect
zhijiang created FLINK-13126: Summary: Construct special test/benchmark to verify the backlog effect Key: FLINK-13126 URL: https://issues.apache.org/jira/browse/FLINK-13126 Project: Flink Issue Type: Sub-task Components: Benchmarks, Runtime / Network, Tests Reporter: zhijiang Assignee: zhijiang Based on Piotr's suggestion in reviewing [PR|[https://github.com/apache/flink/pull/7911]], it is better to construct relevant test or benchmark to further verify the backlog effect as a follow-up work for [FLINK-11082|https://issues.apache.org/jira/browse/FLINK-11082]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
zhijiang created FLINK-13100: Summary: Fix the unexpected IOException during FileBufferReader#nextBuffer Key: FLINK-13100 URL: https://issues.apache.org/jira/browse/FLINK-13100 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang In the implementation of FileBufferReader#nextBuffer, we expect the next memory segment always available based on the assumption that the nextBuffer call could only happen when the previous buffer was recycled before. Otherwise it would throw an IOException in current implementation. In fact, the above assumption is not making sense based on the credit-based and zero-copy features in network. The detail processes are as follows: * The netty thread finishes calling the channel.writeAndFlush() in PartitionRequestQueue and adds a listener to handle the ChannelFuture later. Before future done, the corresponding buffer is not recycled because of zero-copy improvement. * Before the previous future done, the netty thread could trigger next writeAndFlush via processing addCredit message, then FileBufferReader#nextBuffer would throw exception because of previous buffer not recycled. We thought of several ways for solving this potential bug: * It does not trigger the next writeAndFlush before the previous future done. To do so it has to maintain the future state and check it in relevant actions. I wonder it might bring performance regression in network throughput and bring extra state management. * Adjust the implementation of current FileBufferReader. We ever regarded the blocking partition view as always available based on the next buffer read ahead, so it would be always added into available queue in PartitionRequestQueue. Actually this next buffer ahead only simplifies the process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view availability could be judged based on available buffers in FileBufferReader instead of next buffer ahead. When the buffer is recycled into FileBufferReader after writeAndFlush done, it could call notifyDataAvailable to add this view into available queue in PartitionRequestQueue. I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13010) Refactor the process of SchedulerNG#requestPartitionState
zhijiang created FLINK-13010: Summary: Refactor the process of SchedulerNG#requestPartitionState Key: FLINK-13010 URL: https://issues.apache.org/jira/browse/FLINK-13010 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently `requestPartitionState` is mainly used for querying partition state when the consumer receives `PartitionNotFoundException` during requesting partition. Actually we do not have the concept of partition state atm, and ` requestPartitionState` would return the corresponding producer's state as as result, so it exists a contradiction here. My suggestion is refactoring the method as `requestPartitionProducerState` and we do not need to pass `IntermediateDataSetID` and `ResultPartitionID` arguments for finding the corresponding execution attempt. We could only pass the `ExecutionAttemptID` in method then the corresponding execution attempt could be easily found from the mapping in `ExecutionGraph`. To do so, we could further remove ` IntermediateDataSetID` from `SingleInputGate` and might replace `IntermediateDataSetID` by `InputGateID` in `InputGateDeploymentDescriptor`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters
zhijiang created FLINK-12882: Summary: Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters Key: FLINK-12882 URL: https://issues.apache.org/jira/browse/FLINK-12882 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} during creating {{ResultPartitionWriter}}s in {{ShuffleEnvironment}}. Actually the {{ResultPartitionID}} could be got directly from {{ResultPartitionDeploymentDescriptor}} via {{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this field in the interface to make it simple. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12843) Refactor the pin logic in ResultPartition
zhijiang created FLINK-12843: Summary: Refactor the pin logic in ResultPartition Key: FLINK-12843 URL: https://issues.apache.org/jira/browse/FLINK-12843 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The pin logic is for adding the reference counter based on number of subpartitions in {{ResultPartition}}. It seems not necessary to do it in while loop as now, because the atomic counter would not be accessed by other threads during pin. If the `ResultPartition` is not created yet, the {{ResultPartition#createSubpartitionView}} would not be called and it would response {{ResultPartitionNotFoundException}} in {{ResultPartitionManager}}. So we could simple increase the reference counter in {{ResultPartition}} constructor directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12842) Fix invalid check released state during ResultPartition#createSubpartitionView
zhijiang created FLINK-12842: Summary: Fix invalid check released state during ResultPartition#createSubpartitionView Key: FLINK-12842 URL: https://issues.apache.org/jira/browse/FLINK-12842 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently in {{ResultPartition#createSubpartitionView}} it would check whether this partition is released before creating view. But this check is based on {{refCnt != -1}} which seems invalid, because the reference counter would not always reflect the released state. In the case of {{ResultPartition#release/fail}}, the reference counter is not set to -1. Even if in the case of {{ResultPartition#onConsumedSubpartition}}, the reference counter seems also no chance to be -1. So we could check the real {{isReleased}} state during creating view instead of reference counter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12738) Remove abstract getPageSize from InputGate
zhijiang created FLINK-12738: Summary: Remove abstract getPageSize from InputGate Key: FLINK-12738 URL: https://issues.apache.org/jira/browse/FLINK-12738 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently {{InputGate#getPageSize}} is only used for constructing {{BarrierBuffer}}. In order to make abstract InputGate simple and clean, we should remove unnecessary abstract methods from it. Considering the page size could be parsed directly from configuration which could also visible while constructing {{BarrierBuffer}}, so it is reasonable to do so. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12735) Make shuffle environment implementation independent with IOManager
zhijiang created FLINK-12735: Summary: Make shuffle environment implementation independent with IOManager Key: FLINK-12735 URL: https://issues.apache.org/jira/browse/FLINK-12735 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The current creation of {{NetworkEnvironment}} is relying on {{IOManager}} from {{TaskManagerServices}}. In order not to rely on external specific components for implementing shuffle environment, and let the specific implementation creates internal components if required. The current abstract {{IOManager}} has two roles, one is for file channel management based on temp directories configuration, and the other is providing abstract methods for reading/writing files. We could further extract the file channel management as a separate internal class which could be reused for all the required components, like current {{BoundedBlockingSubpartition}}. To do so, the shuffle environment should also creates its internal channel manager to break dependency with {{IOManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12630) Refactor abstract InputGate to general interface
zhijiang created FLINK-12630: Summary: Refactor abstract InputGate to general interface Key: FLINK-12630 URL: https://issues.apache.org/jira/browse/FLINK-12630 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang `InputGate` is currently defined as an abstract class which extracts the common codes for checking data availability for subclasses `SingleInputGate` and `UnionInputGate`, but it might bring limits for further extending `InputGate` implementations in shuffle service architecture. `SingleInputGate` is created from shuffle service so it belongs to the scope of shuffle service, while `UnionInputGate` is a wrapper of some `SingleInputGate`s so it should be in the task/processor stack. In order to make a new `InputGate` implementation from another new shuffle service could be directly pitched in, we should define a more clean `InputGate` interface to decouple the implementation of checking data available logic. In detail we could define the `isAvailable` method in `InputGate` interface and extract the current implementation as a separate class `FutureBasedAvailability` which could still be extent and reused for both `SingleInputGate` and `UnionInputGate`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12603) Remove getOwningTaskName method from InputGate
zhijiang created FLINK-12603: Summary: Remove getOwningTaskName method from InputGate Key: FLINK-12603 URL: https://issues.apache.org/jira/browse/FLINK-12603 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Current `InputGate#getOwningTaskName` is only used for logging in related components such as `BarrierBuffer`, `StreamInputProcessor`, etc. We could put this name in the structure of `TaskInfo`, then the related components could get task name directly from `RuntimeEnvironment#getTaskInfo`. To do so, we could simplify the interface of `InputGate`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12571) Make NetworkEnvironment#start() return the binded data port
zhijiang created FLINK-12571: Summary: Make NetworkEnvironment#start() return the binded data port Key: FLINK-12571 URL: https://issues.apache.org/jira/browse/FLINK-12571 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently `NetworkEnvironment#getConnectionManager()` is mainly used for `TaskManagerServices` for getting binded data port from `NettyConnectionManager`. Considering the `ConnectionManager` as an internal component of `NetworkEnvironment`, it should not be exposed for outsides. For other ShuffleService implementations, it might have no `ConnectionManager` at all. We could make `ShuffleService#start()` return the binded data port to replace the `getConnectionManager`. For the `LocalConnectionManager` or other shuffle service implementations which have no binded data port, it could return a simple default value and it would have no harm. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12564) Remove getBufferProvider method from ResultPartitionWriter interface
zhijiang created FLINK-12564: Summary: Remove getBufferProvider method from ResultPartitionWriter interface Key: FLINK-12564 URL: https://issues.apache.org/jira/browse/FLINK-12564 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently `ResultPartitionWriter#getBufferProvider` is used for requesting `BufferBuilder` in `RecordWriter`, then the `BufferConsumer` created from `BufferBuilder` is added into `ResultPartitionWriter` via `addBufferConsumer` method. We could merge these two methods in `ResultPartitionWriter` in order not to expose `getBufferProvider`. `ResultPartitionWriter` could internally request `BufferBuilder` and add the created `BufferConsumer` into one sub partition, then return the `BufferBuilder` for `RecordWriter` writing serialized data. Since we also change the `ResultPartitionWriter#addBufferConsumer` to `ResultPartitionWriter#requestBufferBuilder`, then another new method `ResultPartitionWriter#broadcastEvents` should be introduced for handling the case of events. In future it might worth further abstracting the `ResultPartitionWriter` to be not only related to `BufferBuilder`. We could provide `writeRecord(int targetIndex)` to replace `requestBufferBuilder`, then the serialization process could be done inside specific `ResultPartitionWriter` instance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12544) Deadlock during releasing memory in SpillableSubpartition
zhijiang created FLINK-12544: Summary: Deadlock during releasing memory in SpillableSubpartition Key: FLINK-12544 URL: https://issues.apache.org/jira/browse/FLINK-12544 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang It is reported by flink user, and the original jstack is as following: {code:java} // "CoGroup (2/2)": at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213) - waiting to lock <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "CoGroup (1/2)": at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277) - waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147) at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239) - locked <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297) - locked <0x00063c785350> (a java.lang.Object) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259) at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272) at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) - locked <0x00062bf859b8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614) at java.lang.Thread.run(Thread.java:745) "DataSource (1/1)": at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227) - waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257) - locked <0x00063fdf4888> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) {code} Based on the analysis, it happens when one task A is trying to release sub partition memory, then it occupies the lock in `LocalBufferPool`, and trying to get the lock in `SpillSubpartition`. Meanwhile, another task B is submitted to TM to trigger previous task to release memory, then it would already
[jira] [Created] (FLINK-12497) Refactor the start method of ConnectionManager
zhijiang created FLINK-12497: Summary: Refactor the start method of ConnectionManager Key: FLINK-12497 URL: https://issues.apache.org/jira/browse/FLINK-12497 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang In current `ConnectionManager#start(ResultPartitionProvider, TaskEventDispatcher)`, the parameters in start are only reasonable for `NettyConnectionManager` implementation, reductant for `LocalConnectionManager`. We could put these parameters in the constructor of `NettyConnectionManager`, then `ConnectionManager#start()` would be more cleaner for both implementations. And it also bring benefits for calling start in `NetworkEnvironment` which does not need to maintain private `TaskEventDispatcher`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread
zhijiang created FLINK-12474: Summary: UnionInputGate should be notified when closing SingleInputGate by canceler thread Key: FLINK-12474 URL: https://issues.apache.org/jira/browse/FLINK-12474 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: zhijiang If task is being canceled, the `SingleInputGate` would be closed by canceler thread. If the `SingleInputGate` is waiting for buffer, `inputChannelWithData` would be notified to wake task thread to exit early. But if the `UnionInputGate` is waiting for buffer, task thread is still stucking in wait when `SingleInputGate` is closed until cancel timeout. To make task exit early in this case, we could make `SingleInputGate` further notify `UnionInputGate` after it is closed, then it could also wake task thread to exit during canceling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12458) Throw PartitionNotFoundException if consumer can not establish a connection to remote TM
zhijiang created FLINK-12458: Summary: Throw PartitionNotFoundException if consumer can not establish a connection to remote TM Key: FLINK-12458 URL: https://issues.apache.org/jira/browse/FLINK-12458 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang If the consumer can not establish a connection to remote task executor, which might indicate the remote task executor is not reachable. We could wrap this connection exception into existing `PartitionNotFoundException`, then the job master would decide whether to restart the upstream region to re-producer partition data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12331) Introduce partition/gate setup to decouple task registration with NetworkEnvironment
zhijiang created FLINK-12331: Summary: Introduce partition/gate setup to decouple task registration with NetworkEnvironment Key: FLINK-12331 URL: https://issues.apache.org/jira/browse/FLINK-12331 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Fix For: 1.9.0 In order to decouple task with {{NetworkEnvironment}} completely, we introduce interface methods for {{InputGate#setup}} and {{ResultPartitionWriter#setup}}. Then the task could call the {{setup}} method for the created partitions/gates directly instead of calling current {{registerTask}} via {{NetworkEnvironment}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12213) Pass TaskManagerMetricGroup into constructor of NetworkEnvironment
zhijiang created FLINK-12213: Summary: Pass TaskManagerMetricGroup into constructor of NetworkEnvironment Key: FLINK-12213 URL: https://issues.apache.org/jira/browse/FLINK-12213 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Fix For: 1.9.0 At the moment {{NetworkEnvironment#getNetworkBufferPool}} is called to add network related {{MetricGroup}}. In order to simplify the public API in {{NetworkEnvironment}} which is regarded as default {{ShuffleService}} implementation, we could pass the {{TaskManagerMetricGroup}} into constructor of {{NetworkEnvironment}}, then the related network {{MetricGroup}} could be added internally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12154) Remove legacy fields for SingleInputGate
zhijiang created FLINK-12154: Summary: Remove legacy fields for SingleInputGate Key: FLINK-12154 URL: https://issues.apache.org/jira/browse/FLINK-12154 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang In {{SingleInputGate#create}}, we could remove unused parameter {{ExecutionAttemptID}}. And for the constructor of {{SingleInputGate}}, we could remove unused parameter {{TaskIOMetricGroup}}. Then we introduce {{createSingleInputGate}} for reusing the process of creating {{SingleInputGate}} in related tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12146) Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService
zhijiang created FLINK-12146: Summary: Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService Key: FLINK-12146 URL: https://issues.apache.org/jira/browse/FLINK-12146 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The current {{NetworkEnvironment}} would be the default {{ShuffleService}} implementation in task manager. In order to make the interface simple, we try to avoid more interactive with {{NetworkEnvironment}}. {{NetworkEnvironment#unregisterTask}} is used for closing partition/gate and releasing partition from {{ResultPartitionManager}}. partition/gate close could be done in task which already maintains the arrays of them. Further we could release partition from {{ResultPartitionManager}} inside {{ResultPartition}} via introducing {{ResultPartition#close(Throwable)}}. To do so, the {{NetworkEnvironment#unregisterTask}} could be totally replaced to remove. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12127) Move network related options to NetworkEnvironmentOptions
zhijiang created FLINK-12127: Summary: Move network related options to NetworkEnvironmentOptions Key: FLINK-12127 URL: https://issues.apache.org/jira/browse/FLINK-12127 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Some network related options in TaskManagerOptions could be moved into new introduced `NetworkEnvironmentOptions` which would be used for different shuffle services. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12078) Abstract TaskEventPublisher interface for simplify NetworkEnvironment
zhijiang created FLINK-12078: Summary: Abstract TaskEventPublisher interface for simplify NetworkEnvironment Key: FLINK-12078 URL: https://issues.apache.org/jira/browse/FLINK-12078 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently {{TaskEventDispatcher}} is maintained in {{NetworkEnvironment}} for register/unregister partition and used for {{NettyConnectionManager}}. In order for further decoupling {{Task}} with {{NetworkEnvironment}}, we introduce {{TaskEventPublisher}} interface for providing {{publish}} method only. Then the {{NetworkEnvironment}} could maintain {{TaskEventPublisher}} and the {{register/unregister}} would be removed outside to be handled by {{Task}} directly. To do so, the {{NetworkEnvironment#unregisterTask}} would be removed finally and the {{partition/gate#close}} could be instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12067) Refactor the constructor of NetworkEnvironment
zhijiang created FLINK-12067: Summary: Refactor the constructor of NetworkEnvironment Key: FLINK-12067 URL: https://issues.apache.org/jira/browse/FLINK-12067 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The constructor of {{NetworkEnvironment}} could be refactored to only contain {{NetworkEnvironmentConfiguration}}, the other related components such as {{TaskEventDispatcher}}, {{ResultPartitionManager}}, {{NetworkBufferPool}} could be created internally. We also refactor the process of generating {{NetworkEnvironmentConfiguration}} in {{TaskManagerServiceConfiguration}} to add {{numNetworkBuffers}} instead of previous {{networkBufFraction}}, {{networkBufMin}}, {{networkBufMax}}. Further we introduce the {{NetworkEnvironmentConfigurationBuilder}} for creating {{NetworkEnvironmentConfiguration}} easily especially for tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11988) Remove legacy MockNetworkEnvironment
zhijiang created FLINK-11988: Summary: Remove legacy MockNetworkEnvironment Key: FLINK-11988 URL: https://issues.apache.org/jira/browse/FLINK-11988 Project: Flink Issue Type: Task Components: Runtime / Network, Tests Reporter: zhijiang Assignee: zhijiang Remove legacy {{MockNetworkEnvironment}} class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11885) Introduce RecordWriterBuilder for creating RecordWriter instance
zhijiang created FLINK-11885: Summary: Introduce RecordWriterBuilder for creating RecordWriter instance Key: FLINK-11885 URL: https://issues.apache.org/jira/browse/FLINK-11885 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang The current {{RecordWriter}} would be refactored as an abstract class for improving broadcast mode mentioned in FLINK-10995. So it is better to migrate the logic of building {{RecordWriter}} instance into a separate {{RecordWriterBuilder}} utility class which should be the only entrance for other usages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)