[jira] [Created] (FLINK-19745) Supplement micro-benchmark for bounded blocking partition in remote channel case

2020-10-20 Thread Zhijiang (Jira)
Zhijiang created FLINK-19745:


 Summary: Supplement micro-benchmark for bounded blocking partition 
in remote channel case
 Key: FLINK-19745
 URL: https://issues.apache.org/jira/browse/FLINK-19745
 Project: Flink
  Issue Type: Task
  Components: Benchmarks, Runtime / Network
Reporter: Zhijiang
Assignee: Zhijiang


The current benchmark `BlockingPartitionBenchmark` for batch job only measures 
the scenario of producer & consumer deployment in the same processor, that 
corresponds to the local input channel on consumer side. 

We want to supplement another common scenario to measure the effect of reading 
data via network shuffle, which corresponds to the remote input channel on 
consumer side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19551) Follow up improvements for shuffle service

2020-10-09 Thread Zhijiang (Jira)
Zhijiang created FLINK-19551:


 Summary: Follow up improvements for shuffle service 
 Key: FLINK-19551
 URL: https://issues.apache.org/jira/browse/FLINK-19551
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Network
Reporter: Zhijiang


After resolving the core architecture and functions of pluggable shuffle 
service proposed by FLINK-10653, there are still some pending followup issues 
to be traced future in this umbrella ticket with low priority.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19003) Add micro-benchmark for unaligned checkpoints

2020-08-19 Thread Zhijiang (Jira)
Zhijiang created FLINK-19003:


 Summary: Add micro-benchmark for unaligned checkpoints
 Key: FLINK-19003
 URL: https://issues.apache.org/jira/browse/FLINK-19003
 Project: Flink
  Issue Type: Task
  Components: Benchmarks, Runtime / Checkpointing
Reporter: Zhijiang
Assignee: Zhijiang


It is necessary to supplement the unaligned checkpoint benchmark to verify our 
following  improvements or any effect in future. 

The benchmark should cover both remote and local channels separately for 
different code paths, and it also needs to guarantee there are some in-flight 
buffers during checkpoint for measuring the channel state snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18612) WordCount example failure when using relative output path

2020-07-16 Thread Zhijiang (Jira)
Zhijiang created FLINK-18612:


 Summary: WordCount example failure when using relative output path
 Key: FLINK-18612
 URL: https://issues.apache.org/jira/browse/FLINK-18612
 Project: Flink
  Issue Type: Bug
  Components: fs
Affects Versions: 1.11.0, 1.11.1
Reporter: Zhijiang
 Fix For: 1.12.0, 1.11.2


The failure log can be found here 
[log|https://pipelines.actions.githubusercontent.com/revSbsLpzrFApLL6BmCvScWt72tRe3wYUv7fCdCtThtI5bydk7/_apis/pipelines/1/runs/27244/signedlogcontent/21?urlExpires=2020-07-16T06%3A35%3A49.4559813Z=HMACV1=%2FfAsJgIlIf%2BDitViRJYh0DAGJZjJwhsCGS219ZyniAA%3D].

When execute the following command, we can reproduce this problem locally.
* bin/start-cluster.sh
* bin/flink run -p 1 examples/streaming/WordCount.jar --input input --output 
result

It is caused by the 
[commit|https://github.com/apache/flink/commit/a2deff2967b7de423b10f7f01a41c06565c37e62#diff-2010e422f5e43a971cd7134a9e0b9a5f
 ].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18591) Fix the format issue for metrics web page

2020-07-13 Thread Zhijiang (Jira)
Zhijiang created FLINK-18591:


 Summary: Fix the format issue for metrics web page
 Key: FLINK-18591
 URL: https://issues.apache.org/jira/browse/FLINK-18591
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Runtime / Metrics
Affects Versions: 1.11.0
Reporter: Zhijiang
Assignee: Zhijiang
 Fix For: 1.12.0, 1.11.0


The formatting issue is shown by link 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#checkpointing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18552) Update migration tests in master to cover migration from release-1.11

2020-07-10 Thread Zhijiang (Jira)
Zhijiang created FLINK-18552:


 Summary: Update migration tests in master to cover migration from 
release-1.11
 Key: FLINK-18552
 URL: https://issues.apache.org/jira/browse/FLINK-18552
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Zhijiang
 Fix For: 1.12.0


We should update the following tests to cover migration from release-1.11:
 * {{CEPMigrationTest}}
 * {{BucketingSinkMigrationTest}}
 * {{FlinkKafkaConsumerBaseMigrationTest}}
 * {{ContinuousFileProcessingMigrationTest}}
 * {{WindowOperatorMigrationTest}}
 * {{StatefulJobSavepointMigrationITCase}}
 * {{StatefulJobWBroadcastStateMigrationITCase}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18088) Umbrella for features testing in release-1.11.0

2020-06-03 Thread Zhijiang (Jira)
Zhijiang created FLINK-18088:


 Summary: Umbrella for features testing in release-1.11.0 
 Key: FLINK-18088
 URL: https://issues.apache.org/jira/browse/FLINK-18088
 Project: Flink
  Issue Type: Test
Affects Versions: 1.11.0
Reporter: Zhijiang
 Fix For: 1.11.0


This is the umbrella issue for tracing the testing progress of all the related 
features in release-1.11.0, either the way of e2e or manually testing in 
cluster, to confirm the features work in practice with good quality.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18063) Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner#processEndOfPartition

2020-06-02 Thread Zhijiang (Jira)
Zhijiang created FLINK-18063:


 Summary: Fix the race condition for aborting current checkpoint in 
CheckpointBarrierUnaligner#processEndOfPartition
 Key: FLINK-18063
 URL: https://issues.apache.org/jira/browse/FLINK-18063
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Zhijiang
Assignee: Zhijiang
 Fix For: 1.11.0, 1.12.0


In the handle of CheckpointBarrierUnaligner#processEndOfPartition, it only 
aborts the current checkpoint by judging the condition of pending checkpoint 
from task thread processing, so it will miss one scenario that checkpoint 
triggered by notifyBarrierReceived from netty thread.

The proper fix should also judge the pending checkpoint inside 
ThreadSafeUnaligner in order to abort it and reset internal variables in case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18050) Fix the bug of recycling buffer twice once exception in ChannelStateWriteRequestDispatcher#dispatch

2020-06-01 Thread Zhijiang (Jira)
Zhijiang created FLINK-18050:


 Summary: Fix the bug of recycling buffer twice once exception in 
ChannelStateWriteRequestDispatcher#dispatch
 Key: FLINK-18050
 URL: https://issues.apache.org/jira/browse/FLINK-18050
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Zhijiang
Assignee: Zhijiang
 Fix For: 1.11.0, 1.12.0


During ChannelStateWriteRequestDispatcherImpl#dispatch, `request.cancel(e)` is 
called to recycle the internal buffer of request once exception happens.

But for the case of requesting write output, the buffers would be also finally 
recycled inside ChannelStateCheckpointWriter#write no matter exceptions or not. 
So the buffers in request will be recycled twice in the case of exception, 
which would cause further exceptions in the network shuffle process to 
reference the same buffer.

This bug can be reproduced easily via running 
UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointOnParallelRemoteChannel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17994) Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

2020-05-27 Thread Zhijiang (Jira)
Zhijiang created FLINK-17994:


 Summary: Fix the race condition between 
CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived
 Key: FLINK-17994
 URL: https://issues.apache.org/jira/browse/FLINK-17994
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Zhijiang
Assignee: Zhijiang
 Fix For: 1.11.0


The race condition issue happens as follow:
 * ch1 is received from network by netty thread and schedule the ch1 into 
mailbox via #notifyBarrierReceived
 * ch2 is received from network by netty thread, but before calling 
#notifyBarrierReceived this barrier was inserted into channel's data queue in 
advance. Then it would cause task thread process ch2 earlier than 
#notifyBarrierReceived by netty thread.
 * Task thread would execute checkpoint for ch2 directly because ch2 > ch1.
 * After that, the previous scheduled ch1 is performed from mailbox by task 
thread, then it causes the IllegalArgumentException inside 
SubtaskCheckpointCoordinatorImpl#checkpointState because it breaks the 
assumption that checkpoint is executed in incremental way. 

One possible solution for this race condition is inserting the received barrier 
into channel's data queue after calling #notifyBarrierReceived, then we can 
make the assumption that the checkpoint is always triggered by netty thread, to 
simplify the current situation that checkpoint might be triggered either by 
task thread or netty thread. 

To do so we can also avoid accessing #notifyBarrierReceived method by task 
thread while processing the barrier to simplify the logic inside 
CheckpointBarrierUnaligner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17992) Exception from RemoteInputChannel#onBuffer should not fail the whole NetworkClientHandler

2020-05-27 Thread Zhijiang (Jira)
Zhijiang created FLINK-17992:


 Summary: Exception from RemoteInputChannel#onBuffer should not 
fail the whole NetworkClientHandler
 Key: FLINK-17992
 URL: https://issues.apache.org/jira/browse/FLINK-17992
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.10.1, 1.10.0
Reporter: Zhijiang
Assignee: Zhijiang
 Fix For: 1.11.0


RemoteInputChannel#onBuffer is invoked by 
CreditBasedPartitionRequestClientHandler while receiving and decoding the 
network data. #onBuffer can throw exceptions which would tag the error in 
client handler and fail all the added input channels inside handler. Then it 
would cause a tricky potential issue as following.

If the RemoteInputChannel is canceling by canceler thread, then the task thread 
might exit early than canceler thread terminate. That means the 
PartitionRequestClient might not be closed (triggered by canceler thread) while 
the new task attempt is already deployed into this TaskManger. Therefore the 
new task might reuse the previous PartitionRequestClient while requesting 
partitions, but note that the respective client handler was already tagged an 
error before during above RemoteInputChannel#onBuffer. It will cause the next 
round unnecessary failover.

It is hard to find this potential issue in production because it can be 
restored normal finally after one or more additional failover. We find this 
potential problem from UnalignedCheckpointITCase because it will define the 
precise restart times within configured failures.

The solution is to only fail the respective task when its internal 
RemoteInputChannel#onBuffer throws any exceptions instead of failing the whole 
channels inside client handler, then the client is still health and can also be 
reused by other input channels as long as it is not released yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17869) Fix the race condition of aborting unaligned checkpoint

2020-05-21 Thread Zhijiang (Jira)
Zhijiang created FLINK-17869:


 Summary: Fix the race condition of aborting unaligned checkpoint
 Key: FLINK-17869
 URL: https://issues.apache.org/jira/browse/FLINK-17869
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Zhijiang
Assignee: Zhijiang
 Fix For: 1.11.0


On ChannelStateWriter side, the lifecycle of checkpoint should be as follows:

start -> in progress/abort -> stop.

We must guarantee that #abort should be queued after #start, otherwise the 
aborted checkpoint might be started later again in the case of race condition.

There are two cases might trigger abort checkpoint:
 * One is CheckpointBarrierUnaligner#processEndOfPartition, which should abort 
all the current and future checkpoints, no need to judge the condition 
`isCheckpointPending()` as current code did. 
 * Another is CheckpointBarrierUnaligner#processCancellationBarrier, which 
should only abort the respective checkpoint id if already triggered before.

The unaligned checkpoint might be triggered either by task thread or netty 
thread inside ThreadSafeUnaligner. Anyway we should know the current triggered 
checkpoint id in order to handle both above cases properly.

Another bug is that during ChannelStateWriterImpl#abort, we should not remove 
the respective ChannelStateWriteResult. Otherwise it would throw 
IllegalArgumentException when ChannelStateWriterImpl#getWriteResult in the 
process of checkpoint. ChannelStateWriteResult should be created at #start 
method and only removed at #stop method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17823) Resolve the race condition while releasing RemoteInputChannel

2020-05-19 Thread Zhijiang (Jira)
Zhijiang created FLINK-17823:


 Summary: Resolve the race condition while releasing 
RemoteInputChannel
 Key: FLINK-17823
 URL: https://issues.apache.org/jira/browse/FLINK-17823
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.11.0
Reporter: Zhijiang
Assignee: Zhijiang
 Fix For: 1.11.0


RemoteInputChannel#releaseAllResources might be called by canceler thread. 
Meanwhile, the task thread can also call RemoteInputChannel#getNextBuffer. 
There probably cause two potential problems:
 * Task thread might get null buffer after canceler thread already released all 
the buffers, then it might cause misleading NPE in getNextBuffer.
 * Task thread and canceler thread might pull the same buffer concurrently, 
which causes unexpected exception when the same buffer is recycled twice.

The solution is to properly synchronize the buffer queue in release method to 
avoid the same buffer pulled by both canceler thread and task thread. And in 
getNextBuffer method, we add some explicit checks to avoid misleading NPE and 
hint some valid exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17719) Provide ChannelStateReader#hasStates for hints of reading channel states

2020-05-15 Thread Zhijiang (Jira)
Zhijiang created FLINK-17719:


 Summary: Provide ChannelStateReader#hasStates for hints of reading 
channel states
 Key: FLINK-17719
 URL: https://issues.apache.org/jira/browse/FLINK-17719
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Task
Reporter: Zhijiang
Assignee: Zhijiang


Currently we rely on whether unaligned checkpoint is enabled to determine 
whether to read recovered states during task startup, then it will block the 
requirements of recovery from previous unaligned states even though the current 
mode is aligned.

We can make `ChannelStateReader` provide the hint whether there are any channel 
states to be read during startup, then we will never lose any chances to 
recover from them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17413) Remove redundant states from ThreadSafeUnaligner

2020-04-27 Thread Zhijiang (Jira)
Zhijiang created FLINK-17413:


 Summary: Remove redundant states from ThreadSafeUnaligner
 Key: FLINK-17413
 URL: https://issues.apache.org/jira/browse/FLINK-17413
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Zhijiang
Assignee: Zhijiang


In RemoteInputChannel, we already have the states  of 
`lastRequestedCheckpointId` and `receivedCheckpointId` to control whether the 
received buffer should be notified to unaligner component. 

In current ThreadSafeUnaligner, the variable `storeNewBuffers` is also used for 
similar purpose to deciding whether the notified buffer should be written into 
persister. In other words, as long as the `RemoteInputChannel` decides to 
notify this received buffer, it should be always needed to spill. So we can 
remove the variable `storeNewBuffers` from ThreadSafeUnaligner completely.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17389) LocalExecutorITCase.testBatchQueryCancel asserts error

