[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r368107532 ## File path: samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java ## @@ -198,4 +203,31 @@ public void testRunWithClassLoader() throws Exception { // make sure runClusterBasedJobCoordinator only got called once verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)}); } + + @Test(expected = SamzaException.class) + public void testCreateFromConfigLoaderWithoutConfigLoaderFactory() { +ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig()); + } + + @Test + public void testCreateFromConfigLoader() throws Exception { +// partially mock ClusterBasedJobCoordinator (mock prepareJob method only) +PowerMockito.spy(ClusterBasedJobCoordinator.class); + +Map config = new HashMap<>(); +config.put(ApplicationConfig.APP_CLASS, MockStreamApplication.class.getCanonicalName()); +config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName()); +config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", +getClass().getResource("/test.properties").getPath()); + +PowerMockito.doAnswer(invocation -> invocation.getArgumentAt(0, Config.class)) +.when(ClusterBasedJobCoordinator.class, "prepareJob", any()); + PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mock(ClusterBasedJobCoordinator.class)); + PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mock(CoordinatorStreamStore.class)); + +ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig(config)); + +verifyPrivate(ClusterBasedJobCoordinator.class).invoke("prepareJob", any()); +verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(), any(), any()); Review comment: Updated the unit test have to explicit validation on arguments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r368084308 ## File path: samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java ## @@ -161,6 +162,31 @@ public void testApplyRewriterClassDoesNotExist() { assertEquals(expectedConfig, ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } + @Test + public void testLoadConfigWithoutLoader() { +Map config = new HashMap<>(); +config.put(JobConfig.JOB_NAME, "new-test-job"); + +Config actual = ConfigUtil.loadConfig(new MapConfig(config)); + +assertEquals(config.size(), actual.size()); +assertEquals("new-test-job", actual.get(JobConfig.JOB_NAME)); + } + + @Test + public void testLoadConfigWithLoader() { Review comment: Updated to include both overrides and rewrites. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r368079520 ## File path: samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java ## @@ -67,4 +72,42 @@ public static Config applyRewriter(Config config, String rewriterName) { LOG.info("Re-writing config with {}", rewriter); return rewriter.rewrite(rewriterName, config); } + + /** + * Load full job config with {@link ConfigLoaderFactory} when present. + * + * @param original config + * @return full job config + */ + public static Config loadConfig(Config original) { +JobConfig jobConfig = new JobConfig(original); +Config fullConfig = original; + +if (jobConfig.getConfigLoaderFactory().isPresent()) { + ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); + ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); + // overrides config loaded with original config, which may contain overridden values. + fullConfig = ConfigUtil.rewriteConfig(override(loader.getConfig(), original)); +} + +return fullConfig; + } + + /** + * Overrides original config with overridden values. + * + * @param original config to be overridden. + * @param overrides overridden values. + * @return the overridden config. + */ + @SafeVarargs + public static Config override(Config original, Map... overrides) { Review comment: Updated to private This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r368079318 ## File path: samza-core/src/test/java/org/apache/samza/util/TestConfigUtil.java ## @@ -161,6 +162,31 @@ public void testApplyRewriterClassDoesNotExist() { assertEquals(expectedConfig, ConfigUtil.applyRewriter(new MapConfig(fullConfig), REWRITER_NAME)); } + @Test + public void testLoadConfigWithoutLoader() { +Map config = new HashMap<>(); +config.put(JobConfig.JOB_NAME, "new-test-job"); + +Config actual = ConfigUtil.loadConfig(new MapConfig(config)); + +assertEquals(config.size(), actual.size()); +assertEquals("new-test-job", actual.get(JobConfig.JOB_NAME)); Review comment: Update the unit test to check ConfigException instead as we disallow config w/o loader. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r368078024 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -480,20 +485,101 @@ private static void executeRunClusterBasedJobCoordinatorForClass(Class cluste * {@link #main(String[])} so that it can be executed directly or from a separate classloader. */ private static void runClusterBasedJobCoordinator(String[] args) { -Config coordinatorSystemConfig; final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()); -try { - //Read and parse the coordinator system config. - LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); - coordinatorSystemConfig = - new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); - LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); -} catch (IOException e) { - LOG.error("Exception while reading coordinator stream config", e); - throw new SamzaException(e); +final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG()); + +if (submissionEnv != null) { + Config submissionConfig; + try { +//Read and parse the coordinator system config. +LOG.info("Parsing submission config {}", submissionEnv); +submissionConfig = +new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class)); +LOG.info("Using the submission config: {}.", submissionConfig); + } catch (IOException e) { +LOG.error("Exception while reading submission config", e); +throw new SamzaException(e); + } + + ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); +} else { + // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + Config coordinatorSystemConfig; + try { +//Read and parse the coordinator system config. +LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); +coordinatorSystemConfig = +new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); +LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); + } catch (IOException e) { +LOG.error("Exception while reading coordinator stream config", e); +throw new SamzaException(e); + } + ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); +} + } + + /** + * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from + * coordinator stream. + * + * @param metadataStoreConfig to initialize {@link MetadataStore} + * @return {@link ClusterBasedJobCoordinator} + */ + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + public static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) { +MetricsRegistryMap metrics = new MetricsRegistryMap(); + +CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics); +coordinatorStreamStore.init(); +Config config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); + +return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, config); + } + + /** + * Initialize {@link ClusterBasedJobCoordinator} with submission config, full job config will be fetched using + * specified {@link org.apache.samza.config.ConfigLoaderFactory} + * + * @param submissionConfig specifies {@link org.apache.samza.config.ConfigLoaderFactory} + * @return {@link ClusterBasedJobCoordinator} + */ + public static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig) { Review comment: Update to package private and mark as `VisibleForTesting` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r368077678 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -480,20 +485,101 @@ private static void executeRunClusterBasedJobCoordinatorForClass(Class cluste * {@link #main(String[])} so that it can be executed directly or from a separate classloader. */ private static void runClusterBasedJobCoordinator(String[] args) { -Config coordinatorSystemConfig; final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()); -try { - //Read and parse the coordinator system config. - LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); - coordinatorSystemConfig = - new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); - LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); -} catch (IOException e) { - LOG.error("Exception while reading coordinator stream config", e); - throw new SamzaException(e); +final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG()); + +if (submissionEnv != null) { + Config submissionConfig; + try { +//Read and parse the coordinator system config. +LOG.info("Parsing submission config {}", submissionEnv); +submissionConfig = +new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class)); +LOG.info("Using the submission config: {}.", submissionConfig); + } catch (IOException e) { +LOG.error("Exception while reading submission config", e); +throw new SamzaException(e); + } + + ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); +} else { + // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + Config coordinatorSystemConfig; + try { +//Read and parse the coordinator system config. +LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); +coordinatorSystemConfig = +new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); +LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); + } catch (IOException e) { +LOG.error("Exception while reading coordinator stream config", e); +throw new SamzaException(e); + } + ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); +} + } + + /** + * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from + * coordinator stream. + * + * @param metadataStoreConfig to initialize {@link MetadataStore} + * @return {@link ClusterBasedJobCoordinator} + */ + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + public static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) { Review comment: Update to private. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r368077451 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -480,20 +485,101 @@ private static void executeRunClusterBasedJobCoordinatorForClass(Class cluste * {@link #main(String[])} so that it can be executed directly or from a separate classloader. */ private static void runClusterBasedJobCoordinator(String[] args) { -Config coordinatorSystemConfig; final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()); -try { - //Read and parse the coordinator system config. - LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); - coordinatorSystemConfig = - new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); - LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); -} catch (IOException e) { - LOG.error("Exception while reading coordinator stream config", e); - throw new SamzaException(e); +final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG()); + +if (submissionEnv != null) { + Config submissionConfig; + try { +//Read and parse the coordinator system config. +LOG.info("Parsing submission config {}", submissionEnv); +submissionConfig = +new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class)); +LOG.info("Using the submission config: {}.", submissionConfig); + } catch (IOException e) { +LOG.error("Exception while reading submission config", e); +throw new SamzaException(e); + } + + ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); +} else { + // TODO: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + Config coordinatorSystemConfig; + try { +//Read and parse the coordinator system config. +LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); +coordinatorSystemConfig = +new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); +LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); + } catch (IOException e) { +LOG.error("Exception while reading coordinator stream config", e); +throw new SamzaException(e); + } + ClusterBasedJobCoordinator jc = createFromMetadataStore(coordinatorSystemConfig); + jc.run(); + LOG.info("Finished running ClusterBasedJobCoordinator"); +} + } + + /** + * Initialize {@link ClusterBasedJobCoordinator} with coordinator stream config, full job config will be fetched from + * coordinator stream. + * + * @param metadataStoreConfig to initialize {@link MetadataStore} + * @return {@link ClusterBasedJobCoordinator} + */ + // TODO SAMZA-2432: Clean this up once SAMZA-2405 is completed when legacy flow is removed. + public static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreConfig) { +MetricsRegistryMap metrics = new MetricsRegistryMap(); + +CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(metadataStoreConfig, metrics); +coordinatorStreamStore.init(); Review comment: Update to call outside constructor as we need it to be initialized before we can read full job config from it in the legacy flow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r368076765 ## File path: samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java ## @@ -67,4 +72,42 @@ public static Config applyRewriter(Config config, String rewriterName) { LOG.info("Re-writing config with {}", rewriter); return rewriter.rewrite(rewriterName, config); } + + /** + * Load full job config with {@link ConfigLoaderFactory} when present. + * + * @param original config + * @return full job config + */ + public static Config loadConfig(Config original) { +JobConfig jobConfig = new JobConfig(original); +Config fullConfig = original; + +if (jobConfig.getConfigLoaderFactory().isPresent()) { + ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); + ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); + // overrides config loaded with original config, which may contain overridden values. + fullConfig = override(ConfigUtil.rewriteConfig(loader.getConfig()), original); +} Review comment: Updated to disallow the case when ConfigLoaderFactory is not present. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r368074498 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -480,20 +510,41 @@ private static void executeRunClusterBasedJobCoordinatorForClass(Class cluste * {@link #main(String[])} so that it can be executed directly or from a separate classloader. */ private static void runClusterBasedJobCoordinator(String[] args) { -Config coordinatorSystemConfig; final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()); -try { - //Read and parse the coordinator system config. - LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); - coordinatorSystemConfig = - new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); - LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); -} catch (IOException e) { - LOG.error("Exception while reading coordinator stream config", e); - throw new SamzaException(e); +final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG()); + +if (submissionEnv != null) { Review comment: My bad, changes are lost during updates. Updated again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r367146958 ## File path: samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java ## @@ -67,4 +72,42 @@ public static Config applyRewriter(Config config, String rewriterName) { LOG.info("Re-writing config with {}", rewriter); return rewriter.rewrite(rewriterName, config); } + + /** + * Load full job config with {@link ConfigLoaderFactory} when present. + * + * @param original config + * @return full job config + */ + public static Config loadConfig(Config original) { +JobConfig jobConfig = new JobConfig(original); +Config fullConfig = original; + +if (jobConfig.getConfigLoaderFactory().isPresent()) { + ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); + ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); + // overrides config loaded with original config, which may contain overridden values. + fullConfig = override(ConfigUtil.rewriteConfig(loader.getConfig()), original); Review comment: Good point, we should override the original config with provided overrides and then invoke config rewriters. Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r367138308 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -172,15 +179,39 @@ * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually * run the jobcoordinator. * - * @param coordinatorSystemConfig the coordinator stream config that can be used to read the - *{@link org.apache.samza.job.model.JobModel} from. + * @param jobCoordinatorConfig job coordinator config that either contains coordinator stream properties + * or config loader properties to load full job config. */ - public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { + public ClusterBasedJobCoordinator(Config jobCoordinatorConfig) { metrics = new MetricsRegistryMap(); -coordinatorStreamStore = new CoordinatorStreamStore(coordinatorSystemConfig, metrics); -coordinatorStreamStore.init(); -config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); +JobConfig jobConfig = new JobConfig(jobCoordinatorConfig); + +if (jobConfig.getConfigLoaderFactory().isPresent()) { + // load full job config with ConfigLoader + Config originalConfig = ConfigUtil.loadConfig(jobCoordinatorConfig); + + // Execute planning + ApplicationDescriptorImpl + appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig); + RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); + List jobConfigs = planner.prepareJobs(); + + if (jobConfigs.size() != 1) { +throw new SamzaException("Only support single remote job is supported."); + } + + config = jobConfigs.get(0); + coordinatorStreamStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics); + coordinatorStreamStore.init(); + CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true); + DiagnosticsUtil.createDiagnosticsStream(config); Review comment: Added a comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r366624883 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -178,9 +185,33 @@ public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { Review comment: Renamed to jobCoordinatorConfig and updated the javadoc as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r366610778 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -178,9 +185,33 @@ public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { Review comment: I was planning to rename this variable when cleaning up the legacy flow, but we can choose a generic name in the mid now as well, what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r365993189 ## File path: samza-core/src/main/java/org/apache/samza/util/ConfigUtil.java ## @@ -67,4 +72,42 @@ public static Config applyRewriter(Config config, String rewriterName) { LOG.info("Re-writing config with {}", rewriter); return rewriter.rewrite(rewriterName, config); } + + /** + * Load full job config with {@link ConfigLoaderFactory} when present. + * + * @param original config + * @return full job config + */ + public static Config loadConfig(Config original) { +JobConfig jobConfig = new JobConfig(original); +Config fullConfig = original; + +if (jobConfig.getConfigLoaderFactory().isPresent()) { + ConfigLoaderFactory factory = ReflectionUtil.getObj(jobConfig.getConfigLoaderFactory().get(), ConfigLoaderFactory.class); + ConfigLoader loader = factory.getLoader(original.subset(ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX)); + // overrides config loaded with original config, which may contain overridden values. + fullConfig = override(ConfigUtil.rewriteConfig(loader.getConfig()), original); +} Review comment: Eventually Yes, but during the migration, there will be cases where jobs are still following the legacy submission flow, but still goes to this new method in code. For example, this util will be called by LocalApplicationRunner too, which will first support both legacy and new flow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r365487071 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -178,9 +185,35 @@ public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { metrics = new MetricsRegistryMap(); -coordinatorStreamStore = new CoordinatorStreamStore(coordinatorSystemConfig, metrics); -coordinatorStreamStore.init(); -config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); +JobConfig jobConfig = new JobConfig(coordinatorSystemConfig); + +if (jobConfig.getConfigLoaderFactory().isPresent()) { + // load full job config with ConfigLoader + Config originalConfig = ConfigUtil.loadConfig(coordinatorSystemConfig); + + // Execute planning + ApplicationDescriptorImpl + appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig); + RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); + List jobConfigs = planner.prepareJobs(); + + if (jobConfigs.size() != 1) { +throw new SamzaException("Only support single remote job is supported."); + } + + // Merge with default coordinator stream config + config = ConfigUtil.override(jobConfigs.get(0), CoordinatorStreamUtil.buildCoordinatorStreamConfig(jobConfigs.get(0))); Review comment: Good question, you are right, we should not need to merge it with the full job config, we only need to build it from the full job config and initialize CoordinatorStreamStore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r365486468 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -480,20 +512,41 @@ private static void executeRunClusterBasedJobCoordinatorForClass(Class cluste * {@link #main(String[])} so that it can be executed directly or from a separate classloader. */ private static void runClusterBasedJobCoordinator(String[] args) { -Config coordinatorSystemConfig; final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()); -try { - //Read and parse the coordinator system config. - LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv); - coordinatorSystemConfig = - new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); - LOG.info("Using the coordinator system config: {}.", coordinatorSystemConfig); -} catch (IOException e) { - LOG.error("Exception while reading coordinator stream config", e); - throw new SamzaException(e); +final String submissionEnv = System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG()); + +if (submissionEnv != null) { + Config submissionConfig; + try { +//Read and parse the coordinator system config. +LOG.info("Parsing submission config {}", submissionEnv); +submissionConfig = +new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(submissionEnv, Config.class)); +LOG.info("Using the submission config: {}.", submissionEnv); Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader.
kw2542 commented on a change in pull request #1248: SAMZA-2410: Update ClusterBasedJobCoordinator config retrieval logic from loader. URL: https://github.com/apache/samza/pull/1248#discussion_r365486270 ## File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ## @@ -178,9 +185,35 @@ public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { metrics = new MetricsRegistryMap(); -coordinatorStreamStore = new CoordinatorStreamStore(coordinatorSystemConfig, metrics); -coordinatorStreamStore.init(); -config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore); +JobConfig jobConfig = new JobConfig(coordinatorSystemConfig); + +if (jobConfig.getConfigLoaderFactory().isPresent()) { + // load full job config with ConfigLoader + Config originalConfig = ConfigUtil.loadConfig(coordinatorSystemConfig); + + // Execute planning + ApplicationDescriptorImpl + appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig); + RemoteJobPlanner planner = new RemoteJobPlanner(appDesc); + List jobConfigs = planner.prepareJobs(); Review comment: Good question, both RemoteApplicationRunner and LocalJobPlanner themselves only works for single-job applications anyway, we may update them, but I think it is better to do it separately. It also relies on our plan to support multi job applications too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services