[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-17 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r368107532
 
 

 ##
 File path: 
samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
 ##
 @@ -198,4 +203,31 @@ public void testRunWithClassLoader() throws Exception {
 // make sure runClusterBasedJobCoordinator only got called once
 
verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator",
 new Object[]{aryEq(args)});
   }
+
+  @Test(expected = SamzaException.class)
+  public void testCreateFromConfigLoaderWithoutConfigLoaderFactory() {
+ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig());
+  }
+
+  @Test
+  public void testCreateFromConfigLoader() throws Exception {
+// partially mock ClusterBasedJobCoordinator (mock prepareJob method only)
+PowerMockito.spy(ClusterBasedJobCoordinator.class);
+
+Map config = new HashMap<>();
+config.put(ApplicationConfig.APP_CLASS, 
MockStreamApplication.class.getCanonicalName());
+config.put(JobConfig.CONFIG_LOADER_FACTORY, 
PropertiesConfigLoaderFactory.class.getCanonicalName());
+config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + 
"path",
+getClass().getResource("/test.properties").getPath());
+
+PowerMockito.doAnswer(invocation -> invocation.getArgumentAt(0, 
Config.class))
+.when(ClusterBasedJobCoordinator.class, "prepareJob", any());
+
PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mock(ClusterBasedJobCoordinator.class));
+
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mock(CoordinatorStreamStore.class));
+
+ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig(config));
+
+verifyPrivate(ClusterBasedJobCoordinator.class).invoke("prepareJob", 
any());
+verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(), any(), 
any());
 
 Review comment:
   Updated the unit test have to explicit validation on arguments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-17 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r368084308
 
 

 ##
 File path: samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java
 ##
 @@ -161,6 +162,31 @@ public void testApplyRewriterClassDoesNotExist() {
 assertEquals(expectedConfig, ConfigUtil.applyRewriter(new 
MapConfig(fullConfig), REWRITER_NAME));
   }
 