2020-04-26 Thread Zhijiang (Jira)
Zhijiang created FLINK-17389:


 Summary: LocalExecutorITCase.testBatchQueryCancel asserts error
 Key: FLINK-17389
 URL: https://issues.apache.org/jira/browse/FLINK-17389
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Reporter: Zhijiang
 Fix For: 1.11.0


CI [https://api.travis-ci.org/v3/job/679144612/log.txt]
{code:java}
19:28:13.121 [INFO] ---
19:28:13.121 [INFO]  T E S T S
19:28:13.121 [INFO] ---
19:28:17.231 [INFO] Running 
org.apache.flink.table.client.gateway.local.LocalExecutorITCase
19:32:06.049 [ERROR] Tests run: 70, Failures: 1, Errors: 0, Skipped: 5, Time 
elapsed: 228.813 s <<< FAILURE! - in 
org.apache.flink.table.client.gateway.local.LocalExecutorITCase
19:32:06.051 [ERROR] testBatchQueryCancel[Planner: 
old](org.apache.flink.table.client.gateway.local.LocalExecutorITCase)  Time 
elapsed: 32.767 s  <<< FAILURE!
java.lang.AssertionError: expected: but was:
at 
org.apache.flink.table.client.gateway.local.LocalExecutorITCase.testBatchQueryCancel(LocalExecutorITCase.java:738)

19:32:06.440 [INFO] 
19:32:06.440 [INFO] Results:
19:32:06.440 [INFO] 
19:32:06.440 [ERROR] Failures: 
19:32:06.440 [ERROR]   LocalExecutorITCase.testBatchQueryCancel:738 
expected: but was:
19:32:06.440 [INFO] 
19:32:06.440 [ERROR] Tests run: 70, Failures: 1, Errors: 0, Skipped: 5{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17315) UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel failed in timeout

2020-04-21 Thread Zhijiang (Jira)
Zhijiang created FLINK-17315:


 Summary: 
UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel 
failed in timeout
 Key: FLINK-17315
 URL: https://issues.apache.org/jira/browse/FLINK-17315
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


Build: 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=1=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=45cc9205-bdb7-5b54-63cd-89fdc0983323]

logs
{code:java}
2020-04-21T20:25:23.1139147Z [ERROR] Errors: 
2020-04-21T20:25:23.1140908Z [ERROR]   
UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel:80->execute:87
 » TestTimedOut
2020-04-21T20:25:23.1141383Z [INFO] 
2020-04-21T20:25:23.1141675Z [ERROR] Tests run: 1525, Failures: 0, Errors: 1, 
Skipped: 36
{code}
 
I run it in my local machine and it almost takes about 40 seconds to finish, so 
the configured 90 seconds timeout seems not enough in heavy load environment 
sometimes. Maybe we can remove the timeout in tests since azure already 
configured to monitor the timeout.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17095) KafkaProducerExactlyOnceITCase fails with "address already in use"

2020-04-12 Thread Zhijiang (Jira)
Zhijiang created FLINK-17095:


 Summary: KafkaProducerExactlyOnceITCase fails with "address 
already in use"
 Key: FLINK-17095
 URL: https://issues.apache.org/jira/browse/FLINK-17095
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


Logs: [https://travis-ci.org/github/apache/flink/jobs/673786814]
{code:java}
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 7.256 s 
<<< FAILURE! - in 
org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase
[ERROR] 
org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase  
Time elapsed: 7.256 s  <<< ERROR!
org.apache.kafka.common.KafkaException: Socket server failed to bind to 
0.0.0.0:42733: Address already in use.
at kafka.network.Acceptor.openServerSocket(SocketServer.scala:573)
at kafka.network.Acceptor.(SocketServer.scala:451)
at kafka.network.SocketServer.createAcceptor(SocketServer.scala:245)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:215)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:214)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:214)
at kafka.network.SocketServer.startup(SocketServer.scala:114)
at kafka.server.KafkaServer.startup(KafkaServer.scala:253)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:404)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:131)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:142)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:131)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:100)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:92)
at 
org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase.prepare(KafkaProducerExactlyOnceITCase.java:31)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:220)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:78)
at 

[jira] [Created] (FLINK-17094) OverWindowITCase#testRowTimeBoundedPartitionedRowsOver failed by FileNotFoundException

2020-04-12 Thread Zhijiang (Jira)
Zhijiang created FLINK-17094:


 Summary: OverWindowITCase#testRowTimeBoundedPartitionedRowsOver 
failed by FileNotFoundException
 Key: FLINK-17094
 URL: https://issues.apache.org/jira/browse/FLINK-17094
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


Build: [https://travis-ci.org/github/apache/flink/jobs/673786805]

logs
{code:java}
[ERROR] 
testRowTimeBoundedPartitionedRowsOver[StateBackend=ROCKSDB](org.apache.flink.table.planner.runtime.stream.sql.OverWindowITCase)
  Time elapsed: 0.754 s  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
at 
org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1625)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:673)
at 
org.apache.flink.table.planner.runtime.stream.sql.OverWindowITCase.testRowTimeBoundedPartitionedRowsOver(OverWindowITCase.scala:417)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
   

[jira] [Created] (FLINK-17092) Pyflink failure for BlinkStreamDependencyTests and StreamPandasUDFITTests

2020-04-11 Thread Zhijiang (Jira)
Zhijiang created FLINK-17092:


 Summary: Pyflink failure for BlinkStreamDependencyTests and 
StreamPandasUDFITTests
 Key: FLINK-17092
 URL: https://issues.apache.org/jira/browse/FLINK-17092
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


Build: 
[https://dev.azure.com/rmetzger/Flink/_build/results?buildId=7324=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=14487301-07d2-5d56-5690-6dfab9ffd4d9]

logs
{code:java}
2020-04-10T13:05:25.7259119Z E   : 
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
2020-04-10T13:05:25.7259755Z E  at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
2020-04-10T13:05:25.7260301Z E  at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
2020-04-10T13:05:25.7260927Z E  at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1663)
2020-04-10T13:05:25.7261772Z E  at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
2020-04-10T13:05:25.7262405Z E  at 
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:51)
2020-04-10T13:05:25.7263073Z E  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:719)
2020-04-10T13:05:25.7263588Z E  at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-04-10T13:05:25.7264090Z E  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-04-10T13:05:25.7264668Z E  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-04-10T13:05:25.7265175Z E  at 
java.lang.reflect.Method.invoke(Method.java:498)
2020-04-10T13:05:25.7265807Z E  at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
2020-04-10T13:05:25.7266445Z E  at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
2020-04-10T13:05:25.7267288Z E  at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
2020-04-10T13:05:25.7267897Z E  at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
2020-04-10T13:05:25.7268518Z E  at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
2020-04-10T13:05:25.7269130Z E  at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
2020-04-10T13:05:25.7269623Z E  at 
java.lang.Thread.run(Thread.java:748)
2020-04-10T13:05:25.7270112Z E   Caused by: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
2020-04-10T13:05:25.7270700Z E  at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
2020-04-10T13:05:25.7271406Z E  at 
org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:175)
2020-04-10T13:05:25.7272111Z E  at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
2020-04-10T13:05:25.7272665Z E  at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
2020-04-10T13:05:25.7273245Z E  at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
2020-04-10T13:05:25.7273909Z E  at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
2020-04-10T13:05:25.7274514Z E  at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
2020-04-10T13:05:25.7275147Z E  at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
2020-04-10T13:05:25.7275800Z E  at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
2020-04-10T13:05:25.7276447Z E  at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
2020-04-10T13:05:25.7277239Z E  at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

[jira] [Created] (FLINK-16821) Run Kubernetes test failed with invalid named "minikube"

2020-03-26 Thread Zhijiang (Jira)
Zhijiang created FLINK-16821:


 Summary: Run Kubernetes test failed with invalid named "minikube"
 Key: FLINK-16821
 URL: https://issues.apache.org/jira/browse/FLINK-16821
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes, Tests
Reporter: Zhijiang


This is the test run 
[https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6702=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]

Log output
{code:java}
2020-03-27T00:07:38.9666021Z Running 'Run Kubernetes test'
2020-03-27T00:07:38.956Z 
==
2020-03-27T00:07:38.9677101Z TEST_DATA_DIR: 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-38967103614
2020-03-27T00:07:41.7529865Z Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
2020-03-27T00:07:41.7721475Z Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
2020-03-27T00:07:41.8208394Z Docker version 19.03.8, build afacb8b7f0
2020-03-27T00:07:42.4793914Z docker-compose version 1.25.4, build 8d51620a
2020-03-27T00:07:42.5359301Z Installing minikube ...
2020-03-27T00:07:42.5494076Z   % Total% Received % Xferd  Average Speed   
TimeTime Time  Current
2020-03-27T00:07:42.5494729Z  Dload  Upload   
Total   SpentLeft  Speed
2020-03-27T00:07:42.5498136Z 
2020-03-27T00:07:42.6214887Z   0 00 00 0  0  0 
--:--:-- --:--:-- --:--:-- 0
2020-03-27T00:07:43.3467750Z   0 00 00 0  0  0 
--:--:-- --:--:-- --:--:-- 0
2020-03-27T00:07:43.3469636Z 100 52.0M  100 52.0M0 0  65.2M  0 
--:--:-- --:--:-- --:--:-- 65.2M
2020-03-27T00:07:43.4262625Z * There is no local cluster named "minikube"
2020-03-27T00:07:43.4264438Z   - To fix this, run: minikube start
2020-03-27T00:07:43.4282404Z Starting minikube ...
2020-03-27T00:07:43.7749694Z * minikube v1.9.0 on Ubuntu 16.04
2020-03-27T00:07:43.7761742Z * Using the none driver based on user configuration
2020-03-27T00:07:43.7762229Z X The none driver requires conntrack to be 
installed for kubernetes version 1.18.0
2020-03-27T00:07:43.8202161Z * There is no local cluster named "minikube"
2020-03-27T00:07:43.8203353Z   - To fix this, run: minikube start
2020-03-27T00:07:43.8568899Z * There is no local cluster named "minikube"
2020-03-27T00:07:43.8570685Z   - To fix this, run: minikube start
2020-03-27T00:07:43.8583793Z Command: start_kubernetes_if_not_running failed. 
Retrying...
2020-03-27T00:07:48.9017252Z * There is no local cluster named "minikube"
2020-03-27T00:07:48.9019347Z   - To fix this, run: minikube start
2020-03-27T00:07:48.9031515Z Starting minikube ...
2020-03-27T00:07:49.0612601Z * minikube v1.9.0 on Ubuntu 16.04
2020-03-27T00:07:49.0616688Z * Using the none driver based on user configuration
2020-03-27T00:07:49.0620173Z X The none driver requires conntrack to be 
installed for kubernetes version 1.18.0
2020-03-27T00:07:49.1040676Z * There is no local cluster named "minikube"
2020-03-27T00:07:49.1042353Z   - To fix this, run: minikube start
2020-03-27T00:07:49.1453522Z * There is no local cluster named "minikube"
2020-03-27T00:07:49.1454594Z   - To fix this, run: minikube start
2020-03-27T00:07:49.1468436Z Command: start_kubernetes_if_not_running failed. 
Retrying...
2020-03-27T00:07:54.1907713Z * There is no local cluster named "minikube"
2020-03-27T00:07:54.1909876Z   - To fix this, run: minikube start
2020-03-27T00:07:54.1921479Z Starting minikube ...
2020-03-27T00:07:54.3388738Z * minikube v1.9.0 on Ubuntu 16.04
2020-03-27T00:07:54.3395499Z * Using the none driver based on user configuration
2020-03-27T00:07:54.3396443Z X The none driver requires conntrack to be 
installed for kubernetes version 1.18.0
2020-03-27T00:07:54.3824399Z * There is no local cluster named "minikube"
2020-03-27T00:07:54.3837652Z   - To fix this, run: minikube start
2020-03-27T00:07:54.4203902Z * There is no local cluster named "minikube"
2020-03-27T00:07:54.4204895Z   - To fix this, run: minikube start
2020-03-27T00:07:54.4217866Z Command: start_kubernetes_if_not_running failed. 
Retrying...
2020-03-27T00:07:59.4235917Z Command: start_kubernetes_if_not_running failed 3 
times.
2020-03-27T00:07:59.4236459Z Could not start minikube. Aborting...
2020-03-27T00:07:59.8439850Z The connection to the server localhost:8080 was 
refused - did you specify the right host or port?
2020-03-27T00:07:59.8939088Z The connection to the server localhost:8080 was 
refused - did you specify the right host or port?
2020-03-27T00:07:59.9515679Z The connection to the server localhost:8080 was 
refused - did you specify the right host or port?
2020-03-27T00:07:59.9528463Z Stopping minikube ...
2020-03-27T00:07:59.9921558Z 

[jira] [Created] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-03-25 Thread Zhijiang (Jira)
Zhijiang created FLINK-16770:


 Summary: Resuming Externalized Checkpoint (rocks, incremental, 
scale up) end-to-end test fails with no such file
 Key: FLINK-16770
 URL: https://issues.apache.org/jira/browse/FLINK-16770
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


The log : 
[https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]

 

There was also the similar problem in 
https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
parallelism change. And this case is for scaling up. Not quite sure whether the 
root cause is the same one.
{code:java}
2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint (rocks, 
incremental, scale up) end-to-end test'
2020-03-25T06:50:31.3895308Z 
==
2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
2020-03-25T06:50:31.5500274Z Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
2020-03-25T06:50:31.6354639Z Starting cluster.
2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host fv-az655.
2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come up...
2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come up...
2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come up...
2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come up...
2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
SIMULATE_FAILURE=false ...
2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is running.
2020-03-25T06:50:46.1758132Z Waiting for job (b8cb04e4b1e730585bc616aa352866d0) 
to have at least 1 completed checkpoints ...
2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
current progress: 173 records ...
2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
2020-03-25T06:50:50.5468230Z ls: cannot access 
'/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
 No such file or directory
2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . ...
2020-03-25T06:50:58.4728245Z 
2020-03-25T06:50:58.4732663Z 

