Re: ApacheCon North America 2019: Proposal accepted!
Hi @Abhishek Tiwari , If Gobblin has twitter handle, pls post the news there with #ApacheCon and/or #ACNA19 tags. Thanks On Thu, Jun 6, 2019 at 10:09 AM Tamas Nemeth wrote: > Wow, awesome, congrats! > > On 2019. Jun 6., Thu at 19:08, Abhishek Tiwari wrote: > > > Congrats! Thats fabulous news. > > > > Abhishek > > > > On Thu, Jun 6, 2019 at 10:06 AM Jay Sen wrote: > > > > > Hi Gobblin Team, > > > > > > we are accepted for the ApacheCon presentation. > > > Lets talk about this in next Gobblin Meetup. > > > > > > Thanks > > > Jay > > > > > > -- Forwarded message - > > > From: ApacheCon North America 2019 via JamHosted < > > > no-re...@asf.jamhosted.net > > > > > > > Date: Thu, Jun 6, 2019 at 8:07 AM > > > Subject: ApacheCon North America 2019: Proposal accepted! > > > To: Jay Sen > > > > > > > > > Dear Jay Sen, > > > > > > Congratulations! We are pleased to tell you that your talk, "Data > > Movement > > > & Integration at PayPal & LinkedIn using Apache Gobblin” > > > has been accepted for ApacheCon North America 2019. (If you submitted > > > additional proposals, you > > > will receive separate notifications regarding each proposal.) > > > > > > IMPORTANT: Please confirm that you will be attending by visiting the > > > following URL: > > > This will ensure that we register you > for > > > the event. If you > > > do not confirm that you'll be attending, we'll have to replace your > talk > > > with > > > another one. > > > > > > Once the schedule is published - > > > https://www.apachecon.com/acna19/schedule.html - > > > please check it to see where your talk has been scheduled. Note that > this > > > is > > > subject to change. If you CANNOT speak where you have been scheduled, > > > please let > > > us know. We cannot promise to schedule you in your preferred spot, but > we > > > do wish > > > to avoid no-shows. > > > > > > Once you have confirmed that you'll be joining us, and once the > schedule > > is > > > finalized, we'll be following up with more details about the event, so > > > please > > > hold your questions for a moment while we compile that speaker packet. > > But, > > > since you all always ask this, we wanted to let you know that the > > > projectors are 16:10, 1024x768. > > > > > > If you need help, such as an invitation letter, for acquiring a travel > > > visa, > > > please contact acna...@apache.org > > > > > > If you need financial assistance for travel and lodging, please > consider > > > applying for our TAC (Travel Assistance Committee) support, at > > > https://www.apache.org/travel/ > > > > > > Please help us promote your talk, and the conference in general, by > > posting > > > about it on your project mailing lists, social media, and your blog. > > Please > > > use #ApacheCon and/or #ACNA19 to tag the posts, so that we can find and > > > amplify them. > > > > > > With regards, > > > The team behind ApacheCon North America 2019 > > > > > >
[jira] [Work logged] (GOBBLIN-790) Mysql Implementation of DagStateStore
[ https://issues.apache.org/jira/browse/GOBBLIN-790?focusedWorklogId=257387=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257387 ] ASF GitHub Bot logged work on GOBBLIN-790: -- Author: ASF GitHub Bot Created on: 11/Jun/19 04:02 Start Date: 11/Jun/19 04:02 Worklog Time Spent: 10m Work Description: sv2000 commented on pull request #2656: [GOBBLIN-790]DagStateStore MySQL URL: https://github.com/apache/incubator-gobblin/pull/2656#discussion_r292270078 ## File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ## @@ -582,11 +581,18 @@ private void cleanUp() { } } -private void cleanUpDag(String dagId) { - Dag dag = this.dags.get(dagId); - this.dagToJobs.remove(dagId); - this.dagStateStore.cleanUp(dag); +/** + * Cleaning of all relevant states need to be + * @param dagId + */ +private synchronized void cleanUpDag(String dagId) { + try { + this.dagStateStore.cleanUp(dags.get(dagId)); + } catch (IOException ioe) { + log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe); Review comment: Not necessarily better. I have seen parametrized logging being used more frequently in Gobblin code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257387) Time Spent: 1h 40m (was: 1.5h) > Mysql Implementation of DagStateStore > - > > Key: GOBBLIN-790 > URL: https://issues.apache.org/jira/browse/GOBBLIN-790 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Lei Sun >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2656: [GOBBLIN-790]DagStateStore MySQL
sv2000 commented on a change in pull request #2656: [GOBBLIN-790]DagStateStore MySQL URL: https://github.com/apache/incubator-gobblin/pull/2656#discussion_r292270078 ## File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ## @@ -582,11 +581,18 @@ private void cleanUp() { } } -private void cleanUpDag(String dagId) { - Dag dag = this.dags.get(dagId); - this.dagToJobs.remove(dagId); - this.dagStateStore.cleanUp(dag); +/** + * Cleaning of all relevant states need to be + * @param dagId + */ +private synchronized void cleanUpDag(String dagId) { + try { + this.dagStateStore.cleanUp(dags.get(dagId)); + } catch (IOException ioe) { + log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe); Review comment: Not necessarily better. I have seen parametrized logging being used more frequently in Gobblin code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-796) Add support partial updates for flowConfig
[ https://issues.apache.org/jira/browse/GOBBLIN-796?focusedWorklogId=257306=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257306 ] ASF GitHub Bot logged work on GOBBLIN-796: -- Author: ASF GitHub Bot Created on: 11/Jun/19 00:48 Start Date: 11/Jun/19 00:48 Worklog Time Spent: 10m Work Description: sv2000 commented on pull request #2663: [GOBBLIN-796] Add support partial updates for flowConfig URL: https://github.com/apache/incubator-gobblin/pull/2663#discussion_r292242390 ## File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java ## @@ -121,6 +123,25 @@ public void updateFlowConfig(FlowConfig flowConfig) response.getResponse(); } + /** + * Partially update a flow configuration + * @param flowId flow ID to update + * @param flowConfigPatch {@link PatchRequest} containing changes to the flowConfig + * @throws RemoteInvocationException + */ + public void partialUpdateFlowConfig(FlowId flowId, PatchRequest flowConfigPatch) throws RemoteInvocationException { Review comment: Why is the partialUpdateFlowConfig() method duplicated in 3 different classes? Maybe move this to a Utils class? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257306) Time Spent: 1h 20m (was: 1h 10m) > Add support partial updates for flowConfig > -- > > Key: GOBBLIN-796 > URL: https://issues.apache.org/jira/browse/GOBBLIN-796 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Jack Moseley >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2663: [GOBBLIN-796] Add support partial updates for flowConfig
sv2000 commented on a change in pull request #2663: [GOBBLIN-796] Add support partial updates for flowConfig URL: https://github.com/apache/incubator-gobblin/pull/2663#discussion_r292242390 ## File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java ## @@ -121,6 +123,25 @@ public void updateFlowConfig(FlowConfig flowConfig) response.getResponse(); } + /** + * Partially update a flow configuration + * @param flowId flow ID to update + * @param flowConfigPatch {@link PatchRequest} containing changes to the flowConfig + * @throws RemoteInvocationException + */ + public void partialUpdateFlowConfig(FlowId flowId, PatchRequest flowConfigPatch) throws RemoteInvocationException { Review comment: Why is the partialUpdateFlowConfig() method duplicated in 3 different classes? Maybe move this to a Utils class? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-800) Remove the metric context cache from GobblinMetricsRegistry
[ https://issues.apache.org/jira/browse/GOBBLIN-800?focusedWorklogId=257296=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257296 ] ASF GitHub Bot logged work on GOBBLIN-800: -- Author: ASF GitHub Bot Created on: 10/Jun/19 23:50 Start Date: 10/Jun/19 23:50 Worklog Time Spent: 10m Work Description: yukuai518 commented on pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry URL: https://github.com/apache/incubator-gobblin/pull/2667#discussion_r292232789 ## File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.runtime.util; + +import java.util.List; +import java.util.concurrent.Callable; + +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.fork.Fork; + +/** + * An extension to {@link GobblinMetrics} specifically for {@link Fork}. + */ +public class ForkMetrics extends GobblinMetrics { + private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName"; + + protected ForkMetrics(TaskState taskState, int index) { +super(name(taskState, index), parentContextForFork(taskState), getForkMetricsTags(taskState, index)); + } + + private static MetricContext parentContextForFork(TaskState taskState) { +return TaskMetrics.get(METRICS_ID_PREFIX + taskState.getJobId() + "." + taskState.getTaskId()).getMetricContext(); + } + + public static ForkMetrics get(final TaskState taskState, int index) { +return (ForkMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(taskState, index), new Callable() { + @Override + public GobblinMetrics call() throws Exception { +return new ForkMetrics(taskState, index); + } +}); + } + + /** + * Creates a unique {@link String} representing this branch. + */ + private static String getForkMetricsId(State state, int index) { +return state.getProp(ConfigurationKeys.FORK_BRANCH_NAME_KEY + "." + index, +ConfigurationKeys.DEFAULT_FORK_BRANCH_NAME + index); + } + + /** + * Creates a {@link List} of {@link Tag}s for a {@link Fork} instance. The {@link Tag}s are purely based on the + * index and the branch name. + */ + private static List> getForkMetricsTags(State state, int index) { +return ImmutableList.>of(new Tag<>(FORK_METRICS_BRANCH_NAME_KEY, getForkMetricsId(state, index))); Review comment: The getForkMetricsId never returns null, because it has a default value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257296) Time Spent: 1h 10m (was: 1h) > Remove the metric context cache from GobblinMetricsRegistry > --- > > Key: GOBBLIN-800 > URL: https://issues.apache.org/jira/browse/GOBBLIN-800 > Project: Apache Gobblin > Issue Type: Bug >Reporter: Kuai Yu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Remove the metric context cache from GobblinMetricsRegistry -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-800) Remove the metric context cache from GobblinMetricsRegistry
[ https://issues.apache.org/jira/browse/GOBBLIN-800?focusedWorklogId=257297=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257297 ] ASF GitHub Bot logged work on GOBBLIN-800: -- Author: ASF GitHub Bot Created on: 10/Jun/19 23:50 Start Date: 10/Jun/19 23:50 Worklog Time Spent: 10m Work Description: yukuai518 commented on pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry URL: https://github.com/apache/incubator-gobblin/pull/2667#discussion_r292213096 ## File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ## @@ -451,13 +452,26 @@ private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch coun public void runAndOptionallyCommitTaskAttempt(CommitPolicy multiTaskAttemptCommitPolicy) throws IOException, InterruptedException { -run(); -if (multiTaskAttemptCommitPolicy.equals(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE)) { - this.log.info("Will commit tasks directly."); - commit(); -} else if (!isSpeculativeExecutionSafe()) { - throw new RuntimeException( - "Speculative execution is enabled. However, the task context is not safe for speculative execution."); +try { + run(); + if (multiTaskAttemptCommitPolicy.equals(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE)) { +this.log.info("Will commit tasks directly."); +commit(); + } else if (!isSpeculativeExecutionSafe()) { +throw new RuntimeException( +"Speculative execution is enabled. However, the task context is not safe for speculative execution."); + } +} finally { + // During the task execution, the fork/task instances will create metric contexts (fork, task, job, container) + // along the hierarchy up to the root metric context. Although root metric context has a weak reference to + // those metric contexts, they are meanwhile cached by GobblinMetricsRegistry. Here we will remove all those + // strong reference from the cache to make sure it can be reclaimed by Java GC when JVM has run out of memory. + + this.tasks.forEach(task-> { +TaskMetrics.remove(task); + }); + + JobMetrics.remove(GobblinMetrics.METRICS_ID_PREFIX + jobState.getJobId()); Review comment: Most of our Gobblin code runs in distributed mode, like MRJobLauncher, HelixJobLauncher, etc. I will move this logic outside of GobblinMultiTaskAttempt to make sure it doesn't break LocalJobLauncher. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257297) Time Spent: 1h 20m (was: 1h 10m) > Remove the metric context cache from GobblinMetricsRegistry > --- > > Key: GOBBLIN-800 > URL: https://issues.apache.org/jira/browse/GOBBLIN-800 > Project: Apache Gobblin > Issue Type: Bug >Reporter: Kuai Yu >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Remove the metric context cache from GobblinMetricsRegistry -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-802) register Gauge in innerMetricsContext
[ https://issues.apache.org/jira/browse/GOBBLIN-802?focusedWorklogId=257269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257269 ] ASF GitHub Bot logged work on GOBBLIN-802: -- Author: ASF GitHub Bot Created on: 10/Jun/19 22:47 Start Date: 10/Jun/19 22:47 Worklog Time Spent: 10m Work Description: arjun4084346 commented on pull request #2668: [GOBBLIN-802] change gauge metrics context to RootMetricsContext URL: https://github.com/apache/incubator-gobblin/pull/2668 Dear Gobblin maintainers, Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! @sv2000 please review ### JIRA - [x] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR" - https://issues.apache.org/jira/browse/GOBBLIN-XXX ### Description - [x] Here are some details about my PR, including screenshots (if applicable): 1 - this PR adds context aware gauge to get registered in innerMetricsContext 2 - make one of the gauge creation more readable ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: changes are trivial ### Commits - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257269) Time Spent: 10m Remaining Estimate: 0h > register Gauge in innerMetricsContext > - > > Key: GOBBLIN-802 > URL: https://issues.apache.org/jira/browse/GOBBLIN-802 > Project: Apache Gobblin > Issue Type: Bug >Reporter: Arjun Singh Bora >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] arjun4084346 opened a new pull request #2668: [GOBBLIN-802] change gauge metrics context to RootMetricsContext
arjun4084346 opened a new pull request #2668: [GOBBLIN-802] change gauge metrics context to RootMetricsContext URL: https://github.com/apache/incubator-gobblin/pull/2668 Dear Gobblin maintainers, Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! @sv2000 please review ### JIRA - [x] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR" - https://issues.apache.org/jira/browse/GOBBLIN-XXX ### Description - [x] Here are some details about my PR, including screenshots (if applicable): 1 - this PR adds context aware gauge to get registered in innerMetricsContext 2 - make one of the gauge creation more readable ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: changes are trivial ### Commits - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (GOBBLIN-802) register Gauge in innerMetricsContext
Arjun Singh Bora created GOBBLIN-802: Summary: register Gauge in innerMetricsContext Key: GOBBLIN-802 URL: https://issues.apache.org/jira/browse/GOBBLIN-802 Project: Apache Gobblin Issue Type: Bug Reporter: Arjun Singh Bora -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-800) Remove the metric context cache from GobblinMetricsRegistry
[ https://issues.apache.org/jira/browse/GOBBLIN-800?focusedWorklogId=257245=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257245 ] ASF GitHub Bot logged work on GOBBLIN-800: -- Author: ASF GitHub Bot Created on: 10/Jun/19 22:16 Start Date: 10/Jun/19 22:16 Worklog Time Spent: 10m Work Description: yukuai518 commented on pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry URL: https://github.com/apache/incubator-gobblin/pull/2667#discussion_r292213096 ## File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ## @@ -451,13 +452,26 @@ private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch coun public void runAndOptionallyCommitTaskAttempt(CommitPolicy multiTaskAttemptCommitPolicy) throws IOException, InterruptedException { -run(); -if (multiTaskAttemptCommitPolicy.equals(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE)) { - this.log.info("Will commit tasks directly."); - commit(); -} else if (!isSpeculativeExecutionSafe()) { - throw new RuntimeException( - "Speculative execution is enabled. However, the task context is not safe for speculative execution."); +try { + run(); + if (multiTaskAttemptCommitPolicy.equals(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE)) { +this.log.info("Will commit tasks directly."); +commit(); + } else if (!isSpeculativeExecutionSafe()) { +throw new RuntimeException( +"Speculative execution is enabled. However, the task context is not safe for speculative execution."); + } +} finally { + // During the task execution, the fork/task instances will create metric contexts (fork, task, job, container) + // along the hierarchy up to the root metric context. Although root metric context has a weak reference to + // those metric contexts, they are meanwhile cached by GobblinMetricsRegistry. Here we will remove all those + // strong reference from the cache to make sure it can be reclaimed by Java GC when JVM has run out of memory. + + this.tasks.forEach(task-> { +TaskMetrics.remove(task); + }); + + JobMetrics.remove(GobblinMetrics.METRICS_ID_PREFIX + jobState.getJobId()); Review comment: Most of our Gobblin code runs in distributed mode, like MRJobLauncher, HelixJobLauncher, etc. I will add the defensive check to make sure that this GobblinMultiTaskAttempt was actually in the separate process. But even for LocalJobLauncher, GobblinMetricsRegistry is actually a cache, there is no guarantee that JobMetrics cannot be evicted if TaskMetrics is still alive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257245) Time Spent: 1h (was: 50m) > Remove the metric context cache from GobblinMetricsRegistry > --- > > Key: GOBBLIN-800 > URL: https://issues.apache.org/jira/browse/GOBBLIN-800 > Project: Apache Gobblin > Issue Type: Bug >Reporter: Kuai Yu >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Remove the metric context cache from GobblinMetricsRegistry -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] yukuai518 commented on a change in pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry
yukuai518 commented on a change in pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry URL: https://github.com/apache/incubator-gobblin/pull/2667#discussion_r292213096 ## File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ## @@ -451,13 +452,26 @@ private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch coun public void runAndOptionallyCommitTaskAttempt(CommitPolicy multiTaskAttemptCommitPolicy) throws IOException, InterruptedException { -run(); -if (multiTaskAttemptCommitPolicy.equals(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE)) { - this.log.info("Will commit tasks directly."); - commit(); -} else if (!isSpeculativeExecutionSafe()) { - throw new RuntimeException( - "Speculative execution is enabled. However, the task context is not safe for speculative execution."); +try { + run(); + if (multiTaskAttemptCommitPolicy.equals(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE)) { +this.log.info("Will commit tasks directly."); +commit(); + } else if (!isSpeculativeExecutionSafe()) { +throw new RuntimeException( +"Speculative execution is enabled. However, the task context is not safe for speculative execution."); + } +} finally { + // During the task execution, the fork/task instances will create metric contexts (fork, task, job, container) + // along the hierarchy up to the root metric context. Although root metric context has a weak reference to + // those metric contexts, they are meanwhile cached by GobblinMetricsRegistry. Here we will remove all those + // strong reference from the cache to make sure it can be reclaimed by Java GC when JVM has run out of memory. + + this.tasks.forEach(task-> { +TaskMetrics.remove(task); + }); + + JobMetrics.remove(GobblinMetrics.METRICS_ID_PREFIX + jobState.getJobId()); Review comment: Most of our Gobblin code runs in distributed mode, like MRJobLauncher, HelixJobLauncher, etc. I will add the defensive check to make sure that this GobblinMultiTaskAttempt was actually in the separate process. But even for LocalJobLauncher, GobblinMetricsRegistry is actually a cache, there is no guarantee that JobMetrics cannot be evicted if TaskMetrics is still alive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-785) remove wrapper isPartition function, use table.isPartitioned instead
[ https://issues.apache.org/jira/browse/GOBBLIN-785?focusedWorklogId=257244=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257244 ] ASF GitHub Bot logged work on GOBBLIN-785: -- Author: ASF GitHub Bot Created on: 10/Jun/19 22:14 Start Date: 10/Jun/19 22:14 Worklog Time Spent: 10m Work Description: ibuenros commented on pull request #2650: [GOBBLIN-785] remove wrapper isPartition function, use table.isPartitioned instead URL: https://github.com/apache/incubator-gobblin/pull/2650#discussion_r292212680 ## File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java ## @@ -163,10 +163,4 @@ private static Configuration getHadoopConfiguration() { return conf; } - /** - * @return true if {@link Table} is partitioned. - */ - public static boolean isPartitioned(Table table) { Review comment: I don't see the method added back, did you push the change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257244) Time Spent: 50m (was: 40m) > remove wrapper isPartition function, use table.isPartitioned instead > > > Key: GOBBLIN-785 > URL: https://issues.apache.org/jira/browse/GOBBLIN-785 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Jay Sen >Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > some places use {{HiveUtils.isPartitioned()}} which is un-necessary wrapper > function, and some places uses direct {{HiveTable.isPartitioned()}} the later > should be used consistently. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] ibuenros commented on a change in pull request #2650: [GOBBLIN-785] remove wrapper isPartition function, use table.isPartitioned instead
ibuenros commented on a change in pull request #2650: [GOBBLIN-785] remove wrapper isPartition function, use table.isPartitioned instead URL: https://github.com/apache/incubator-gobblin/pull/2650#discussion_r292212680 ## File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java ## @@ -163,10 +163,4 @@ private static Configuration getHadoopConfiguration() { return conf; } - /** - * @return true if {@link Table} is partitioned. - */ - public static boolean isPartitioned(Table table) { Review comment: I don't see the method added back, did you push the change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-800) Remove the metric context cache from GobblinMetricsRegistry
[ https://issues.apache.org/jira/browse/GOBBLIN-800?focusedWorklogId=257239=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257239 ] ASF GitHub Bot logged work on GOBBLIN-800: -- Author: ASF GitHub Bot Created on: 10/Jun/19 22:10 Start Date: 10/Jun/19 22:10 Worklog Time Spent: 10m Work Description: yukuai518 commented on pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry URL: https://github.com/apache/incubator-gobblin/pull/2667#discussion_r292211586 ## File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.runtime.util; + +import java.util.List; +import java.util.concurrent.Callable; + +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.fork.Fork; + +/** + * An extension to {@link GobblinMetrics} specifically for {@link Fork}. + */ +public class ForkMetrics extends GobblinMetrics { Review comment: This is just refactoring This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257239) Time Spent: 50m (was: 40m) > Remove the metric context cache from GobblinMetricsRegistry > --- > > Key: GOBBLIN-800 > URL: https://issues.apache.org/jira/browse/GOBBLIN-800 > Project: Apache Gobblin > Issue Type: Bug >Reporter: Kuai Yu >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Remove the metric context cache from GobblinMetricsRegistry -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-800) Remove the metric context cache from GobblinMetricsRegistry
[ https://issues.apache.org/jira/browse/GOBBLIN-800?focusedWorklogId=257236=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257236 ] ASF GitHub Bot logged work on GOBBLIN-800: -- Author: ASF GitHub Bot Created on: 10/Jun/19 21:55 Start Date: 10/Jun/19 21:55 Worklog Time Spent: 10m Work Description: ibuenros commented on pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry URL: https://github.com/apache/incubator-gobblin/pull/2667#discussion_r292207566 ## File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.runtime.util; + +import java.util.List; +import java.util.concurrent.Callable; + +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.fork.Fork; + +/** + * An extension to {@link GobblinMetrics} specifically for {@link Fork}. + */ +public class ForkMetrics extends GobblinMetrics { Review comment: Is this just refactoring or is different logic being added? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257236) Time Spent: 40m (was: 0.5h) > Remove the metric context cache from GobblinMetricsRegistry > --- > > Key: GOBBLIN-800 > URL: https://issues.apache.org/jira/browse/GOBBLIN-800 > Project: Apache Gobblin > Issue Type: Bug >Reporter: Kuai Yu >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Remove the metric context cache from GobblinMetricsRegistry -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] ibuenros commented on a change in pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry
ibuenros commented on a change in pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry URL: https://github.com/apache/incubator-gobblin/pull/2667#discussion_r292207228 ## File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ## @@ -451,13 +452,26 @@ private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch coun public void runAndOptionallyCommitTaskAttempt(CommitPolicy multiTaskAttemptCommitPolicy) throws IOException, InterruptedException { -run(); -if (multiTaskAttemptCommitPolicy.equals(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE)) { - this.log.info("Will commit tasks directly."); - commit(); -} else if (!isSpeculativeExecutionSafe()) { - throw new RuntimeException( - "Speculative execution is enabled. However, the task context is not safe for speculative execution."); +try { + run(); + if (multiTaskAttemptCommitPolicy.equals(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE)) { +this.log.info("Will commit tasks directly."); +commit(); + } else if (!isSpeculativeExecutionSafe()) { +throw new RuntimeException( +"Speculative execution is enabled. However, the task context is not safe for speculative execution."); + } +} finally { + // During the task execution, the fork/task instances will create metric contexts (fork, task, job, container) + // along the hierarchy up to the root metric context. Although root metric context has a weak reference to + // those metric contexts, they are meanwhile cached by GobblinMetricsRegistry. Here we will remove all those + // strong reference from the cache to make sure it can be reclaimed by Java GC when JVM has run out of memory. + + this.tasks.forEach(task-> { +TaskMetrics.remove(task); + }); + + JobMetrics.remove(GobblinMetrics.METRICS_ID_PREFIX + jobState.getJobId()); Review comment: I don't think this should be done in here, the job will still run after the tasks are done. If this is needed for distributed mode, then let's check that we are running on a worker before removing the job metrics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] ibuenros commented on a change in pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry
ibuenros commented on a change in pull request #2667: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry URL: https://github.com/apache/incubator-gobblin/pull/2667#discussion_r292207566 ## File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.runtime.util; + +import java.util.List; +import java.util.concurrent.Callable; + +import com.google.common.collect.ImmutableList; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.fork.Fork; + +/** + * An extension to {@link GobblinMetrics} specifically for {@link Fork}. + */ +public class ForkMetrics extends GobblinMetrics { Review comment: Is this just refactoring or is different logic being added? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-gobblin] ZihanLi58 commented on issue #2666: [Gobblin-799]Fix bug in AvroSchemaCheckDefaultStrategy
ZihanLi58 commented on issue #2666: [Gobblin-799]Fix bug in AvroSchemaCheckDefaultStrategy URL: https://github.com/apache/incubator-gobblin/pull/2666#issuecomment-500595161 @ibuenros Can you take a look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-798) Clean up workflows from Helix when the Gobblin application master starts
[ https://issues.apache.org/jira/browse/GOBBLIN-798?focusedWorklogId=257000=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257000 ] ASF GitHub Bot logged work on GOBBLIN-798: -- Author: ASF GitHub Bot Created on: 10/Jun/19 17:05 Start Date: 10/Jun/19 17:05 Worklog Time Spent: 10m Work Description: asfgit commented on pull request #2665: [GOBBLIN-798] Clean up workflows from Helix when the Gobblin applicat… URL: https://github.com/apache/incubator-gobblin/pull/2665 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 257000) Time Spent: 2h (was: 1h 50m) > Clean up workflows from Helix when the Gobblin application master starts > > > Key: GOBBLIN-798 > URL: https://issues.apache.org/jira/browse/GOBBLIN-798 > Project: Apache Gobblin > Issue Type: Task >Reporter: Hung Tran >Assignee: Hung Tran >Priority: Major > Fix For: 0.15.0 > > Time Spent: 2h > Remaining Estimate: 0h > > If the application master aborts a new one may be spawned by YARN. The second > application master will resubmit the jobs. This results in duplicate jobs in > Helix and multiple instances of the job may run, resulting in duplicate data. > The Gobblin application master should clean up all workflows on startup to > avoid executing multiple instances of a job. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] asfgit closed pull request #2665: [GOBBLIN-798] Clean up workflows from Helix when the Gobblin applicat…
asfgit closed pull request #2665: [GOBBLIN-798] Clean up workflows from Helix when the Gobblin applicat… URL: https://github.com/apache/incubator-gobblin/pull/2665 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (GOBBLIN-798) Clean up workflows from Helix when the Gobblin application master starts
[ https://issues.apache.org/jira/browse/GOBBLIN-798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hung Tran resolved GOBBLIN-798. --- Resolution: Fixed Fix Version/s: 0.15.0 Issue resolved by pull request #2665 [https://github.com/apache/incubator-gobblin/pull/2665] > Clean up workflows from Helix when the Gobblin application master starts > > > Key: GOBBLIN-798 > URL: https://issues.apache.org/jira/browse/GOBBLIN-798 > Project: Apache Gobblin > Issue Type: Task >Reporter: Hung Tran >Assignee: Hung Tran >Priority: Major > Fix For: 0.15.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > If the application master aborts a new one may be spawned by YARN. The second > application master will resubmit the jobs. This results in duplicate jobs in > Helix and multiple instances of the job may run, resulting in duplicate data. > The Gobblin application master should clean up all workflows on startup to > avoid executing multiple instances of a job. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (GOBBLIN-790) Mysql Implementation of DagStateStore
[ https://issues.apache.org/jira/browse/GOBBLIN-790?focusedWorklogId=256952=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-256952 ] ASF GitHub Bot logged work on GOBBLIN-790: -- Author: ASF GitHub Bot Created on: 10/Jun/19 16:33 Start Date: 10/Jun/19 16:33 Worklog Time Spent: 10m Work Description: autumnust commented on pull request #2656: [GOBBLIN-790]DagStateStore MySQL URL: https://github.com/apache/incubator-gobblin/pull/2656#discussion_r292087455 ## File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.MysqlStateStoreFactory; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.GsonSerDe; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer; + +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonSerializer; +import com.google.gson.reflect.TypeToken; +import com.typesafe.config.Config; + +import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX; +import static org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateFlowIdInString; +import static org.apache.gobblin.service.modules.orchestration.DagManagerUtils.getFlowExecId; + + +/** + * A implementation of {@link DagStateStore} using MySQL as a backup, leverage {@link MysqlStateStore}. + * It implements interfaces of {@link DagStateStore} but delegating responsibilities to methods provided + * in {@link MysqlStateStore}. + * It also implements conversion between {@link Dag} to {@link State}. + * + * The schema of this will simply be: + * | storeName | tableName | State | + * where storeName represents FlowId, a combination of FlowGroup and FlowName, and tableName represents FlowExecutionId. + * State is a pocket for serialized {@link Dag} object. + * + * TODO: In the DagManagerTest: change the hardcoded type of DagStateStore. + * + */ +public class MysqlDagStateStore implements DagStateStore { + + public static final String CONFIG_PREFIX = GOBBLIN_SERVICE_PREFIX + "mysqlDagStateStore"; + public static final String DAG_KEY_IN_STATE = "dag"; + + /** + * The schema of {@link MysqlStateStore} is fixed but the columns are semantically projected into Dag's context: + * - The 'storeName' is FlowId. + * - The 'tableName' is FlowExecutionId. + */ + private MysqlStateStore mysqlStateStore; + private final GsonSerDe> serDe; + private JobExecutionPlanDagFactory jobExecPlanDagFactory; + + public MysqlDagStateStore(Config config, Map topologySpecMap) { Review comment: Agree. Since this PR is large enough I would like to separate this out: https://issues.apache.org/jira/browse/GOBBLIN-801 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 256952) Time Spent: 1.5h (was: 1h 20m) > Mysql Implementation of DagStateStore > - > > Key: GOBBLIN-790 > URL:
[jira] [Created] (GOBBLIN-801) Reimplement FsDagStateStore to use MysqlStateStore
Lei Sun created GOBBLIN-801: --- Summary: Reimplement FsDagStateStore to use MysqlStateStore Key: GOBBLIN-801 URL: https://issues.apache.org/jira/browse/GOBBLIN-801 Project: Apache Gobblin Issue Type: Improvement Reporter: Lei Sun -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2656: [GOBBLIN-790]DagStateStore MySQL
autumnust commented on a change in pull request #2656: [GOBBLIN-790]DagStateStore MySQL URL: https://github.com/apache/incubator-gobblin/pull/2656#discussion_r292087455 ## File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.MysqlStateStoreFactory; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.GsonSerDe; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer; + +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonSerializer; +import com.google.gson.reflect.TypeToken; +import com.typesafe.config.Config; + +import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX; +import static org.apache.gobblin.service.modules.orchestration.DagManagerUtils.generateFlowIdInString; +import static org.apache.gobblin.service.modules.orchestration.DagManagerUtils.getFlowExecId; + + +/** + * A implementation of {@link DagStateStore} using MySQL as a backup, leverage {@link MysqlStateStore}. + * It implements interfaces of {@link DagStateStore} but delegating responsibilities to methods provided + * in {@link MysqlStateStore}. + * It also implements conversion between {@link Dag} to {@link State}. + * + * The schema of this will simply be: + * | storeName | tableName | State | + * where storeName represents FlowId, a combination of FlowGroup and FlowName, and tableName represents FlowExecutionId. + * State is a pocket for serialized {@link Dag} object. + * + * TODO: In the DagManagerTest: change the hardcoded type of DagStateStore. + * + */ +public class MysqlDagStateStore implements DagStateStore { + + public static final String CONFIG_PREFIX = GOBBLIN_SERVICE_PREFIX + "mysqlDagStateStore"; + public static final String DAG_KEY_IN_STATE = "dag"; + + /** + * The schema of {@link MysqlStateStore} is fixed but the columns are semantically projected into Dag's context: + * - The 'storeName' is FlowId. + * - The 'tableName' is FlowExecutionId. + */ + private MysqlStateStore mysqlStateStore; + private final GsonSerDe> serDe; + private JobExecutionPlanDagFactory jobExecPlanDagFactory; + + public MysqlDagStateStore(Config config, Map topologySpecMap) { Review comment: Agree. Since this PR is large enough I would like to separate this out: https://issues.apache.org/jira/browse/GOBBLIN-801 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-790) Mysql Implementation of DagStateStore
[ https://issues.apache.org/jira/browse/GOBBLIN-790?focusedWorklogId=256920=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-256920 ] ASF GitHub Bot logged work on GOBBLIN-790: -- Author: ASF GitHub Bot Created on: 10/Jun/19 16:04 Start Date: 10/Jun/19 16:04 Worklog Time Spent: 10m Work Description: autumnust commented on pull request #2656: [GOBBLIN-790]DagStateStore MySQL URL: https://github.com/apache/incubator-gobblin/pull/2656#discussion_r292076607 ## File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ## @@ -582,11 +581,18 @@ private void cleanUp() { } } -private void cleanUpDag(String dagId) { - Dag dag = this.dags.get(dagId); - this.dagToJobs.remove(dagId); - this.dagStateStore.cleanUp(dag); +/** + * Cleaning of all relevant states need to be + * @param dagId + */ +private synchronized void cleanUpDag(String dagId) { + try { + this.dagStateStore.cleanUp(dags.get(dagId)); + } catch (IOException ioe) { + log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe); + } this.dags.remove(dagId); + this.dagToJobs.remove(dagId); Review comment: The reason is I get rid of `this.dags.get(dagId);` and there's no reference to a `dag` associated with this `dagId`. So clean up has to happen before `remove`, or `dag` object won't be fetched. Will add comment to make it clear This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 256920) Time Spent: 1h 20m (was: 1h 10m) > Mysql Implementation of DagStateStore > - > > Key: GOBBLIN-790 > URL: https://issues.apache.org/jira/browse/GOBBLIN-790 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Lei Sun >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2656: [GOBBLIN-790]DagStateStore MySQL
autumnust commented on a change in pull request #2656: [GOBBLIN-790]DagStateStore MySQL URL: https://github.com/apache/incubator-gobblin/pull/2656#discussion_r292076607 ## File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ## @@ -582,11 +581,18 @@ private void cleanUp() { } } -private void cleanUpDag(String dagId) { - Dag dag = this.dags.get(dagId); - this.dagToJobs.remove(dagId); - this.dagStateStore.cleanUp(dag); +/** + * Cleaning of all relevant states need to be + * @param dagId + */ +private synchronized void cleanUpDag(String dagId) { + try { + this.dagStateStore.cleanUp(dags.get(dagId)); + } catch (IOException ioe) { + log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe); + } this.dags.remove(dagId); + this.dagToJobs.remove(dagId); Review comment: The reason is I get rid of `this.dags.get(dagId);` and there's no reference to a `dag` associated with this `dagId`. So clean up has to happen before `remove`, or `dag` object won't be fetched. Will add comment to make it clear This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-790) Mysql Implementation of DagStateStore
[ https://issues.apache.org/jira/browse/GOBBLIN-790?focusedWorklogId=256914=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-256914 ] ASF GitHub Bot logged work on GOBBLIN-790: -- Author: ASF GitHub Bot Created on: 10/Jun/19 15:58 Start Date: 10/Jun/19 15:58 Worklog Time Spent: 10m Work Description: autumnust commented on pull request #2656: [GOBBLIN-790]DagStateStore MySQL URL: https://github.com/apache/incubator-gobblin/pull/2656#discussion_r292074478 ## File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ## @@ -582,11 +581,18 @@ private void cleanUp() { } } -private void cleanUpDag(String dagId) { - Dag dag = this.dags.get(dagId); - this.dagToJobs.remove(dagId); - this.dagStateStore.cleanUp(dag); +/** + * Cleaning of all relevant states need to be + * @param dagId + */ +private synchronized void cleanUpDag(String dagId) { + try { + this.dagStateStore.cleanUp(dags.get(dagId)); + } catch (IOException ioe) { + log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe); Review comment: I believe `log.error (String, Exception)` will format the line better. What is the reason to have them cancatted as single string ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 256914) Time Spent: 1h 10m (was: 1h) > Mysql Implementation of DagStateStore > - > > Key: GOBBLIN-790 > URL: https://issues.apache.org/jira/browse/GOBBLIN-790 > Project: Apache Gobblin > Issue Type: Improvement >Reporter: Lei Sun >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2656: [GOBBLIN-790]DagStateStore MySQL
autumnust commented on a change in pull request #2656: [GOBBLIN-790]DagStateStore MySQL URL: https://github.com/apache/incubator-gobblin/pull/2656#discussion_r292074478 ## File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ## @@ -582,11 +581,18 @@ private void cleanUp() { } } -private void cleanUpDag(String dagId) { - Dag dag = this.dags.get(dagId); - this.dagToJobs.remove(dagId); - this.dagStateStore.cleanUp(dag); +/** + * Cleaning of all relevant states need to be + * @param dagId + */ +private synchronized void cleanUpDag(String dagId) { + try { + this.dagStateStore.cleanUp(dags.get(dagId)); + } catch (IOException ioe) { + log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe); Review comment: I believe `log.error (String, Exception)` will format the line better. What is the reason to have them cancatted as single string ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work logged] (GOBBLIN-672) Add EventHub source
[ https://issues.apache.org/jira/browse/GOBBLIN-672?focusedWorklogId=256707=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-256707 ] ASF GitHub Bot logged work on GOBBLIN-672: -- Author: ASF GitHub Bot Created on: 10/Jun/19 08:14 Start Date: 10/Jun/19 08:14 Worklog Time Spent: 10m Work Description: fabi54 commented on issue #2542: [GOBBLIN-672] add EventHub source URL: https://github.com/apache/incubator-gobblin/pull/2542#issuecomment-500331613 @jhsenjaliya Yes, I've seen the comments and I'm working on improvements. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 256707) Time Spent: 2h 40m (was: 2.5h) > Add EventHub source > --- > > Key: GOBBLIN-672 > URL: https://issues.apache.org/jira/browse/GOBBLIN-672 > Project: Apache Gobblin > Issue Type: New Feature >Reporter: Jan Fabian >Priority: Minor > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [incubator-gobblin] fabi54 commented on issue #2542: [GOBBLIN-672] add EventHub source
fabi54 commented on issue #2542: [GOBBLIN-672] add EventHub source URL: https://github.com/apache/incubator-gobblin/pull/2542#issuecomment-500331613 @jhsenjaliya Yes, I've seen the comments and I'm working on improvements. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services