package com.intellica.evam.engine.cache;

import com.evam.utils.logging.ELogger;
import com.evam.utils.logging.ELoggerFactory;
import com.intellica.evam.engine.dynamic.DeploymentWorkerManager;
import com.intellica.model.EvamScenario;
import com.intellica.model.deployment.DeploymentEvent;
import com.intellica.model.deployment.DeploymentEventDefinitions;
import com.thoughtworks.xstream.InitializationException;

import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;

import java.io.Serializable;
import java.util.List;

import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;

/**
 * Created by alpert on 02/02/2017.
 */
public class DeploymentCacheListener implements Serializable {
    
    private static final ELogger logger = ELoggerFactory.getLogger(DeploymentCacheListener.class);
    private ContinuousQuery<String, List<DeploymentEvent>> qry = new ContinuousQuery<>();
    private QueryCursor cursor = null;

    public void listen(Cache cache) {
        qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<String, List<DeploymentEvent>>() {
            @Override public boolean apply(String key, List<DeploymentEvent> val) {
                return true;
            }
        }));

        // Callback that is called locally when update notifications are received.
        qry.setLocalListener(new UpdateListener());

        // This filter will be evaluated remotely on all nodes.
        // Entry that pass this filter will be sent to the caller.
        qry.setRemoteFilterFactory(new FilterFac());

        try {
            // run query to start continuous query
            cursor = ((IgniteCache)cache).query(qry);
        } catch (Exception e) {
            logger.error("Could not create deployment cache listener!", e);
            throw new InitializationException("Could not create deployment cache listener!");

        }
    }

    private DeploymentEvent cloneDeploymentEventWithNewStatus(DeploymentEvent deploymentEvent, int newStatus) {
        return new DeploymentEvent(deploymentEvent.getScenarioName(),
                deploymentEvent.getEvamScenario(),deploymentEvent.getUser(), newStatus,
                deploymentEvent.getErrorCode(), deploymentEvent.getMessage(), deploymentEvent.getJarContent());
    }

    public void close() {
        if (cursor != null) {
            cursor.close();
        }
    }

    private class FilterFac implements Factory<CacheEntryEventFilter<String, List<DeploymentEvent>>>, Serializable {

        @Override
        public CacheEntryEventFilter<String, List<DeploymentEvent>> create() {
            return new CacheEntryEventFilter<String, List<DeploymentEvent>>() {
                @Override public boolean evaluate(CacheEntryEvent<? extends String, ? extends List<DeploymentEvent>> e) {
                    return true;
                }
            };
        }
    }

    private class UpdateListener implements CacheEntryUpdatedListener<String, List<DeploymentEvent>>, Serializable {

        @Override
        public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends List<DeploymentEvent>>> cacheEntryEvents) throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends String, ? extends List<DeploymentEvent>> e : cacheEntryEvents) {
                if (e.getEventType() == EventType.CREATED || e.getEventType() == EventType.UPDATED) {

                    List<DeploymentEvent> events = e.getValue();
                    if (events == null || events.size() == 0) {
                        return;
                    }

                    DeploymentEvent deploymentEvent = events.get(events.size() - 1);

                    // create insert request
                    if (deploymentEvent.getStatus() ==
                            DeploymentEventDefinitions.CENTRILIZED_DEPLOY_OP_REQUESTED) {
                        logger.warn("Unexpected status in deployment cache [CENTRILIZED_DEPLOY_OP_REQUESTED].");
                    }

                    if (deploymentEvent.getStatus() == DeploymentEventDefinitions.RESUME_REQUESTED
                            || deploymentEvent.getStatus() == DeploymentEventDefinitions.SUSPEND_REQUESTED
                            || deploymentEvent.getStatus() == DeploymentEventDefinitions.FREEZE_REQUESTED
                            || deploymentEvent.getStatus() == DeploymentEventDefinitions.IDLE_REQUESTED
                            ) {
                        Cache<String, EvamScenario> evamScenarioCache =
                                CacheManager.getInstance().getCache(CacheDefinitions.SCENARIO_DEF_CACHE_NAME);
                        if (evamScenarioCache.get(deploymentEvent.getScenarioName()) == null) {
                            logger.info("Scenario [" + deploymentEvent.getScenarioName() + "] could not be found. Creating order for DISTRIBUTED_DEPLOY_OP_REQUESTED.");
                            DeploymentEvent deployOrder = cloneDeploymentEventWithNewStatus(deploymentEvent, DeploymentEventDefinitions.DISTRIBUTED_DEPLOY_OP_REQUESTED);
                            DeploymentWorkerManager.getInstance().orderDeployment(deployOrder);
                        }
                    }
                    DeploymentWorkerManager.getInstance().orderDeployment(deploymentEvent);
                } else if (e.getEventType() == EventType.REMOVED) {
                    // TODO: what to do if a scenario is removed
                }
            }
        }
    }
}
