[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache

2020-01-18 Thread luoguohao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018798#comment-17018798
 ] 

luoguohao commented on FLINK-13758:
---

i'm ok to transfer this ticket to you if you have much more time to handle 
it.[~fly_in_gis]

> failed to submit JobGraph when registered hdfs file in DistributedCache 
> 
>
> Key: FLINK-13758
> URL: https://issues.apache.org/jira/browse/FLINK-13758
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1, 1.9.0, 1.9.1, 1.10.0
>Reporter: luoguohao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> when using HDFS files for DistributedCache, it would failed to submit 
> jobGraph, we can see exceptions stack traces in log file after a while, but 
> if DistributedCache file is a local file, every thing goes fine.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache

2020-01-18 Thread luoguohao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018534#comment-17018534
 ] 

luoguohao commented on FLINK-13758:
---

Yes,it really took long time to fix this issue,I already made a pull request 
for this issue,but it have not so much progress on it .after all, thanks,@Yang 
Wang



--
发自我的网易邮箱手机智能版



在 2020-01-16 16:50:00,"Yang Wang (Jira)"  写道:


Currently, the public API {{StreamExecutionEnvironment#registerCachedFile}} has 
been broken.  We could not register remote cached files for all session 
deployments(standalone, Yarn, Kubernetes, Mesos). Since they are using 
{{RestClusterClient}} to submit job.

[~gjy], [~liyu] Do you think this ticket should be a blocker for 1.10? |


> failed to submit JobGraph when registered hdfs file in DistributedCache 
> 
>
> Key: FLINK-13758
> URL: https://issues.apache.org/jira/browse/FLINK-13758
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1, 1.9.0, 1.9.1, 1.10.0
>Reporter: luoguohao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> when using HDFS files for DistributedCache, it would failed to submit 
> jobGraph, we can see exceptions stack traces in log file after a while, but 
> if DistributedCache file is a local file, every thing goes fine.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache

2019-09-06 Thread luoguohao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924706#comment-16924706
 ] 

luoguohao commented on FLINK-13758:
---

[~Zentol] ok, the key problem is that when i try to register a hdfs file into 
DistributedCache:
{code:java}
env.registerCachedFile("hdfs://myhdfs/path/of/file", "test_data", false)
{code}
it will fail to execute job on cluster mode, but local mode is ok. by the way, 
the hdfs path is accessible from the cluster.

 

> failed to submit JobGraph when registered hdfs file in DistributedCache 
> 
>
> Key: FLINK-13758
> URL: https://issues.apache.org/jira/browse/FLINK-13758
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1
>Reporter: luoguohao
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> when using HDFS files for DistributedCache, it would failed to submit 
> jobGraph, we can see exceptions stack traces in log file after a while, but 
> if DistributedCache file is a local file, every thing goes fine.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache

2019-08-28 Thread luoguohao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16917639#comment-16917639
 ] 

luoguohao commented on FLINK-13758:
---

[~Zentol] Thanks for reply, in the current implementation, HDFS files will not 
stored in the BlobServer, and HDFS files is retrieved directly from HDFS when 
the Task initialized in TaskManager, so if we want to store all the files into 
BlobServer, the scope of code adjustment is wider. And in my opinion, as long 
as the user choose the HDFS file as a DistributeCache file, the user should 
make sure that the file is accessible from the cluster, but not the cluster 
itself.

> failed to submit JobGraph when registered hdfs file in DistributedCache 
> 
>
> Key: FLINK-13758
> URL: https://issues.apache.org/jira/browse/FLINK-13758
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1
>Reporter: luoguohao
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> when using HDFS files for DistributedCache, it would failed to submit 
> jobGraph, we can see exceptions stack traces in log file after a while, but 
> if DistributedCache file is a local file, every thing goes fine.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache

2019-08-19 Thread luoguohao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910185#comment-16910185
 ] 

luoguohao commented on FLINK-13758:
---

[~fly_in_gis] yes. this works in local mode, because the different code path. 
but if you deploy the application on the cluster, it would failed after a while 
(default would be 100 minutes). 

