[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589897#comment-16589897
 ] 

ASF GitHub Bot commented on FLINK-9693:
---

TisonKun commented on issue #6251: [FLINK-9693] Set Execution#taskRestore to 
null after deployment
URL: https://github.com/apache/flink/pull/6251#issuecomment-415333090
 
 
   Thanks till, this saves my weekend :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.3, 1.6.1, 1.7.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-25 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556325#comment-16556325
 ] 

Steven Zhen Wu commented on FLINK-9693:
---

One more observation. we are seeing this issue right after the jobmanager node 
got killed and replaced. however, it is not reproducible when I trying to kill 
the jobmanager when job is healthy

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.1, 1.6.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-12 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541535#comment-16541535
 ] 

Till Rohrmann commented on FLINK-9693:
--

Yes this should not be a problem [~stevenz3wu]. I will create the backport for 
1.4. 

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1, 1.6.0
>
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-09 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537514#comment-16537514
 ] 

Steven Zhen Wu commented on FLINK-9693:
---

[~till.rohrmann] is it possible to generate a patch for 1.4? it doesn't seem 
straightforward to backport PR 6251 to 1.4 branch.

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1, 1.6.0
>
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532777#comment-16532777
 ] 

ASF GitHub Bot commented on FLINK-9693:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6251


> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532573#comment-16532573
 ] 

ASF GitHub Bot commented on FLINK-9693:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6251
  
Thanks for the review @StefanRRichter and @zentol. I will merge this PR 
once Travis gives green light. I will address your comments while merging this 
PR.


> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532571#comment-16532571
 ] 

ASF GitHub Bot commented on FLINK-9693:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6251#discussion_r200075750
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
@@ -385,6 +381,76 @@ public void 
testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
restartFuture.get();
}
 
+   /**
+* Tests that the task restore state is nulled after the {@link 
Execution} has been
+* deployed. See FLINK-9693.
+*/
+   @Test
+   public void testTaskRestoreStateIsNulledAfterDeployment() throws 
Exception {
+   final JobVertex jobVertex = createNoOpJobVertex();
+   final JobVertexID jobVertexId = jobVertex.getID();
+
+   final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+   final ProgrammedSlotProvider slotProvider = 
createProgrammedSlotProvider(
+   1,
+   Collections.singleton(jobVertexId),
+   slotOwner);
+
+   ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+   new JobID(),
+   slotProvider,
+   new NoRestartStrategy(),
+   jobVertex);
+
+   ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+   final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+   final JobManagerTaskRestore taskRestoreState = new 
JobManagerTaskRestore(1L, new TaskStateSnapshot());
+   execution.setInitialState(taskRestoreState);
+
+   assertThat(execution.getTaskRestore(), is(notNullValue()));
+
+   // schedule the execution vertex and wait for its deployment
+   executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY).get();
+
+   assertThat(execution.getTaskRestore(), is(nullValue()));
+   }
+
+   @Nonnull
+   private JobVertex createNoOpJobVertex() {
+   final JobVertex jobVertex = new JobVertex("Test vertex", new 
JobVertexID());
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+
+   return jobVertex;
+   }
+
+   @Nonnull
+   private ProgrammedSlotProvider createProgrammedSlotProvider(
+   int parallelism,
+   Collection jobVertexIds,
+   SlotOwner slotOwner) {
+   final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+
+   for (JobVertexID jobVertexId : jobVertexIds) {
+   for (int i = 0; i < parallelism; i++) {
+   final SimpleSlot slot = new SimpleSlot(
+   slotOwner,
+   new LocalTaskManagerLocation(),
+   0,
+   new SimpleAckingTaskManagerGateway(),
+   null,
+   null);
--- End diff --

True, will remove it.


> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was 

[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532516#comment-16532516
 ] 

ASF GitHub Bot commented on FLINK-9693:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6251
  
LGTM  


> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532514#comment-16532514
 ] 

ASF GitHub Bot commented on FLINK-9693:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6251#discussion_r200062854
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
@@ -385,6 +381,76 @@ public void 
testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
restartFuture.get();
}
 
+   /**
+* Tests that the task restore state is nulled after the {@link 
Execution} has been
+* deployed. See FLINK-9693.
+*/
+   @Test
+   public void testTaskRestoreStateIsNulledAfterDeployment() throws 
Exception {
+   final JobVertex jobVertex = createNoOpJobVertex();
+   final JobVertexID jobVertexId = jobVertex.getID();
+
+   final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+   final ProgrammedSlotProvider slotProvider = 
createProgrammedSlotProvider(
+   1,
+   Collections.singleton(jobVertexId),
+   slotOwner);
+
+   ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+   new JobID(),
+   slotProvider,
+   new NoRestartStrategy(),
+   jobVertex);
+
+   ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+   final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+   final JobManagerTaskRestore taskRestoreState = new 
JobManagerTaskRestore(1L, new TaskStateSnapshot());
+   execution.setInitialState(taskRestoreState);
+
+   assertThat(execution.getTaskRestore(), is(notNullValue()));
+
+   // schedule the execution vertex and wait for its deployment
+   executionVertex.scheduleForExecution(slotProvider, false, 
LocationPreferenceConstraint.ANY).get();
+
+   assertThat(execution.getTaskRestore(), is(nullValue()));
+   }
+
+   @Nonnull
+   private JobVertex createNoOpJobVertex() {
+   final JobVertex jobVertex = new JobVertex("Test vertex", new 
JobVertexID());
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+
+   return jobVertex;
+   }
+
+   @Nonnull
+   private ProgrammedSlotProvider createProgrammedSlotProvider(
+   int parallelism,
+   Collection jobVertexIds,
+   SlotOwner slotOwner) {
+   final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(parallelism);
+
+   for (JobVertexID jobVertexId : jobVertexIds) {
+   for (int i = 0; i < parallelism; i++) {
+   final SimpleSlot slot = new SimpleSlot(
+   slotOwner,
+   new LocalTaskManagerLocation(),
+   0,
+   new SimpleAckingTaskManagerGateway(),
+   null,
+   null);
--- End diff --

whitepace after `null`


> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by 

[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532501#comment-16532501
 ] 

ASF GitHub Bot commented on FLINK-9693:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6251

[FLINK-9693] Set Execution#taskRestore to null after deployment

## What is the purpose of the change

Setting the assigned Execution#taskRestore to null after the deployment 
allows the
JobManagerTaskRestore instance to be garbage collected. Furthermore, it 
won't be
archived along with the Execution in the ExecutionVertex in case of a 
restart. This
is especially important when setting state.backend.fs.memory-threshold to 
larger
values because every state below this threshold will be stored in the meta 
state files
and, thus, also the JobManagerTaskRestore instances.

## Verifying this change

- Added `ExecutionTest#testTaskRestoreStateIsNulledAfterDeployment`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixMemoryLeakInJobManager

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6251.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6251






> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)