2020-03-25T06:50:58.4735785Z  The program finished with the following exception:
2020-03-25T06:50:58.4737759Z 
2020-03-25T06:50:58.4742666Z 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
2020-03-25T06:50:58.4746274Zat 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
2020-03-25T06:50:58.4749954Zat 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
2020-03-25T06:50:58.4752753Zat 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
2020-03-25T06:50:58.4755400Zat 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659)
2020-03-25T06:50:58.4757862Zat 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
2020-03-25T06:50:58.4760282Zat 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:890)
2020-03-25T06:50:58.4763591Zat 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:963)
2020-03-25T06:50:58.4764274Zat 
java.security.AccessController.doPrivileged(Native Method)
2020-03-25T06:50:58.4764809Zat 
javax.security.auth.Subject.doAs(Subject.java:422)
2020-03-25T06:50:58.4765434Zat 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
2020-03-25T06:50:58.4766180Zat 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
2020-03-25T06:50:58.4773549Zat 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:963)
2020-03-25T06:50:58.4774502Z Caused by: java.lang.RuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: 

[jira] [Created] (FLINK-16768) HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart runs without exit

2020-03-25 Thread Zhijiang (Jira)
Zhijiang created FLINK-16768:


 Summary: 
HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart runs without 
exit
 Key: FLINK-16768
 URL: https://issues.apache.org/jira/browse/FLINK-16768
 Project: Flink
  Issue Type: Task
  Components: FileSystems, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


Logs: 
[https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6584=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=d26b3528-38b0-53d2-05f7-37557c2405e4]
{code:java}
2020-03-24T15:52:18.9196862Z "main" #1 prio=5 os_prio=0 tid=0x7fd36c00b800 
nid=0xc21 runnable [0x7fd3743ce000]
2020-03-24T15:52:18.9197235Zjava.lang.Thread.State: RUNNABLE
2020-03-24T15:52:18.9197536Zat 
java.net.SocketInputStream.socketRead0(Native Method)
2020-03-24T15:52:18.9197931Zat 
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
2020-03-24T15:52:18.9198340Zat 
java.net.SocketInputStream.read(SocketInputStream.java:171)
2020-03-24T15:52:18.9198749Zat 
java.net.SocketInputStream.read(SocketInputStream.java:141)
2020-03-24T15:52:18.9199171Zat 
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
2020-03-24T15:52:18.9199840Zat 
sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
2020-03-24T15:52:18.9200265Zat 
sun.security.ssl.InputRecord.read(InputRecord.java:532)
2020-03-24T15:52:18.9200663Zat 
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
2020-03-24T15:52:18.9201213Z- locked <0x927583d8> (a 
java.lang.Object)
2020-03-24T15:52:18.9201589Zat 
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
2020-03-24T15:52:18.9202026Zat 
sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
2020-03-24T15:52:18.9202583Z- locked <0x92758c00> (a 
sun.security.ssl.AppInputStream)
2020-03-24T15:52:18.9203029Zat 
org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
2020-03-24T15:52:18.9203558Zat 
org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198)
2020-03-24T15:52:18.9204121Zat 
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
2020-03-24T15:52:18.9204626Zat 
org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
2020-03-24T15:52:18.9205121Zat 
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
2020-03-24T15:52:18.9205679Zat 
com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
2020-03-24T15:52:18.9206164Zat 
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
2020-03-24T15:52:18.9206786Zat 
com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
2020-03-24T15:52:18.9207361Zat 
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
2020-03-24T15:52:18.9207839Zat 
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
2020-03-24T15:52:18.9208327Zat 
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
2020-03-24T15:52:18.9208809Zat 
com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
2020-03-24T15:52:18.9209273Zat 
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
2020-03-24T15:52:18.9210003Zat 
com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107)
2020-03-24T15:52:18.9210658Zat 
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
2020-03-24T15:52:18.9211154Zat 
org.apache.hadoop.fs.s3a.S3AInputStream.lambda$read$3(S3AInputStream.java:445)
2020-03-24T15:52:18.9211631Zat 
org.apache.hadoop.fs.s3a.S3AInputStream$$Lambda$42/1936375962.execute(Unknown 
Source)
2020-03-24T15:52:18.9212044Zat 
org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
2020-03-24T15:52:18.9212553Zat 
org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
2020-03-24T15:52:18.9212972Zat 
org.apache.hadoop.fs.s3a.Invoker$$Lambda$23/1457226878.execute(Unknown Source)
2020-03-24T15:52:18.9213408Zat 
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
2020-03-24T15:52:18.9213866Zat 
org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
2020-03-24T15:52:18.9214273Zat 
org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
2020-03-24T15:52:18.9214701Zat 
org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:441)
2020-03-24T15:52:18.9215443Z- locked <0x926e88b0> (a 
org.apache.hadoop.fs.s3a.S3AInputStream)
2020-03-24T15:52:18.9215852Zat 
java.io.DataInputStream.read(DataInputStream.java:149)
2020-03-24T15:52:18.9216305Zat 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)

[jira] [Created] (FLINK-16750) Kerberized YARN on Docker test fails with staring Hadoop cluster

2020-03-24 Thread Zhijiang (Jira)
Zhijiang created FLINK-16750:


 Summary: Kerberized YARN on Docker test fails with staring Hadoop 
cluster
 Key: FLINK-16750
 URL: https://issues.apache.org/jira/browse/FLINK-16750
 Project: Flink
  Issue Type: Task
  Components: Deployment / Docker, Deployment / YARN, Tests
Reporter: Zhijiang


Build: 
[https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6563=results]

logs
{code:java}
2020-03-24T08:48:53.3813297Z 
==
2020-03-24T08:48:53.3814016Z Running 'Running Kerberized YARN on Docker test 
(custom fs plugin)'
2020-03-24T08:48:53.3814511Z 
==
2020-03-24T08:48:53.3827028Z TEST_DATA_DIR: 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-53382133956
2020-03-24T08:48:56.1944456Z Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
2020-03-24T08:48:56.2300265Z Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
2020-03-24T08:48:56.2412349Z Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
2020-03-24T08:48:56.2861072Z Docker version 19.03.8, build afacb8b7f0
2020-03-24T08:48:56.8025297Z docker-compose version 1.25.4, build 8d51620a
2020-03-24T08:48:56.8499071Z Flink Tarball directory 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-53382133956
2020-03-24T08:48:56.8501170Z Flink tarball filename flink.tar.gz
2020-03-24T08:48:56.8502612Z Flink distribution directory name 
flink-1.11-SNAPSHOT
2020-03-24T08:48:56.8504724Z End-to-end directory 
/home/vsts/work/1/s/flink-end-to-end-tests
2020-03-24T08:48:56.8620115Z Building Hadoop Docker container
2020-03-24T08:48:56.9117609Z Sending build context to Docker daemon  56.83kB
2020-03-24T08:48:56.9117926Z 
2020-03-24T08:48:57.0076373Z Step 1/54 : FROM sequenceiq/pam:ubuntu-14.04
2020-03-24T08:48:57.0082811Z  ---> df7bea4c5f64
2020-03-24T08:48:57.0084798Z Step 2/54 : RUN set -x && addgroup hadoop 
&& useradd -d /home/hdfs -ms /bin/bash -G hadoop -p hdfs hdfs && useradd -d 
/home/yarn -ms /bin/bash -G hadoop -p yarn yarn && useradd -d /home/mapred 
-ms /bin/bash -G hadoop -p mapred mapred && useradd -d /home/hadoop-user 
-ms /bin/bash -p hadoop-user hadoop-user
2020-03-24T08:48:57.0092833Z  ---> Using cache
2020-03-24T08:48:57.0093976Z  ---> 3c12a7d3e20c
2020-03-24T08:48:57.0096889Z Step 3/54 : RUN set -x && apt-get update && 
apt-get install -y curl tar sudo openssh-server openssh-client rsync unzip 
krb5-user
2020-03-24T08:48:57.0106188Z  ---> Using cache
2020-03-24T08:48:57.0107830Z  ---> 9a59599596be
2020-03-24T08:48:57.0110793Z Step 4/54 : RUN set -x && mkdir -p 
/var/log/kerberos && touch /var/log/kerberos/kadmind.log
2020-03-24T08:48:57.0118896Z  ---> Using cache
2020-03-24T08:48:57.0121035Z  ---> c83551d4f695
2020-03-24T08:48:57.0125298Z Step 5/54 : RUN set -x && rm -f 
/etc/ssh/ssh_host_dsa_key /etc/ssh/ssh_host_rsa_key /root/.ssh/id_rsa && 
ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key && ssh-keygen -q -N 
"" -t rsa -f /etc/ssh/ssh_host_rsa_key && ssh-keygen -q -N "" -t rsa -f 
/root/.ssh/id_rsa && cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
2020-03-24T08:48:57.0133473Z  ---> Using cache
2020-03-24T08:48:57.0134240Z  ---> f69560c2bc0a
2020-03-24T08:48:57.0135683Z Step 6/54 : RUN set -x && mkdir -p 
/usr/java/default && curl -Ls 
'http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz'
 -H 'Cookie: oraclelicense=accept-securebackup-cookie' | tar 
--strip-components=1 -xz -C /usr/java/default/
2020-03-24T08:48:57.0148145Z  ---> Using cache
2020-03-24T08:48:57.0149008Z  ---> f824256d72f1
2020-03-24T08:48:57.0152616Z Step 7/54 : ENV JAVA_HOME /usr/java/default
2020-03-24T08:48:57.0155992Z  ---> Using cache
2020-03-24T08:48:57.0160104Z  ---> 770e6bfd219a
2020-03-24T08:48:57.0160410Z Step 8/54 : ENV PATH $PATH:$JAVA_HOME/bin
2020-03-24T08:48:57.0168690Z  ---> Using cache
2020-03-24T08:48:57.0169451Z  ---> 2643e1a25898
2020-03-24T08:48:57.0174785Z Step 9/54 : RUN set -x && curl -LOH 'Cookie: 
oraclelicense=accept-securebackup-cookie' 
'http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip' && unzip 
jce_policy-8.zip && cp /UnlimitedJCEPolicyJDK8/local_policy.jar 
/UnlimitedJCEPolicyJDK8/US_export_policy.jar $JAVA_HOME/jre/lib/security
2020-03-24T08:48:57.0187797Z  ---> Using cache
2020-03-24T08:48:57.0188202Z  ---> 51cf2085f95d
2020-03-24T08:48:57.0188467Z Step 10/54 : ARG HADOOP_VERSION=2.8.4
2020-03-24T08:48:57.0199344Z  ---> Using cache
2020-03-24T08:48:57.0199846Z  ---> d169c15c288c
2020-03-24T08:48:57.0200652Z 

[jira] [Created] (FLINK-16739) PrestoS3FileSystemITCase#testSimpleFileWriteAndRead fails with no such key

2020-03-23 Thread Zhijiang (Jira)
Zhijiang created FLINK-16739:


 Summary: PrestoS3FileSystemITCase#testSimpleFileWriteAndRead fails 
with no such key
 Key: FLINK-16739
 URL: https://issues.apache.org/jira/browse/FLINK-16739
 Project: Flink
  Issue Type: Task
  Components: Connectors / FileSystem, Tests
Reporter: Zhijiang


Build: 
[https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6546=logs=e9af9cde-9a65-5281-a58e-2c8511d36983=df5b2bf5-bcff-5dc9-7626-50bed0866a82]

logs
{code:java}
2020-03-24T01:51:19.6988685Z [INFO] Running 
org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase
2020-03-24T01:51:21.6250893Z [INFO] Running 
org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase
2020-03-24T01:51:25.1626385Z [WARNING] Tests run: 8, Failures: 0, Errors: 0, 
Skipped: 2, Time elapsed: 5.461 s - in 
org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase
2020-03-24T01:51:50.5503712Z [ERROR] Tests run: 7, Failures: 1, Errors: 1, 
Skipped: 0, Time elapsed: 28.922 s <<< FAILURE! - in 
org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase
2020-03-24T01:51:50.5506010Z [ERROR] testSimpleFileWriteAndRead[Scheme = 
s3p](org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase)  Time elapsed: 0.7 
s  <<< ERROR!
2020-03-24T01:51:50.5513057Z 
com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
 com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not 
exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request 
ID: A07D70A474EABC13; S3 Extended Request ID: 
R2ReW39oZ9ncoc82xb+V5h/EJV5/Mnsee+7uZ7cFMkliTQ/nKhvHPCDfr5zddbfUdR/S49VdbrA=), 
S3 Extended Request ID: 
R2ReW39oZ9ncoc82xb+V5h/EJV5/Mnsee+7uZ7cFMkliTQ/nKhvHPCDfr5zddbfUdR/S49VdbrA= 
(Path: s3://***/temp/tests-c79a578b-13d9-41ba-b73b-4f53fc965b96/test.txt)
2020-03-24T01:51:50.5517642Z Caused by: 
com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not 
exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request 
ID: A07D70A474EABC13; S3 Extended Request ID: 
R2ReW39oZ9ncoc82xb+V5h/EJV5/Mnsee+7uZ7cFMkliTQ/nKhvHPCDfr5zddbfUdR/S49VdbrA=)
2020-03-24T01:51:50.5519791Z 
2020-03-24T01:51:50.5520679Z [ERROR] 
org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase  Time elapsed: 17.431 s  
<<< FAILURE!
2020-03-24T01:51:50.5521841Z java.lang.AssertionError: expected: but 
was:
2020-03-24T01:51:50.5522437Z 
2020-03-24T01:51:50.8966641Z [INFO] 
2020-03-24T01:51:50.8967386Z [INFO] Results:
2020-03-24T01:51:50.8967849Z [INFO] 
2020-03-24T01:51:50.8968357Z [ERROR] Failures: 
2020-03-24T01:51:50.8970933Z [ERROR]   
PrestoS3FileSystemITCase>AbstractHadoopFileSystemITTest.teardown:155->AbstractHadoopFileSystemITTest.checkPathExistence:61
 expected: but was:
2020-03-24T01:51:50.8972311Z [ERROR] Errors: 
2020-03-24T01:51:50.8973807Z [ERROR]   
PrestoS3FileSystemITCase>AbstractHadoopFileSystemITTest.testSimpleFileWriteAndRead:87
 » UnrecoverableS3Operation
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16712) Refactor StreamTask to construct final fields

2020-03-22 Thread Zhijiang (Jira)
Zhijiang created FLINK-16712:


 Summary: Refactor StreamTask to construct final fields
 Key: FLINK-16712
 URL: https://issues.apache.org/jira/browse/FLINK-16712
 Project: Flink
  Issue Type: Task
  Components: Runtime / Task
Reporter: Zhijiang
Assignee: Zhijiang
 Fix For: 1.11.0