> failed to submit JobGraph when registered hdfs file in DistributedCache 
> 
>
> Key: FLINK-13758
> URL: https://issues.apache.org/jira/browse/FLINK-13758
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1
>Reporter: luoguohao
>Priority: Major
>
> when using HDFS files for DistributedCache, it would failed to submit 
> jobGraph, we can see exceptions stack traces in log file after a while, but 
> if DistributedCache file is a local file, every thing goes fine.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache

2019-08-17 Thread luoguohao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909628#comment-16909628
 ] 

luoguohao commented on FLINK-13758:
---

after reading the code, i found that every files registered in the 
distributedCache will uploaded to the jobManager, but in fact, only local file 
should be uploaded to add the file into the blobServer, other file types such 
as hdfs, is not needed, which only stores the file path in jobManager. When 
Task inits in TaskManager,  the hdfs file in distributedCache would be 
retrieved directly from HDFS,but not the blobServer.

> failed to submit JobGraph when registered hdfs file in DistributedCache 
> 
>
> Key: FLINK-13758
> URL: https://issues.apache.org/jira/browse/FLINK-13758
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1
>Reporter: luoguohao
>Priority: Major
>
> when using HDFS files for DistributedCache, it would failed to submit 
> jobGraph, we can see exceptions stack traces in log file after a while, but 
> if DistributedCache file is a local file, every thing goes fine.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache

2019-08-17 Thread luoguohao (JIRA)
luoguohao created FLINK-13758:
-

 Summary: failed to submit JobGraph when registered hdfs file in 
DistributedCache 
 Key: FLINK-13758
 URL: https://issues.apache.org/jira/browse/FLINK-13758
 Project: Flink
  Issue Type: Bug
  Components: Command Line Client
Affects Versions: 1.8.1, 1.8.0, 1.7.2, 1.6.4, 1.6.3
Reporter: luoguohao


when using HDFS files for DistributedCache, it would failed to submit jobGraph, 
we can see exceptions stack traces in log file after a while, but if 
DistributedCache file is a local file, every thing goes fine.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2019-01-14 Thread luoguohao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16742665#comment-16742665
 ] 

luoguohao commented on FLINK-11046:
---

