[jira] [Created] (FLINK-16574) StreamingFileSink should rename files or fail if destination file already exists
static-max created FLINK-16574: -- Summary: StreamingFileSink should rename files or fail if destination file already exists Key: FLINK-16574 URL: https://issues.apache.org/jira/browse/FLINK-16574 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.9.1 Environment: We're using Flink 1.9.1 on YARN with Horton HDP 2.7.3. Reporter: static-max I switched from BucketingSink to StreamingFileSink so my state could not be restored after starting from a savepoint. Upon start of the job there were already part-0-0 and part-0-1 files in the HDFS destination folder. The StreamingFileSink then creates a file like .part-0-0.inprogress.d1849354-39d4-4634-8fb3-dfb8e2083857{color:#172b4d}. When the file is rolled Flink tries to rename it to part-0-0, but that file already exists. NameNode logs "WARN hdfs.StateChange (FSDirRenameOp.java:unprotectedRenameTo(174)) - DIR* FSDirectory.unprotectedRenameTo: failed to rename to beca use destination exists".{color} Flink does not care and creates a new file like .part-0-1.inprogress.d {color:#172b4d}for the next bucket and the game continues until the part index counter is so high the file can be renamed. But now I'm left with a lot of .part-xxx.inprogress.xxx that I need to rename by hand if I don't want to lose the data.{color} I would expect Flink to either fail if the file cannot be renamed, or auto-rename it to filename that does not exists yet. The same happens when not starting from a savepoint. IIRC the BucketingFileSink did not have this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15832) Artifact flink-metrics-core-tests will be uploaded twice
[ https://issues.apache.org/jira/browse/FLINK-15832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027461#comment-17027461 ] static-max commented on FLINK-15832: Setting {{shadeTestJar}} to {{false}} resolves the problem. > Artifact flink-metrics-core-tests will be uploaded twice > > > Key: FLINK-15832 > URL: https://issues.apache.org/jira/browse/FLINK-15832 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.9.1 > Reporter: static-max >Assignee: Chesnay Schepler >Priority: Major > Labels: build > > I built Flink 1.9.1 myself and merged the changes from > [https://github.com/apache/flink/pull/10936]. > When I uploaded the artifacts to our repository (using {{mvn deploy }} > {{-DaltDeploymentRepository}}) the build fails as > {{flink-metrics-core-tests}} will be uploaded twice and we have redeployments > disabled. > > I'm not sure if other artifacts are affected as well, as I enabled > redeployment as a quick workaround. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15832) Artifact flink-metrics-core-tests will be uploaded twice
static-max created FLINK-15832: -- Summary: Artifact flink-metrics-core-tests will be uploaded twice Key: FLINK-15832 URL: https://issues.apache.org/jira/browse/FLINK-15832 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.9.1 Reporter: static-max I built Flink 1.9.1 myself and merged the changes from [https://github.com/apache/flink/pull/10936]. When I uploaded the artifacts to our repository (using {{mvn deploy }} {{-DaltDeploymentRepository}}) the build fails as {{flink-metrics-core-tests}} will be uploaded twice and we have redeployments disabled. I'm not sure if other artifacts are affected as well, as I enabled redeployment as a quick workaround. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13689) Rest High Level Client for Elasticsearch6.x connector leaks threads if no connection could be established
[ https://issues.apache.org/jira/browse/FLINK-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17021282#comment-17021282 ] static-max commented on FLINK-13689: Hi there, I would like to contribute the changes proposed by [~aljoscha] at [https://github.com/apache/flink/pull/9468]. I already started working on a fix and testing it at the moment. > Rest High Level Client for Elasticsearch6.x connector leaks threads if no > connection could be established > - > > Key: FLINK-13689 > URL: https://issues.apache.org/jira/browse/FLINK-13689 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Affects Versions: 1.8.1 >Reporter: Rishindra Kumar >Assignee: Rishindra Kumar >Priority: Major > Labels: pull-request-available > Fix For: 1.8.4 > > Time Spent: 20m > Remaining Estimate: 0h > > If the created Elastic Search Rest High Level Client(rhlClient) is > unreachable, Current code throws RuntimeException. But, it doesn't close the > client which causes thread leak. > > *Current Code* > *if (!rhlClient.ping()) {* > *throw new RuntimeException("There are no reachable Elasticsearch > nodes!");* > *}* > > *Change Needed* > rhlClient needs to be closed. > > *Steps to Reproduce* > 1. Add the ElasticSearch Sink to the stream. Start the Flink program without > starting the ElasticSearch. > 2. Program will give error: "*Too many open files*" and it doesn't write even > though you start the Elastic Search later. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max closed the pull request at: https://github.com/apache/flink/pull/2861 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3246: [FLINK-5353] [elasticsearch] User-provided failure handle...
Github user static-max commented on the issue: https://github.com/apache/flink/pull/3246 Looks great! One note: In your example ExampleActionRequestFailureHandler you have to unwrap the Exception, as it is typical looks like: `RemoteTransportException[[Richard Rider][127.0.0.1:9301][indices:data/write/bulk[s]]]; nested: RemoteTransportException[[Richard Rider][127.0.0.1:9301][indices:data/write/bulk[s][p]]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@e5c47a1 on EsThreadPoolExecutor[bulk, queue capacity = 1, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7e2d5cc5[Running, pool size = 8, active threads = 8, queued tasks = 1, completed tasks = 119]]];` In my implementation I use Apache commons: ` ExceptionUtils.indexOfThrowable(throwable, EsRejectedExecutionException.class) >= 0 ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...
Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 @tzulitai OK, if you need any help feel free to ask. Are there plans to switch from the TransportClient to a pure HTTP client? That would reduce the Elasticsearch dependencies and would decouple the cluster's version from the TransportClient version used by Flink. In that case we won't get a Throwable anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...
Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 In my tests BulkItemResponse.getFailure().getCause() returns a RemoteTransportException like this: `RemoteTransportException[[Harrier][127.0.0.1:9302][indices:data/write/bulk[s]]]; nested: RemoteTransportException[[Harrier][127.0.0.1:9302][indices:data/write/bulk[s][p]]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@3a0f3a6e on EsThreadPoolExecutor[bulk, queue capacity = 2, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@5ac266bd[Running, pool size = 8, active threads = 8, queued tasks = 2, completed tasks = 206]]];` So the nested Exception needs to be checked. That's possible, I will implement that change. The last to Exceptions are common when a new Index gets created (if you have new index by day for example), or when a node leaves the cluster and no master can be elected (no quorum), --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...
Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 Hi @tzulitai, sure, go ahead :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...
Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 Ok, let's wait for the restructure and rebase this PR to support at least once. BTW, we're using my PR in production and haven't lost a single document since then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...
Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 The path of the connector has changed to "flink/flink-connectors/flink-connector-elasticsearch2/", how should I handle the conflict? Open a new PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r94382504 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (checkErrorAndRetryBulk && ( + failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) + ) { + LOG.debug("Retry bulk: {}", itemResp.getFailureMessage()); + reAddBulkRequest(request); --- End diff -- Your're right, it gets added multiple times, I'll fix that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91991599 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) requestIndexer = new BulkProcessorIndexer(bulkProcessor); } + /** +* Adds all requests of the bulk to the BulkProcessor. Used when trying again. +* @param bulkRequest +*/ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. --- End diff -- Currently I'm not aware of a way to filter these requests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91964180 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); --- End diff -- What level do you suggest? Personally I don't care for retried batches as long as the data gets into my ES cluster. When logging this as INFO or WARN, the logfile will get pretty messy on a cluster with high traffic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91963737 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors --- End diff -- I didn't find an alternative to check strings. I will add a flag an disable it by default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3404) Extend Kafka consumers with interface StoppableFunction
[ https://issues.apache.org/jira/browse/FLINK-3404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690734#comment-15690734 ] static-max commented on FLINK-3404: --- I'm in need of a _stop()_ method too, I will propose an PR for this issue if there is interest. > Extend Kafka consumers with interface StoppableFunction > --- > > Key: FLINK-3404 > URL: https://issues.apache.org/jira/browse/FLINK-3404 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Matthias J. Sax > > Kafka consumers are not stoppable right now. To make them stoppable, they > must implement {{StoppableFunction}}. Implementing method {{stop()}} must > ensure, that the consumer stops pulling new messages from Kafka and issues a > final checkpoint with the last offset. Afterwards, {{run()}} must return. > When implementing this, keep in mind, that the gathered checkpoint might > later be used as a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
GitHub user static-max opened a pull request: https://github.com/apache/flink/pull/2861 [FLINK-5122] Index requests will be retried if the error is only temp⦠This PR will re-add index requests to the BulkProcessor if the error is temporay, like * Generel timeout errors * No master * UnavailableShardsException (Rebalancing, Node down) * Bulk queue on node full You can merge this pull request into a Git repository by running: $ git pull https://github.com/static-max/flink flink-connector-elasticsearch2-robust Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2861.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2861 commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d Author: Max Kuklinski <max.kuklin...@live.de> Date: 2016-11-23T16:54:11Z [FLINK-5122] Index requests will be retried if the error is only temporary on Elasticsearch side. Covered are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load
[ https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] static-max reassigned FLINK-5122: - Assignee: static-max > Elasticsearch Sink loses documents when cluster has high load > - > > Key: FLINK-5122 > URL: https://issues.apache.org/jira/browse/FLINK-5122 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0 > Reporter: static-max >Assignee: static-max > > My cluster had high load and documents got not indexed. This violates the "at > least once" semantics in the ES connector. > I gave pressure on my cluster to test Flink, causing new indices to be > created and balanced. On those errors the bulk should be tried again instead > of being discarded. > Primary shard not active because ES decided to rebalance the index: > 2016-11-15 15:35:16,123 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > UnavailableShardsException[[index-name][3] primary shard is not active > Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] > requests]] > Bulk queue on node full (I set queue to a low value to reproduce error): > 22:37:57,702 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]]; > nested: EsRejectedExecutionException[rejected execution of > org.elasticsearch.transport.TransportService$4@727e677c on > EsThreadPoolExecutor[bulk, queue capacity = 1, > org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, > pool size = 2, active threads = 2, queued tasks = 1, completed tasks = > 2939]]]; > I can try to propose a PR for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load
[ https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686338#comment-15686338 ] static-max commented on FLINK-5122: --- Your PR does not handle single failed bulk requests, as far as I can see. But your PR makes me think about the whole error handling concept in the ES sink. hasFailure will only be checked on close(), which does not make sense to me. Would it be smarter to throw the Exception immediately when a document cannot be indexed? This way Flink will restart the job from a checkpoint and try again. > Elasticsearch Sink loses documents when cluster has high load > - > > Key: FLINK-5122 > URL: https://issues.apache.org/jira/browse/FLINK-5122 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0 > Reporter: static-max > > My cluster had high load and documents got not indexed. This violates the "at > least once" semantics in the ES connector. > I gave pressure on my cluster to test Flink, causing new indices to be > created and balanced. On those errors the bulk should be tried again instead > of being discarded. > Primary shard not active because ES decided to rebalance the index: > 2016-11-15 15:35:16,123 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > UnavailableShardsException[[index-name][3] primary shard is not active > Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] > requests]] > Bulk queue on node full (I set queue to a low value to reproduce error): > 22:37:57,702 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]]; > nested: EsRejectedExecutionException[rejected execution of > org.elasticsearch.transport.TransportService$4@727e677c on > EsThreadPoolExecutor[bulk, queue capacity = 1, > org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, > pool size = 2, active threads = 2, queued tasks = 1, completed tasks = > 2939]]]; > I can try to propose a PR for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load
static-max created FLINK-5122: - Summary: Elasticsearch Sink loses documents when cluster has high load Key: FLINK-5122 URL: https://issues.apache.org/jira/browse/FLINK-5122 Project: Flink Issue Type: Bug Components: Streaming Connectors Affects Versions: 1.2.0 Reporter: static-max My cluster had high load and documents got not indexed. This violates the "at least once" semantics in the ES connector. I gave pressure on my cluster to test Flink, causing new indices to be created and balanced. On those errors the bulk should be tried again instead of being discarded. Primary shard not active because ES decided to rebalance the index: 2016-11-15 15:35:16,123 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - Failed to index document in Elasticsearch: UnavailableShardsException[[index-name][3] primary shard is not active Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] requests]] Bulk queue on node full (I set queue to a low value to reproduce error): 22:37:57,702 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - Failed to index document in Elasticsearch: RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@727e677c on EsThreadPoolExecutor[bulk, queue capacity = 1, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 2939]]]; I can try to propose a PR for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4779) Restoring from savepoint fails when BucketingSink has not yet created folder
static-max created FLINK-4779: - Summary: Restoring from savepoint fails when BucketingSink has not yet created folder Key: FLINK-4779 URL: https://issues.apache.org/jira/browse/FLINK-4779 Project: Flink Issue Type: Bug Components: filesystem-connector Affects Versions: 1.2.0 Reporter: static-max When I restore from a savepoint, starting the job fails when the root-folder used by the BucketingSink not yet exists. This may happen in my case, when the source for my sink has not yet emitted any messages and I did not create the folder by hand. The complete folder structure is not required by the BucketingSink as it will create itermediate folders by itself when creating the bucket. I suggest that this does not prevent the job from being restarted. {code} 10/07/2016 22:50:53 Source: Kafka Consumer for X -> (Sink: HDFS for X, Sink: X)(1/1) switched to FAILED java.lang.Exception: Failed to restore state to function: Error while deleting old pending files. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:184) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:550) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:255) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error while deleting old pending files. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:805) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:139) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:182) ... 4 more Caused by: java.io.FileNotFoundException: File hdfs://server:8020/1/2/3/4/flink does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.(DistributedFileSystem.java:948) at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.(DistributedFileSystem.java:927) at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:872) at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:868) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:886) at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1696) at org.apache.hadoop.fs.FileSystem$6.(FileSystem.java:1791) at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1787) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:784) ... 6 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2579: [FLINK-4618] FlinkKafkaConsumer09 should start fro...
GitHub user static-max opened a pull request: https://github.com/apache/flink/pull/2579 [FLINK-4618] FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka This PR addresses https://issues.apache.org/jira/browse/FLINK-4618, which causes the last message to be read again from Kafka after a fresh start of the job. You can merge this pull request into a Git repository by running: $ git pull https://github.com/static-max/flink flink-connector-kafka-0.9-fix-duplicate-messages Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2579.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2579 commit 0b564203cdae3b21b00bb499b85feb799136e29b Author: static-max <max.kuklin...@live.de> Date: 2016-09-30T19:45:38Z Merge pull request #1 from apache/master Pull from origin commit 3618f5053e0ffb0ec1f789c56d878ed400e27056 Author: Max Kuklinski <max.kuklin...@live.de> Date: 2016-09-30T21:03:30Z FLINK-4618 Incremented the commited offset by one to avoid duplicate read message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---