At the moment there are four fields initialized in the method of 
StreamTask#beforeInvoke, such as `stateBackend`, `checkpointStorage`, 
`timerService`, `asyncOperationsThreadPool`.

In general it is suggested to use final fields to get known benefits. So we can 
refactor the StreamTask to initialize these fields in the constructor instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16690) Refactor StreamTaskTest to reuse TestTaskBuilder and MockStreamTaskBuilder

2020-03-20 Thread Zhijiang (Jira)
Zhijiang created FLINK-16690:


 Summary: Refactor StreamTaskTest to reuse TestTaskBuilder and 
MockStreamTaskBuilder
 Key: FLINK-16690
 URL: https://issues.apache.org/jira/browse/FLINK-16690
 Project: Flink
  Issue Type: Task
  Components: Runtime / Task, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


We can reuse existing TestTaskBuilder and MockStreamTaskBuilder for 
constructing Task and StreamTask easily in tests to simplify StreamTaskTest 
case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16653) Introduce ResultPartitionWriterTestBase for simplifying tests

2020-03-18 Thread Zhijiang (Jira)
Zhijiang created FLINK-16653:


 Summary: Introduce ResultPartitionWriterTestBase for simplifying 
tests 
 Key: FLINK-16653
 URL: https://issues.apache.org/jira/browse/FLINK-16653
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network, Tests
Reporter: Zhijiang
Assignee: Zhijiang


At the moment there are at-least four implementations of 
`ResultPartitionWriter` interface used in unit tests. And there are about ten 
methods to be implemented for `ResultPartitionWriter` and most of them are 
dummy in tests.

When we want to extend the methods for `ResultPartitionWriter`, the above four 
dummy implementations in tests have to be adjusted as well, to waste a bit 
efforts.

Therefore abstract ResultPartitionWriterTestBase is proposed to implement the 
basic dummy methods for `ResultPartitionWriter`, and the previous four 
instances can all extend it to only implement one or two methods based on 
specific requirements in tests. And we will probably only need to adjust the 
ResultPartitionWriterTestBase when extending the `ResultPartitionWriter` 
interface.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16645) Limit the maximum backlogs in subpartitions for data skew case

2020-03-18 Thread Zhijiang (Jira)
Zhijiang created FLINK-16645:


 Summary: Limit the maximum backlogs in subpartitions for data skew 
case
 Key: FLINK-16645
 URL: https://issues.apache.org/jira/browse/FLINK-16645
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Zhijiang
 Fix For: 1.11.0


In the case of data skew, most of the buffers in partition's LocalBufferPool 
are probably requested away and accumulated in certain subpartition, which 
would increase in-flight data to slow down the barrier alignment.

We can set up a proper config to control how many backlogs are allowed for one 
subpartition. If one subpartition reaches this threshold, it will make the 
buffer pool unavailable which blocks task processing continuously. Then we can 
reduce the in-flight data for speeding up checkpoint process a bit and not 
impact on the performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16644) HadoopS3FileSystemBehaviorITCase failed with null uri host

2020-03-18 Thread Zhijiang (Jira)
Zhijiang created FLINK-16644:


 Summary: HadoopS3FileSystemBehaviorITCase failed with null uri host
 Key: FLINK-16644
 URL: https://issues.apache.org/jira/browse/FLINK-16644
 Project: Flink
  Issue Type: Bug
  Components: FileSystems, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


[https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/6340/logs/72]

 
{code:java}
2020-03-18T04:21:08.9614875Z [ERROR] Tests run: 16, Failures: 0, Errors: 16, 
Skipped: 0, Time elapsed: 0.641 s <<< FAILURE! - in 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase
2020-03-18T04:21:08.9616309Z [ERROR] 
testMkdirsReturnsTrueWhenCreatingDirectory(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
  Time elapsed: 0.104 s  <<< ERROR!
2020-03-18T04:21:08.9617487Z java.io.IOException: null uri host. This can be 
caused by unencoded / in the password string
2020-03-18T04:21:08.9618318Zat 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60)
2020-03-18T04:21:08.9619063Z Caused by: java.lang.NullPointerException: null 
uri host. This can be caused by unencoded / in the password string
2020-03-18T04:21:08.9619752Zat 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60)
2020-03-18T04:21:08.9620120Z 
2020-03-18T04:21:08.9620544Z [ERROR] 
testMkdirsReturnsTrueWhenCreatingDirectory(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
  Time elapsed: 0.104 s  <<< ERROR!
2020-03-18T04:21:08.9621056Z java.lang.NullPointerException
2020-03-18T04:21:08.9621205Z 
2020-03-18T04:21:08.9621606Z [ERROR] 
testMkdirsFailsWithExistingParentFile(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
  Time elapsed: 0 s  <<< ERROR!
2020-03-18T04:21:08.9622187Z java.io.IOException: null uri host. This can be 
caused by unencoded / in the password string
2020-03-18T04:21:08.9622737Zat 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60)
2020-03-18T04:21:08.9623318Z Caused by: java.lang.NullPointerException: null 
uri host. This can be caused by unencoded / in the password string
2020-03-18T04:21:08.9623913Zat 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60)
2020-03-18T04:21:08.9624325Z 
2020-03-18T04:21:08.9624738Z [ERROR] 
testMkdirsFailsWithExistingParentFile(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
  Time elapsed: 0.001 s  <<< ERROR!
2020-03-18T04:21:08.9625215Z java.lang.NullPointerException
2020-03-18T04:21:08.9625364Z 
2020-03-18T04:21:08.9625795Z [ERROR] 
testMkdirsReturnsTrueForExistingDirectory(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
  Time elapsed: 0.001 s  <<< ERROR!
2020-03-18T04:21:08.9626376Z java.io.IOException: null uri host. This can be 
caused by unencoded / in the password string
2020-03-18T04:21:08.9626922Zat 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60)
2020-03-18T04:21:08.9627583Z Caused by: java.lang.NullPointerException: null 
uri host. This can be caused by unencoded / in the password string
2020-03-18T04:21:08.9628270Zat 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60)
2020-03-18T04:21:08.9628694Z 
2020-03-18T04:21:08.9629127Z [ERROR] 
testMkdirsReturnsTrueForExistingDirectory(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
  Time elapsed: 0.001 s  <<< ERROR!
2020-03-18T04:21:08.9629865Z java.lang.NullPointerException
2020-03-18T04:21:08.9630030Z 
2020-03-18T04:21:08.9630404Z [ERROR] 
testPathAndScheme(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
  Time elapsed: 0.001 s  <<< ERROR!
2020-03-18T04:21:08.9630954Z java.io.IOException: null uri host. This can be 
caused by unencoded / in the password string
2020-03-18T04:21:08.9631489Zat 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60)
2020-03-18T04:21:08.9632082Z Caused by: java.lang.NullPointerException: null 
uri host. This can be caused by unencoded / in the password string
2020-03-18T04:21:08.9632663Zat 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase.getFileSystem(HadoopS3FileSystemBehaviorITCase.java:60)
2020-03-18T04:21:08.9633026Z 
2020-03-18T04:21:08.9633396Z [ERROR] 
testPathAndScheme(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
  Time elapsed: 0.001 s  <<< ERROR!
2020-03-18T04:21:08.9633920Z java.lang.NullPointerException
2020-03-18T04:21:08.9634125Z 
2020-03-18T04:21:08.9634514Z [ERROR] 
testHomeAndWorkDir(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase)
  

[jira] [Created] (FLINK-16641) Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-03-17 Thread Zhijiang (Jira)
Zhijiang created FLINK-16641:


 Summary: Announce sender's backlog to solve the deadlock issue 
without exclusive buffers
 Key: FLINK-16641
 URL: https://issues.apache.org/jira/browse/FLINK-16641
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Zhijiang
 Fix For: 1.11.0


This is the second ingredient besides FLINK-16404 to solve the deadlock problem 
without exclusive buffers.

The scenario is as follows:
 * The data in subpartition with positive backlog can be sent without doubt 
because the exclusive credits would be feedback finally.
 * Without exclusive buffers, the receiver would not request floating buffers 
for 0 backlog. But when the new backlog is added into such subpartition, it has 
no way to notify the receiver side without positive credits ATM.
 * So it would result in waiting for each other between receiver and sender 
sides to cause deadlock. The sender waits for credit to notify backlog and the 
receiver waits for backlog to request floating credits.

To solve the above problem, the sender needs a separate message to announce 
backlog sometimes besides existing `BufferResponse`. Then the receiver can get 
this info to request floating buffers to feedback.

The side effect brought is to increase network transport delay and throughput 
regression. We can measure how much it effects in existing micro-benchmark. It 
might probably bear this effect to get a benefit of fast checkpoint without 
exclusive buffers. We can give the proper explanations in respective 
configuration options to let users make the final decision in practice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16610) FlinkKafkaInternalProducerITCase fails with timeout exception

2020-03-16 Thread Zhijiang (Jira)
Zhijiang created FLINK-16610:


 Summary: FlinkKafkaInternalProducerITCase fails with timeout 
exception
 Key: FLINK-16610
 URL: https://issues.apache.org/jira/browse/FLINK-16610
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator:184->KafkaTestBase.deleteTestTopic:201->Object.wait:->TestTimedOut
 

Logs: [https://api.travis-ci.org/v3/job/662458976/log.txt]
{noformat}
21:25:57,692 [   Time-limited test] INFO  
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
Deleting topic flink-kafka-producer-txn-coordinator-changed
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:110)21:26:25,273
 [main] ERROR 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase  - 

Test 
testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase)
 failed with:
org.junit.runners.model.TestTimedOutException: test timed out after 3 
milliseconds
at java.lang.Object.wait(Native Method)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:110)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.tryDelete(KafkaTestEnvironmentImpl.java:173)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:160)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:201)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:184)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)



at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.tryDelete(KafkaTestEnvironmentImpl.java:173)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:160)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:201)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:184)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at 

[jira] [Created] (FLINK-16586) Build ResultSubpartitionInfo and InputChannelInfo in respective constructors

2020-03-13 Thread Zhijiang (Jira)
Zhijiang created FLINK-16586:


 Summary: Build ResultSubpartitionInfo and InputChannelInfo in 
respective constructors
 Key: FLINK-16586
 URL: https://issues.apache.org/jira/browse/FLINK-16586
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Zhijiang
Assignee: Zhijiang


In the constructors of ResultSubpartition and InputChannel, the respective 
ResultSubpartitionInfo and InputChannelInfo should be created as well. These 
infos would be used for interacting with ChannelStateReader and 
ChannelStateWriter future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16583) SQLClientKafkaITCase.testKafka failed in SqlClientException

2020-03-13 Thread Zhijiang (Jira)
Zhijiang created FLINK-16583:


 Summary: SQLClientKafkaITCase.testKafka failed in 
SqlClientException
 Key: FLINK-16583
 URL: https://issues.apache.org/jira/browse/FLINK-16583
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Table SQL / Client, Tests
Reporter: Zhijiang
 Fix For: 1.11.0


The end-to-end test {{SQLClientKafkaITCase.testKafka}} failed with
{code:java}
18:13:02.425 [ERROR] testKafka[0: kafka-version:0.10 
kafka-sql-version:.*kafka-0.10.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase)
  Time elapsed: 32.246 s  <<< ERROR!
java.io.IOException: 
Process execution failed due error. Error output:Mar 12, 2020 6:11:46 PM 
org.jline.utils.Log logr
WARNING: Unable to create a system terminal, creating a dumb terminal (enable 
debug logging for more information)
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Could not submit given SQL update statement to cluster.
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:131)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)

at 
org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178)
at 
org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151)

18:13:02.425 [ERROR] testKafka[1: kafka-version:0.11 
kafka-sql-version:.*kafka-0.11.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase)
  Time elapsed: 34.539 s  <<< ERROR!
java.io.IOException: 
Process execution failed due error. Error output:Mar 12, 2020 6:12:21 PM 
org.jline.utils.Log logr
WARNING: Unable to create a system terminal, creating a dumb terminal (enable 
debug logging for more information)
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Could not submit given SQL update statement to cluster.
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:131)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)

at 
org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178)
at 
org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151)