[~tzulitai] yes, that's the point. thanks for giving a suggestion. i may not 
have enough time working on it, maybe next time. currently, i just get around 
this problem by re-indexing manually. [~xueyu] i am also looking forward to 
your commit and apply the patch when it is available. Thx.

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Assignee: xueyu
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
>if (concurrentRequests == 0) {
>latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
> `concurrentRequests` was set to zero. And i saw the the initialize for 
> bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =  
> callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make 
> sense, but i still wonder why the retry operation is not in the same thread 
> as the bulk process execution, after i read the code, `bulkAsync` method 
> might be the last puzzle.
> 

[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-04 Thread luoguohao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708469#comment-16708469
 ] 

luoguohao commented on FLINK-11046:
---

HI, [~tzulitai],

Thank you for your reply. Because our elasticSearch scheme is configured as 
strict mode, so if some unexpected `strict_dynamic_mapping_exception` be 
thrown, i would check if the field is invalid, otherwise, remapping the schema, 
and re-adding the index operation into the `RequestIndexer` instance.

The snippet of the code would as simple as the follow:
{code:java}
class LoadTraitIntoEsFailureHandler
extends ActionRequestFailureHandler {

 override def onFailure(action: ActionRequest,
   failure: Throwable,
   restStatusCode: Int,
   indexer: RequestIndexer): Unit = {
   failure match {
 case exception: ElasticsearchException
if isStrictDynamicMappingException(exception) ⇒ 
val updateAction = action.asInstanceOf[UpdateRequest]
// remapping the es schema if needed
...
// finally, re-adding the action if needed
indexer.add(updateAction)
 case others ⇒
   // do nothing
   }
 }
}{code}
That's all things for the retry operation.

 

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
>if (concurrentRequests == 0) {
>latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why 

[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-02 Thread luoguohao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706611#comment-16706611
 ] 

luoguohao commented on FLINK-11046:
---

sorry for missing that, here are all my settings for the ES sink:
 * bulk.flush.max.actions: 1000
 * bulk.flush.interval.ms: 10s
 * bulk.flush.max.size.mb: 10M
 * bulk.flush.backoff.enable: true
 * bulk.flush.backoff.retries: 3
 * bulk.flush.backoff.type: EXPONENTIAL
 * bulk.flush.backoff.delay: 1minute

 

 

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
>if (concurrentRequests == 0) {
>latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
> `concurrentRequests` was set to zero. And i saw the the initialize for 
> bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =  
> callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make 
> sense, but i still wonder why the retry operation is not in the same thread 
> as the bulk process execution, after i read the code, `bulkAsync` method 
> might be the last puzzle.
> {code:java}
> 

[jira] [Updated] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-02 Thread luoguohao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoguohao updated FLINK-11046:
--
Description: 
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find that 
`synchronized` is needed on each add operation.
{code:java}
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object 
payload) {
  ensureOpen();
  bulkRequest.add(request, payload);
  executeIfNeeded();
}
{code}
And, at i also noticed that `bulkprocessor` object would also locked in the 
bulk process thread. 

the bulk process operation is in the following code:
{code:java}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new 
ActionListener() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}

@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
   if (concurrentRequests == 0) {
   latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) {  // if we fail on 
client.bulk() release the semaphore
toRelease.run();
}
}
}
{code}
As the read line i marked above, i think, that's the reason why the retry 
operation thread was block, because the the bulk process thread never release 
the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
`concurrentRequests` was set to zero. And i saw the the initialize for 
bulkprocessor in class `ElasticsearchSinkBase`:
{code:java}
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 ...
 BulkProcessor.Builder bulkProcessorBuilder =  
callBridge.createBulkProcessorBuilder(client, listener);

 // This makes flush() blocking
 bulkProcessorBuilder.setConcurrentRequests(0);
 
 ...

 return bulkProcessorBuilder.build();
}
{code}
 this field value was set to zero explicitly. So, all things seems to make 
sense, but i still wonder why the retry operation is not in the same thread as 
the bulk process execution, after i read the code, `bulkAsync` method might be 
the last puzzle.
{code:java}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient 
client, BulkProcessor.Listener listener) {
 return BulkProcessor.builder(client::bulkAsync, listener);
}
{code}
So, I hope someone can help to fix this problem, or given some suggestions, and 
also i can make a try to take it. 
 Thanks a lot !

  was:
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find 

[jira] [Updated] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-02 Thread luoguohao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoguohao updated FLINK-11046:
--
Description: 
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find that 
`synchronized` is needed on each add operation.
{code:java}
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object 
payload) {
  ensureOpen();
  bulkRequest.add(request, payload);
  executeIfNeeded();
}
{code}
And, at i also noticed that `bulkprocessor` object would also locked in the 
bulk process thread. 

the bulk process operation is in the following code:
{code:java}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new 
ActionListener() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}

@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
   {color:red} if (concurrentRequests == 0) {
latch.await();
}{color}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) {  // if we fail on 
client.bulk() release the semaphore
toRelease.run();
}
}
}
{code}
As the read line i marked above, i think, that's the reason why the retry 
operation thread was block, because the the bulk process thread never release 
the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
`concurrentRequests` was set to zero. And i saw the the initialize for 
bulkprocessor in class `ElasticsearchSinkBase`:
{code:java}
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 ...
 BulkProcessor.Builder bulkProcessorBuilder =  
callBridge.createBulkProcessorBuilder(client, listener);

 // This makes flush() blocking
 bulkProcessorBuilder.setConcurrentRequests(0);
 
 ...

 return bulkProcessorBuilder.build();
}
{code}
 this field value was set to zero explicitly. So, all things seems to make 