+  @Test
+  public void testLoadConfigWithoutLoader() {
+Map config = new HashMap<>();
+config.put(JobConfig.JOB_NAME, "new-test-job");
+
+Config actual = ConfigUtil.loadConfig(new MapConfig(config));
+
+assertEquals(config.size(), actual.size());
+assertEquals("new-test-job", actual.get(JobConfig.JOB_NAME));
+  }
+
+  @Test
+  public void testLoadConfigWithLoader() {
 
 Review comment:
   Updated to include both overrides and rewrites.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-17 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r368079520
 
 

 ##
 File path: samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java
 ##
 @@ -67,4 +72,42 @@ public static Config applyRewriter(Config config, String 
rewriterName) {
 LOG.info("Re-writing config with {}", rewriter);
 return rewriter.rewrite(rewriterName, config);
   }
+
+  /**
+   * Load full job config with {@link ConfigLoaderFactory} when present.
+   *
+   * @param original config
+   * @return full job config
+   */
+  public static Config loadConfig(Config original) {
+JobConfig jobConfig = new JobConfig(original);
+Config fullConfig = original;
+
+if (jobConfig.getConfigLoaderFactory().isPresent()) {
+  ConfigLoaderFactory factory = 
ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), 
ConfigLoaderFactory.class);
+  ConfigLoader loader = 
factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX));
+  // overrides config loaded with original config, which may contain 
overridden values.
+  fullConfig = ConfigUtil.rewriteConfig(override(loader.getConfig(), 
original));
+}
+
+return fullConfig;
+  }
+
+  /**
+   * Overrides original config with overridden values.
+   *
+   * @param original config to be overridden.
+   * @param overrides overridden values.
+   * @return the overridden config.
+   */
+  @SafeVarargs
+  public static Config override(Config original, Map... 
overrides) {
 
 Review comment:
   Updated to private


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-17 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r368079318
 
 

 ##
 File path: samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java
 ##
 @@ -161,6 +162,31 @@ public void testApplyRewriterClassDoesNotExist() {
 assertEquals(expectedConfig, ConfigUtil.applyRewriter(new 
MapConfig(fullConfig), REWRITER_NAME));
   }
 
+  @Test
+  public void testLoadConfigWithoutLoader() {
+Map config = new HashMap<>();
+config.put(JobConfig.JOB_NAME, "new-test-job");
+
+Config actual = ConfigUtil.loadConfig(new MapConfig(config));
+
+assertEquals(config.size(), actual.size());
+assertEquals("new-test-job", actual.get(JobConfig.JOB_NAME));
 
 Review comment:
   Update the unit test to check ConfigException instead as we disallow config 
w/o loader.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-17 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r368078024
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -480,20 +485,101 @@ private static void 
executeRunClusterBasedJobCoordinatorForClass(Class cluste
* {@link #main(String[])} so that it can be executed directly or from a 
separate classloader.
*/
   private static void runClusterBasedJobCoordinator(String[] args) {
-Config coordinatorSystemConfig;
 final String coordinatorSystemEnv = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
-try {
-  //Read and parse the coordinator system config.
-  LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
-  coordinatorSystemConfig =
-  new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
-  LOG.info("Using the coordinator system config: {}.", 
coordinatorSystemConfig);
-} catch (IOException e) {
-  LOG.error("Exception while reading coordinator stream config", e);
-  throw new SamzaException(e);
+final String submissionEnv = 
System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG());
+
+if (submissionEnv != null) {
+  Config submissionConfig;
+  try {
+//Read and parse the coordinator system config.
+LOG.info("Parsing submission config {}", submissionEnv);
+submissionConfig =
+new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, 
Config.class));
+LOG.info("Using the submission config: {}.", submissionConfig);
+  } catch (IOException e) {
+LOG.error("Exception while reading submission config", e);
+throw new SamzaException(e);
+  }
+
+  ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig);
+  jc.run();
+  LOG.info("Finished running ClusterBasedJobCoordinator");
+} else {
+  // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is 
removed.
+  Config coordinatorSystemConfig;
+  try {
+//Read and parse the coordinator system config.
+LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
+coordinatorSystemConfig =
+new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
+LOG.info("Using the coordinator system config: {}.", 
coordinatorSystemConfig);
+  } catch (IOException e) {
+LOG.error("Exception while reading coordinator stream config", e);
+throw new SamzaException(e);
+  }
+  ClusterBasedJobCoordinator jc = 
createFromMetadataStore(coordinatorSystemConfig);
+  jc.run();
+  LOG.info("Finished running ClusterBasedJobCoordinator");
+}
+  }
+
+  /**
+   * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream 
config, full job config will be fetched from
+   * coordinator stream.
+   *
+   * @param metadataStoreConfig to initialize {@link MetadataStore}
+   * @return {@link ClusterBasedJobCoordinator}
+   */
+  // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy 
flow is removed.
+  public static ClusterBasedJobCoordinator createFromMetadataStore(Config 
metadataStoreConfig) {
+MetricsRegistryMap metrics = new MetricsRegistryMap();
+
+CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(metadataStoreConfig, metrics);
+coordinatorStreamStore.init();
+Config config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+
+return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, 
config);
+  }
+
+  /**
+   * Initialize {@link ClusterBasedJobCoordinator} with submission config, 
full job config will be fetched using
+   * specified {@link org.apache.samza.config.ConfigLoaderFactory}
+   *
+   * @param submissionConfig specifies {@link 
org.apache.samza.config.ConfigLoaderFactory}
+   * @return {@link ClusterBasedJobCoordinator}
+   */
+  public static ClusterBasedJobCoordinator createFromConfigLoader(Config 
submissionConfig) {
 
 Review comment:
   Update to package private and mark as `VisibleForTesting`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-17 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r368077678
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -480,20 +485,101 @@ private static void 
executeRunClusterBasedJobCoordinatorForClass(Class cluste
* {@link #main(String[])} so that it can be executed directly or from a 
separate classloader.
*/
   private static void runClusterBasedJobCoordinator(String[] args) {
-Config coordinatorSystemConfig;
 final String coordinatorSystemEnv = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
-try {
-  //Read and parse the coordinator system config.
-  LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
-  coordinatorSystemConfig =
-  new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
-  LOG.info("Using the coordinator system config: {}.", 
coordinatorSystemConfig);
-} catch (IOException e) {
-  LOG.error("Exception while reading coordinator stream config", e);
-  throw new SamzaException(e);
+final String submissionEnv = 
System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG());
+
+if (submissionEnv != null) {
+  Config submissionConfig;
+  try {
+//Read and parse the coordinator system config.
+LOG.info("Parsing submission config {}", submissionEnv);
+submissionConfig =
+new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, 
Config.class));
+LOG.info("Using the submission config: {}.", submissionConfig);
+  } catch (IOException e) {
+LOG.error("Exception while reading submission config", e);
+throw new SamzaException(e);
+  }
+
+  ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig);
+  jc.run();
+  LOG.info("Finished running ClusterBasedJobCoordinator");
+} else {
+  // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is 
removed.
+  Config coordinatorSystemConfig;
+  try {
+//Read and parse the coordinator system config.
+LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
+coordinatorSystemConfig =
+new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
+LOG.info("Using the coordinator system config: {}.", 
coordinatorSystemConfig);
+  } catch (IOException e) {
+LOG.error("Exception while reading coordinator stream config", e);
+throw new SamzaException(e);
+  }
+  ClusterBasedJobCoordinator jc = 
createFromMetadataStore(coordinatorSystemConfig);
+  jc.run();
+  LOG.info("Finished running ClusterBasedJobCoordinator");
+}
+  }
+
+  /**
+   * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream 
config, full job config will be fetched from
+   * coordinator stream.
+   *
+   * @param metadataStoreConfig to initialize {@link MetadataStore}
+   * @return {@link ClusterBasedJobCoordinator}
+   */
+  // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy 
flow is removed.
+  public static ClusterBasedJobCoordinator createFromMetadataStore(Config 
metadataStoreConfig) {
 
 Review comment:
   Update to private.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-17 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r368077451
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -480,20 +485,101 @@ private static void 
executeRunClusterBasedJobCoordinatorForClass(Class cluste
* {@link #main(String[])} so that it can be executed directly or from a 
separate classloader.
*/
   private static void runClusterBasedJobCoordinator(String[] args) {
-Config coordinatorSystemConfig;
 final String coordinatorSystemEnv = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
-try {
-  //Read and parse the coordinator system config.
-  LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
-  coordinatorSystemConfig =
-  new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
-  LOG.info("Using the coordinator system config: {}.", 
coordinatorSystemConfig);
-} catch (IOException e) {
-  LOG.error("Exception while reading coordinator stream config", e);
-  throw new SamzaException(e);
+final String submissionEnv = 
System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG());
+
+if (submissionEnv != null) {
+  Config submissionConfig;
+  try {
+//Read and parse the coordinator system config.
+LOG.info("Parsing submission config {}", submissionEnv);
+submissionConfig =
+new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, 
Config.class));
+LOG.info("Using the submission config: {}.", submissionConfig);
+  } catch (IOException e) {
+LOG.error("Exception while reading submission config", e);
+throw new SamzaException(e);
+  }
+
+  ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig);
+  jc.run();
+  LOG.info("Finished running ClusterBasedJobCoordinator");
+} else {
+  // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is 
removed.
+  Config coordinatorSystemConfig;
+  try {
+//Read and parse the coordinator system config.
+LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
+coordinatorSystemConfig =
+new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
+LOG.info("Using the coordinator system config: {}.", 
coordinatorSystemConfig);
+  } catch (IOException e) {
+LOG.error("Exception while reading coordinator stream config", e);
+throw new SamzaException(e);
+  }
+  ClusterBasedJobCoordinator jc = 
createFromMetadataStore(coordinatorSystemConfig);
+  jc.run();
+  LOG.info("Finished running ClusterBasedJobCoordinator");
+}
+  }
+
+  /**
+   * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream 
config, full job config will be fetched from
+   * coordinator stream.
+   *
+   * @param metadataStoreConfig to initialize {@link MetadataStore}
+   * @return {@link ClusterBasedJobCoordinator}
+   */
+  // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy 
flow is removed.
+  public static ClusterBasedJobCoordinator createFromMetadataStore(Config 
metadataStoreConfig) {
+MetricsRegistryMap metrics = new MetricsRegistryMap();
+
+CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(metadataStoreConfig, metrics);
+coordinatorStreamStore.init();
 
 Review comment:
   Update to call outside constructor as we need it to be initialized before we 
can read full job config from it in the legacy flow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-17 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r368076765
 
 

 ##
 File path: samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java
 ##
 @@ -67,4 +72,42 @@ public static Config applyRewriter(Config config, String 
rewriterName) {
 LOG.info("Re-writing config with {}", rewriter);
 return rewriter.rewrite(rewriterName, config);
   }
+
+  /**
+   * Load full job config with {@link ConfigLoaderFactory} when present.
+   *
+   * @param original config
+   * @return full job config
+   */
+  public static Config loadConfig(Config original) {
+JobConfig jobConfig = new JobConfig(original);
+Config fullConfig = original;
+
+if (jobConfig.getConfigLoaderFactory().isPresent()) {
+  ConfigLoaderFactory factory = 
ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), 
ConfigLoaderFactory.class);
+  ConfigLoader loader = 
factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX));
+  // overrides config loaded with original config, which may contain 
overridden values.
+  fullConfig = override(ConfigUtil.rewriteConfig(loader.getConfig()), 
original);
+}
 
 Review comment:
   Updated to disallow the case when ConfigLoaderFactory is not present.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-17 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r368074498
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -480,20 +510,41 @@ private static void 
executeRunClusterBasedJobCoordinatorForClass(Class cluste
* {@link #main(String[])} so that it can be executed directly or from a 
separate classloader.
*/
   private static void runClusterBasedJobCoordinator(String[] args) {
-Config coordinatorSystemConfig;
 final String coordinatorSystemEnv = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
-try {
-  //Read and parse the coordinator system config.
-  LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
-  coordinatorSystemConfig =
-  new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
-  LOG.info("Using the coordinator system config: {}.", 
coordinatorSystemConfig);
-} catch (IOException e) {
-  LOG.error("Exception while reading coordinator stream config", e);
-  throw new SamzaException(e);
+final String submissionEnv = 
System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG());
+
+if (submissionEnv != null) {
 
 Review comment:
   My bad, changes are lost during updates. Updated again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-15 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r367146958
 
 

 ##
 File path: samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java
 ##
 @@ -67,4 +72,42 @@ public static Config applyRewriter(Config config, String 
rewriterName) {
 LOG.info("Re-writing config with {}", rewriter);
 return rewriter.rewrite(rewriterName, config);
   }
+
+  /**
+   * Load full job config with {@link ConfigLoaderFactory} when present.
+   *
+   * @param original config
+   * @return full job config
+   */
+  public static Config loadConfig(Config original) {
+JobConfig jobConfig = new JobConfig(original);
+Config fullConfig = original;
+
+if (jobConfig.getConfigLoaderFactory().isPresent()) {
+  ConfigLoaderFactory factory = 
ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), 
ConfigLoaderFactory.class);
+  ConfigLoader loader = 
factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX));
+  // overrides config loaded with original config, which may contain 
overridden values.
+  fullConfig = override(ConfigUtil.rewriteConfig(loader.getConfig()), 
original);
 
 Review comment:
   Good point, we should override the original config with provided overrides 
and then invoke config rewriters. 
   
   Updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-15 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r367138308
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -172,15 +179,39 @@
* Creates a new ClusterBasedJobCoordinator instance from a config. Invoke 
run() to actually
* run the jobcoordinator.
*
-   * @param coordinatorSystemConfig the coordinator stream config that can be 
used to read the
-   *{@link 
org.apache.samza.job.model.JobModel} from.
+   * @param jobCoordinatorConfig job coordinator config that either contains 
coordinator stream properties
+   * or config loader properties to load full job 
config.
*/
-  public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
+  public ClusterBasedJobCoordinator(Config jobCoordinatorConfig) {
 metrics = new MetricsRegistryMap();
 
-coordinatorStreamStore = new 
CoordinatorStreamStore(coordinatorSystemConfig, metrics);
-coordinatorStreamStore.init();
-config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+JobConfig jobConfig = new JobConfig(jobCoordinatorConfig);
+
+if (jobConfig.getConfigLoaderFactory().isPresent()) {
+  // load full job config with ConfigLoader
+  Config originalConfig = ConfigUtil.loadConfig(jobCoordinatorConfig);
+
+  // Execute planning
+  ApplicationDescriptorImpl
+  appDesc = 
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig),
 originalConfig);
+  RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
+  List jobConfigs = planner.prepareJobs();
+
+  if (jobConfigs.size() != 1) {
+throw new SamzaException("Only support single remote job is 
supported.");
+  }
+
+  config = jobConfigs.get(0);
+  coordinatorStreamStore = new 
CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config),
 metrics);
+  coordinatorStreamStore.init();
+  CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true);
+  DiagnosticsUtil.createDiagnosticsStream(config);
 
 Review comment:
   Added a comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-14 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r366624883
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -178,9 +185,33 @@
   public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
 
 Review comment:
   Renamed to jobCoordinatorConfig and updated the javadoc as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-14 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r366610778
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -178,9 +185,33 @@
   public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
 
 Review comment:
   I was planning to rename this variable when cleaning up the legacy flow, but 
