[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015364#comment-16015364
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3525
  
Great to hear @WangTaoTheTonic. Thanks a lot for testing.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.3.0, 1.4.0
>
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015328#comment-16015328
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user WangTaoTheTonic closed the pull request at:

https://github.com/apache/flink/pull/3525


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.3.0, 1.4.0
>
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015327#comment-16015327
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3525
  
@tillrohrmann I've tried with your commit and the issue is resolved, 
thanks. Closing this PR.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.3.0, 1.4.0
>
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013597#comment-16013597
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3888


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013566#comment-16013566
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3525
  
Thanks for your fix, i'll check in a day or two.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010300#comment-16010300
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3888
  
Thanks for the quick review @StefanRRichter. I will address your comment 
and also add a test case for the delete path.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010293#comment-16010293
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3888#discussion_r116457501
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -235,8 +235,54 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
return;
}
 
-   // from here on, we started sending data, so all we can do is 
close the connection when something happens
+   readLock.lock();
+
try {
+   try {
+   if (!blobFile.exists()) {
+   // first we have to release the read 
lock in order to acquire the write lock
+   readLock.unlock();
+   writeLock.lock();
--- End diff --

True, I'm also not sure which version would be more efficient. Given that 
the file transfer dominates the execution time here, I assume that the 
additional check won't hurt.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010227#comment-16010227
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3888#discussion_r116446914
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -235,8 +235,54 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
return;
}
 
-   // from here on, we started sending data, so all we can do is 
close the connection when something happens
+   readLock.lock();
+
try {
+   try {
+   if (!blobFile.exists()) {
+   // first we have to release the read 
lock in order to acquire the write lock
+   readLock.unlock();
+   writeLock.lock();
--- End diff --

In between upgrading from read lock to write lock, multiple threads can 
reach this point and as far as I see, then a file can be written more often 
then required. I would assume the code still produces correct result, but could 
do duplicate work. An obvious fix would be to re-check `blobFile.exists()` 
under the write lock, but now sure if the costs of another meta data query per 
write could offset occasional, but unlikely duplicate work. 


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010059#comment-16010059
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3525
  
@WangTaoTheTonic I've opened a PR which addresses the problem: #3888. If 
you like, then you can try it out and see if it solves your problem.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16008382#comment-16008382
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3888

[FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file 
creation and deletion

This PR is based on #3873 and #3864.

This commit introduces a BlobServer#readWriteLock in order to synchronize 
file creation
and deletion operations in BlobServerConnection and BlobServer. This will 
prevent
that multiple put, delete and get operations interfere with each other.

The get operations are synchronized using the read lock in order to 
guarantee some kind of
parallelism.

What this PR does not address is the handling of concurrent writes and 
reads to the `BlobStore`. This could be solved via SUCCESS files in order to 
indicate the completion of a file. However, the first read operation should now 
happen strictly after the write operation due to the locking.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink concurrentBlobUploads

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3888.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3888


commit 28bc0acd5c2d3858e41fd29113fff1c7a40471f5
Author: Till Rohrmann 
Date:   2017-05-11T15:36:17Z

[FLINK-6555] [futures] Generalize ConjunctFuture to return results

The ConjunctFuture now returns the set of future values once it is 
completed.

commit f221353584c9089552572387eee9e162695311cd
Author: Till Rohrmann 
Date:   2017-05-12T09:05:13Z

Introduce WaitingConjunctFuture; Fix thread safety issue with 
ResultConjunctFuture

The WaitingConjunctFuture waits for the completion of its futures. The 
future values
are discarded making it more efficient than the ResultConjunctFuture which 
returns
the futures' values. The WaitingConjunctFuture is instantiated via
FutureUtils.waitForAll(Collection).

commit 79eafa2671f4a4dfdc9ab135443c339ef2e8001a
Author: Till Rohrmann 
Date:   2017-05-09T08:26:37Z

[FLINK-6519] Integrate BlobStore in lifecycle management of 
HighAvailabilityServices

The HighAvailabilityService creates a single BlobStoreService instance 
which is
shared by all BlobServer and BlobCache instances. The BlobStoreService's 
lifecycle
is exclusively managed by the HighAvailabilityServices. This means that the
BlobStore's content is only cleaned up if the HighAvailabilityService's HA 
data
is cleaned up. Having this single point of control, makes it easier to 
decide when
to discard HA data (e.g. in case of a successful job execution) and when to 
retain
the data (e.g. for recovery).

Close and cleanup all data of BlobStore in HighAvailabilityServices

Use HighAvailabilityServices to create BlobStore

Introduce BlobStoreService interface to hide close and 
closeAndCleanupAllData methods

commit c6ff2ced58e60f63d0236e53c83192f64479c44a
Author: Till Rohrmann 
Date:   2017-05-10T15:38:49Z

[FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file 
creation and deletion

This commit introduces a BlobServer#readWriteLock in order to synchronize 
file creation
and deletion operations in BlobServerConnection and BlobServer. This will 
prevent
that multiple put and get operations interfere with each other and with get 
operations.

The get operations are synchronized using the read lock in order to 
guarantee some kind of
parallelism.




> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Till Rohrmann
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> 

[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16001897#comment-16001897
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3525
  
That looks good to me. Looking forward to fix from @tillrohrmann. Thank you 
very much :)


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16001004#comment-16001004
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
@WangTaoTheTonic I think we can solve that the following way:

  - The local upload uses `ATOMIC_MOVE` to rename the file
  - Only the thread that succeeds will store the blob in HDFS or S3

What do you think?


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000669#comment-16000669
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3525
  
For HA case, the blob server will upload jars to HDFS for recovery, and 
there's a cocurrent operations here too. I'm not sure if the solutions ou 
proposed can cover that.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999405#comment-15999405
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
@WangTaoTheTonic I have debugged a bit further in this issue, and it seems 
there is a bit more to do.
For non-HA blob servers, the atomic rename fix would do it.

For HA cases, we need to do a bit more. A recent change was that the blob 
cache will try and fetch blobs directly from the blob store, which may cause 
pre-mature reads before the blob has been fully written. Because the storage 
systems we target for HA do not all support atomic renames (S3 does not), we 
need to use the `_SUCCESS` file trick to mark completed blobs.

I chatted with @tillrohrmann about that, he agreed to take a look at fixing 
these and will make an effort to get this into the 1.3 release. Hope that this 
will work for you.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999404#comment-15999404
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
Yes, @netguy204 - that is definitely one possible way for class loaders to 
leak over...


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998814#comment-15998814
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user netguy204 commented on the issue:

https://github.com/apache/flink/pull/3525
  
@StephanEwen Yes, I do have at least objects and classes being stored in a 
static context. Any easy example (that has also bitten me a few times) is the 
class cache that Avro maintains: 


https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java#L146

The Avro API's, unless told otherwise, will use a singleton instance of 
SpecificData and will access that shared cache.

Would something like that be enough to cause the classloader to pass 
between jobs?


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996568#comment-15996568
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
@netguy204 I think you are affected by a different issue. In your case, 
there are no damaged jar files, but it looks like the classloader has been 
closed.

Flink creates classloaders per job and caches them across different tasks 
of that job. It closes the dynamically created classloaders when all tasks from 
the job are done.

Is it possible that a classloader passes between jobs, meaning that another 
job uses a class loaders that was created for another job? Do you store some 
objects / classes / classloaders somewhere in a static context or a cache or 
interner so that it can be that one job created them and another job re-uses 
them?


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15992954#comment-15992954
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user netguy204 commented on the issue:

https://github.com/apache/flink/pull/3525
  
+1 I'm looking forward to this fix as I think I'm encountering this bug in 
production.

I bundle my jobs into a single JAR file with multiple mains. I submit the 
jobs to the cluster sequentially (once the cluster accepts one I submit the 
next). My job also has two dependency JARs that I provide via HTTP using the -C 
switch to flink.

When a job fails it automatically restarts but it seems to cause other jobs 
from the same JAR to fail and restart as well. The error is always some 
variation of:

```
java.lang.IllegalStateException: zip file closed
at java.util.zip.ZipFile.ensureOpen(ZipFile.java:669)
at java.util.zip.ZipFile.getEntry(ZipFile.java:309)
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at sun.net.www.protocol.jar.URLJarFile.getEntry(URLJarFile.java:128)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath$JarLoader.findResource(URLClassPath.java:983)
at sun.misc.URLClassPath.findResource(URLClassPath.java:188)
at java.net.URLClassLoader$2.run(URLClassLoader.java:569)
at java.net.URLClassLoader$2.run(URLClassLoader.java:567)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findResource(URLClassLoader.java:566)
at java.lang.ClassLoader.getResource(ClassLoader.java:1093)
at java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232)
 backtrace from some arbitrary point in my code that never is 
doing anything with reflection ...
```

The class load that triggers the fault is arbitrary. The same job may fail 
and restart multiple times in the same day with a different failing class load.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978555#comment-15978555
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
@WangTaoTheTonic I will take this issue next week. I think we can fix this 
using `Files.move(src, dest, ATOMIC_MOVE)` to avoid that multiple jobs get in 
each others' way. By that we preserve the cross-job caching behavior and should 
fix the issue you described.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958155#comment-15958155
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
I am currently travelling and then attending Flink Forward. Will come back 
to this after that.

Quick feedback:
  - I am still thinking that the random suffix breaks the original idea of 
the cached blobs.
  - The blob manager counts references to files and does not delete them as 
long as someone has a reference. That prevents deletion if multiple parties 
work with the same jar.
  - Properly handling rename and add reference in one lock, as well as 
de-reference and delete in the same lock should fix it, I think
  - The blob manager needs to make sure it has an exclusive directory, so 
that no other process accesses the files. But I think that is the case already.


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly

2017-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15956452#comment-15956452
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3525
  
hi stephan, could you help review? @StephanEwen 


> Blob Server cannot handle multiple job submits (with same content) parallelly
> -
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)