sense, but i still wonder why the retry operation is not in the same thread as 
the bulk process execution, after i read the code, `bulkAsync` method might be 
the last puzzle.
{code:java}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient 
client, BulkProcessor.Listener listener) {
 return BulkProcessor.builder(client::bulkAsync, listener);
}
{code}
So, I hope someone can help to fix this problem, or given some suggestions, and 
also i can make a try to take it. 
 Thanks a lot !

  was:
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the 

[jira] [Updated] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-01 Thread luoguohao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoguohao updated FLINK-11046:
--
Description: 
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find that 
`synchronized` is needed on each add operation.
{code:java}
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object 
payload) {
  ensureOpen();
  bulkRequest.add(request, payload);
  executeIfNeeded();
}
{code}
And, at i also noticed that `bulkprocessor` object would also locked in the 
bulk process thread. 

the bulk process operation is in the following code:
{code:java}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new 
ActionListener() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}

@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) {  // if we fail on 
client.bulk() release the semaphore
toRelease.run();
}
}
}
{code}
As the read line i marked above, i think, that's the reason why the retry 
operation thread was block, because the the bulk process thread never release 
the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
`concurrentRequests` was set to zero. And i saw the the initialize for 
bulkprocessor in class `ElasticsearchSinkBase`:
{code:java}
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 ...
 BulkProcessor.Builder bulkProcessorBuilder =  
callBridge.createBulkProcessorBuilder(client, listener);

 // This makes flush() blocking
 bulkProcessorBuilder.setConcurrentRequests(0);
 
 ...

 return bulkProcessorBuilder.build();
}
{code}
 this field value was set to zero explicitly. So, all things seems to make 
sense, but i still wonder why the retry operation is not in the same thread as 
the bulk process execution, after i read the code, `bulkAsync` method might be 
the last puzzle.
{code:java}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient 
client, BulkProcessor.Listener listener) {
 return BulkProcessor.builder(client::bulkAsync, listener);
}
{code}
So, I hope someone can help to fix this problem, or given some suggestions, and 
also i can make a try to take it. 
 Thanks a lot !

  was:
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find 

[jira] [Created] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-01 Thread luoguohao (JIRA)
luoguohao created FLINK-11046:
-

 Summary: ElasticSearch6Connector cause thread blocked when index 
failed with retry
 Key: FLINK-11046
 URL: https://issues.apache.org/jira/browse/FLINK-11046
 Project: Flink
  Issue Type: Bug
  Components: ElasticSearch Connector
Affects Versions: 1.6.2
Reporter: luoguohao


When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find that 
`synchronized` is needed on each add operation.

 

 
{code:java}
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object 
payload) {
  ensureOpen();
  bulkRequest.add(request, payload);
  executeIfNeeded();
}
{code}
 

 

And, at i also noticed that `bulkprocessor` object would also locked in the 
bulk process thread. 

the bulk process operation is in the following code:
{code:java}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new 
ActionListener() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}

@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) {  // if we fail on 
client.bulk() release the semaphore
toRelease.run();
}
}
}
{code}
As the read line i marked above, i think, that's the reason why the retry 
operation thread was block, because the the bulk process thread never release 
the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
`concurrentRequests` was set to zero. And i saw the the initialize for 
bulkprocessor in class `ElasticsearchSinkBase`:
{code:java}
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 ...
 BulkProcessor.Builder bulkProcessorBuilder =  
callBridge.createBulkProcessorBuilder(client, listener);

 // This makes flush() blocking
 bulkProcessorBuilder.setConcurrentRequests(0);
 
 ...

 return bulkProcessorBuilder.build();
}
{code}
 this field value was set to zero explicitly. So, all things seems to make 
sense, but i still wonder why the retry operation is not in the same thread as 
the bulk process execution, after i read the code, `bulkAsync` method might be 
the last puzzle.
{code:java}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient 
client, BulkProcessor.Listener listener) {
 return BulkProcessor.builder(client::bulkAsync, listener);
}
{code}
So, I hope someone can help to fix this problem, or given some suggestions, and 
also i can make a try to take it. 
Thanks a lot !



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)