[jira] [Created] (FLINK-16574) StreamingFileSink should rename files or fail if destination file already exists

2020-03-12 Thread static-max (Jira)
static-max created FLINK-16574:
--

 Summary: StreamingFileSink should rename files or fail if 
destination file already exists
 Key: FLINK-16574
 URL: https://issues.apache.org/jira/browse/FLINK-16574
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.9.1
 Environment: We're using Flink 1.9.1 on YARN with Horton HDP 2.7.3.
Reporter: static-max


I switched from BucketingSink to StreamingFileSink so my state could not be 
restored after starting from a savepoint.

Upon start of the job there were already part-0-0 and part-0-1 files in the 
HDFS destination folder. The StreamingFileSink then creates a file like 
.part-0-0.inprogress.d1849354-39d4-4634-8fb3-dfb8e2083857{color:#172b4d}. When 
the file is rolled Flink tries to rename it to part-0-0, but that file already 
exists. NameNode logs "WARN hdfs.StateChange 
(FSDirRenameOp.java:unprotectedRenameTo(174)) - DIR* 
FSDirectory.unprotectedRenameTo: failed to rename  to beca
use destination exists".{color}

Flink does not care and creates a new file like .part-0-1.inprogress.d 
{color:#172b4d}for the next bucket and the game continues until the part index 
counter is so high the file can be renamed. But now I'm left with a lot of 
.part-xxx.inprogress.xxx that I need to rename by hand if I don't want to lose 
the data.{color}

 

I would expect Flink to either fail if the file cannot be renamed, or 
auto-rename it to filename that does not exists yet.

The same happens when not starting from a savepoint. IIRC the BucketingFileSink 
did not have this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15832) Artifact flink-metrics-core-tests will be uploaded twice

2020-01-31 Thread static-max (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027461#comment-17027461
 ] 

static-max commented on FLINK-15832:


Setting {{shadeTestJar}} to {{false}} resolves the problem.

> Artifact flink-metrics-core-tests will be uploaded twice
> 
>
> Key: FLINK-15832
> URL: https://issues.apache.org/jira/browse/FLINK-15832
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.9.1
>    Reporter: static-max
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: build
>
> I built Flink 1.9.1 myself and merged the changes from 
> [https://github.com/apache/flink/pull/10936].
> When I uploaded the artifacts to our repository (using {{mvn deploy }}
> {{-DaltDeploymentRepository}}) the build fails as 
> {{flink-metrics-core-tests}} will be uploaded twice and we have redeployments 
> disabled.
>  
> I'm not sure if other artifacts are affected as well, as I enabled 
> redeployment as a quick workaround.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15832) Artifact flink-metrics-core-tests will be uploaded twice

2020-01-31 Thread static-max (Jira)
static-max created FLINK-15832:
--

 Summary: Artifact flink-metrics-core-tests will be uploaded twice
 Key: FLINK-15832
 URL: https://issues.apache.org/jira/browse/FLINK-15832
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.9.1
Reporter: static-max


I built Flink 1.9.1 myself and merged the changes from 
[https://github.com/apache/flink/pull/10936].

When I uploaded the artifacts to our repository (using {{mvn deploy }}
{{-DaltDeploymentRepository}}) the build fails as {{flink-metrics-core-tests}} 
will be uploaded twice and we have redeployments disabled.
 
I'm not sure if other artifacts are affected as well, as I enabled redeployment 
as a quick workaround.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13689) Rest High Level Client for Elasticsearch6.x connector leaks threads if no connection could be established

2020-01-22 Thread static-max (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17021282#comment-17021282
 ] 

static-max commented on FLINK-13689:


Hi there, I would like to contribute the changes proposed by [~aljoscha] at 
[https://github.com/apache/flink/pull/9468].

I already started working on a fix and testing it at the moment. 

> Rest High Level Client for Elasticsearch6.x connector leaks threads if no 
> connection could be established
> -
>
> Key: FLINK-13689
> URL: https://issues.apache.org/jira/browse/FLINK-13689
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.8.1
>Reporter: Rishindra Kumar
>Assignee: Rishindra Kumar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.4
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> If the created Elastic Search Rest High Level Client(rhlClient) is 
> unreachable, Current code throws RuntimeException. But, it doesn't close the 
> client which causes thread leak.
>  
> *Current Code*
> *if (!rhlClient.ping()) {*
>      *throw new RuntimeException("There are no reachable Elasticsearch 
> nodes!");*
> *}*
>  
> *Change Needed*
> rhlClient needs to be closed.
>  
> *Steps to Reproduce*
> 1. Add the ElasticSearch Sink to the stream. Start the Flink program without 
> starting the ElasticSearch. 
> 2. Program will give error: "*Too many open files*" and it doesn't write even 
> though you start the Elastic Search later.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2017-03-03 Thread static-max
Github user static-max closed the pull request at:

https://github.com/apache/flink/pull/2861


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3246: [FLINK-5353] [elasticsearch] User-provided failure handle...

2017-02-08 Thread static-max
Github user static-max commented on the issue:

https://github.com/apache/flink/pull/3246
  
Looks great!
One note: In your example ExampleActionRequestFailureHandler you have to 
unwrap the Exception, as it is typical looks like:

`RemoteTransportException[[Richard 
Rider][127.0.0.1:9301][indices:data/write/bulk[s]]]; nested: 
RemoteTransportException[[Richard 
Rider][127.0.0.1:9301][indices:data/write/bulk[s][p]]]; nested: 
EsRejectedExecutionException[rejected execution of 
org.elasticsearch.transport.TransportService$4@e5c47a1 on 
EsThreadPoolExecutor[bulk, queue capacity = 1, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7e2d5cc5[Running, 
pool size = 8, active threads = 8, queued tasks = 1, completed tasks = 119]]];`

In my implementation I use Apache commons:
`
ExceptionUtils.indexOfThrowable(throwable, 
EsRejectedExecutionException.class) >= 0
`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...

2017-01-27 Thread static-max
Github user static-max commented on the issue:

https://github.com/apache/flink/pull/2861
  
@tzulitai OK, if you need any help feel free to ask.
Are there plans to switch from the TransportClient to a pure HTTP client? 
That would reduce the Elasticsearch dependencies and would decouple the 
cluster's version from the TransportClient version used by Flink. In that case 
we won't get a Throwable anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...

2017-01-27 Thread static-max
Github user static-max commented on the issue:

https://github.com/apache/flink/pull/2861
  
In my tests BulkItemResponse.getFailure().getCause() returns a 
RemoteTransportException like this:

`RemoteTransportException[[Harrier][127.0.0.1:9302][indices:data/write/bulk[s]]];
 nested: 
RemoteTransportException[[Harrier][127.0.0.1:9302][indices:data/write/bulk[s][p]]];
 nested: EsRejectedExecutionException[rejected execution of 
org.elasticsearch.transport.TransportService$4@3a0f3a6e on 
EsThreadPoolExecutor[bulk, queue capacity = 2, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@5ac266bd[Running, 
pool size = 8, active threads = 8, queued tasks = 2, completed tasks = 206]]];`

So the nested Exception needs to be checked. That's possible, I will 
implement that change.

The last to Exceptions are common when a new Index gets created (if you 
have new index by day for example), or when a node leaves the cluster and no 
master can be elected (no quorum),


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...

2017-01-27 Thread static-max
Github user static-max commented on the issue:

https://github.com/apache/flink/pull/2861
  
Hi @tzulitai, sure, go ahead :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...

2017-01-17 Thread static-max
Github user static-max commented on the issue:

https://github.com/apache/flink/pull/2861
  
Ok, let's wait for the restructure and rebase this PR to support at least 
once.
BTW, we're using my PR in production and haven't lost a single document 
since then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2861: [FLINK-5122] Index requests will be retried if the error ...

2017-01-03 Thread static-max
Github user static-max commented on the issue:

https://github.com/apache/flink/pull/2861
  
The path of the connector has changed to 
"flink/flink-connectors/flink-connector-elasticsearch2/", how should I handle 
the conflict? Open a new PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2017-01-03 Thread static-max
Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r94382504
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(checkErrorAndRetryBulk && (
+   
failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
+   )
+   ) {
+   
LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
+   
reAddBulkRequest(request);
--- End diff --

Your're right, it gets added multiple times, I'll fix that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-12 Thread static-max
Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r91991599
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest 
request, Throwable failure)
requestIndexer = new BulkProcessorIndexer(bulkProcessor);
}
 
+   /**
+* Adds all requests of the bulk to the BulkProcessor. Used when trying 
again.
+* @param bulkRequest
+*/
+   public void reAddBulkRequest(BulkRequest bulkRequest) {
+   //TODO Check what happens when bulk contains a DeleteAction and 
IndexActions and the DeleteAction fails because the document already has been 
deleted. This may not happen in typical Flink jobs.
--- End diff --

Currently I'm not aware of a way to filter these requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-12 Thread static-max
Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r91964180
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
+   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
+   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
+   ) {
+   
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
--- End diff --

What level do you suggest? Personally I don't care for retried batches as 
long as the data gets into my ES cluster. When logging this as INFO or WARN, 
the logfile will get pretty messy on a cluster with high traffic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-12-12 Thread static-max
Github user static-max commented on a diff in the pull request:

https://github.com/apache/flink/pull/2861#discussion_r91963737
  
--- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
@@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
 
@Override
public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
+   boolean allRequestsRepeatable = true;
if (response.hasFailures()) {
for (BulkItemResponse itemResp : 
response.getItems()) {
if (itemResp.isFailed()) {
-   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
-   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
+   // Check if index 
request can be retried
+   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
--- End diff --

I didn't find an alternative to check strings. I will add a flag an disable 
it by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3404) Extend Kafka consumers with interface StoppableFunction

2016-11-23 Thread static-max (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690734#comment-15690734
 ] 

static-max commented on FLINK-3404:
---

I'm in need of a _stop()_ method too, I will propose an PR for this issue if 
there is interest.

> Extend Kafka consumers with interface StoppableFunction
> ---
>
> Key: FLINK-3404
> URL: https://issues.apache.org/jira/browse/FLINK-3404
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Matthias J. Sax
>
> Kafka consumers are not stoppable right now. To make them stoppable, they 
> must implement {{StoppableFunction}}. Implementing method {{stop()}} must 
> ensure, that the consumer stops pulling new messages from Kafka and issues a 
> final checkpoint with the last offset. Afterwards, {{run()}} must return.
> When implementing this, keep in mind, that the gathered checkpoint might 
> later be used as a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

2016-11-23 Thread static-max
GitHub user static-max opened a pull request:

https://github.com/apache/flink/pull/2861

[FLINK-5122] Index requests will be retried if the error is only temp…

This PR will re-add index requests to the BulkProcessor if the error is 
temporay, like
* Generel timeout errors
* No master
* UnavailableShardsException (Rebalancing, Node down)
* Bulk queue on node full

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/static-max/flink 
flink-connector-elasticsearch2-robust

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2861.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2861


commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d
Author: Max Kuklinski <max.kuklin...@live.de>
Date:   2016-11-23T16:54:11Z

[FLINK-5122] Index requests will be retried if the error is only temporary 
on Elasticsearch side. Covered are: Timeouts, No Master, 
UnavailableShardsException, bulk queue on node full




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-11-23 Thread static-max (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

static-max reassigned FLINK-5122:
-

Assignee: static-max

> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>    Reporter: static-max
>Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-11-22 Thread static-max (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686338#comment-15686338
 ] 

static-max commented on FLINK-5122:
---

Your PR does not handle single failed bulk requests, as far as I can see.
But your PR makes me think about the whole error handling concept in the ES 
sink. hasFailure will only be checked on close(), which does not make sense to 
me. Would it be smarter to throw the Exception immediately when a document 
cannot be indexed? This way Flink will restart the job from a checkpoint and 
try again.

> Elasticsearch Sink loses documents when cluster has high load
> -
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>    Reporter: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5122) Elasticsearch Sink loses documents when cluster has high load

2016-11-21 Thread static-max (JIRA)
static-max created FLINK-5122:
-

 Summary: Elasticsearch Sink loses documents when cluster has high 
load
 Key: FLINK-5122
 URL: https://issues.apache.org/jira/browse/FLINK-5122
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.2.0
Reporter: static-max


My cluster had high load and documents got not indexed. This violates the "at 
least once" semantics in the ES connector.

I gave pressure on my cluster to test Flink, causing new indices to be created 
and balanced. On those errors the bulk should be tried again instead of being 
discarded.

Primary shard not active because ES decided to rebalance the index:
2016-11-15 15:35:16,123 ERROR 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
Failed to index document in Elasticsearch: 
UnavailableShardsException[[index-name][3] primary shard is not active Timeout: 
[1m], request: [BulkShardRequest to [index-name] containing [20] requests]]

Bulk queue on node full (I set queue to a low value to reproduce error):
22:37:57,702 ERROR 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
Failed to index document in Elasticsearch: 
RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
 nested: EsRejectedExecutionException[rejected execution of 
org.elasticsearch.transport.TransportService$4@727e677c on 
EsThreadPoolExecutor[bulk, queue capacity = 1, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, 
pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 2939]]];

I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4779) Restoring from savepoint fails when BucketingSink has not yet created folder

2016-10-07 Thread static-max (JIRA)
static-max created FLINK-4779:
-

 Summary: Restoring from savepoint fails when BucketingSink has not 
yet created folder
 Key: FLINK-4779
 URL: https://issues.apache.org/jira/browse/FLINK-4779
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.2.0
Reporter: static-max


When I restore from a savepoint, starting the job fails when the root-folder 
used by the BucketingSink not yet exists. This may happen in my case, when the 
source for my sink has not yet emitted any messages and I did not create the 
folder by hand.

The complete folder structure is not required by the BucketingSink as it will 
create itermediate folders by itself when creating the bucket.

I suggest that this does not prevent the job from being restarted.

{code}
10/07/2016 22:50:53 Source: Kafka Consumer for X -> (Sink: HDFS for X, 
Sink: X)(1/1) switched to FAILED
java.lang.Exception: Failed to restore state to function: Error while deleting 
old pending files.
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:184)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:550)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:255)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error while deleting old pending files.
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:805)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:139)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:182)
... 4 more
Caused by: java.io.FileNotFoundException: File hdfs://server:8020/1/2/3/4/flink 
does not exist.
at 
org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.(DistributedFileSystem.java:948)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.(DistributedFileSystem.java:927)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:872)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:868)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:886)
at 
org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1696)
at org.apache.hadoop.fs.FileSystem$6.(FileSystem.java:1791)
at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1787)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.restoreState(BucketingSink.java:784)
... 6 more
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2579: [FLINK-4618] FlinkKafkaConsumer09 should start fro...

2016-09-30 Thread static-max
GitHub user static-max opened a pull request:

https://github.com/apache/flink/pull/2579

[FLINK-4618] FlinkKafkaConsumer09 should start from the next record on 
startup from offsets in Kafka

This PR addresses https://issues.apache.org/jira/browse/FLINK-4618, which 
causes the last message to be read again from Kafka after a fresh start of the 
job.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/static-max/flink 
flink-connector-kafka-0.9-fix-duplicate-messages

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2579.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2579


commit 0b564203cdae3b21b00bb499b85feb799136e29b
Author: static-max <max.kuklin...@live.de>
Date:   2016-09-30T19:45:38Z

Merge pull request #1 from apache/master

Pull from origin

commit 3618f5053e0ffb0ec1f789c56d878ed400e27056
Author: Max Kuklinski <max.kuklin...@live.de>
Date:   2016-09-30T21:03:30Z

FLINK-4618 Incremented the commited offset by one to avoid duplicate read 
message.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---