{code}
[https://api.travis-ci.org/v3/job/661535183/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16564) HadoopS3RecoverableWriterITCase fails with NullPointerException

2020-03-12 Thread Zhijiang (Jira)
Zhijiang created FLINK-16564:


 Summary: HadoopS3RecoverableWriterITCase fails with 
NullPointerException
 Key: FLINK-16564
 URL: https://issues.apache.org/jira/browse/FLINK-16564
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem, Tests
Affects Versions: 1.11.0
Reporter: Zhijiang


{code:java}
23:46:14.740 [INFO] Running 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase 23:46:16.674 
[ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.817 s 
<<< FAILURE! - in org.apache.flink.fs.s3hadoop.HadoopS3FileSystemITCase 
23:46:16.674 [ERROR] 
testDirectoryListing(org.apache.flink.fs.s3hadoop.HadoopS3FileSystemITCase) 
Time elapsed: 2.166 s <<< ERROR! java.io.FileNotFoundException: No such file or 
directory: 
s3://[secure]/temp/tests-eb7ce54f-0e20-492a-8fef-ce6f742048d3/testdir 
23:46:17.471 [INFO] Running 
org.apache.flink.fs.s3hadoop.HadoopS3FileSystemBehaviorITCase 23:46:19.548 
[ERROR] Tests run: 13, Failures: 0, Errors: 13, Skipped: 0, Time elapsed: 4.805 
s <<< FAILURE! - in 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase 23:46:19.564 
[ERROR] 
testCloseWithNoData(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase)
 Time elapsed: 2.448 s <<< ERROR! java.lang.NullPointerException at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testCloseWithNoData(HadoopS3RecoverableWriterITCase.java:186)
 23:46:19.567 [ERROR] 
testCommitAfterPersist(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase)
 Time elapsed: 0.111 s <<< ERROR! java.lang.NullPointerException at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testCommitAfterPersist(HadoopS3RecoverableWriterITCase.java:208)
 23:46:19.567 [ERROR] 
testRecoverWithEmptyState(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase)
 Time elapsed: 0.108 s <<< ERROR! java.lang.NullPointerException at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersist(HadoopS3RecoverableWriterITCase.java:384)
 at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersistWithSmallData(HadoopS3RecoverableWriterITCase.java:352)
 at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testRecoverWithEmptyState(HadoopS3RecoverableWriterITCase.java:302)
 23:46:19.567 [ERROR] 
testRecoverFromIntermWithoutAdditionalState(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase)
 Time elapsed: 0.103 s <<< ERROR! java.lang.NullPointerException at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersist(HadoopS3RecoverableWriterITCase.java:384)
 at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersistWithSmallData(HadoopS3RecoverableWriterITCase.java:352)
 at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testRecoverFromIntermWithoutAdditionalState(HadoopS3RecoverableWriterITCase.java:316)
 23:46:19.568 [ERROR] 
testCallingDeleteObjectTwiceDoesNotThroughException(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase)
 Time elapsed: 0.108 s <<< ERROR! java.lang.NullPointerException at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testCallingDeleteObjectTwiceDoesNotThroughException(HadoopS3RecoverableWriterITCase.java:245)
 23:46:19.568 [ERROR] 
testCommitAfterNormalClose(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase)
 Time elapsed: 0.104 s <<< ERROR! java.lang.NullPointerException at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testCommitAfterNormalClose(HadoopS3RecoverableWriterITCase.java:196)
 23:46:19.568 [ERROR] 
testRecoverWithStateWithMultiPart(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase)
 Time elapsed: 0.327 s <<< ERROR! java.lang.NullPointerException at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersist(HadoopS3RecoverableWriterITCase.java:384)
 at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersistWithMultiPartUploads(HadoopS3RecoverableWriterITCase.java:364)
 at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart(HadoopS3RecoverableWriterITCase.java:330)
 23:46:19.568 [ERROR] 
testRecoverFromIntermWithoutAdditionalStateWithMultiPart(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase)
 Time elapsed: 0.248 s <<< ERROR! java.lang.NullPointerException at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersist(HadoopS3RecoverableWriterITCase.java:384)
 at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterITCase.testResumeAfterMultiplePersistWithMultiPartUploads(HadoopS3RecoverableWriterITCase.java:364)
 at 

[jira] [Created] (FLINK-16537) Implement ResultPartition state recovery for unaligned checkpoint

2020-03-10 Thread Zhijiang (Jira)
Zhijiang created FLINK-16537:


 Summary: Implement ResultPartition state recovery for unaligned 
checkpoint
 Key: FLINK-16537
 URL: https://issues.apache.org/jira/browse/FLINK-16537
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Network
Reporter: Zhijiang
 Fix For: 1.11.0


During recovery process for unaligned checkpoint, the partition state should 
also be recovered besides with existing operator states.

The ResultPartition would request buffer from local pool and then interact with 
ChannelStateReader to fill in the state data.  The filled buffer would be 
inserted into respective ResultSubpartition queue  in normal way.

It should guarantee that op can not process any inputs before finishing all the 
output recovery to avoid mis-order issue.

Refer to more details by 
[https://docs.google.com/document/d/16_MOQymzxrKvUHXh6QFr2AAXIKt_2vPUf8vzKy4H_tU/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16536) Implement InputChannel state recovery for unaligned checkpoint

2020-03-10 Thread Zhijiang (Jira)
Zhijiang created FLINK-16536:


 Summary: Implement InputChannel state recovery for unaligned 
checkpoint
 Key: FLINK-16536
 URL: https://issues.apache.org/jira/browse/FLINK-16536
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Network
Reporter: Zhijiang
 Fix For: 1.11.0


During recovery process for unaligned checkpoint, the input channel state 
should also be recovered besides with existing operator states.

The InputGate would request buffer from local pool and then interact with 
ChannelStateReader to fill in the state data.  The filled buffer would be 
inserted into respective InputChannel queue for processing in normal way.

It should guarantee that the new data from upstream side should not overtake 
the input state data to avoid mis-order issue.

Refer to more details by [design 
doc|[https://docs.google.com/document/d/16_MOQymzxrKvUHXh6QFr2AAXIKt_2vPUf8vzKy4H_tU/edit]]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16454) Update the copyright year in NOTICE files

2020-03-05 Thread Zhijiang (Jira)
Zhijiang created FLINK-16454:


 Summary: Update the copyright year in NOTICE files
 Key: FLINK-16454
 URL: https://issues.apache.org/jira/browse/FLINK-16454
 Project: Flink
  Issue Type: Task
  Components: Release System
 Environment: The current copyright year is 2014-2019 in NOTICE files. 
We should change it to 2014-2020.
Reporter: Zhijiang
Assignee: Zhijiang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16428) Fine-grained network buffer management for backpressure

2020-03-04 Thread Zhijiang (Jira)
Zhijiang created FLINK-16428:


 Summary: Fine-grained network buffer management for backpressure
 Key: FLINK-16428
 URL: https://issues.apache.org/jira/browse/FLINK-16428
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Zhijiang
 Fix For: 1.11.0


It is an umbrella ticket for tracing the progress of this improvement.

This is the second ingredient to solve the “checkpoints under backpressure” 
problem (together with unaligned checkpoints). It consists of two steps:
 * See if we can use less network memory in general for streaming jobs (with 
potentially different distribution of floating buffers in the input side)
 * Under backpressure, reduce network memory to have less in-flight data (needs 
specification of algorithm and experiments)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16404) Solve the potential deadlock problem when reducing exclusive buffers to zero

2020-03-03 Thread Zhijiang (Jira)
Zhijiang created FLINK-16404:


 Summary: Solve the potential deadlock problem when reducing 
exclusive buffers to zero
 Key: FLINK-16404
 URL: https://issues.apache.org/jira/browse/FLINK-16404
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Zhijiang
 Fix For: 1.11.0


One motivation of this issue is for reducing the in-flight data in the case of 
back pressure to speed up checkpoint. The current default exclusive buffers per 
channel is 2. If we reduce it to 0 and increase somewhat floating buffers for 
compensation, it might cause deadlock problem because all the floating buffers 
might be requested away by some blocked input channels and never recycled until 
barrier alignment.

In order to solve above deadlock concern, we can make some logic changes on 
both sender and receiver sides.
 * Sender side: it should revoke previous received credit after sending 
checkpoint barrier, that means it would not send any following buffers until 
receiving new credits.
 * Receiver side: after processing the barrier from one channel and setting it 
blocked, it should release the available floating buffers for this blocked 
channel, and restore requesting floating buffers until barrier alignment. That 
means the receiver would only announce new credits to sender side after barrier 
alignment.


Another possible benefit to do so is that the floating buffers might be more 
properly made use of before barrier alignment. We can further verify the 
performance concern via existing micro-benchmark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16403) Solve the potential deadlock problem when reducing exclusive buffers to zero

2020-03-03 Thread Zhijiang (Jira)
Zhijiang created FLINK-16403:


 Summary: Solve the potential deadlock problem when reducing 
exclusive buffers to zero
 Key: FLINK-16403
 URL: https://issues.apache.org/jira/browse/FLINK-16403
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Zhijiang


One motivation of this issue is for reducing the in-flight data in the case of 
back pressure to speed up checkpoint. The current default exclusive buffers per 
channel is 2. If we reduce it to 0 and increase somewhat floating buffers for 
compensation, it might cause deadlock problem because all the floating buffers 
might be requested away by some blocked input channels and never recycled until 
barrier alignment.

In order to solve above deadlock concern, we can make some logic changes on 
both sender and receiver sides.
 * Sender side: it should revoke previous received credit after sending 
checkpoint barrier, that means it would not send any following buffers until 
receiving new credits.
 * Receiver side: after processing the barrier from one channel and setting it 
blocked, it should release the available floating buffers for this blocked 
channel, and restore requesting floating buffers until barrier alignment. That 
means the receiver would only announce new credits to sender side after barrier 
alignment.

Another possible benefit to do so is that the floating buffers might be more 
properly made use of before barrier alignment. We can further verify the 
performance concern via existing micro-benchmark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16285) Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument

2020-02-25 Thread Zhijiang (Jira)
Zhijiang created FLINK-16285:


 Summary: Refactor SingleInputGate#setInputChannel to remove 
IntermediateResultPartitionID argument
 Key: FLINK-16285
 URL: https://issues.apache.org/jira/browse/FLINK-16285
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Zhijiang
Assignee: Zhijiang


The IntermediateResultPartitionID info can be got directly from the respective 
InputChannel, so we can remove it from the arguments of 
SingleInputGate#setInputChannel to cleanup the codes.

It is also helpful to simplify the unit tests and avoid passing the 
inconsistent IntermediateResultPartitionID with the internal ResultPartitionID 
that the respective InputChannel maintains.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16284) Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument

2020-02-25 Thread Zhijiang (Jira)
Zhijiang created FLINK-16284:


 Summary: Refactor SingleInputGate#setInputChannel to remove 
IntermediateResultPartitionID argument
 Key: FLINK-16284
 URL: https://issues.apache.org/jira/browse/FLINK-16284
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Zhijiang
Assignee: Zhijiang


The IntermediateResultPartitionID info can be got directly from the respective 
InputChannel, so we can remove it from the arguments of 
SingleInputGate#setInputChannel to cleanup the codes.

It is also helpful to simplify the unit tests and avoid passing the 
inconsistent IntermediateResultPartitionID with the internal ResultPartitionID 
that the respective InputChannel maintains.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16257) Remove useless ResultPartitionID from AddCredit message

2020-02-24 Thread Zhijiang (Jira)
Zhijiang created FLINK-16257:


 Summary: Remove useless ResultPartitionID from AddCredit message
 Key: FLINK-16257
 URL: https://issues.apache.org/jira/browse/FLINK-16257
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Zhijiang
Assignee: Zhijiang
 Fix For: 1.11.0


The ResultPartitionID in AddCredit message is never used on upstream side, so 
we can remove it to cleanup the codes. There would have another two benefits to 
do so:
 # Reduce the total message size from previous 52 bytes to 20 bytes.
 # Decouple the dependency with `InputChannel#getPartitionId`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16012) Reduce the default number of exclusive buffers from 2 to 1 on receiver side

2020-02-11 Thread zhijiang (Jira)
zhijiang created FLINK-16012:


 Summary: Reduce the default number of exclusive buffers from 2 to 
1 on receiver side
 Key: FLINK-16012
 URL: https://issues.apache.org/jira/browse/FLINK-16012
 Project: Flink
  Issue Type: Improvement
Reporter: zhijiang


In order to reduce the inflight buffers for checkpoint in the case of back 
pressure, we can reduce the number of exclusive buffers for remote input 
channel from default 2 to 1 as the first step. Besides that, the total required 
buffers are also reduced as a result. We can further verify the performance 
effect via various of benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15963) Reuse the same ByteBuf while writing the BufferResponse header

2020-02-09 Thread zhijiang (Jira)
zhijiang created FLINK-15963:


 Summary: Reuse the same ByteBuf while writing the BufferResponse 
header
 Key: FLINK-15963
 URL: https://issues.apache.org/jira/browse/FLINK-15963
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang


On sender side while writing the BufferResponse message, it always allocates 
the new direct ByteBuf from netty allocator to write header part for every 
message.

Considering only one message is written in one channel at the same time, then 
we can make use of a fixed ByteBuf to write header part for all the 
BufferResponse messages. We can verify how it effects the performance in 
practice/benchmark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15914) Miss the barrier alignment metric for the case of two inputs

2020-02-05 Thread zhijiang (Jira)
zhijiang created FLINK-15914:


 Summary: Miss the barrier alignment metric for the case of two 
inputs
 Key: FLINK-15914
 URL: https://issues.apache.org/jira/browse/FLINK-15914
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Metrics
Reporter: zhijiang
Assignee: zhijiang


When the StreamTwoInputSelectableProcessor was introduced before, it was 
missing to add the barrier alignment metric in the constructor. But it does not 
cause problems then, because only StreamTwoInputProcessor works at that time.

After StreamTwoInputProcessor is replaced by StreamTwoInputSelectableProcessor 
as now, this bug is exposed and we will not see the barrier alignment metric 
for the case of two inputs.

The solution is to add this metric while constructing the current 
StreamTwoInputProcessor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15444) Make the component AbstractInvokable in CheckpointBarrierHandler NonNull

2019-12-30 Thread zhijiang (Jira)
zhijiang created FLINK-15444:


 Summary: Make the component AbstractInvokable in 
CheckpointBarrierHandler NonNull 
 Key: FLINK-15444
 URL: https://issues.apache.org/jira/browse/FLINK-15444
 Project: Flink
  Issue Type: Task
  Components: Runtime / Checkpointing
Reporter: zhijiang
Assignee: zhijiang


The current component {{AbstractInvokable}} in {{CheckpointBarrierHandler}} is 
annotated as {{@Nullable}}. Actually in real code path it is passed via the 
constructor and never be null. The nullable annotation is only used for unit 
test purpose. But this way would mislead the real usage in practice and bring 
extra troubles, because you have to alway check whether it is null before usage 
in related processes.

We can refactor the related unit tests to implement a dummy 
{{AbstractInvokable}} for tests and remove the {{@Nullable}} annotation from 
the related class constructors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15340) Remove the executor of pipelined compression benchmark

2019-12-19 Thread zhijiang (Jira)
zhijiang created FLINK-15340:


 Summary: Remove the executor of pipelined compression benchmark
 Key: FLINK-15340
 URL: https://issues.apache.org/jira/browse/FLINK-15340
 Project: Flink
  Issue Type: Task
  Components: Benchmarks
Reporter: zhijiang
Assignee: zhijiang


In [FLINK-15308|https://issues.apache.org/jira/browse/FLINK-15308], we removed 
the function of compression for pipelined case. Accordingly we also need to 
remove the respective benchmark executor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15306) Adjust the default netty transport option from nio to auto

2019-12-17 Thread zhijiang (Jira)
zhijiang created FLINK-15306:


 Summary: Adjust the default netty transport option from nio to auto
 Key: FLINK-15306
 URL: https://issues.apache.org/jira/browse/FLINK-15306
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.11.0


The default option of `taskmanager.network.netty.transport` in 
NettyShuffleEnvironmentOptions is `nio` now. As we know, the `epoll` mode can 
get better performance, less GC and have more advanced features which are only 
available on linux.

Therefore it is better to adjust the default option to `auto` instead, and then 
the framework would automatically choose the proper mode based on the platform.

We would further verify the performance effect via micro benchmark if possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15187) Reuse LocalBufferPool for FileBufferReader in blocking partition

