[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-07-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5448


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197754848
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java ---
@@ -148,6 +148,27 @@ public static MemorySize parse(String text) throws 
IllegalArgumentException {
return new MemorySize(parseBytes(text));
}
 
+   /**
+* Parses the given string as as MemorySize.
+* The supported expressions are listed under {@link MemorySize}.
+*
+* 
+* Note : this method is compatible with the old memory config key, 
like {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}.
+* 
+*
+* @param text The string to parse.
+* @return The parsed MemorySize.
+*
+* @throws IllegalArgumentException Thrown, if the expression cannot be 
parsed.
+*/
+   public static MemorySize parseAsMebiBytesIfNoUnit(String text) throws 
IllegalArgumentException {
--- End diff --

@dawidwys there is not a `MemoryUnit` and `parseAsMebiBytesIfNoUnit` just 
for MebiBytes(`taskmanager.memory.size`) so I think we should not make it more 
complex.


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197736638
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 ---
@@ -67,8 +68,8 @@ public String toString() {
public static ClusterSpecification fromConfiguration(Configuration 
configuration) {
int slots = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
-   int jobManagerMemoryMb = 
configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
-   int taskManagerMemoryMb = 
configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+   int jobManagerMemoryMb = (int) 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --

hi @dawidwys this belongs suggestion 2, "change the return value of 
getMebiBytes() to int or have a getMebiBytesAsInt() method that uses a 
MathUtils.checkedDownCast() to avoid unnoticed overflow errors, as Stephan 
commented"

"have a getMebiBytesAsInt()" is another choice, if I picked the first 
choice and change the `(int) to  MathUtils.checkedDownCast()` . Is there 
necessary to provide the `getMebiBytesAsInt()` method?


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197707880
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java ---
@@ -148,6 +148,27 @@ public static MemorySize parse(String text) throws 
IllegalArgumentException {
return new MemorySize(parseBytes(text));
}
 
+   /**
+* Parses the given string as as MemorySize.
+* The supported expressions are listed under {@link MemorySize}.
+*
+* 
+* Note : this method is compatible with the old memory config key, 
like {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}.
+* 
+*
+* @param text The string to parse.
+* @return The parsed MemorySize.
+*
+* @throws IllegalArgumentException Thrown, if the expression cannot be 
parsed.
+*/
+   public static MemorySize parseAsMebiBytesIfNoUnit(String text) throws 
IllegalArgumentException {
--- End diff --

I think a better idea would be to have `parse(String text, MemoryUnit 
defaultUnit)`


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197707246
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 ---
@@ -67,8 +68,8 @@ public String toString() {
public static ClusterSpecification fromConfiguration(Configuration 
configuration) {
int slots = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
-   int jobManagerMemoryMb = 
configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
-   int taskManagerMemoryMb = 
configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+   int jobManagerMemoryMb = (int) 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   int taskManagerMemoryMb = (int) 
MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --

use `getMebiBytesAsInt`


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197711430
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ---
@@ -66,7 +67,7 @@
  */
 public class StreamNetworkBenchmarkEnvironment {
 
-   private static final int BUFFER_SIZE = 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
+   private static final int BUFFER_SIZE = (int) 
MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes();
--- End diff --

use `checkedDownCast`


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197711900
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -385,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+   final int jobManagerMemoryMB = (int) 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
 
// Task Managers memory
-   final int taskManagerMemoryMB = 
configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+   final int taskManagerMemoryMB = (int) 
MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --

unnecessary cast


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197711813
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -161,7 +162,7 @@ public YarnResourceManager(
numPendingContainerRequests = 0;
 
this.webInterfaceUrl = webInterfaceUrl;
-   this.defaultTaskManagerMemoryMB = 
flinkConfig.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+   this.defaultTaskManagerMemoryMB = (int) 
MemorySize.parse(flinkConfig.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --

unnecessary cast


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197708797
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
 ---
@@ -86,7 +87,7 @@ private static TaskManagerServicesConfiguration 
getTmConfig(
networkBufFraction,
networkBufMin,
networkBufMax,
-   TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
+   (int) 
MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(),
--- End diff --

use `checkedDownCast`


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197707411
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java ---
@@ -95,8 +95,8 @@ public long getKibiBytes() {
/**
 * Gets the memory size in Mebibytes (= 1024 Kibibytes).
 */
-   public long getMebiBytes() {
-   return bytes >> 20;
+   public int getMebiBytes() {
+   return (int) (bytes >> 20);
--- End diff --

use `checkedDownCast` here


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197711881
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -385,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+   final int jobManagerMemoryMB = (int) 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --

unnecessary cast


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197708483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -482,14 +484,15 @@ private static NetworkEnvironment 
createNetworkEnvironment(
public static long calculateNetworkBufferMemory(long 
totalJavaMemorySize, Configuration config) {
Preconditions.checkArgument(totalJavaMemorySize > 0);
 
-   int segmentSize = 
config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
+   int segmentSize = (int) 
MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes();
--- End diff --

Use `checkedDownCast`


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r197707216
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 ---
@@ -67,8 +68,8 @@ public String toString() {
public static ClusterSpecification fromConfiguration(Configuration 
configuration) {
int slots = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
-   int jobManagerMemoryMb = 
configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
-   int taskManagerMemoryMb = 
configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+   int jobManagerMemoryMb = (int) 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
--- End diff --

use `getMebiBytesAsInt`


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-05 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r193277880
  
--- Diff: docs/_includes/generated/task_manager_configuration.html ---
@@ -84,13 +84,13 @@
 
 
 taskmanager.memory.segment-size
-32768
-Size of memory buffers used by the network stack and the 
memory manager (in bytes).
+"32768"
+Size of memory buffers used by the network stack and the 
memory manager.
 
 
 taskmanager.memory.size
--1
-Amount of memory to be allocated by the task manager's 
memory manager (in megabytes). If not set, a relative fraction will be 
allocated.
+"-1"
--- End diff --

currently, every place we use the value of config item 
`taskmanager.memory.size`, will compare with the default value "-1", if yes, 
will not use the parser, so there is no problem now. But I think the suggestion 
from @zentol  is good. What's your opinion? @StephanEwen 


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r193221496
  
--- Diff: docs/_includes/generated/task_manager_configuration.html ---
@@ -84,13 +84,13 @@
 
 
 taskmanager.memory.segment-size
-32768
-Size of memory buffers used by the network stack and the 
memory manager (in bytes).
+"32768"
+Size of memory buffers used by the network stack and the 
memory manager.
 
 
 taskmanager.memory.size
--1
-Amount of memory to be allocated by the task manager's 
memory manager (in megabytes). If not set, a relative fraction will be 
allocated.
+"-1"
--- End diff --

looking at the 
[parser](https://github.com/yanghua/flink/blob/39fd2efb66bd6fcac3f86e953729831a49bc7709/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java#L191)
 it appears that `MemorySize` does not support negative values (rightfully so). 
We may have to change the default to 0.


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191191358
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,10 +72,19 @@
" leader-election service (like ZooKeeper) is used to 
elect and discover the JobManager" +
" leader from potentially multiple standby 
JobManagers.");
 
+   /**
+* JVM heap size for the JobManager with memory size.
+*/
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   key("jobmanager.heap.size")
+   .defaultValue("1024m")
+   .withDescription("JVM heap size for the JobManager.");
+
/**
 * JVM heap size (in megabytes) for the JobManager.
+* @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
 */
-   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY_MB =
--- End diff --

Also think we should add some note to release notes, that the migration of 
parameters is advised.


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191197602
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
final long heapSizeMB;
if (useOffHeap) {
 
-   long offHeapSize = 
config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   long offHeapSize = 0;
+   try {
+   offHeapSize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   offHeapSize = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+   }
+   } catch (IllegalArgumentException e) {
+
--- End diff --

Rethrow with IllegalConfigurationException with a pointer that 
`MANAGED_MEMORY_SIZE` was badly configured.


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191190916
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -176,19 +188,19 @@
/**
 * Size of memory buffers used by the network stack and the memory 
manager (in bytes).
 */
-   public static final ConfigOption MEMORY_SEGMENT_SIZE =
+   public static final ConfigOption MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
-   .defaultValue(32768)
-   .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
+   .defaultValue("32768")
+   .withDescription("Size of memory buffers used by the 
network stack and the memory manager.");
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
 * set, a relative fraction will be allocated, as defined by {@link 
#MANAGED_MEMORY_FRACTION}.
 */
-   public static final ConfigOption MANAGED_MEMORY_SIZE =
+   public static final ConfigOption MANAGED_MEMORY_SIZE =
--- End diff --

Also I think we should add tests that explicitly test using old style 
configuration.


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191199370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ---
@@ -223,9 +224,17 @@ public static TaskManagerServicesConfiguration 
fromConfiguration(
parseQueryableStateConfiguration(configuration);
 
// extract memory settings
-   long configuredMemory = 
configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   long configuredMemory = 
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
+   if 
(!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()))
 {
+   try {
+   configuredMemory = 
MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+   } catch (IllegalArgumentException e) {
+
+   }
+   }
--- End diff --

Same comments as before.  How about code like this:

long configuredMemory;
String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
try {
configuredMemory = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " + 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
}
} else {
configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
}


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191190775
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -176,19 +188,19 @@
/**
 * Size of memory buffers used by the network stack and the memory 
manager (in bytes).
 */
-   public static final ConfigOption MEMORY_SEGMENT_SIZE =
+   public static final ConfigOption MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
-   .defaultValue(32768)
-   .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
+   .defaultValue("32768")
+   .withDescription("Size of memory buffers used by the 
network stack and the memory manager.");
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
 * set, a relative fraction will be allocated, as defined by {@link 
#MANAGED_MEMORY_FRACTION}.
 */
-   public static final ConfigOption MANAGED_MEMORY_SIZE =
+   public static final ConfigOption MANAGED_MEMORY_SIZE =
--- End diff --

This option requires special handling (maybe similar to 
JOB_MANAGER_HEAP_MEMORY/TASK_MANAGER_HEAP_MEMORY). Right now if an old config 
file will be used with new version the megabytes will be treated as bytes. 
Therefore the value will 1000 smaller.


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191147488
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -32,10 +32,22 @@
//  General TaskManager Options
// 

 
+   /**
+* JVM heap size for the TaskManagers with memory size.
+*/
+   public static final ConfigOption TASK_MANAGER_HEAP_MEMORY =
+   key("taskmanager.heap.size")
+   .defaultValue("1024m")
+   .withDescription("JVM heap size for the TaskManagers, 
which are the parallel workers of" +
+   " the system. On YARN setups, this 
value is automatically configured to the size of the TaskManager's" +
+   " YARN container, minus a certain 
tolerance value.");
+
/**
 * JVM heap size (in megabytes) for the TaskManagers.
+*
+* @deprecated use {@link #TASK_MANAGER_HEAP_MEMORY}
 */
-   public static final ConfigOption TASK_MANAGER_HEAP_MEMORY =
+   public static final ConfigOption TASK_MANAGER_HEAP_MEMORY_MB =
--- End diff --

Add `@Deprecated`


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191147297
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -72,10 +72,19 @@
" leader-election service (like ZooKeeper) is used to 
elect and discover the JobManager" +
" leader from potentially multiple standby 
JobManagers.");
 
+   /**
+* JVM heap size for the JobManager with memory size.
+*/
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   key("jobmanager.heap.size")
+   .defaultValue("1024m")
+   .withDescription("JVM heap size for the JobManager.");
+
/**
 * JVM heap size (in megabytes) for the JobManager.
+* @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
 */
-   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
+   public static final ConfigOption JOB_MANAGER_HEAP_MEMORY_MB =
--- End diff --

Missing annotation `@Deprecated`. That way we can remove that option from 
docs (which I think we should do).


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191186380
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -176,19 +188,19 @@
/**
 * Size of memory buffers used by the network stack and the memory 
manager (in bytes).
 */
-   public static final ConfigOption MEMORY_SEGMENT_SIZE =
+   public static final ConfigOption MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
-   .defaultValue(32768)
-   .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
+   .defaultValue("32768")
+   .withDescription("Size of memory buffers used by the 
network stack and the memory manager.");
 
/**
 * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
--- End diff --

remove (in megabytes)


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5448#discussion_r191198882
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -647,7 +649,16 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
final long heapSizeMB;
if (useOffHeap) {
 
-   long offHeapSize = 
config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   long offHeapSize = 0;
+   try {
+   offHeapSize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   offHeapSize = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+   }
+   } catch (IllegalArgumentException e) {
+
--- End diff --

I found the flow with default value somewhat counterintuitive. How about we 
structure this code like this:

long offHeapSize;
String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
if 
(!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
try {
offHeapSize = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE))
.getMebiBytes();
} catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Could not read " + 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
}
} else {
offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
}


---


[GitHub] flink pull request #5448: [FLINK-6469] Configure Memory Sizes with units

2018-02-09 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5448

[FLINK-6469] Configure Memory Sizes with units



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-6469

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5448.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5448


commit 1e109fd21783eada731df4552e154302d1bd9255
Author: vinoyang 
Date:   2018-02-10T06:52:13Z

[FLINK-6469] Configure Memory Sizes with units




---