http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java deleted file mode 100644 index a9f9f39..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import java.util.Comparator; -import java.util.Objects; - -import org.apache.eagle.alert.engine.model.PartitionedEvent; - -/** - * TODO: Stable sorting algorithm for better performance to avoid event resorting with same timestamp? - */ -public class PartitionedEventTimeOrderingComparator implements Comparator<PartitionedEvent> { - public static final PartitionedEventTimeOrderingComparator INSTANCE = new PartitionedEventTimeOrderingComparator(); - - @Override - public int compare(PartitionedEvent o1, PartitionedEvent o2) { - if(Objects.equals(o1,o2)){ - return 0; - }else { - if(o1 == null && o2 == null){ - return 0; - }else if(o1 != null && o2 == null){ - return 1; - }else if(o1 == null){ - return -1; - } - // Unstable Sorting Algorithm - if(o1.getTimestamp() <= o2.getTimestamp()){ - return -1; - } else { - return 1; - } - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java deleted file mode 100644 index 7be69e1..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import java.io.IOException; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.router.StreamSortHandler; -import org.apache.eagle.alert.engine.sorter.StreamTimeClock; -import org.apache.eagle.alert.engine.sorter.StreamWindow; -import org.apache.eagle.alert.engine.sorter.StreamWindowManager; -import org.apache.eagle.alert.utils.DateTimeUtil; -import org.joda.time.Period; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StreamSortWindowHandlerImpl implements StreamSortHandler { - private final static Logger LOG = LoggerFactory.getLogger(StreamSortWindowHandlerImpl.class); - private StreamWindowManager windowManager; - private StreamSortSpec streamSortSpecSpec; - private PartitionedEventCollector outputCollector; - private String streamId; - - public void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector) { - this.windowManager = new StreamWindowManagerImpl( - Period.parse(streamSortSpecSpec.getWindowPeriod()), - streamSortSpecSpec.getWindowMargin(), - PartitionedEventTimeOrderingComparator.INSTANCE, - outputCollector); - this.streamSortSpecSpec = streamSortSpecSpec; - this.streamId = streamId; - this.outputCollector = outputCollector; - } - - /** - * Entry point to manage window lifecycle - * - * @param event StreamEvent - */ - public void nextEvent(PartitionedEvent event) { - final long eventTime = event.getEvent().getTimestamp(); - boolean handled = false; - - synchronized (this.windowManager){ - for (StreamWindow window : this.windowManager.getWindows()) { - if (window.alive() && window.add(event)) { - handled = true; - } - } - - // No window found for the event but not too late being rejected - if (!handled && !windowManager.reject(eventTime)) { - // later then all events, create later window - StreamWindow window = windowManager.addNewWindow(eventTime); - if (window.add(event)) { - LOG.info("Created {} of {} at {}", window, this.streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(eventTime)); - handled = true; - } - } - } - - if(!handled){ - if(LOG.isDebugEnabled()) { - LOG.debug("Drop expired event {}", event); - } - outputCollector.drop(event); - } - } - - @Override - public void onTick(StreamTimeClock clock,long globalSystemTime) { - windowManager.onTick(clock, globalSystemTime); - } - - @Override - public void close() { - try { - windowManager.close(); - } catch (IOException e) { - LOG.error("Got exception while closing window manager",e); - } - } - - @Override - public String toString() { - return super.toString(); - } - - @Override - public int hashCode(){ - if(streamSortSpecSpec == null){ - throw new NullPointerException("streamSortSpec is null"); - }else{ - return streamSortSpecSpec.hashCode(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java deleted file mode 100644 index c1c289d..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.lang3.time.StopWatch; -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.sorter.BaseStreamWindow; -import org.mapdb.BTreeMap; -import org.mapdb.DB; -import org.mapdb.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * StreamSortedWindow based on MapDB to support off-heap or disk storage. - * - * Stable sorting algorithm - * - * <br/><br/> - * - * See <a href="http://www.mapdb.org">http://www.mapdb.org</a> - */ -public class StreamSortedWindowInMapDB extends BaseStreamWindow { - private final String mapId; - private BTreeMap<Long, PartitionedEvent[]> bTreeMap; - private final static Logger LOG = LoggerFactory.getLogger(StreamSortedWindowInMapDB.class); - private final AtomicInteger size; - private long replaceOpCount = 0; - private final static PartitionedEventGroupSerializer STREAM_EVENT_GROUP_SERIALIZER = new PartitionedEventGroupSerializer(); - - /** - * @param start - * @param end - * @param margin - * @param db - * @param mapId physical map id, used to decide whether to reuse or not - */ - @SuppressWarnings("unused") - public StreamSortedWindowInMapDB(long start, long end, long margin,DB db,String mapId) { - super(start, end, margin); - this.mapId = mapId; - try { - bTreeMap = db.<Long, StreamEvent>treeMap(mapId). - keySerializer(Serializer.LONG). - valueSerializer(STREAM_EVENT_GROUP_SERIALIZER). - createOrOpen(); - LOG.debug("Created BTree map {}",mapId); - } catch (Error error){ - LOG.info("Failed create BTree {}",mapId,error); - } - size = new AtomicInteger(0); - } - - /** - * Assumed: most of adding operation will do putting only and few require replacing. - * - * <ol> - * <li> - * First of all, always try to put with created event directly - * </li> - * <li> - * If not absent (key already exists), then append and replace, - * replace operation will cause more consumption - * </li> - * </ol> - * @param event coming-in event - * @return whether success - */ - @Override - public synchronized boolean add(PartitionedEvent event) { - long timestamp = event.getEvent().getTimestamp(); - if(accept(timestamp)) { - boolean absent = bTreeMap.putIfAbsentBoolean(timestamp, new PartitionedEvent[]{event}); - if (!absent) { - size.incrementAndGet(); - return true; - } else { - if(LOG.isDebugEnabled()) LOG.debug("Duplicated timestamp {}, will reduce performance as replacing",timestamp); - PartitionedEvent[] oldValue = bTreeMap.get(timestamp); - PartitionedEvent[] newValue = oldValue == null ? new PartitionedEvent[1]: Arrays.copyOf(oldValue,oldValue.length+1); - newValue[newValue.length-1] = event; - PartitionedEvent[] removedValue = bTreeMap.replace(timestamp,newValue); - replaceOpCount ++; - if(replaceOpCount % 1000 == 0){ - LOG.warn("Too many events ({}) with overlap timestamp, may reduce insertion performance",replaceOpCount); - } - if(removedValue!=null) { - size.incrementAndGet(); - } else { - throw new IllegalStateException("Failed to replace key "+timestamp+" with "+newValue.length+" entities array to replace old "+oldValue.length+" entities array"); - } - return true; - } - } else { - return false; - } - } - - @Override - protected synchronized void flush(PartitionedEventCollector collector) { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - bTreeMap.valueIterator().forEachRemaining((events)->{ - for(PartitionedEvent event:events){ - collector.emit(event); - } - }); - bTreeMap.clear(); - replaceOpCount = 0; - stopWatch.stop(); - LOG.info("Flushed {} events in {} ms", size, stopWatch.getTime()); - size.set(0); - } - - @Override - public synchronized void close(){ - super.close(); - bTreeMap.close(); - LOG.info("Closed {}",this.mapId); - } - - @Override - public synchronized int size() { - return size.get(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java deleted file mode 100644 index d3b1d7d..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import java.util.Comparator; - -import org.apache.commons.lang3.time.StopWatch; -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.sorter.BaseStreamWindow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.TreeMultiset; - -public class StreamSortedWindowOnHeap extends BaseStreamWindow { - private final static Logger LOG = LoggerFactory.getLogger(StreamSortedWindowOnHeap.class); - private final TreeMultiset<PartitionedEvent> treeMultisetCache; - - /** - * @param start start time - * @param end end time - * @param margin margin time - */ - public StreamSortedWindowOnHeap(long start, long end, long margin, Comparator<PartitionedEvent> comparator ){ - super(start,end,margin); - treeMultisetCache = TreeMultiset.create(comparator); - } - - public StreamSortedWindowOnHeap(long start, long end, long margin){ - this(start,end,margin,new PartitionedEventTimeOrderingComparator()); - } - - @Override - public boolean add(PartitionedEvent partitionedEvent) { - synchronized (treeMultisetCache) { - if (accept(partitionedEvent.getEvent().getTimestamp())) { - treeMultisetCache.add(partitionedEvent); - return true; - } else { - if(LOG.isDebugEnabled()) LOG.debug("{} is not acceptable, ignored", partitionedEvent); - return false; - } - } - } - - @Override - protected void flush(PartitionedEventCollector collector) { - synchronized (treeMultisetCache) { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - treeMultisetCache.forEach(collector::emit); - int size = treeMultisetCache.size(); - treeMultisetCache.clear(); - stopWatch.stop(); - LOG.info("Flushed {} events in {} ms from {}", size, stopWatch.getTime(),this.toString()); - } - } - - @Override - public int size() { - synchronized (treeMultisetCache) { - return treeMultisetCache.size(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java deleted file mode 100644 index 91a4a37..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.eagle.alert.engine.sorter.StreamTimeClock; -import org.apache.eagle.alert.utils.DateTimeUtil; - - -/** - * In memory thread-safe time clock service - * - * TODO: maybe need to synchronize time clock globally, how to? - */ -public class StreamTimeClockInLocalMemory implements StreamTimeClock { - private final AtomicLong currentTime; - private final String streamId; - - public StreamTimeClockInLocalMemory(String streamId,long initialTime){ - this.streamId = streamId; - this.currentTime = new AtomicLong(initialTime); - } - public StreamTimeClockInLocalMemory(String streamId){ - this(streamId,0L); - } - - @Override - public void moveForward(long timestamp){ - if(timestamp < currentTime.get()){ - throw new IllegalArgumentException(timestamp +" < "+currentTime.get()+", should not move time back"); - } - this.currentTime.set(timestamp); - } - - @Override - public String getStreamId() { - return streamId; - } - - @Override - public long getTime() { - return currentTime.get(); - } - - @Override - public String toString() { - return String.format("StreamClock[streamId=%s, now=%s]",streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(currentTime.get())); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java deleted file mode 100644 index 741d7f0..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import java.util.HashMap; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.eagle.alert.engine.sorter.StreamTimeClock; -import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener; -import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager; -import org.apache.eagle.alert.utils.DateTimeUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class StreamTimeClockManagerImpl implements StreamTimeClockManager { - private static final long serialVersionUID = -2770823821511195343L; - private final static Logger LOG = LoggerFactory.getLogger(StreamTimeClockManagerImpl.class); - private final Map<String,StreamTimeClock> streamIdTimeClockMap; - private Timer timer; - - private final Map<StreamTimeClockListener,String> listenerStreamIdMap; - private final static AtomicInteger num = new AtomicInteger(); - - public StreamTimeClockManagerImpl(){ - listenerStreamIdMap = new HashMap<>(); - streamIdTimeClockMap = new HashMap<>(); - timer = new Timer("StreamScheduler-"+num.getAndIncrement()); - timer.schedule(new TimerTask() { - @Override - public void run() { - // Make sure the timer tick happens one by one - triggerTickOnAll(); - } - },1000,1000); - } - - /** - * - * By default, we could keep the current time clock in memory, - * Eventually we may need to consider the global time synchronization across all nodes - * - * 1) When to initialize window according to start time - * 2) When to close expired window according to current time - * - * @return StreamTimeClock instance - */ - @Override - public StreamTimeClock createStreamTimeClock(String streamId){ - synchronized (streamIdTimeClockMap) { - if (!streamIdTimeClockMap.containsKey(streamId)) { - StreamTimeClock instance = new StreamTimeClockInLocalMemory(streamId); - LOG.info("Created {}", instance); - streamIdTimeClockMap.put(streamId, instance); - } else { - LOG.warn("TimeClock for stream already existss: "+streamIdTimeClockMap.get(streamId)); - } - return streamIdTimeClockMap.get(streamId); - } - } - - /** - * @param streamId - * @return - */ - @Override - public StreamTimeClock getStreamTimeClock(String streamId) { - synchronized (streamIdTimeClockMap) { - if (!streamIdTimeClockMap.containsKey(streamId)) { - LOG.warn("TimeClock for stream {} is not initialized before being called, create now", streamId); - return createStreamTimeClock(streamId); - } - return streamIdTimeClockMap.get(streamId); - } - } - - /** - * @param streamId - */ - @Override - public void removeStreamTimeClock(String streamId){ - synchronized (streamIdTimeClockMap) { - if (streamIdTimeClockMap.containsKey(streamId)) { - streamIdTimeClockMap.remove(streamId); - LOG.info("Removed TimeClock for stream {}: {}", streamId, streamIdTimeClockMap.get(streamId)); - } else { - LOG.warn("No TimeClock found for stream {}, nothing to remove", streamId); - } - } - } - - @Override - public void registerListener(String streamId,StreamTimeClockListener listener) { - synchronized (listenerStreamIdMap) { - if (listenerStreamIdMap.containsKey(listener)) - throw new IllegalArgumentException("Duplicated listener: " + listener.toString()); - LOG.info("Register {} on {}", listener, streamId); - listenerStreamIdMap.put(listener, streamId); - } - } - - @Override - public void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener) { - registerListener(streamClock.getStreamId(),listener); - } - - @Override - public void removeListener(StreamTimeClockListener listener) { - listenerStreamIdMap.remove(listener); - } - - @Override - public synchronized void triggerTickOn(String streamId) { - int count = 0; - for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) { - if (entry.getValue().equals(streamId)) { - entry.getKey().onTick(streamIdTimeClockMap.get(streamId), getCurrentSystemTime()); - count++; - } - } - if (LOG.isDebugEnabled()) LOG.debug("Triggered {} time-clock listeners on stream {}", count, streamId); - } - - private static long getCurrentSystemTime(){ - return System.currentTimeMillis(); - } - - @Override - public void onTimeUpdate(String streamId, long timestamp) { - StreamTimeClock timeClock = getStreamTimeClock(streamId); - if(timeClock == null) - return; - // Trigger time clock only when time moves forward - if(timestamp >= timeClock.getTime()) { - timeClock.moveForward(timestamp); - if(LOG.isDebugEnabled()) LOG.debug("Tick on stream {} with latest time {}",streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timeClock.getTime())); - triggerTickOn(streamId); - } - } - - private void triggerTickOnAll(){ - synchronized (listenerStreamIdMap) { - for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) { - triggerTickOn(entry.getValue()); - } - } - } - - @Override - public void close() { - timer.cancel(); - triggerTickOnAll(); - LOG.info("Closed StreamTimeClockManager {}",this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java deleted file mode 100644 index 4e1212f..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.TreeMap; - -import org.apache.commons.lang3.time.StopWatch; -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.sorter.StreamTimeClock; -import org.apache.eagle.alert.engine.sorter.StreamWindow; -import org.apache.eagle.alert.engine.sorter.StreamWindowManager; -import org.apache.eagle.alert.engine.sorter.StreamWindowRepository; -import org.apache.eagle.alert.utils.DateTimeUtil; -import org.apache.eagle.alert.utils.TimePeriodUtils; -import org.joda.time.Period; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StreamWindowManagerImpl implements StreamWindowManager { - private final static Logger LOG = LoggerFactory.getLogger(StreamWindowManagerImpl.class); - private final TreeMap<Long,StreamWindow> windowBuckets; - private final PartitionedEventCollector collector; - private final Period windowPeriod; - private final long windowMargin; - @SuppressWarnings("unused") - private final Comparator<PartitionedEvent> comparator; - private long rejectTime; - - public StreamWindowManagerImpl(Period windowPeriod, long windowMargin, Comparator<PartitionedEvent> comparator, PartitionedEventCollector collector){ - this.windowBuckets = new TreeMap<>(); - this.windowPeriod = windowPeriod; - this.windowMargin = windowMargin; - this.collector = collector; - this.comparator = comparator; - } - - @Override - public StreamWindow addNewWindow(long initialTime) { - synchronized (windowBuckets) { - if (!reject(initialTime)) { - Long windowStartTime = TimePeriodUtils.formatMillisecondsByPeriod(initialTime, windowPeriod); - Long windowEndTime = windowStartTime + TimePeriodUtils.getMillisecondsOfPeriod(windowPeriod); - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(windowStartTime, windowEndTime, windowMargin); - window.register(collector); - addWindow(window); - return window; - } else { - throw new IllegalStateException("Failed to create new window, as " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(initialTime) + " is too late, only allow timestamp after " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(rejectTime)); - } - } - } - - private void addWindow(StreamWindow window){ - if (!windowBuckets.containsKey(window.startTime())) { - windowBuckets.put(window.startTime(), window); - } else { - throw new IllegalArgumentException("Duplicated " + window.toString()); - } - } - - @Override - public void removeWindow(StreamWindow window) { - synchronized (windowBuckets) { - windowBuckets.remove(window.startTime()); - } - } - - @Override - public boolean hasWindow(StreamWindow window) { - synchronized (windowBuckets) { - return windowBuckets.containsKey(window.startTime()); - } - } - - @Override - public boolean hasWindowFor(long timestamp) { - return getWindowFor(timestamp) != null; - } - - @Override - public Collection<StreamWindow> getWindows() { - synchronized (windowBuckets) { - return windowBuckets.values(); - } - } - - @Override - public StreamWindow getWindowFor(long timestamp) { - synchronized (windowBuckets) { - for (StreamWindow windowBucket : windowBuckets.values()) { - if (timestamp >= windowBucket.startTime() && timestamp < windowBucket.endTime()) { - return windowBucket; - } - } - return null; - } - } - - @Override - public boolean reject(long timestamp) { - return timestamp < rejectTime; - } - - @Override - public void onTick(StreamTimeClock clock,long globalSystemTime) { - synchronized (windowBuckets) { - List<StreamWindow> toRemoved = new ArrayList<>(); - List<StreamWindow> aliveWindow = new ArrayList<>(); - - for (StreamWindow windowBucket : windowBuckets.values()) { - windowBucket.onTick(clock, globalSystemTime); - if (windowBucket.rejectTime() > rejectTime) rejectTime = windowBucket.rejectTime(); - } - for (StreamWindow windowBucket : windowBuckets.values()) { - if (windowBucket.expired() || windowBucket.endTime() <=rejectTime) { - toRemoved.add(windowBucket); - } else { - aliveWindow.add(windowBucket); - } - } - toRemoved.forEach(this::CloseAndRemoveWindow); - if (toRemoved.size() > 0) LOG.info("Windows: {} alive = {}, {} expired = {}", aliveWindow.size(), aliveWindow, toRemoved.size(), toRemoved); - } - } - - private void CloseAndRemoveWindow(StreamWindow windowBucket){ - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - CloseWindow(windowBucket); - removeWindow(windowBucket); - stopWatch.stop(); - LOG.info("Removed {} in {} ms",windowBucket,stopWatch.getTime()); - } - - private void CloseWindow(StreamWindow windowBucket){ - windowBucket.close(); - } - - public void close() { - synchronized (windowBuckets) { - LOG.debug("Closing"); - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - int count = 0; - for (StreamWindow windowBucket : getWindows()) { - count++; - CloseWindow(windowBucket); - } - windowBuckets.clear(); - stopWatch.stop(); - LOG.info("Closed {} windows in {} ms", count, stopWatch.getTime()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java deleted file mode 100644 index cd23405..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.spout; - -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.coordinator.MetadataType; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.router.SpoutSpecListener; -import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; -import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; -import org.apache.eagle.alert.engine.serialization.Serializers; -import org.apache.eagle.alert.utils.AlertConstants; -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import storm.kafka.BrokerHosts; -import storm.kafka.KafkaSpoutMetric; -import storm.kafka.KafkaSpoutWrapper; -import storm.kafka.SpoutConfig; -import storm.kafka.ZkHosts; -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; - -import com.typesafe.config.Config; - -/** - * wrap KafkaSpout to provide parallel processing of messages for multiple Kafka topics - * - * 1. onNewConfig() is interface for outside to update new metadata. Upon new metadata, this class will calculate if there is any new topic, removed topic or - * updated topic - * - */ -public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener,SerializationMetadataProvider { - private static final long serialVersionUID = -5280723341236671580L; - private static final Logger LOG = LoggerFactory.getLogger(CorrelationSpout.class); - - public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT = "/consumers"; - public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH = "/eagle_consumer"; - - // topic to KafkaSpoutWrapper - private volatile Map<String, KafkaSpoutWrapper> kafkaSpoutList = new HashMap<>(); - private int numOfRouterBolts; - - private SpoutSpec cachedSpoutSpec; - - private transient KafkaSpoutMetric kafkaSpoutMetric; - - @SuppressWarnings("rawtypes") - private Map conf; - private TopologyContext context; - private SpoutOutputCollector collector; - private final Config config; - private String topologyId; - private String spoutName; - private String routeBoltName; - @SuppressWarnings("unused") - private int taskIndex; - private IMetadataChangeNotifyService changeNotifyService; - private PartitionedEventSerializer serializer; - private volatile Map<String, StreamDefinition> sds; - - /** - * FIXME one single changeNotifyService may have issues as possibly multiple spout tasks will register themselves and initialize service - * @param config - * @param topologyId - * @param changeNotifyService - * @param numOfRouterBolts - */ - public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts){ - this(config, topologyId, changeNotifyService, numOfRouterBolts, AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME); - } - /** - * - * @param config - * @param topologyId used for distinguishing kafka offset for different topologies - * @param numOfRouterBolts used for generating streamId and routing - * @param spoutName used for generating streamId between spout and router bolt - * @param routerBoltName used for generating streamId between spout and router bolt - */ - public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts, String spoutName, String routerBoltName){ - this.config = config; - this.topologyId = topologyId; - this.changeNotifyService = changeNotifyService; - this.numOfRouterBolts = numOfRouterBolts; - this.spoutName = spoutName; - this.routeBoltName = routerBoltName; - } - - public String getSpoutName(){ - return spoutName; - } - - public String getRouteBoltName(){ - return routeBoltName; - } - - /** - * the only output field is for StreamEvent - * @param declarer - */ - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (int i = 0; i < numOfRouterBolts; i++) { - String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, routeBoltName + i); - declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0)); - LOG.info("declare stream between spout and streamRouterBolt " + streamId); - } - } - - @SuppressWarnings("rawtypes") - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - if(LOG.isDebugEnabled()) { - LOG.debug("open method invoked"); - } - this.conf = conf; - this.context = context; - this.collector = collector; - this.taskIndex = context.getThisTaskIndex(); - - // initialize an empty SpoutSpec - cachedSpoutSpec = new SpoutSpec(topologyId, new HashMap<>(), new HashMap<>(), new HashMap<>()); - - changeNotifyService.registerListener(this); - changeNotifyService.init(config, MetadataType.SPOUT); - - // register KafkaSpout metric - kafkaSpoutMetric = new KafkaSpoutMetric(); - context.registerMetric("kafkaSpout", kafkaSpoutMetric, 60); - - this.serializer = Serializers.newPartitionedEventSerializer(this); - } - - @Override - public void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds) { - LOG.info("new metadata is updated " + spec); - try{ - onReload(spec, sds); - }catch(Exception ex){ - LOG.error("error applying new SpoutSpec", ex); - } - } - - @Override - public void nextTuple() { - for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) { - wrapper.nextTuple(); - } - } - - /** - * find the correct wrapper to do ack that means msgId should be mapped to - * wrapper - * - * @param msgId - */ - @Override - public void ack(Object msgId) { - // decode and get topic - KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId; - KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic); - spout.ack(id.id); - } - - @Override - public void fail(Object msgId) { - // decode and get topic - KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId; - LOG.error("Failing message {}, with topic {}", msgId, id.topic); - KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic); - spout.fail(id.id); - } - - @Override - public void deactivate() { - System.out.println("deactivate"); - for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) { - wrapper.deactivate(); - } - } - - @Override - public void close() { - System.out.println("close"); - for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) { - wrapper.close(); - } - } - - private List<String> getTopics(SpoutSpec spoutSpec) { - List<String> meta = new ArrayList<String>(); - for (Kafka2TupleMetadata entry : spoutSpec.getKafka2TupleMetadataMap().values()) { - meta.add(entry.getTopic()); - } - return meta; - } - - @SuppressWarnings("unchecked") - public void onReload(final SpoutSpec newMeta, Map<String, StreamDefinition> sds) throws Exception { - // calculate topic create/remove/update - List<String> topics = getTopics(newMeta); - List<String> cachedTopcies = getTopics(cachedSpoutSpec); - Collection<String> newTopics = CollectionUtils.subtract(topics, cachedTopcies); - Collection<String> removeTopics = CollectionUtils.subtract(cachedTopcies, topics); - Collection<String> updateTopics = CollectionUtils.intersection(topics, cachedTopcies); - - LOG.info("Topics were added={}, removed={}, modified={}", newTopics, removeTopics, updateTopics); - - // build lookup table for scheme - Map<String, String> newSchemaName = new HashMap<String, String>(); - for (Kafka2TupleMetadata ds : newMeta.getKafka2TupleMetadataMap().values()) { - newSchemaName.put(ds.getTopic(), ds.getSchemeCls()); - } - - // copy and swap - Map<String, KafkaSpoutWrapper> newKafkaSpoutList = new HashMap<>(this.kafkaSpoutList); - // iterate new topics and then create KafkaSpout - for(String topic : newTopics){ - KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic); - if (wrapper != null) { - LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic)); - continue; - } - KafkaSpoutWrapper newWrapper = createKafkaSpout(conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); - newKafkaSpoutList.put(topic, newWrapper); - } - // iterate remove topics and then close KafkaSpout - for(String topic : removeTopics){ - KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic); - if (wrapper == null) { - LOG.warn(MessageFormat.format("try to remove topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic)); - continue; - } - removeKafkaSpout(wrapper); - newKafkaSpoutList.remove(topic); - } - - // iterate update topic and then update metadata - for(String topic : updateTopics){ - KafkaSpoutWrapper spoutWrapper = newKafkaSpoutList.get(topic); - if (spoutWrapper == null) { - LOG.warn(MessageFormat.format("try to update topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic)); - continue; - } - spoutWrapper.update(newMeta, sds); - } - - // swap - this.cachedSpoutSpec = newMeta; - this.kafkaSpoutList = newKafkaSpoutList; - this.sds = sds; - } - - /** - * make this method protected to make sure unit test can work well - * Q: Where to persist consumer state, i.e. what offset has been consumed for each topic and partition - * A: stormKafkaTransactionZkPath + "/" + consumerId + "/" + topic + "/" + topologyId + "/" + partitionId - * Note1: PartitionManager.committedPath for composing zkState path, _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); - * consumerId by default is EagleConsumer unless it is specified by "stormKafkaEagleConsumer" - * Note2: put topologyId as part of zkState because one topic by design can be consumed by multiple topologies so one topology needs to know - * processed offset for itself - * - * TODO: Should avoid use Config.get in deep calling stack, should generate config bean as early as possible - * - * @param conf - * @param context - * @param collector - * @param topic - * @param spoutSpec - * @return - */ - @SuppressWarnings("rawtypes") - protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, - String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception{ - String kafkaBrokerZkQuorum = config.getString("spout.kafkaBrokerZkQuorum"); - BrokerHosts hosts = new ZkHosts(kafkaBrokerZkQuorum); - String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT; - if(config.hasPath("spout.stormKafkaTransactionZkPath")) { - transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath"); - } - // write partition offset etc. into zkRoot+id, see PartitionManager.committedPath - String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH; - if(config.hasPath("spout.stormKafkaEagleConsumer")){ - zkStateTransactionRelPath = config.getString("spout.stormKafkaEagleConsumer"); - } - SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId); - // transaction zkServers - boolean stormKafkaUseSameZkQuorumWithKafkaBroker = config.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker"); - if(stormKafkaUseSameZkQuorumWithKafkaBroker){ - ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum); - spoutConfig.zkServers = utils.getZkHosts(); - spoutConfig.zkPort = utils.getZkPort(); - }else{ - ZkServerPortUtils utils = new ZkServerPortUtils(config.getString("spout.stormKafkaTransactionZkQuorum")); - spoutConfig.zkServers = utils.getZkHosts(); - spoutConfig.zkPort = utils.getZkPort(); - } - // transaction update interval - spoutConfig.stateUpdateIntervalMs = config.getLong("spout.stormKafkaStateUpdateIntervalMs"); - // Kafka fetch size - spoutConfig.fetchSizeBytes = config.getInt("spout.stormKafkaFetchSizeBytes"); - // "startOffsetTime" is for test usage, prod should not use this - if (config.hasPath("spout.stormKafkaStartOffsetTime")) { - spoutConfig.startOffsetTime = config.getInt("spout.stormKafkaStartOffsetTime"); - } - - spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic)); - KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric); - SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds,this.serializer); - wrapper.open(conf, context, collectorWrapper); - return wrapper; - } - - @Override - public StreamDefinition getStreamDefinition(String streamId) { - return sds.get(streamId); - } - - /** - * utility to get list of zkServers and zkPort.(It is assumed that zkPort is same for all zkServers as storm-kafka library requires this though it is not efficient) - */ - private static class ZkServerPortUtils{ - private List<String> zkHosts = new ArrayList<>(); - private Integer zkPort; - public ZkServerPortUtils(String zkQuorum){ - String[] zkConnections = zkQuorum.split(","); - for (String zkConnection : zkConnections) { - zkHosts.add(zkConnection.split(":")[0]); - } - zkPort = Integer.valueOf(zkConnections[0].split(":")[1]); - } - - public List<String> getZkHosts(){ - return zkHosts; - } - - public Integer getZkPort(){ - return zkPort; - } - } - - protected void removeKafkaSpout(KafkaSpoutWrapper wrapper){ - try { - wrapper.close(); - } catch (Exception e) { - LOG.error("Close wrapper failed. Ignore and continue!", e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java deleted file mode 100644 index a2c9219..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.spout; - -import java.util.Properties; - -import kafka.admin.AdminUtils; -import kafka.utils.ZKStringSerializer$; - -import org.I0Itec.zkclient.ZkClient; -import org.slf4j.Logger; - -/** - * normally this is used in unit test for convenience - */ -public class CreateTopicUtils { - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CreateTopicUtils.class); - private static final int partitions = 2; - private static final int replicationFactor = 1; - public static void ensureTopicReady(String zkQuorum, String topic){ - ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$); - if(!AdminUtils.topicExists(zkClient, topic)) { - LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor " + replicationFactor); - AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java deleted file mode 100644 index 23e94c3..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.spout; - -import java.util.Map; - -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -/** - * topic to stream metadata lifecycle method - * one topic may spawn multiple streams, the metadata change includes - * 1. add/remove stream - * 2. for a specific stream, groupingstrategy is changed - * ex1, this stream has more alert bolts than before, then this spout would take more traffic - * ex2, this stream has less alert bolts than before, then this spout would take less traffic - */ -public interface ISpoutSpecLCM { - /** - * stream metadata is used for SPOUT to filter traffic and route traffic to following groupby bolts. - * @param metadata - */ - void update(SpoutSpec metadata, Map<String, StreamDefinition> sds); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java deleted file mode 100644 index c786c01..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.spout; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * Created on 2/18/16. - */ -public class KafkaMessageIdWrapper { - public Object id; - public KafkaMessageIdWrapper(Object o){ - this.id = o; - } - public String topic; - private final static ObjectMapper objectMapper = new ObjectMapper(); - - public String toString(){ - try { - return String.format("KafkaMessageIdWrapper[topic=%s, id=%s]", topic, objectMapper.writeValueAsString(id)); - } catch (JsonProcessingException e) { - throw new IllegalStateException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java deleted file mode 100644 index 223f1b5..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.spout; - -import backtype.storm.spout.Scheme; - - -/** - * All Scheme implementations should have the following conditions - * 1) implement Scheme interface - * 2) has one constructor with topic name as parameter - */ -public class SchemeBuilder { - public static Scheme buildFromClsName(String clsName, String topic) throws Exception{ - Object o = Class.forName(clsName).getConstructor(String.class).newInstance(topic); - return (Scheme)o; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java deleted file mode 100644 index b37f7b3..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.spout; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata; -import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy; -import org.apache.eagle.alert.coordination.model.Tuple2StreamConverter; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; -import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.spout.ISpoutOutputCollector; -import backtype.storm.spout.SpoutOutputCollector; - -/** - * intercept the message sent from within KafkaSpout and select downstream bolts based on meta-data - * This is topic based. each topic will have one SpoutOutputCollectorWrapper - */ -public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements ISpoutSpecLCM,SerializationMetadataProvider { - private static final Logger LOG = LoggerFactory.getLogger(SpoutOutputCollectorWrapper.class); - - private final ISpoutOutputCollector delegate; - private final String topic; - private final PartitionedEventSerializer serializer; - private int numOfRouterBolts; - - private volatile List<StreamRepartitionMetadata> streamRepartitionMetadataList; - private volatile Tuple2StreamConverter converter; - private CorrelationSpout spout; - private volatile Map<String, StreamDefinition> sds; - - /** - * @param delegate actual SpoutOutputCollector to send data to following bolts - * @param topic topic for this KafkaSpout to handle - * @param numGroupbyBolts bolts following this spout - * @param serializer - */ - public SpoutOutputCollectorWrapper(CorrelationSpout spout, - ISpoutOutputCollector delegate, - String topic, - SpoutSpec spoutSpec, - int numGroupbyBolts, - Map<String, StreamDefinition> sds, PartitionedEventSerializer serializer) { - super(delegate); - this.spout = spout; - this.delegate = delegate; - this.topic = topic; - this.streamRepartitionMetadataList = spoutSpec.getStreamRepartitionMetadataMap().get(topic); - this.converter = new Tuple2StreamConverter(spoutSpec.getTuple2StreamMetadataMap().get(topic)); - this.numOfRouterBolts = numGroupbyBolts; - this.sds = sds; - this.serializer = serializer; - } - - /** - * How to assert that numTotalGroupbyBolts >= numOfRouterBolts, otherwise - * there is runtime issue by default, tuple includes 2 fields field 1: topic - * name field 2: map of key/value - */ - @SuppressWarnings("rawtypes") - @Override - public List<Integer> emit(List<Object> tuple, Object messageId) { - if (!sanityCheck()) { - LOG.error( - "spout collector for topic {} see monitored metadata invalid, is this data source removed! Trigger message id {} ", - topic, messageId); - return null; - } - - KafkaMessageIdWrapper newMessageId = new KafkaMessageIdWrapper(messageId); - newMessageId.topic = topic; - /** - phase 1: tuple to stream converter - if this topic multiplexes multiple streams, then retrieve the individual streams - */ - List<Object> convertedTuple = converter.convert(tuple); - if(convertedTuple == null) { - LOG.warn("source data {} can't be converted to a stream, ignore this message", tuple); - spout.ack(newMessageId); - return null; - } - Map m = (Map)convertedTuple.get(3); - Object streamId = convertedTuple.get(1); - - StreamDefinition sd = sds.get(streamId); - if(sd == null){ - LOG.warn("StreamDefinition {} is not found within {}, ignore this message", streamId, sds); - spout.ack(newMessageId); - return null; - } - - StreamEvent event = convertToStreamEventByStreamDefinition((Long)convertedTuple.get(2), m, sds.get(streamId)); - /* - phase 2: stream repartition - */ - for(StreamRepartitionMetadata md : streamRepartitionMetadataList) { - // one stream may have multiple group-by strategies, each strategy is for a specific group-by - for(StreamRepartitionStrategy groupingStrategy : md.groupingStrategies){ - int hash = 0; - if(groupingStrategy.getPartition().getType().equals(StreamPartition.Type.GROUPBY)) { - hash = getRoutingHashByGroupingStrategy(m, groupingStrategy); - }else if(groupingStrategy.getPartition().getType().equals(StreamPartition.Type.SHUFFLE)){ - hash = Math.abs((int)System.currentTimeMillis()); - } - int mod = hash % groupingStrategy.numTotalParticipatingRouterBolts; - // filter out message - if (mod >= groupingStrategy.startSequence && mod < groupingStrategy.startSequence + numOfRouterBolts) { - // framework takes care of field grouping instead of using storm internal field grouping - String sid = StreamIdConversion.generateStreamIdBetween(spout.getSpoutName(), spout.getRouteBoltName()+ (hash % numOfRouterBolts)); - if (LOG.isDebugEnabled()) { - LOG.debug("Emitted tuple: {} with message Id: {}, with topic {}, to streamId {}", convertedTuple, messageId, topic, sid); - } - // send message to StreamRouterBolt - PartitionedEvent pEvent = new PartitionedEvent(event, groupingStrategy.partition, hash); - if(this.serializer == null){ - delegate.emit(sid, Collections.singletonList(pEvent), newMessageId); - }else { - try { - delegate.emit(sid, Collections.singletonList(serializer.serialize(pEvent)), newMessageId); - } catch (IOException e) { - LOG.error("Failed to serialize {}", pEvent, e); - throw new RuntimeException(e); - } - } - }else{ - // ******* short-cut ack ******** - // we should simply ack those messages which are not processed in this topology because KafkaSpout implementation requires _pending is empty - // before moving to next offsets. - if(LOG.isDebugEnabled()){ - LOG.debug("Message filtered with mod {} not within range {} and {} for message {}", mod, groupingStrategy.startSequence, - groupingStrategy.startSequence+ numOfRouterBolts, tuple); - } - spout.ack(newMessageId); - } - } - } - - return null; - } - - @SuppressWarnings("rawtypes") - private int getRoutingHashByGroupingStrategy(Map data, StreamRepartitionStrategy gs){ - // calculate hash value for values from group-by fields - HashCodeBuilder hashCodeBuilder = new HashCodeBuilder(); - for(String groupingField : gs.partition.getColumns()) { - if(data.get(groupingField) != null){ - hashCodeBuilder.append(data.get(groupingField)); - } else { - LOG.warn("Required GroupBy fields {} not found: {}", gs.partition.getColumns(), data); - } - } - int hash = hashCodeBuilder.toHashCode(); - hash = Math.abs(hash); - return hash; - } - - private boolean sanityCheck() { - boolean isOk = true; - if (streamRepartitionMetadataList == null) { - LOG.error("streamRepartitionMetadataList is null!"); - isOk = false; - } - if (converter == null) { - LOG.error("tuple2StreamMetadata is null!"); - isOk = false; - } - return isOk; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private StreamEvent convertToStreamEventByStreamDefinition(long timestamp, Map m, StreamDefinition sd){ - return StreamEvent.Builder().timestamep(timestamp).attributes(m,sd).build(); - } - - /** - * SpoutSpec may be changed, this class will respond to changes on tuple2StreamMetadataMap and streamRepartitionMetadataMap - * @param spoutSpec - * @param sds - */ - @Override - public void update(SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) { - this.streamRepartitionMetadataList = spoutSpec.getStreamRepartitionMetadataMap().get(topic); - this.converter = new Tuple2StreamConverter(spoutSpec.getTuple2StreamMetadataMap().get(topic)); - this.sds = sds; - } - - @Override - public StreamDefinition getStreamDefinition(String streamId) { - return this.sds.get(streamId); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java deleted file mode 100644 index f526cad..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.utils; - -import com.google.common.io.ByteStreams; - -import java.io.*; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - - -public class CompressionUtils { - public static byte[] compress(byte[] source) throws IOException { - if (source == null || source.length == 0) { - return source; - } - ByteArrayInputStream sourceStream = new ByteArrayInputStream(source); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(source.length / 2); - try (OutputStream compressor = new GZIPOutputStream(outputStream)) { - ByteStreams.copy(sourceStream, compressor); - compressor.close(); - } - try { - return outputStream.toByteArray(); - } finally { - sourceStream.close(); - outputStream.close(); - } - } - - public static byte[] decompress(byte[] compressed) throws IOException{ - if (compressed == null || compressed.length == 0) { - return compressed; - } - ByteArrayInputStream sourceStream = new ByteArrayInputStream(compressed); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(compressed.length * 2); - try (GZIPInputStream compressor = new GZIPInputStream(sourceStream)) { - ByteStreams.copy(compressor, outputStream); - compressor.close(); - } - try { - return outputStream.toByteArray(); - } finally { - sourceStream.close(); - outputStream.close(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java deleted file mode 100644 index a576404..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.utils; - -import java.io.InputStream; - -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Since 5/6/16. - */ -public class MetadataSerDeser { - private static final Logger LOG = LoggerFactory.getLogger(MetadataSerDeser.class); - - @SuppressWarnings("rawtypes") - public static <K> K deserialize(InputStream is, TypeReference typeRef){ - ObjectMapper mapper = new ObjectMapper(); - try { - K spec = mapper.readValue(is, typeRef); - return spec; - }catch(Exception ex){ - LOG.error("error in deserializing metadata of type {} from input stream", new TypeReference<K>(){}.getType().getTypeName(), ex); - } - return null; - } - - public static <K> K deserialize(InputStream is, Class<K> cls){ - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(JsonParser.Feature.ALLOW_COMMENTS,true); - try { - K spec = mapper.readValue(is, cls); - return spec; - }catch(Exception ex){ - LOG.error("Got error to deserialize metadata of type {} from input stream", new TypeReference<K>(){}.getType().getTypeName(), ex); - } - return null; - } - - @SuppressWarnings("rawtypes") - public static <K> K deserialize(String json, TypeReference typeRef){ - ObjectMapper mapper = new ObjectMapper(); - try { - K spec = mapper.readValue(json, typeRef); - return spec; - }catch(Exception ex){ - LOG.error("error in deserializing metadata of type {} from {}", new TypeReference<K>(){}.getType().getTypeName(), json, ex); - } - return null; - } - - public static <K> K deserialize(String json, Class<K> cls){ - ObjectMapper mapper = new ObjectMapper(); - try { - K spec = mapper.readValue(json, cls); - return spec; - }catch(Exception ex){ - LOG.error("error in deserializing metadata of type {} from {}", new TypeReference<K>(){}.getType().getTypeName(), json, ex); - } - return null; - } - - public static <K> String serialize(K spec){ - ObjectMapper mapper = new ObjectMapper(); - try{ - String json = mapper.writeValueAsString(spec); - return json; - }catch(Exception ex){ - LOG.error("error in serializing object {} with type {}", spec, new TypeReference<K>(){}.getType().getTypeName(), ex); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java deleted file mode 100644 index f4652a3..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.utils; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import org.xerial.snappy.SnappyInputStream; -import org.xerial.snappy.SnappyOutputStream; - -/** - * Utilities for working with Serializables. - * - * Derived from "com.google.cloud.dataflow.sdk.util.SerializableUtils": - * https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java - */ -public class SerializableUtils { - /** - * Serializes the argument into an array of bytes, and returns it. - * - * @throws IllegalArgumentException if there are errors when serializing - */ - public static byte[] serializeToCompressedByteArray(Object value) { - try { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - try (ObjectOutputStream oos = new ObjectOutputStream(new SnappyOutputStream(buffer))) { - oos.writeObject(value); - } - return buffer.toByteArray(); - } catch (IOException exn) { - throw new IllegalArgumentException( - "unable to serialize " + value, - exn); - } - } - - /** - * Serializes the argument into an array of bytes, and returns it. - * - * @throws IllegalArgumentException if there are errors when serializing - */ - public static byte[] serializeToByteArray(Object value) { - try { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - try (ObjectOutputStream oos = new ObjectOutputStream(buffer)) { - oos.writeObject(value); - } - return buffer.toByteArray(); - } catch (IOException exn) { - throw new IllegalArgumentException("unable to serialize " + value, exn); - } - } - - /** - * Deserializes an object from the given array of bytes, e.g., as - * serialized using {@link #serializeToCompressedByteArray}, and returns it. - * - * @throws IllegalArgumentException if there are errors when - * deserializing, using the provided description to identify what - * was being deserialized - */ - public static Object deserializeFromByteArray(byte[] encodedValue, - String description) { - try { - try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(encodedValue))) { - return ois.readObject(); - } - } catch (IOException | ClassNotFoundException exn) { - throw new IllegalArgumentException( - "unable to deserialize " + description, - exn); - } - } - - /** - * Deserializes an object from the given array of bytes, e.g., as - * serialized using {@link #serializeToCompressedByteArray}, and returns it. - * - * @throws IllegalArgumentException if there are errors when - * deserializing, using the provided description to identify what - * was being deserialized - */ - public static Object deserializeFromCompressedByteArray(byte[] encodedValue, - String description) { - try { - try (ObjectInputStream ois = new ObjectInputStream( - new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) { - return ois.readObject(); - } - } catch (IOException | ClassNotFoundException exn) { - throw new IllegalArgumentException( - "unable to deserialize " + description, - exn); - } - } - - public static <T extends Serializable> T ensureSerializable(T value) { - @SuppressWarnings("unchecked") - T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value), - value.toString()); - return copy; - } - - public static <T extends Serializable> T clone(T value) { - @SuppressWarnings("unchecked") - T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value), - value.toString()); - return copy; - } -} \ No newline at end of file