we can choose a generic name in the mid now as well, what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-13 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r365993189
 
 

 ##
 File path: samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java
 ##
 @@ -67,4 +72,42 @@ public static Config applyRewriter(Config config, String 
rewriterName) {
 LOG.info("Re-writing config with {}", rewriter);
 return rewriter.rewrite(rewriterName, config);
   }
+
+  /**
+   * Load full job config with {@link ConfigLoaderFactory} when present.
+   *
+   * @param original config
+   * @return full job config
+   */
+  public static Config loadConfig(Config original) {
+JobConfig jobConfig = new JobConfig(original);
+Config fullConfig = original;
+
+if (jobConfig.getConfigLoaderFactory().isPresent()) {
+  ConfigLoaderFactory factory = 
ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), 
ConfigLoaderFactory.class);
+  ConfigLoader loader = 
factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX));
+  // overrides config loaded with original config, which may contain 
overridden values.
+  fullConfig = override(ConfigUtil.rewriteConfig(loader.getConfig()), 
original);
+}
 
 Review comment:
   Eventually Yes, but during the migration, there will be cases where jobs are 
still following the legacy submission flow, but still goes to this new method 
in code.
   
   For example, this util will be called by LocalApplicationRunner too, which 
will first support both legacy and new flow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-10 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r365487071
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -178,9 +185,35 @@
   public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
 metrics = new MetricsRegistryMap();
 
