[jira] [Commented] (FLINK-10775) Quarantined address [akka.tcp://flink@flink-jobmanager:6123] is still unreachable or has not been restarted. Keeping it quarantined.

2018-12-02 Thread miki haiat (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706252#comment-16706252
 ] 

miki haiat commented on FLINK-10775:


I had this issue as well on 1.4.x .

I can confirm that on 1.5.5 and 1.6.x this issue is no longer exists 

> Quarantined address [akka.tcp://flink@flink-jobmanager:6123] is still 
> unreachable or has not been restarted. Keeping it quarantined.
> 
>
> Key: FLINK-10775
> URL: https://issues.apache.org/jira/browse/FLINK-10775
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.4.2
> Environment: k8s+docker 
> standalone (1jobmanager + 5taskmanager)
> taskmanager.slotnum=4
>Reporter: ChuanHaiTan
>Priority: Blocker
>  Labels: k8s+docker, usability
> Attachments: 
> logs-from-flink-jobmanager-in-flink-jobmanager-65c8d85f4f-5fm2d.txt, 
> logs-from-flink-taskmanager-in-flink-taskmanager-758575577d-7lw82.txt, 
> logs-from-flink-taskmanager-in-flink-taskmanager-758575577d-qbj9g.txt, 
> 微信图片_20181031171312.png, 微信图片_20181031171316.png
>
>
> On the k8s+docker environment, the 1 jobmanager container and 5 taskmanager 
> container are the standalone cluster modes.
> {color:#FF}But for some reason, the jobmanager is rebooted, and two of 
> the remaining three taskmanger are also rebooted, and two of the remaining 
> three taskmanger don't connect to jobmanager, resulting in insufficient slot 
> resources reporting errors.{color}
> The attachments are the jobmanager log, two disconnected taskmanger logs, and 
> all available and unavailable taskmanager screenshots of flink at the time.
> It is strange that two rebooted taskmanger can connect with jobmanager, and 
> one of the three unrebooted taskamanagers can connect.
> Why?Can the cause of the restart be analyzed from the log?thank you



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-02 Thread miki haiat (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706243#comment-16706243
 ] 

miki haiat commented on FLINK-11046:


HI ,
Can you please share the elastic connection properties .



 

 

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
> if (concurrentRequests == 0) {
> latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
> `concurrentRequests` was set to zero. And i saw the the initialize for 
> bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =  
> callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make 
> sense, but i still wonder why the retry operation is not in the same thread 
> as the bulk process execution, after i read the code, `bulkAsync` method 
> might be the last puzzle.
> {code:java}
> @Override
> public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient 
> client, BulkProcessor.Listener listener) {
>  return BulkProcessor.builder(client::bulkAsync, listener);
> }
> {code}
> So, I hope someone can help 

[jira] [Commented] (FLINK-9120) Task Manager Fault Tolerance issue

2018-04-02 Thread miki haiat (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422061#comment-16422061
 ] 

miki haiat commented on FLINK-9120:
---

 

can you upload logs please 

> Task Manager Fault Tolerance issue
> --
>
> Key: FLINK-9120
> URL: https://issues.apache.org/jira/browse/FLINK-9120
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Configuration, Core
>Affects Versions: 1.4.2
>Reporter: dhiraj prajapati
>Priority: Critical
>
> HI, 
> I have set up a flink 1.4 cluster with 1 job manager and two task managers. 
> The configs taskmanager.numberOfTaskSlots and parallelism.default were set 
> to 2 on each node. I submitted a job to this cluster and it runs fine. To 
> test fault tolerance, I killed one task manager. I was expecting the job to 
> run fine because one of the 2 task managers was still up and running. 
> However, the job failed. Am I missing something? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)