2019-12-10 Thread zhijiang (Jira)
zhijiang created FLINK-15187:


 Summary: Reuse LocalBufferPool for FileBufferReader in blocking 
partition
 Key: FLINK-15187
 URL: https://issues.apache.org/jira/browse/FLINK-15187
 Project: Flink
  Issue Type: Task
  Components: Runtime / Network
Reporter: zhijiang


If we take the file type via 
`taskmanager.network.bounded-blocking-subpartition-type` for batch job, while 
creating the respective view for reading the subpartition persistent data, it 
would create two unpolled memory segments for every subpartition. This portion 
of temporary memory is not managed and calculated by framework, so it might 
cause OOM error concern.

We can also reuse the ResultPartition's `LocalBufferPool` to read subpartition 
data to avoid this memory overhead. But there are additional two problems for 
reuse directly. 
 * The current core size of `LocalBufferPool` is `numberOfSubpartitions + 1`, 
but every subpartition needs two segments for pre-reading atm. We can remove 
the pre-reading to make the current core pool size suitable for the reading 
requirements, because the pre-reading function seems has no obvious benefits in 
practice which is only effecting for the last data.
 * When task finishes, it would destroy the `LocalBufferPool` even though the 
respective `ResultPartition still alive, so the following subpartition view can 
not reuse the pool directly. We should adjust the respective logics to either 
delay destroy the pool or create a new pool for subpartition view.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15070) Supplement the case of bounded blocking partition for benchmark

2019-12-05 Thread zhijiang (Jira)
zhijiang created FLINK-15070:


 Summary: Supplement the case of bounded blocking partition for 
benchmark
 Key: FLINK-15070
 URL: https://issues.apache.org/jira/browse/FLINK-15070
 Project: Flink
  Issue Type: Task
  Components: Benchmarks
Reporter: zhijiang


ATM the benchmark only covers the case of pipelined partition used in streaming 
job, so it is better to also cover the case of blocking partition for batch 
job.  Then we can easily trace the performance concerns for any changes future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15069) Supplement the compression case for benchmark

2019-12-05 Thread zhijiang (Jira)
zhijiang created FLINK-15069:


 Summary: Supplement the compression case for benchmark
 Key: FLINK-15069
 URL: https://issues.apache.org/jira/browse/FLINK-15069
 Project: Flink
  Issue Type: Task
  Components: Benchmarks
Reporter: zhijiang


While reviewing the PR of introducing data compression for persistent storage 
and network shuffle, we think it is better to also cover this scenario in the 
benchmark for tracing the performance issues future. 

Refer to https://github.com/apache/flink/pull/10375#pullrequestreview-325193504



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15021) Refactor to remove channelWritabilityChanged from PartitionRequestQueue

2019-12-02 Thread zhijiang (Jira)
zhijiang created FLINK-15021:


 Summary: Refactor to remove channelWritabilityChanged from 
PartitionRequestQueue
 Key: FLINK-15021
 URL: https://issues.apache.org/jira/browse/FLINK-15021
 Project: Flink
  Issue Type: Task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


After removing the non credit-based flow control codes, the related channel 
writability changed logics in PartitionRequestQueue are invalid and can be 
removed completely. Therefore we can refactor the process to simplify the codes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14553) Respect non-blocking output in StreamTask#processInput

2019-10-29 Thread zhijiang (Jira)
zhijiang created FLINK-14553:


 Summary: Respect non-blocking output in StreamTask#processInput
 Key: FLINK-14553
 URL: https://issues.apache.org/jira/browse/FLINK-14553
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.10.0


The non-blocking output was introduced in FLINK-14396 and FLINK-14498 to solve 
the problem of handling the checkpoint barrier in the case of backpressure.

In order to make the whole process through, {{StreamInputProcessor}} should be 
allowed to process input elements if the output is also available.

The default core size of {{LocalBufferPool}} for {{ResultPartition}} should 
also be increased by 1 in order not to impact the performance in the new way, 
and this tiny memory overhead could be ignored in practice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14551) Unaligned checkpoints

2019-10-29 Thread zhijiang (Jira)
zhijiang created FLINK-14551:


 Summary: Unaligned checkpoints
 Key: FLINK-14551
 URL: https://issues.apache.org/jira/browse/FLINK-14551
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing, Runtime / Network
Reporter: zhijiang


This is the umbrella issue for the feature of unaligned checkpoints. Refer to 
the 
[FLIP-76|https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints]
 for more details.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14498) Introduce NetworkBufferPool#isAvailable() for interacting with LocalBufferPool

2019-10-22 Thread zhijiang (Jira)
zhijiang created FLINK-14498:


 Summary: Introduce NetworkBufferPool#isAvailable() for interacting 
with LocalBufferPool
 Key: FLINK-14498
 URL: https://issues.apache.org/jira/browse/FLINK-14498
 Project: Flink
  Issue Type: Task
  Components: Runtime / Network
Reporter: zhijiang


If the LocalBufferPool can not request available buffer from NetworkBufferPool, 
it would wait for 2 seconds before trying to request again in a loop way. 
Therefore it would bring some delays in practice.

To improve this interaction, we could introduce NetworkBufferPool#isAvailable 
to return a future which would be monitored by LocalBufferPool. Then once there 
are available buffers in NetworkBufferPool, it would complete this future to 
notify LocalBufferPool immediately. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14472) Implement back-pressure monitor with non-blocking outputs

2019-10-21 Thread zhijiang (Jira)
zhijiang created FLINK-14472:


 Summary: Implement back-pressure monitor with non-blocking outputs
 Key: FLINK-14472
 URL: https://issues.apache.org/jira/browse/FLINK-14472
 Project: Flink
  Issue Type: Task
  Components: Runtime / Network
Reporter: zhijiang
 Fix For: 1.10.0


Currently back-pressure monitor relies on detecting task threads that are stuck 
in `requestBufferBuilderBlocking`. There are actually two cases to cause 
back-pressure ATM:
 * There are no available buffers in `LocalBufferPool` and all the given quotas 
from global pool are also exhausted. Then we need to wait for buffer recycling 
to `LocalBufferPool`.
 * No available buffers in `LocalBufferPool`, but the quota has not been used 
up. While requesting buffer from global pool, it is blocked because of no 
available buffers in global pool. Then we need to wait for buffer recycling to 
global pool.

We already implemented the non-blocking output for the first case in 
[FLINK-14396|https://issues.apache.org/jira/browse/FLINK-14396], and we expect 
the second case done together with adjusting the back-pressure monitor which 
could check for `RecordWriter#isAvailable` instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14396) Implement rudimentary non-blocking network output

2019-10-15 Thread zhijiang (Jira)
zhijiang created FLINK-14396:


 Summary: Implement rudimentary non-blocking network output
 Key: FLINK-14396
 URL: https://issues.apache.org/jira/browse/FLINK-14396
 Project: Flink
  Issue Type: Task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.10.0


Considering the mailbox model and unaligned checkpoints requirements in future, 
task network output should be non-blocking. In other words, as long as output 
is available, it should never block for a subsequent/future single record write.

In the first version, we only implement the non-blocking output for the most 
regular case, and do not solve the following cases which still keep the 
previous behavior.
 * Big record which might span multiple buffers
 * Flatmap-like operators which might emit multiple records in every process
 * Broadcast watermark which might request multiple buffers at a time

The solution is providing the RecordWriter#isAvailable method and respective 
LocalBufferPool#isAvailable for judging the output beforehand. As long as there 
is at-least one available buffer in LocalBufferPool, the RecordWriter is 
available for network output in most cases.  This doesn’t include runtime 
handling of this non-blocking and availability behavior in StreamInputProcessor

Note: It requires the minimum number of buffers in output LocalBufferPool 
adjusting to (numberOfSubpartitions + 1) and also adjusting the monitor of 
backpressure future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14394) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-10-15 Thread zhijiang (Jira)
zhijiang created FLINK-14394:


 Summary: Remove unnecessary notifySubpartitionConsumed method from 
view reader 
 Key: FLINK-14394
 URL: https://issues.apache.org/jira/browse/FLINK-14394
 Project: Flink
  Issue Type: Task
Reporter: zhijiang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14289) Remove Optional fields from RecordWriter relevant classes

2019-09-29 Thread zhijiang (Jira)
zhijiang created FLINK-14289:


 Summary: Remove Optional fields from RecordWriter relevant classes
 Key: FLINK-14289
 URL: https://issues.apache.org/jira/browse/FLINK-14289
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


Based on the code style guides for [Jave 
Optional|[https://flink.apache.org/contributing/code-style-and-quality-java.html#java-optional]|https://flink.apache.org/contributing/code-style-and-quality-java.html#java-optional],]
 , the optional should not be used for class fields. 

So we remove the optional usages from RecordWriter, BroadcastRecordWriter and 
ChannelSelectorRecordWriter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14004) Define SourceReader interface to verify the integration with StreamOneInputProcessor

2019-09-08 Thread zhijiang (Jira)
zhijiang created FLINK-14004:


 Summary: Define SourceReader interface to verify the integration 
with StreamOneInputProcessor
 Key: FLINK-14004
 URL: https://issues.apache.org/jira/browse/FLINK-14004
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


We already refactored the task input and output sides based on the new source 
characters in FLIP-27. In order to further verify that the new source reader 
could work well with the unified StreamOneInputProcessor in mailbox model, we 
would design a unit test for integrating the whole process. In detail:
 * Define SourceReader and SourceOutput relevant interfaces based on FLIP-27

 * Implement an example of stateless SourceReader (bounded sequence of integers)

 * Define SourceReaderOperator to integrate the SourceReader with 
StreamOneInputProcessor

 * Define SourceReaderStreamTask to execute the source input and implement a 
unit test for it.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13798) Refactor the process of checking stream status while emitting watermark in source

2019-08-20 Thread zhijiang (Jira)
zhijiang created FLINK-13798:


 Summary: Refactor the process of checking stream status while 
emitting watermark in source
 Key: FLINK-13798
 URL: https://issues.apache.org/jira/browse/FLINK-13798
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


As we know, the watermark could be emitted to downstream only when the stream 
status is active. For the downstream task we already have the component of 
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the 
source task the current implementation of this logic seems a bit tricky. There 
are two scenarios for the source case:
 * Emit watermark via source context: In the specific WatermarkContext, it 
would toggle the  stream status as active before collecting/emitting 
records/watermarks. Then in the implementation of RecordWriterOutput, it would 
check the status always active before really emitting watermark.
 * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by timer 
in interval time. When it happens, it would call output stack to emit 
watermark. Then the RecordWriterOutput could take the role of checking status 
before really emitting watermark.

So we can see that the checking status logic in RecordWriterOutput only works 
for above second scenario, and this logic seems redundant for the first 
scenario because WatermarkContext always toggle active status before emitting. 
Even worse, the logic is RecordWriterOutput would bring cycle dependency with 
StreamStatusMaintainer, which is a blocker for the following work of 
integrating source processing on runtime side.

The solution is that we could migrate the checking logic from 
RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could 
also remove the toggle active logic  in existing WatermarkContext.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13767) Migrate isFinished method from AvailabilityListener to AsyncDataInput

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13767:


 Summary: Migrate isFinished method from AvailabilityListener to 
AsyncDataInput
 Key: FLINK-13767
 URL: https://issues.apache.org/jira/browse/FLINK-13767
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network, Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


AvailabilityListener is both used in AsyncDataInput and StreamTaskInput. We 
already introduced InputStatus for StreamTaskInput#emitNext, and then 
InputStatus#END_OF_INPUT has the same semantic with 
AvailabilityListener#isFinished.

But for the case of AsyncDataInput which is mainly used by InputGate layer, the 
isFinished() method is still needed at the moment. So we migrate this method 
from AvailabilityListener to  AsyncDataInput, and refactor the 
StreamInputProcessor implementations by using InputStatus to judge finished 
state.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13766) Refactor the implementation of StreamInputProcessor based on StreamTaskInput#emitNext

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13766:


 Summary: Refactor the implementation of StreamInputProcessor based 
on StreamTaskInput#emitNext
 Key: FLINK-13766
 URL: https://issues.apache.org/jira/browse/FLINK-13766
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


The current processing in task input processor is based on the way of pollNext. 
In order to unify the processing way of new source operator, we introduce the 
new StreamTaskInput#emitNext(Output) instead of current pollNext. Then we need 
to adjust the existing implementations of 
StreamOneInputProcessor/StreamTwoInputSelectableProcessor based on the new emit 
way.

To do so, we could integrate all the task inputs from network/source in a 
unified processing on runtime side.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13765) Introduce the InputSelectionHandler for selecting next input in StreamTwoInputSelectableProcessor

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13765:


 Summary: Introduce the InputSelectionHandler for selecting next 
input in StreamTwoInputSelectableProcessor
 Key: FLINK-13765
 URL: https://issues.apache.org/jira/browse/FLINK-13765
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


In StreamTwoInputSelectableProcessor there are three fields \{InputSelectable, 
InputSelection, availableInputsMask} to be used together for the function of 
selecting next available input index. It would bring two problems:
 * From design aspect, these fields should be abstracted into a separate 
component and passed into StreamTwoInputSelectableProcessor.
 * inputSelector.nextSelection() is called while processing elements in  
StreamTwoInputSelectableProcessor, so it is the blocker for integrating task 
input/output for both StreamOneInputProcessor/StreamTwoInputSelectableProcessor 
later.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13764) Pass the counter of numRecordsIn into the constructors of StreamOne/TwoInputProcessor

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13764:


 Summary: Pass the counter of numRecordsIn into the constructors of 
StreamOne/TwoInputProcessor
 Key: FLINK-13764
 URL: https://issues.apache.org/jira/browse/FLINK-13764
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


Currently the counter of numRecordsIn is setup while processing input in 
processor. In order to integrate the processing logic based on 
StreamTaskInput#emitNext(Output) later, we need to pass the counter into output 
functions then.

So this refactoring is the precondition of following works, and it could get 
additional benefits. One is that we could make the counter as final field in 
StreamInputProcessor. Another is that we could reuse the counter setup logic 
for both StreamOne/TwoInputProcessors.

There should be no side effects if we make the counter setup a bit earlier than 
the previous way.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13762) Integrate the implementation of ForwardingValveOutputHandler for StreamOne/TwoInputProcessor

2019-08-18 Thread zhijiang (JIRA)
zhijiang created FLINK-13762:


 Summary: Integrate the implementation of 