-coordinatorStreamStore = new 
CoordinatorStreamStore(coordinatorSystemConfig, metrics);
-coordinatorStreamStore.init();
-config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+JobConfig jobConfig = new JobConfig(coordinatorSystemConfig);
+
+if (jobConfig.getConfigLoaderFactory().isPresent()) {
+  // load full job config with ConfigLoader
+  Config originalConfig = ConfigUtil.loadConfig(coordinatorSystemConfig);
+
+  // Execute planning
+  ApplicationDescriptorImpl
+  appDesc = 
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig),
 originalConfig);
+  RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
+  List jobConfigs = planner.prepareJobs();
+
+  if (jobConfigs.size() != 1) {
+throw new SamzaException("Only support single remote job is 
supported.");
+  }
+
+  // Merge with default coordinator stream config
+  config = ConfigUtil.override(jobConfigs.get(0), 
CoordinatorStreamUtil.buildCoordinatorStreamConfig(jobConfigs.get(0)));
 
 Review comment:
   Good question, you are right, we should not need to merge it with the full 
job config, we only need to build it from the full job config and initialize 
CoordinatorStreamStore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-10 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r365486468
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -480,20 +512,41 @@ private static void 
executeRunClusterBasedJobCoordinatorForClass(Class cluste
* {@link #main(String[])} so that it can be executed directly or from a 
separate classloader.
*/
   private static void runClusterBasedJobCoordinator(String[] args) {
-Config coordinatorSystemConfig;
 final String coordinatorSystemEnv = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
-try {
-  //Read and parse the coordinator system config.
-  LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
-  coordinatorSystemConfig =
-  new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
-  LOG.info("Using the coordinator system config: {}.", 
coordinatorSystemConfig);
-} catch (IOException e) {
-  LOG.error("Exception while reading coordinator stream config", e);
-  throw new SamzaException(e);
+final String submissionEnv = 
System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG());
+
+if (submissionEnv != null) {
+  Config submissionConfig;
+  try {
+//Read and parse the coordinator system config.
+LOG.info("Parsing submission config {}", submissionEnv);
+submissionConfig =
+new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, 
Config.class));
+LOG.info("Using the submission config: {}.", submissionEnv);
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.

2020-01-10 Thread GitBox
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update 
ClusterBasedJobCoordinator config retrieval logic from loader.
URL: https://github.com/apache/samza/pull/1248#discussion_r365486270
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 ##
 @@ -178,9 +185,35 @@
   public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
 metrics = new MetricsRegistryMap();
 
-coordinatorStreamStore = new 
CoordinatorStreamStore(coordinatorSystemConfig, metrics);
-coordinatorStreamStore.init();
-config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+JobConfig jobConfig = new JobConfig(coordinatorSystemConfig);
+
+if (jobConfig.getConfigLoaderFactory().isPresent()) {
+  // load full job config with ConfigLoader
+  Config originalConfig = ConfigUtil.loadConfig(coordinatorSystemConfig);
+
+  // Execute planning
+  ApplicationDescriptorImpl
+  appDesc = 
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig),
 originalConfig);
+  RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
+  List jobConfigs = planner.prepareJobs();
 
 Review comment:
   Good question, both RemoteApplicationRunner and LocalJobPlanner themselves 
only works for single-job applications anyway, we may update them, but I think 
it is better to do it separately. It also relies on our plan to support multi 
job applications too. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services