Steven Jacobs has posted comments on this change. Change subject: [ASTERIXDB-1911][HYR,RT,CLUS] Fixes and Improvements for PreDistributed Jobs ......................................................................
Patch Set 14: (30 comments) Addressed comments. Uploading new patchset https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/PredistributedJobService.java File asterixdb/asterix-active/src/main/java/org/apache/asterix/active/PredistributedJobService.java: Line 1: /* > This class is not used by anyone. Don't you need to start this in CCApplica Right now it doesn't have any "state" since it's just a set of helper functions. Does it make sense to "start" it in this case? Maybe the issue is that it shouldn't be called a service? Line 43: IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> jobParameters, EntityId entityId) { > What is this duration for? Execution interval? Yes, this will execute the job periodically with interval of duration seconds Line 44: ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); > Change it to private static int and give it a propername. Done Line 78: Date checkStartTime = new Date(); > see end time Done Line 82: byte[] jobIdParameter = JOB_ID_PARAMETER_NAME.getBytes(); > Instead of getBytes everytime, maybe use JOB_ID_... = "fooname".getbytes() Done Line 86: hcc.waitForCompletion(jobId); > Are we certain that all predistributed job calls are synchronized? maybe ad Done Line 87: Date checkEndTime = new Date(); > you don't really need to create two date objects here. Use Instant.now() an Done https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java File asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java: Line 743: public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt, > Good point. How is this called in BAD? The issue here is that extension Statements (which might perform DDL operations) are not instances of the QueryTranslator, so they don't have access to these statements. There are other methods in QueryTranslator that have been changed to public in the past for this reason. I think Couchbase uses them as well (e.g. handleCreateDatasetStatement), but maybe I'm incorrect there? In any case, I'm fine with another solution if we have one. But I'm not sure what it would be. @Abdullah? https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java File asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java: Line 25: * an interface for JobEventListenerFactories to add Asterix transaction id API > to add Asterix transaction id API? Done https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java File asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractUnaryStringStringEval.java: Line 51: protected final DataOutput dataOutput = resultStorage.getDataOutput(); > reason? Done https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java File asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetJobParameterDescriptor.java: Line 36: public class GetJobParameterDescriptor extends AbstractScalarFunctionDynamicDescriptor { > get_job_parameter by parameter name? Done https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java File asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java: Line 53: return new JobEventListenerFactory(jobId, transactionalWrite); > Do you need to create a new jobId here? It's passing reference to the new j Each execution of the Job has to have a new one, otherwise when it's changed it will be changed on all currently running versions Line 58: byte[] jobIdParameter = jobIdParameterName.getBytes(); > jobIdParameter or jobParameterName? Done https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java File asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java: Line 52: return new MultiTransactionJobletEventListenerFactory(jobIds, transactionalWrite); > same comment from jobEventListener same response :) https://asterix-gerrit.ics.uci.edu/#/c/2045/14/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java File asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java: Line 52: fact instanceof IJobEventListenerFactory ? ((IJobEventListenerFactory) fact).getJobId() : jobId; > It certainly seems "smelly" that had a discussion with Xikui, and came up with what seems like a better solution. Please take a look in the next patchset. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java File hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java: Line 38: IJobletEventListenerFactory getEventListenerFactory(); > getJobLetEventListenenrFactory Done https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java File hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/ActivityClusterId.java: Line 64: return "ACID:" + ":" + id; > remove this extra colon Done https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java File hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobletEventListenerFactory.java: PS14, Line 31: updateListenerJobParameters > Why do we need to update? Can't this be done at construction? JobParameters can be set for each execution of a PreDistributedJob. You can see this used in StartTasksWork https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java File hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobParameterByteStore.java: PS14, Line 31: nonpureSingletonValues > What is a nonpure singleton value? Done Line 65: public synchronized byte[] getNonpureSingletonValue(String functionName) { > What's this method used for? It's not used by others... Done https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/PreDistributedId.java File hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/PreDistributedId.java: PS14, Line 30: PreDistributedId > I'm still confused by this name and I'm wondering if another name could hel Those steps are correct, but I still think "distributed" is better than "deployed." I also feel like this terminology is already used frequently and adopted by people associated with the BAD project (e.g. Mike, Xikui), So if we are going to change it we should have a good reason for the change. My argument for "distributed" is that we are distributing the specification for the job, we aren't actually deploying anything (deploy seems like we are running or starting something to me). https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java: Line 367: public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException { > Minor suggestion, can we merge this into PreDistriubtedJobStore too. This c As discussed today, parameters can potentially be used outside of predistributed jobs in the future, and the parameters can be different for each execution of a predistributed job. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java: Line 34: private final Map<Long, PreDistributedJobDescriptor> preDistributedJobDescriptorMap; > PredistributedId? Done https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java: Line 517: byte[] jagBytes = changed ? acgBytes : null; > this line can be combined with line 506. if it's changed or it's not pre~ j The reason they are separated is because we only one to deserialize one time. The acgBytes are the same for every iteration of the loop, but "changed" could have different values. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java: Line 55: ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(preDistributedId); > change the method to return boolean and throw the exception here. I wrote checkForExistingDistributedJobDescriptor() to intentionally block with an exception. Is there a particular reason to change it? https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java: Line 136: private final Map<JobId, JobParameterByteStore> jobParameterByteStoreMap = new HashMap<>(); > should this be a map from predistributedJobId to JobParameterByteStore too? This is specific to an individual Job execution (predistributed or not) https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java: Line 54: ncs.removeJobParameterByteStore(jobId); > if we map predistributedJobId to jobParameter, this remove wont be necessar see previous comments and discussion https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DestroyJobWork.java: Line 44: ncs.removeActivityClusterGraph(preDistributedId); > So we will remove the parameter store here see previous https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java File hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java: Line 48: ncs.checkForDuplicateDistributedJob(preDistributedId); > throw the exception here. Again, checkForDuplicateDistributedJob() was written specifically to be an exception-throwing blocker, i.e. you are calling it as a sanity check. https://asterix-gerrit.ics.uci.edu/#/c/2045/14/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java File hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java: Line 196: > add one more test. it would be better to have an insert jobspec. distribute Done -- To view, visit https://asterix-gerrit.ics.uci.edu/2045 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: comment Gerrit-Change-Id: I8f493c1fa977d07dfe8a875f9ebe9515d01d1473 Gerrit-PatchSet: 14 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu> Gerrit-Reviewer: Dmitry Lychagin <dmitry.lycha...@couchbase.com> Gerrit-Reviewer: Ildar Absalyamov <ildar.absalya...@gmail.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Steven Jacobs <sjaco...@ucr.edu> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Xikui Wang <xkk...@gmail.com> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com> Gerrit-HasComments: Yes