ForwardingValveOutputHandler for StreamOne/TwoInputProcessor
 Key: FLINK-13762
 URL: https://issues.apache.org/jira/browse/FLINK-13762
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


Currently StreamOneInputProcessor and StreamTwoInputSelectableProcessor have 
separate implementations of ForwardingValveOutputHandler. Especially for the 
implementation in  StreamTwoInputSelectableProcessor, it couples the internal 
input index logic which would be a blocker for the following unification of 
StreamTaskInput/Output.

We could realize a unified ForwardingValveOutputHandler for both 
StreamOneInput/ TwoInputSelectableProcessor, and it does not consider different 
inputs to always consume StreamStatus. Then we refactor the implementation of 
StreamStatusMaintainer for judging the status of different inputs internally 
before really emitting the StreamStatus.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13754) Decouple OperatorChain from StreamStatusMaintainer

2019-08-16 Thread zhijiang (JIRA)
zhijiang created FLINK-13754:


 Summary: Decouple OperatorChain from StreamStatusMaintainer
 Key: FLINK-13754
 URL: https://issues.apache.org/jira/browse/FLINK-13754
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


There are two motivations for this refactoring:
 * It is the precondition for the following work of decoupling the dependency 
between two inputs status in ForwardingValveOutputHandler.
 * From the aspect of design rule, the current OperatorChain takes many 
unrelated roles like StreamStatusMaintainer to make it unmaintainable. The root 
reason for this case is from the cycle dependency between RecordWriterOutput 
(created by OperatorChain) and  StreamStatusMaintainer.

The solution is to refactor the creation of StreamStatusMaintainer and 
RecordWriterOutput in StreamTask level, and then break the implementation cycle 
dependency between them. The array of RecordWriters which has close 
relationship with RecordWriterOutput is created in StreamTask, so it is 
reasonable to create them together. The created StreamStatusMaintainer in 
StreamTask can be directly referenced by subclasses like 
OneInputStreamTask/TwoInputStreamTask.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13753) Integrate new Source Operator with Mailbox Model in StreamTask

2019-08-16 Thread zhijiang (JIRA)
zhijiang created FLINK-13753:


 Summary: Integrate new Source Operator with Mailbox Model in 
StreamTask
 Key: FLINK-13753
 URL: https://issues.apache.org/jira/browse/FLINK-13753
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: zhijiang
Assignee: zhijiang


This is the umbrella issue for integrating new source operator with mailbox 
model in StreamTask.

The motivation is based on 
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
 which proposes to refactor the whole source API and the integration of 
task-level actions (including checkpoint, timer, async operator) with unified 
[mailbox model| 
[https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g]]
 on runtime side.
 * The benefits are simple unified processing logics because only one single 
thread handles all the actions without concurrent issue, and further getting 
rid of lock dependency which causes unfair lock concern in checkpoint process.
 * We still need to support the current legacy source in some releases which 
would probably be used for a while, especially for the scenario of performance 
concern.

The design doc is 
[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]|[https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit#|https://docs.google.com/document/d/13x9M7k1SRqkOFXP0bETcJemIRyJzoqGgkdy11pz5qHM/edit]]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13493) BoundedBlockingSubpartition only notifies onConsumedSubpartition when all the readers are empty

2019-07-30 Thread zhijiang (JIRA)
zhijiang created FLINK-13493:


 Summary: BoundedBlockingSubpartition only notifies 
onConsumedSubpartition when all the readers are empty
 Key: FLINK-13493
 URL: https://issues.apache.org/jira/browse/FLINK-13493
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


In previous implementation, it would always notify the {{ResultPartition}} of 
consumed subpartition if any reader view is released. Based on the 
reference-counter release strategy it might cause problems if one blocking 
subpartition has multiple readers. That means the whole result partition might 
be released but there are still alive readers in some subpartitions.

Although the default release strategy for blocking partition is based on 
JM/scheduler notification atm. But if we switch the option to based on 
consumption notification it would cause problems. And from the subpartition 
side it should has the right behavior no matter what is the specific release 
strategy in upper layer.

In order to fix this bug, the {{BoundedBlockingSubpartition}} would only notify 
{{onConsumedSubpartition}} when all the readers are empty.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13478) Decouple two different release strategies in BoundedBlockingSubpartition

2019-07-29 Thread zhijiang (JIRA)
zhijiang created FLINK-13478:


 Summary: Decouple two different release strategies in 
BoundedBlockingSubpartition
 Key: FLINK-13478
 URL: https://issues.apache.org/jira/browse/FLINK-13478
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


We have two basic release strategies atm. One is based on consumption via 
network notification from consumer. The other is based on notification via RPC 
from JM/scheduler.

But in current implementation of {{BoundedBlockingSubpartition}}, these two 
ways are coupled with each other. In detail, the network consumption 
notification could only close data file after the release RPC was triggered 
from JM/scheduler. Also for the release RPC it has to wait all the reader views 
really released before closing the data file. So the release RPC still relies 
on network notification to some extent.

In order to make these two release strategies independent, if the release call 
is from JM/scheduler RPC, we could immediately release all the view readers and 
then close the data file as well. If the release is based on consumption 
notification, after all the view readers for one subpartition are released, the 
subpartition could further notify the parent {{ResultPartition}} which decides 
whether to release the whole partition or not.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13442) Remove unnecessary notifySubpartitionConsumed method from view reader

2019-07-26 Thread zhijiang (JIRA)
zhijiang created FLINK-13442:


 Summary: Remove unnecessary notifySubpartitionConsumed method from 
view reader 
 Key: FLINK-13442
 URL: https://issues.apache.org/jira/browse/FLINK-13442
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently the methods of `NetworkSequenceViewReader#notifySubpartitionConsumed` 
and ` NetworkSequenceViewReader#releaseAllResources` would be called meanwhile 
in netty stack during releasing resources.

To make this release logic simple and clean, we could remove the redundant 
`notifySubpartitionConsumed` from `NetworkSequenceViewReader` side, and also 
remove it from `ResultSubpartitionView` side. In the implementation of 
`ResultSubpartitionView#releaseAllResources` it would further notify the parent 
subpartition of consumed state via 
`ResultSubpartition#notifySubpartitionConsumed` which further feedback to 
parent `ResultPartition` layer via `onConsumedSubpartition`. Finally 
`ResultPartition` could decide whether to release itself or not.

E.g. for the case of `ReleaseOnConsumptionResultPartition` which is mainly used 
for pipelined partition, it would release partition after reference counter 
decreased to 0. For the case of `ResultPartition` which would be generated for 
blocking partition by default, it would never be released after notifying 
consumed. And the JM/scheduler would decide when to release partition properly.

In addition, `InputChannel#notifySubpartitionConsumed` could also be removed as 
a result of above.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13235) Change the Netty default transport mode to auto

2019-07-11 Thread zhijiang (JIRA)
zhijiang created FLINK-13235:


 Summary: Change the Netty default transport mode to auto
 Key: FLINK-13235
 URL: https://issues.apache.org/jira/browse/FLINK-13235
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The current default config for "taskmanager.net.transport" in 
NettyShuffleEnvironmentOptions is "NIO". In order to use "EPOLL" mode which has 
better performance and is recommended when available, we could change the 
default config as "AUTO". Then the "NIO" mode is used as a fallback.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13141) Remove getBufferSize method from BufferPoolFactory

2019-07-08 Thread zhijiang (JIRA)
zhijiang created FLINK-13141:


 Summary: Remove getBufferSize method from BufferPoolFactory
 Key: FLINK-13141
 URL: https://issues.apache.org/jira/browse/FLINK-13141
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


This is just a refactor work to make the interfacer of BufferPoolFactory more 
simple and clean.

BufferPoolFactory#getBufferSize is only used for creating subpartitions in 
ResultPartitionFactory. We could pass the network buffer size from 
NettyShuffleEnvironmentConfiguration while constructing the 
ResultPartitionFactory, then the interface method getBufferSize could be 
removed form BufferPoolFactory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13126) Construct special test/benchmark to verify the backlog effect

2019-07-05 Thread zhijiang (JIRA)
zhijiang created FLINK-13126:


 Summary: Construct special test/benchmark to verify the backlog 
effect
 Key: FLINK-13126
 URL: https://issues.apache.org/jira/browse/FLINK-13126
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks, Runtime / Network, Tests
Reporter: zhijiang
Assignee: zhijiang


