Steven Jacobs has submitted this change and it was merged. Change subject: Enable Feed Changes to work with BAD project ......................................................................
Enable Feed Changes to work with BAD project Extracts the ActiveListener Enables listeners to survive after job executions Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1524 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java 3 files changed, 68 insertions(+), 26 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java index e4491bd..d7998f8 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java @@ -55,14 +55,15 @@ if (entityId != null) { IActiveEntityEventsListener listener = entityEventListeners.get(entityId); LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind()); - LOGGER.log(Level.FINER, "Notifying the listener"); - listener.notify(event); if (event.getEventKind() == Kind.JOB_FINISHED) { LOGGER.log(Level.FINER, "Removing the job"); jobId2ActiveJobInfos.remove(event.getJobId()); - LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore"); - entityEventListeners.remove(listener.getEntityId()); } + if (listener != null) { + LOGGER.log(Level.FINER, "Notifying the listener"); + listener.notify(event); + } + } else { LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId()); } @@ -75,6 +76,11 @@ LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName()); } + public synchronized void removeListener(IActiveEntityEventsListener listener) throws HyracksDataException { + LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore"); + unregisterListener(listener); + } + public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) { if (DEBUG) { LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java new file mode 100644 index 0000000..365c3ce --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.management; + +import java.util.List; + +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.common.metadata.IDataset; +import org.apache.hyracks.api.job.JobId; + +public abstract class ActiveEntityEventsListener implements IActiveEntityEventsListener { + + // members + protected EntityId entityId; + protected List<IDataset> datasets; + protected volatile ActivityState state; + protected JobId jobId; + + @Override + public EntityId getEntityId() { + return entityId; + } + + @Override + public ActivityState getState() { + return state; + } + + @Override + public boolean isEntityUsingDataset(IDataset dataset) { + return datasets.contains(dataset); + } + + public JobId getJobId() { + return jobId; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java index 2a87cab..f49da3c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java @@ -25,9 +25,9 @@ import java.util.logging.Logger; import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.ActiveJobNotificationHandler; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.active.IActiveEventSubscriber; import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.common.metadata.IDataset; @@ -36,20 +36,15 @@ import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; -public class FeedEventsListener implements IActiveEntityEventsListener { +public class FeedEventsListener extends ActiveEntityEventsListener { // constants private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName()); // members - private final EntityId entityId; - private final List<IDataset> datasets; private final String[] sources; private final List<IActiveEventSubscriber> subscribers; - private volatile ActivityState state; private int numRegistered; - private JobId jobId; public FeedEventsListener(EntityId entityId, List<IDataset> datasets, String[] sources) { this.entityId = entityId; @@ -111,21 +106,12 @@ IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc(); JobStatus status = hcc.getJobStatus(jobId); state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED; + ActiveJobNotificationHandler.INSTANCE.removeListener(this); } private void start(ActiveEvent event) { this.jobId = event.getJobId(); state = ActivityState.STARTING; - } - - @Override - public EntityId getEntityId() { - return entityId; - } - - @Override - public ActivityState getState() { - return state; } @Override @@ -148,11 +134,6 @@ FeedEventSubscriber subscriber = new FeedEventSubscriber(this, state); subscribers.add(subscriber); return subscriber; - } - - @Override - public boolean isEntityUsingDataset(IDataset dataset) { - return datasets.contains(dataset); } public String[] getSources() { -- To view, visit https://asterix-gerrit.ics.uci.edu/1524 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77 Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]> Gerrit-Reviewer: Xikui Wang <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
