[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018798#comment-17018798 ] luoguohao commented on FLINK-13758: --- i'm ok to transfer this ticket to you if you have much more time to handle it.[~fly_in_gis] > failed to submit JobGraph when registered hdfs file in DistributedCache > > > Key: FLINK-13758 > URL: https://issues.apache.org/jira/browse/FLINK-13758 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1, 1.9.0, 1.9.1, 1.10.0 >Reporter: luoguohao >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 40m > Remaining Estimate: 0h > > when using HDFS files for DistributedCache, it would failed to submit > jobGraph, we can see exceptions stack traces in log file after a while, but > if DistributedCache file is a local file, every thing goes fine. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018534#comment-17018534 ] luoguohao commented on FLINK-13758: --- Yes,it really took long time to fix this issue,I already made a pull request for this issue,but it have not so much progress on it .after all, thanks,@Yang Wang -- 发自我的网易邮箱手机智能版 在 2020-01-16 16:50:00,"Yang Wang (Jira)" 写道: Currently, the public API {{StreamExecutionEnvironment#registerCachedFile}} has been broken. We could not register remote cached files for all session deployments(standalone, Yarn, Kubernetes, Mesos). Since they are using {{RestClusterClient}} to submit job. [~gjy], [~liyu] Do you think this ticket should be a blocker for 1.10? | > failed to submit JobGraph when registered hdfs file in DistributedCache > > > Key: FLINK-13758 > URL: https://issues.apache.org/jira/browse/FLINK-13758 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1, 1.9.0, 1.9.1, 1.10.0 >Reporter: luoguohao >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 40m > Remaining Estimate: 0h > > when using HDFS files for DistributedCache, it would failed to submit > jobGraph, we can see exceptions stack traces in log file after a while, but > if DistributedCache file is a local file, every thing goes fine. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924706#comment-16924706 ] luoguohao commented on FLINK-13758: --- [~Zentol] ok, the key problem is that when i try to register a hdfs file into DistributedCache: {code:java} env.registerCachedFile("hdfs://myhdfs/path/of/file", "test_data", false) {code} it will fail to execute job on cluster mode, but local mode is ok. by the way, the hdfs path is accessible from the cluster. > failed to submit JobGraph when registered hdfs file in DistributedCache > > > Key: FLINK-13758 > URL: https://issues.apache.org/jira/browse/FLINK-13758 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1 >Reporter: luoguohao >Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > when using HDFS files for DistributedCache, it would failed to submit > jobGraph, we can see exceptions stack traces in log file after a while, but > if DistributedCache file is a local file, every thing goes fine. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16917639#comment-16917639 ] luoguohao commented on FLINK-13758: --- [~Zentol] Thanks for reply, in the current implementation, HDFS files will not stored in the BlobServer, and HDFS files is retrieved directly from HDFS when the Task initialized in TaskManager, so if we want to store all the files into BlobServer, the scope of code adjustment is wider. And in my opinion, as long as the user choose the HDFS file as a DistributeCache file, the user should make sure that the file is accessible from the cluster, but not the cluster itself. > failed to submit JobGraph when registered hdfs file in DistributedCache > > > Key: FLINK-13758 > URL: https://issues.apache.org/jira/browse/FLINK-13758 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1 >Reporter: luoguohao >Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > when using HDFS files for DistributedCache, it would failed to submit > jobGraph, we can see exceptions stack traces in log file after a while, but > if DistributedCache file is a local file, every thing goes fine. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910185#comment-16910185 ] luoguohao commented on FLINK-13758: --- [~fly_in_gis] yes. this works in local mode, because the different code path. but if you deploy the application on the cluster, it would failed after a while (default would be 100 minutes). > failed to submit JobGraph when registered hdfs file in DistributedCache > > > Key: FLINK-13758 > URL: https://issues.apache.org/jira/browse/FLINK-13758 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1 >Reporter: luoguohao >Priority: Major > > when using HDFS files for DistributedCache, it would failed to submit > jobGraph, we can see exceptions stack traces in log file after a while, but > if DistributedCache file is a local file, every thing goes fine. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16909628#comment-16909628 ] luoguohao commented on FLINK-13758: --- after reading the code, i found that every files registered in the distributedCache will uploaded to the jobManager, but in fact, only local file should be uploaded to add the file into the blobServer, other file types such as hdfs, is not needed, which only stores the file path in jobManager. When Task inits in TaskManager, the hdfs file in distributedCache would be retrieved directly from HDFS,but not the blobServer. > failed to submit JobGraph when registered hdfs file in DistributedCache > > > Key: FLINK-13758 > URL: https://issues.apache.org/jira/browse/FLINK-13758 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1 >Reporter: luoguohao >Priority: Major > > when using HDFS files for DistributedCache, it would failed to submit > jobGraph, we can see exceptions stack traces in log file after a while, but > if DistributedCache file is a local file, every thing goes fine. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13758) failed to submit JobGraph when registered hdfs file in DistributedCache
luoguohao created FLINK-13758: - Summary: failed to submit JobGraph when registered hdfs file in DistributedCache Key: FLINK-13758 URL: https://issues.apache.org/jira/browse/FLINK-13758 Project: Flink Issue Type: Bug Components: Command Line Client Affects Versions: 1.8.1, 1.8.0, 1.7.2, 1.6.4, 1.6.3 Reporter: luoguohao when using HDFS files for DistributedCache, it would failed to submit jobGraph, we can see exceptions stack traces in log file after a while, but if DistributedCache file is a local file, every thing goes fine. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16742665#comment-16742665 ] luoguohao commented on FLINK-11046: --- [~tzulitai] yes, that's the point. thanks for giving a suggestion. i may not have enough time working on it, maybe next time. currently, i just get around this problem by re-indexing manually. [~xueyu] i am also looking forward to your commit and apply the patch when it is available. Thx. > ElasticSearch6Connector cause thread blocked when index failed with retry > - > > Key: FLINK-11046 > URL: https://issues.apache.org/jira/browse/FLINK-11046 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.2 >Reporter: luoguohao >Assignee: xueyu >Priority: Major > > When i'm using es6 sink to index into es, bulk process with some exception > catched, and i trying to reindex the document with the call > `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` > method, but things goes incorrect. The call thread stuck there, and with the > thread dump, i saw the `bulkprocessor` object was locked by other thread. > {code:java} > public interface ActionRequestFailureHandler extends Serializable { > void onFailure(ActionRequest action, Throwable failure, int restStatusCode, > RequestIndexer indexer) throws Throwable; > } > {code} > After i read the code implemented in the `indexer.add(action)`, i find that > `synchronized` is needed on each add operation. > {code:java} > private synchronized void internalAdd(DocWriteRequest request, @Nullable > Object payload) { > ensureOpen(); > bulkRequest.add(request, payload); > executeIfNeeded(); > } > {code} > And, at i also noticed that `bulkprocessor` object would also locked in the > bulk process thread. > the bulk process operation is in the following code: > {code:java} > public void execute(BulkRequest bulkRequest, long executionId) { > Runnable toRelease = () -> {}; > boolean bulkRequestSetupSuccessful = false; > try { > listener.beforeBulk(executionId, bulkRequest); > semaphore.acquire(); > toRelease = semaphore::release; > CountDownLatch latch = new CountDownLatch(1); > retry.withBackoff(consumer, bulkRequest, new > ActionListener() { > @Override > public void onResponse(BulkResponse response) { > try { > listener.afterBulk(executionId, bulkRequest, response); > } finally { > semaphore.release(); > latch.countDown(); > } > } > @Override > public void onFailure(Exception e) { > try { > listener.afterBulk(executionId, bulkRequest, e); > } finally { > semaphore.release(); > latch.countDown(); > } > } > }, Settings.EMPTY); > bulkRequestSetupSuccessful = true; >if (concurrentRequests == 0) { >latch.await(); > } > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > logger.info(() -> new ParameterizedMessage("Bulk request {} has been > cancelled.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } catch (Exception e) { > logger.warn(() -> new ParameterizedMessage("Failed to execute bulk > request {}.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } finally { > if (bulkRequestSetupSuccessful == false) { // if we fail on > client.bulk() release the semaphore > toRelease.run(); > } > } > } > {code} > As the read line i marked above, i think, that's the reason why the retry > operation thread was block, because the the bulk process thread never release > the lock on `bulkprocessor`. and, i also trying to figure out why the field > `concurrentRequests` was set to zero. And i saw the the initialize for > bulkprocessor in class `ElasticsearchSinkBase`: > {code:java} > protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { > ... > BulkProcessor.Builder bulkProcessorBuilder = > callBridge.createBulkProcessorBuilder(client, listener); > // This makes flush() blocking > bulkProcessorBuilder.setConcurrentRequests(0); > > ... > return bulkProcessorBuilder.build(); > } > {code} > this field value was set to zero explicitly. So, all things seems to make > sense, but i still wonder why the retry operation is not in the same thread > as the bulk process execution, after i read the code, `bulkAsync` method > might be the last puzzle. >
[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708469#comment-16708469 ] luoguohao commented on FLINK-11046: --- HI, [~tzulitai], Thank you for your reply. Because our elasticSearch scheme is configured as strict mode, so if some unexpected `strict_dynamic_mapping_exception` be thrown, i would check if the field is invalid, otherwise, remapping the schema, and re-adding the index operation into the `RequestIndexer` instance. The snippet of the code would as simple as the follow: {code:java} class LoadTraitIntoEsFailureHandler extends ActionRequestFailureHandler { override def onFailure(action: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer): Unit = { failure match { case exception: ElasticsearchException if isStrictDynamicMappingException(exception) ⇒ val updateAction = action.asInstanceOf[UpdateRequest] // remapping the es schema if needed ... // finally, re-adding the action if needed indexer.add(updateAction) case others ⇒ // do nothing } } }{code} That's all things for the retry operation. > ElasticSearch6Connector cause thread blocked when index failed with retry > - > > Key: FLINK-11046 > URL: https://issues.apache.org/jira/browse/FLINK-11046 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.2 >Reporter: luoguohao >Priority: Major > > When i'm using es6 sink to index into es, bulk process with some exception > catched, and i trying to reindex the document with the call > `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` > method, but things goes incorrect. The call thread stuck there, and with the > thread dump, i saw the `bulkprocessor` object was locked by other thread. > {code:java} > public interface ActionRequestFailureHandler extends Serializable { > void onFailure(ActionRequest action, Throwable failure, int restStatusCode, > RequestIndexer indexer) throws Throwable; > } > {code} > After i read the code implemented in the `indexer.add(action)`, i find that > `synchronized` is needed on each add operation. > {code:java} > private synchronized void internalAdd(DocWriteRequest request, @Nullable > Object payload) { > ensureOpen(); > bulkRequest.add(request, payload); > executeIfNeeded(); > } > {code} > And, at i also noticed that `bulkprocessor` object would also locked in the > bulk process thread. > the bulk process operation is in the following code: > {code:java} > public void execute(BulkRequest bulkRequest, long executionId) { > Runnable toRelease = () -> {}; > boolean bulkRequestSetupSuccessful = false; > try { > listener.beforeBulk(executionId, bulkRequest); > semaphore.acquire(); > toRelease = semaphore::release; > CountDownLatch latch = new CountDownLatch(1); > retry.withBackoff(consumer, bulkRequest, new > ActionListener() { > @Override > public void onResponse(BulkResponse response) { > try { > listener.afterBulk(executionId, bulkRequest, response); > } finally { > semaphore.release(); > latch.countDown(); > } > } > @Override > public void onFailure(Exception e) { > try { > listener.afterBulk(executionId, bulkRequest, e); > } finally { > semaphore.release(); > latch.countDown(); > } > } > }, Settings.EMPTY); > bulkRequestSetupSuccessful = true; >if (concurrentRequests == 0) { >latch.await(); > } > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > logger.info(() -> new ParameterizedMessage("Bulk request {} has been > cancelled.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } catch (Exception e) { > logger.warn(() -> new ParameterizedMessage("Failed to execute bulk > request {}.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } finally { > if (bulkRequestSetupSuccessful == false) { // if we fail on > client.bulk() release the semaphore > toRelease.run(); > } > } > } > {code} > As the read line i marked above, i think, that's the reason why the retry > operation thread was block, because the the bulk process thread never release > the lock on `bulkprocessor`. and, i also trying to figure out why
[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706611#comment-16706611 ] luoguohao commented on FLINK-11046: --- sorry for missing that, here are all my settings for the ES sink: * bulk.flush.max.actions: 1000 * bulk.flush.interval.ms: 10s * bulk.flush.max.size.mb: 10M * bulk.flush.backoff.enable: true * bulk.flush.backoff.retries: 3 * bulk.flush.backoff.type: EXPONENTIAL * bulk.flush.backoff.delay: 1minute > ElasticSearch6Connector cause thread blocked when index failed with retry > - > > Key: FLINK-11046 > URL: https://issues.apache.org/jira/browse/FLINK-11046 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.2 >Reporter: luoguohao >Priority: Major > > When i'm using es6 sink to index into es, bulk process with some exception > catched, and i trying to reindex the document with the call > `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` > method, but things goes incorrect. The call thread stuck there, and with the > thread dump, i saw the `bulkprocessor` object was locked by other thread. > {code:java} > public interface ActionRequestFailureHandler extends Serializable { > void onFailure(ActionRequest action, Throwable failure, int restStatusCode, > RequestIndexer indexer) throws Throwable; > } > {code} > After i read the code implemented in the `indexer.add(action)`, i find that > `synchronized` is needed on each add operation. > {code:java} > private synchronized void internalAdd(DocWriteRequest request, @Nullable > Object payload) { > ensureOpen(); > bulkRequest.add(request, payload); > executeIfNeeded(); > } > {code} > And, at i also noticed that `bulkprocessor` object would also locked in the > bulk process thread. > the bulk process operation is in the following code: > {code:java} > public void execute(BulkRequest bulkRequest, long executionId) { > Runnable toRelease = () -> {}; > boolean bulkRequestSetupSuccessful = false; > try { > listener.beforeBulk(executionId, bulkRequest); > semaphore.acquire(); > toRelease = semaphore::release; > CountDownLatch latch = new CountDownLatch(1); > retry.withBackoff(consumer, bulkRequest, new > ActionListener() { > @Override > public void onResponse(BulkResponse response) { > try { > listener.afterBulk(executionId, bulkRequest, response); > } finally { > semaphore.release(); > latch.countDown(); > } > } > @Override > public void onFailure(Exception e) { > try { > listener.afterBulk(executionId, bulkRequest, e); > } finally { > semaphore.release(); > latch.countDown(); > } > } > }, Settings.EMPTY); > bulkRequestSetupSuccessful = true; >if (concurrentRequests == 0) { >latch.await(); > } > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > logger.info(() -> new ParameterizedMessage("Bulk request {} has been > cancelled.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } catch (Exception e) { > logger.warn(() -> new ParameterizedMessage("Failed to execute bulk > request {}.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } finally { > if (bulkRequestSetupSuccessful == false) { // if we fail on > client.bulk() release the semaphore > toRelease.run(); > } > } > } > {code} > As the read line i marked above, i think, that's the reason why the retry > operation thread was block, because the the bulk process thread never release > the lock on `bulkprocessor`. and, i also trying to figure out why the field > `concurrentRequests` was set to zero. And i saw the the initialize for > bulkprocessor in class `ElasticsearchSinkBase`: > {code:java} > protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { > ... > BulkProcessor.Builder bulkProcessorBuilder = > callBridge.createBulkProcessorBuilder(client, listener); > // This makes flush() blocking > bulkProcessorBuilder.setConcurrentRequests(0); > > ... > return bulkProcessorBuilder.build(); > } > {code} > this field value was set to zero explicitly. So, all things seems to make > sense, but i still wonder why the retry operation is not in the same thread > as the bulk process execution, after i read the code, `bulkAsync` method > might be the last puzzle. > {code:java} >
[jira] [Updated] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoguohao updated FLINK-11046: -- Description: When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. {code:java} public interface ActionRequestFailureHandler extends Serializable { void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } {code} After i read the code implemented in the `indexer.add(action)`, i find that `synchronized` is needed on each add operation. {code:java} private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); } {code} And, at i also noticed that `bulkprocessor` object would also locked in the bulk process thread. the bulk process operation is in the following code: {code:java} public void execute(BulkRequest bulkRequest, long executionId) { Runnable toRelease = () -> {}; boolean bulkRequestSetupSuccessful = false; try { listener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); retry.withBackoff(consumer, bulkRequest, new ActionListener() { @Override public void onResponse(BulkResponse response) { try { listener.afterBulk(executionId, bulkRequest, response); } finally { semaphore.release(); latch.countDown(); } } @Override public void onFailure(Exception e) { try { listener.afterBulk(executionId, bulkRequest, e); } finally { semaphore.release(); latch.countDown(); } } }, Settings.EMPTY); bulkRequestSetupSuccessful = true; if (concurrentRequests == 0) { latch.await(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } finally { if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore toRelease.run(); } } } {code} As the read line i marked above, i think, that's the reason why the retry operation thread was block, because the the bulk process thread never release the lock on `bulkprocessor`. and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`: {code:java} protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { ... BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener); // This makes flush() blocking bulkProcessorBuilder.setConcurrentRequests(0); ... return bulkProcessorBuilder.build(); } {code} this field value was set to zero explicitly. So, all things seems to make sense, but i still wonder why the retry operation is not in the same thread as the bulk process execution, after i read the code, `bulkAsync` method might be the last puzzle. {code:java} @Override public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) { return BulkProcessor.builder(client::bulkAsync, listener); } {code} So, I hope someone can help to fix this problem, or given some suggestions, and also i can make a try to take it. Thanks a lot ! was: When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. {code:java} public interface ActionRequestFailureHandler extends Serializable { void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } {code} After i read the code implemented in the `indexer.add(action)`, i find
[jira] [Updated] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoguohao updated FLINK-11046: -- Description: When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. {code:java} public interface ActionRequestFailureHandler extends Serializable { void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } {code} After i read the code implemented in the `indexer.add(action)`, i find that `synchronized` is needed on each add operation. {code:java} private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); } {code} And, at i also noticed that `bulkprocessor` object would also locked in the bulk process thread. the bulk process operation is in the following code: {code:java} public void execute(BulkRequest bulkRequest, long executionId) { Runnable toRelease = () -> {}; boolean bulkRequestSetupSuccessful = false; try { listener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); retry.withBackoff(consumer, bulkRequest, new ActionListener() { @Override public void onResponse(BulkResponse response) { try { listener.afterBulk(executionId, bulkRequest, response); } finally { semaphore.release(); latch.countDown(); } } @Override public void onFailure(Exception e) { try { listener.afterBulk(executionId, bulkRequest, e); } finally { semaphore.release(); latch.countDown(); } } }, Settings.EMPTY); bulkRequestSetupSuccessful = true; {color:red} if (concurrentRequests == 0) { latch.await(); }{color} } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } finally { if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore toRelease.run(); } } } {code} As the read line i marked above, i think, that's the reason why the retry operation thread was block, because the the bulk process thread never release the lock on `bulkprocessor`. and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`: {code:java} protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { ... BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener); // This makes flush() blocking bulkProcessorBuilder.setConcurrentRequests(0); ... return bulkProcessorBuilder.build(); } {code} this field value was set to zero explicitly. So, all things seems to make sense, but i still wonder why the retry operation is not in the same thread as the bulk process execution, after i read the code, `bulkAsync` method might be the last puzzle. {code:java} @Override public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) { return BulkProcessor.builder(client::bulkAsync, listener); } {code} So, I hope someone can help to fix this problem, or given some suggestions, and also i can make a try to take it. Thanks a lot ! was: When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. {code:java} public interface ActionRequestFailureHandler extends Serializable { void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } {code} After i read the code implemented in the
[jira] [Updated] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoguohao updated FLINK-11046: -- Description: When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. {code:java} public interface ActionRequestFailureHandler extends Serializable { void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } {code} After i read the code implemented in the `indexer.add(action)`, i find that `synchronized` is needed on each add operation. {code:java} private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); } {code} And, at i also noticed that `bulkprocessor` object would also locked in the bulk process thread. the bulk process operation is in the following code: {code:java} public void execute(BulkRequest bulkRequest, long executionId) { Runnable toRelease = () -> {}; boolean bulkRequestSetupSuccessful = false; try { listener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); retry.withBackoff(consumer, bulkRequest, new ActionListener() { @Override public void onResponse(BulkResponse response) { try { listener.afterBulk(executionId, bulkRequest, response); } finally { semaphore.release(); latch.countDown(); } } @Override public void onFailure(Exception e) { try { listener.afterBulk(executionId, bulkRequest, e); } finally { semaphore.release(); latch.countDown(); } } }, Settings.EMPTY); bulkRequestSetupSuccessful = true; if (concurrentRequests == 0) { latch.await(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } finally { if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore toRelease.run(); } } } {code} As the read line i marked above, i think, that's the reason why the retry operation thread was block, because the the bulk process thread never release the lock on `bulkprocessor`. and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`: {code:java} protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { ... BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener); // This makes flush() blocking bulkProcessorBuilder.setConcurrentRequests(0); ... return bulkProcessorBuilder.build(); } {code} this field value was set to zero explicitly. So, all things seems to make sense, but i still wonder why the retry operation is not in the same thread as the bulk process execution, after i read the code, `bulkAsync` method might be the last puzzle. {code:java} @Override public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) { return BulkProcessor.builder(client::bulkAsync, listener); } {code} So, I hope someone can help to fix this problem, or given some suggestions, and also i can make a try to take it. Thanks a lot ! was: When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. {code:java} public interface ActionRequestFailureHandler extends Serializable { void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } {code} After i read the code implemented in the `indexer.add(action)`, i find
[jira] [Created] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
luoguohao created FLINK-11046: - Summary: ElasticSearch6Connector cause thread blocked when index failed with retry Key: FLINK-11046 URL: https://issues.apache.org/jira/browse/FLINK-11046 Project: Flink Issue Type: Bug Components: ElasticSearch Connector Affects Versions: 1.6.2 Reporter: luoguohao When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. {code:java} public interface ActionRequestFailureHandler extends Serializable { void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } {code} After i read the code implemented in the `indexer.add(action)`, i find that `synchronized` is needed on each add operation. {code:java} private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); } {code} And, at i also noticed that `bulkprocessor` object would also locked in the bulk process thread. the bulk process operation is in the following code: {code:java} public void execute(BulkRequest bulkRequest, long executionId) { Runnable toRelease = () -> {}; boolean bulkRequestSetupSuccessful = false; try { listener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); retry.withBackoff(consumer, bulkRequest, new ActionListener() { @Override public void onResponse(BulkResponse response) { try { listener.afterBulk(executionId, bulkRequest, response); } finally { semaphore.release(); latch.countDown(); } } @Override public void onFailure(Exception e) { try { listener.afterBulk(executionId, bulkRequest, e); } finally { semaphore.release(); latch.countDown(); } } }, Settings.EMPTY); bulkRequestSetupSuccessful = true; if (concurrentRequests == 0) { latch.await(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } finally { if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore toRelease.run(); } } } {code} As the read line i marked above, i think, that's the reason why the retry operation thread was block, because the the bulk process thread never release the lock on `bulkprocessor`. and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`: {code:java} protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { ... BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener); // This makes flush() blocking bulkProcessorBuilder.setConcurrentRequests(0); ... return bulkProcessorBuilder.build(); } {code} this field value was set to zero explicitly. So, all things seems to make sense, but i still wonder why the retry operation is not in the same thread as the bulk process execution, after i read the code, `bulkAsync` method might be the last puzzle. {code:java} @Override public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) { return BulkProcessor.builder(client::bulkAsync, listener); } {code} So, I hope someone can help to fix this problem, or given some suggestions, and also i can make a try to take it. Thanks a lot ! -- This message was sent by Atlassian JIRA (v7.6.3#76005)