Based on Piotr's suggestion in reviewing 
[PR|[https://github.com/apache/flink/pull/7911]], it is better to construct 
relevant test or benchmark to further verify the backlog effect as a follow-up 
work for  [FLINK-11082|https://issues.apache.org/jira/browse/FLINK-11082].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-04 Thread zhijiang (JIRA)
zhijiang created FLINK-13100:


 Summary: Fix the unexpected IOException during 
FileBufferReader#nextBuffer
 Key: FLINK-13100
 URL: https://issues.apache.org/jira/browse/FLINK-13100
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang


In the implementation of FileBufferReader#nextBuffer, we expect the next memory 
segment always available based on the assumption that the nextBuffer call could 
only happen when the previous buffer was recycled before. Otherwise it would 
throw an IOException in current implementation.

In fact, the above assumption is not making sense based on the credit-based and 
zero-copy features in network. The detail processes are as follows:
 * The netty thread finishes calling the channel.writeAndFlush() in 
PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
Before future done, the corresponding buffer is not recycled because of 
zero-copy improvement.

 * Before the previous future done, the netty thread could trigger next 
writeAndFlush via processing addCredit message, then 
FileBufferReader#nextBuffer would throw exception because of previous buffer 
not recycled.

We thought of several ways for solving this potential bug:
 * It does not trigger the next writeAndFlush before the previous future done. 
To do so it has to maintain the future state and check it in relevant actions. 
I wonder it might bring performance regression in network throughput and bring 
extra state management.

 * Adjust the implementation of current FileBufferReader. We ever regarded the 
blocking partition view as always available based on the next buffer read 
ahead, so it would be always added into available queue in 
PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
availability could be judged based on available buffers in FileBufferReader 
instead of next buffer ahead. When the buffer is recycled into FileBufferReader 
after writeAndFlush done, it could call notifyDataAvailable to add this view 
into available queue in PartitionRequestQueue.

I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13010) Refactor the process of SchedulerNG#requestPartitionState

2019-06-26 Thread zhijiang (JIRA)
zhijiang created FLINK-13010:


 Summary: Refactor the process of SchedulerNG#requestPartitionState
 Key: FLINK-13010
 URL: https://issues.apache.org/jira/browse/FLINK-13010
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently `requestPartitionState` is mainly used for querying partition state 
when the consumer receives `PartitionNotFoundException` during requesting 
partition. Actually we do not have the concept of partition state atm, and ` 
requestPartitionState` would return the corresponding producer's state as as 
result, so it exists a contradiction here.

My suggestion is refactoring the method as `requestPartitionProducerState` and 
we do not need to pass `IntermediateDataSetID` and `ResultPartitionID` 
arguments for finding the corresponding execution attempt. We could only pass 
the `ExecutionAttemptID` in method then the corresponding execution attempt 
could be easily found from the mapping in `ExecutionGraph`.

To do so, we could further remove ` IntermediateDataSetID` from 
`SingleInputGate` and might replace `IntermediateDataSetID` by `InputGateID` in 
`InputGateDeploymentDescriptor`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters

2019-06-18 Thread zhijiang (JIRA)
zhijiang created FLINK-12882:


 Summary: Remove ExecutionAttemptID field from 
ShuffleEnvironment#createResultPartitionWriters
 Key: FLINK-12882
 URL: https://issues.apache.org/jira/browse/FLINK-12882
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} 
during creating {{ResultPartitionWriter}}s in {{ShuffleEnvironment}}.

Actually the {{ResultPartitionID}} could be got directly from 
{{ResultPartitionDeploymentDescriptor}} via 
{{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this 
field in the interface to make it simple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12843) Refactor the pin logic in ResultPartition

2019-06-13 Thread zhijiang (JIRA)
zhijiang created FLINK-12843:


 Summary: Refactor the pin logic in ResultPartition
 Key: FLINK-12843
 URL: https://issues.apache.org/jira/browse/FLINK-12843
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The pin logic is for adding the reference counter based on number of 
subpartitions in {{ResultPartition}}. It seems not necessary to do it in while 
loop as now, because the atomic counter would not be accessed by other threads 
during pin. If the `ResultPartition` is not created yet, the 
{{ResultPartition#createSubpartitionView}} would not be called and it would 
response {{ResultPartitionNotFoundException}} in {{ResultPartitionManager}}. 

So we could simple increase the reference counter in {{ResultPartition}} 
constructor directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12842) Fix invalid check released state during ResultPartition#createSubpartitionView

2019-06-13 Thread zhijiang (JIRA)
zhijiang created FLINK-12842:


 Summary: Fix invalid check released state during 
ResultPartition#createSubpartitionView
 Key: FLINK-12842
 URL: https://issues.apache.org/jira/browse/FLINK-12842
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently in {{ResultPartition#createSubpartitionView}} it would check whether 
this partition is released before creating view. But this check is based on 
{{refCnt != -1}} which seems invalid, because the reference counter would not 
always reflect the released state.

In the case of {{ResultPartition#release/fail}}, the reference counter is not 
set to -1. Even if in the case of {{ResultPartition#onConsumedSubpartition}}, 
the reference counter seems also no chance to be -1.

So we could check the real {{isReleased}} state during creating view instead of 
reference counter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12738) Remove abstract getPageSize from InputGate

2019-06-05 Thread zhijiang (JIRA)
zhijiang created FLINK-12738:


 Summary: Remove abstract getPageSize from InputGate
 Key: FLINK-12738
 URL: https://issues.apache.org/jira/browse/FLINK-12738
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently {{InputGate#getPageSize}} is only used for constructing 
{{BarrierBuffer}}. In order to make abstract InputGate simple and clean, we 
should remove unnecessary abstract methods from it.

Considering the page size could be parsed directly from configuration which 
could also visible while constructing {{BarrierBuffer}}, so it is reasonable to 
do so.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12735) Make shuffle environment implementation independent with IOManager

2019-06-05 Thread zhijiang (JIRA)
zhijiang created FLINK-12735:


 Summary: Make shuffle environment implementation independent with 
IOManager
 Key: FLINK-12735
 URL: https://issues.apache.org/jira/browse/FLINK-12735
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The current creation of {{NetworkEnvironment}} is relying on {{IOManager}} from 
{{TaskManagerServices}}. In order not to rely on external specific components 
for implementing shuffle environment, and let the specific implementation 
creates internal components if required.

The current abstract {{IOManager}} has two roles, one is for file channel 
management based on temp directories configuration, and the other is providing 
abstract methods for reading/writing files. We could further extract the file 
channel management as a separate internal class which could be reused for all 
the required components, like current {{BoundedBlockingSubpartition}}. To do 
so, the shuffle environment should also creates its internal channel manager to 
break dependency with {{IOManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12630) Refactor abstract InputGate to general interface

2019-05-27 Thread zhijiang (JIRA)
zhijiang created FLINK-12630:


 Summary: Refactor abstract InputGate to general interface
 Key: FLINK-12630
 URL: https://issues.apache.org/jira/browse/FLINK-12630
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


`InputGate` is currently defined as an abstract class which extracts the common 
codes for checking data availability for subclasses `SingleInputGate` and 
`UnionInputGate`, but it might bring limits for further extending `InputGate` 
implementations in shuffle service architecture.

`SingleInputGate` is created from shuffle service so it belongs to the scope of 
shuffle service, while `UnionInputGate` is a wrapper of some `SingleInputGate`s 
so it should be in the task/processor stack.

In order to make a new `InputGate` implementation from another new shuffle 
service could be directly pitched in, we should define a more clean `InputGate` 
interface to decouple the implementation of checking data available logic. In 
detail we could define the `isAvailable` method in `InputGate` interface and 
extract the current implementation as a separate class 
`FutureBasedAvailability` which could still be extent and reused for both 
`SingleInputGate` and `UnionInputGate`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12603) Remove getOwningTaskName method from InputGate

2019-05-23 Thread zhijiang (JIRA)
zhijiang created FLINK-12603:


 Summary: Remove getOwningTaskName method from InputGate
 Key: FLINK-12603
 URL: https://issues.apache.org/jira/browse/FLINK-12603
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Current `InputGate#getOwningTaskName` is only used for logging in related 
components such as `BarrierBuffer`, `StreamInputProcessor`, etc. We could put 
this name in the structure of `TaskInfo`,  then the related components could 
get task name directly from `RuntimeEnvironment#getTaskInfo`.

To do so, we could simplify the interface of `InputGate`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12571) Make NetworkEnvironment#start() return the binded data port

2019-05-21 Thread zhijiang (JIRA)
zhijiang created FLINK-12571:


 Summary: Make NetworkEnvironment#start() return the binded data 
port
 Key: FLINK-12571
 URL: https://issues.apache.org/jira/browse/FLINK-12571
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently `NetworkEnvironment#getConnectionManager()` is mainly used for 
`TaskManagerServices` for getting binded data port from 
`NettyConnectionManager`. Considering the `ConnectionManager` as an internal 
component of `NetworkEnvironment`, it should not be exposed for outsides. For 
other ShuffleService implementations, it might have no `ConnectionManager` at 
all.

We could make `ShuffleService#start()` return the binded data port to replace 
the `getConnectionManager`. For the `LocalConnectionManager` or other shuffle 
service implementations which have no binded data port, it could return a 
simple default value and it would have no harm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12564) Remove getBufferProvider method from ResultPartitionWriter interface

2019-05-20 Thread zhijiang (JIRA)
zhijiang created FLINK-12564:


 Summary: Remove getBufferProvider method from 
ResultPartitionWriter interface
 Key: FLINK-12564
 URL: https://issues.apache.org/jira/browse/FLINK-12564
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently `ResultPartitionWriter#getBufferProvider` is used for requesting 
`BufferBuilder` in `RecordWriter`, then the `BufferConsumer` created from 
`BufferBuilder` is added into `ResultPartitionWriter` via `addBufferConsumer` 
method.

We could merge these two methods in `ResultPartitionWriter` in order not to 
expose `getBufferProvider`. `ResultPartitionWriter` could internally request 
`BufferBuilder` and  add the created `BufferConsumer` into one sub partition, 
then return the `BufferBuilder` for `RecordWriter` writing serialized data.

Since we also change the `ResultPartitionWriter#addBufferConsumer` to 
`ResultPartitionWriter#requestBufferBuilder`, then another new method 
`ResultPartitionWriter#broadcastEvents` should be introduced for handling the 
case of events.

In future it might worth further abstracting the `ResultPartitionWriter` to be 
not only related to  `BufferBuilder`. We could provide `writeRecord(int 
targetIndex)` to replace `requestBufferBuilder`, then the serialization process 
could be done inside specific `ResultPartitionWriter` instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12544) Deadlock during releasing memory in SpillableSubpartition

2019-05-17 Thread zhijiang (JIRA)
zhijiang created FLINK-12544:


 Summary: Deadlock during releasing memory in SpillableSubpartition
 Key: FLINK-12544
 URL: https://issues.apache.org/jira/browse/FLINK-12544
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


It is reported by flink user, and the original jstack is as following:

 
{code:java}
// "CoGroup (2/2)":
    at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
    - waiting to lock <0x00062bf859b8> (a java.lang.Object)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
    at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
    - waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
    at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
    at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
    at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
    - locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
    - locked <0x00063c785350> (a java.lang.Object)
    at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
    at 
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
    at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
    - locked <0x00062bf859b8> (a java.lang.Object)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
    at java.lang.Thread.run(Thread.java:745)
"DataSource  (1/1)":
    at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
    - waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
    - locked <0x00063fdf4888> (a java.util.ArrayDeque)
    at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
    at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
{code}
Based on the analysis, it happens when one task A is trying to release sub 
partition memory, then it occupies the lock in `LocalBufferPool`, and trying to 
get the lock in `SpillSubpartition`. Meanwhile, another task B is submitted to 
TM to trigger previous task to release memory, then it would already 

[jira] [Created] (FLINK-12497) Refactor the start method of ConnectionManager

2019-05-13 Thread zhijiang (JIRA)
zhijiang created FLINK-12497:


 Summary: Refactor the start method of ConnectionManager
 Key: FLINK-12497
 URL: https://issues.apache.org/jira/browse/FLINK-12497
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


In current `ConnectionManager#start(ResultPartitionProvider, 
TaskEventDispatcher)`, the parameters in start are only reasonable for 
`NettyConnectionManager` implementation, reductant for 
`LocalConnectionManager`. 

We could put these parameters in the constructor of `NettyConnectionManager`, 
then `ConnectionManager#start()` would be more cleaner for both 
implementations. And it also bring benefits for calling start in 
`NetworkEnvironment` which does not need to maintain private 
`TaskEventDispatcher`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12474) UnionInputGate should be notified when closing SingleInputGate by canceler thread

2019-05-10 Thread zhijiang (JIRA)
zhijiang created FLINK-12474:


 Summary: UnionInputGate should be notified when closing 
SingleInputGate by canceler thread
 Key: FLINK-12474
 URL: https://issues.apache.org/jira/browse/FLINK-12474
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang


If task is being canceled, the `SingleInputGate` would be closed by canceler 
thread. If the `SingleInputGate` is waiting for buffer, `inputChannelWithData` 
would be notified to wake task thread to exit early. But if the 
`UnionInputGate` is waiting for buffer, task thread is still stucking in wait 
when `SingleInputGate` is closed until cancel timeout.

To make task exit early in this case, we could make `SingleInputGate` further 
notify `UnionInputGate` after it is closed, then it could also wake task thread 
to exit during canceling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12458) Throw PartitionNotFoundException if consumer can not establish a connection to remote TM

2019-05-09 Thread zhijiang (JIRA)
zhijiang created FLINK-12458:


 Summary: Throw PartitionNotFoundException if consumer can not 
establish a connection to remote TM
 Key: FLINK-12458
 URL: https://issues.apache.org/jira/browse/FLINK-12458
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


If the consumer can not establish a connection to remote task executor, which 
might indicate the remote task executor is not reachable. We could wrap this 
connection exception into existing `PartitionNotFoundException`, then the job 
master would decide whether to restart the upstream region to re-producer 
partition data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12331) Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-04-25 Thread zhijiang (JIRA)
zhijiang created FLINK-12331:


 Summary: Introduce partition/gate setup to decouple task 
registration with NetworkEnvironment
 Key: FLINK-12331
 URL: https://issues.apache.org/jira/browse/FLINK-12331
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.9.0


In order to decouple task with {{NetworkEnvironment}} completely, we introduce 
interface methods for {{InputGate#setup}} and {{ResultPartitionWriter#setup}}. 
Then the task could call the {{setup}} method for the created partitions/gates 
directly instead of calling current {{registerTask}} via {{NetworkEnvironment}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12213) Pass TaskManagerMetricGroup into constructor of NetworkEnvironment

2019-04-16 Thread zhijiang (JIRA)
zhijiang created FLINK-12213:


 Summary: Pass TaskManagerMetricGroup into constructor of 
NetworkEnvironment
 Key: FLINK-12213
 URL: https://issues.apache.org/jira/browse/FLINK-12213
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.9.0


At the moment {{NetworkEnvironment#getNetworkBufferPool}} is called to add 
network related {{MetricGroup}}. In order to simplify the public API in 
{{NetworkEnvironment}} which is regarded as default {{ShuffleService}} 
implementation, we could pass the {{TaskManagerMetricGroup}} into constructor 
of {{NetworkEnvironment}}, then the related network {{MetricGroup}} could be 
added internally. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12154) Remove legacy fields for SingleInputGate

2019-04-10 Thread zhijiang (JIRA)
zhijiang created FLINK-12154:


 Summary: Remove legacy fields for SingleInputGate
 Key: FLINK-12154
 URL: https://issues.apache.org/jira/browse/FLINK-12154
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


In {{SingleInputGate#create}}, we could remove unused parameter 
{{ExecutionAttemptID}}.

And for the constructor of {{SingleInputGate}}, we could remove unused 
parameter {{TaskIOMetricGroup}}.

Then we introduce {{createSingleInputGate}} for reusing the process of creating 
{{SingleInputGate}} in related tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12146) Remove unregister task from NetworkEnvironment to simplify the interface of ShuffleService

2019-04-09 Thread zhijiang (JIRA)
zhijiang created FLINK-12146:


 Summary: Remove unregister task from NetworkEnvironment to 
simplify the interface of ShuffleService
 Key: FLINK-12146
 URL: https://issues.apache.org/jira/browse/FLINK-12146
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The current {{NetworkEnvironment}} would be the default {{ShuffleService}} 
implementation in task manager. In order to make the interface simple, we try 
to avoid more interactive with {{NetworkEnvironment}}.

{{NetworkEnvironment#unregisterTask}} is used for closing partition/gate and 
releasing partition from {{ResultPartitionManager}}. partition/gate close could 
be done in task which already maintains the arrays of them. Further we could 
release partition from {{ResultPartitionManager}} inside {{ResultPartition}} 
via introducing {{ResultPartition#close(Throwable)}}. To do so, the 
{{NetworkEnvironment#unregisterTask}} could be totally replaced to remove.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12127) Move network related options to NetworkEnvironmentOptions

2019-04-08 Thread zhijiang (JIRA)
zhijiang created FLINK-12127:


 Summary: Move network related options to NetworkEnvironmentOptions
 Key: FLINK-12127
 URL: https://issues.apache.org/jira/browse/FLINK-12127
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Some network related options in TaskManagerOptions could be moved into new 
introduced `NetworkEnvironmentOptions` which would be used for different 
shuffle services.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12078) Abstract TaskEventPublisher interface for simplify NetworkEnvironment

2019-04-01 Thread zhijiang (JIRA)
zhijiang created FLINK-12078:


 Summary: Abstract TaskEventPublisher interface for simplify 
NetworkEnvironment
 Key: FLINK-12078
 URL: https://issues.apache.org/jira/browse/FLINK-12078
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently {{TaskEventDispatcher}} is maintained in {{NetworkEnvironment}} for 
register/unregister partition and used for {{NettyConnectionManager}}. In order 
for further decoupling {{Task}} with {{NetworkEnvironment}}, we introduce 
{{TaskEventPublisher}} interface for providing {{publish}} method only. Then 
the {{NetworkEnvironment}} could maintain {{TaskEventPublisher}} and the 
{{register/unregister}} would be removed outside to be handled by {{Task}} 
directly.

To do so, the {{NetworkEnvironment#unregisterTask}} would be removed finally 
and the {{partition/gate#close}} could be instead.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12067) Refactor the constructor of NetworkEnvironment

2019-03-29 Thread zhijiang (JIRA)
zhijiang created FLINK-12067:


 Summary: Refactor the constructor of NetworkEnvironment
 Key: FLINK-12067
 URL: https://issues.apache.org/jira/browse/FLINK-12067
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The constructor of {{NetworkEnvironment}} could be refactored to only contain 
{{NetworkEnvironmentConfiguration}}, the other related components such as 
{{TaskEventDispatcher}}, {{ResultPartitionManager}}, {{NetworkBufferPool}} 
could be created internally.

We also refactor the process of generating {{NetworkEnvironmentConfiguration}} 
in {{TaskManagerServiceConfiguration}} to add {{numNetworkBuffers}} instead of 
previous {{networkBufFraction}}, {{networkBufMin}}, {{networkBufMax}}.

Further we introduce the {{NetworkEnvironmentConfigurationBuilder}} for 
creating {{NetworkEnvironmentConfiguration}} easily especially for tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11988) Remove legacy MockNetworkEnvironment

2019-03-21 Thread zhijiang (JIRA)
zhijiang created FLINK-11988:


 Summary: Remove legacy MockNetworkEnvironment
 Key: FLINK-11988
 URL: https://issues.apache.org/jira/browse/FLINK-11988
 Project: Flink
  Issue Type: Task
  Components: Runtime / Network, Tests
Reporter: zhijiang
Assignee: zhijiang


Remove legacy {{MockNetworkEnvironment}} class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11885) Introduce RecordWriterBuilder for creating RecordWriter instance

2019-03-12 Thread zhijiang (JIRA)
zhijiang created FLINK-11885:


 Summary: Introduce RecordWriterBuilder for creating RecordWriter 
instance
 Key: FLINK-11885
 URL: https://issues.apache.org/jira/browse/FLINK-11885
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The current {{RecordWriter}} would be refactored as an abstract class for 
improving broadcast mode mentioned in FLINK-10995. So it is better to migrate 
the logic of building {{RecordWriter}} instance into a separate 
{{RecordWriterBuilder}} utility class which should be the only entrance for 
other usages.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >