Hi everybody,
so now I implemented some bound handling in the --merge task we talked
about a few days ago, which you will find in the attached patch.
The bounds are processed as follows:
1. If neither source 0 nor source 1 have a bound, the merged stream
doesn't have a bound, either.
2. If both sources have a bound, then a bound corresponding to the
unions of bounds 0 and 1 is emitted to the merged stream.
3. If source 0 does have a bound but source 1 hasn't (or vice versa):
3a. If source 1 is empty (no entities whatsoever), the original
bound of source 0 is passed through to the merged stream.
3b. If source 1 is not empty, no bound is passed through.
In case 3b - when a bound is "removed" -, there are three possibilities
controlled by the new boundRemovedAction keyword argument:
* ignore: don't do anything and continue
* warn: emit a warning to the log but continue (default)
* fail: stop processing altogether
The patch applies to current trunk and additionally includes a handful
of test cases (MergeBoundTest.java) which test all those cases from 1
through 3b :)
Let me know if there's something wrong with the patch, I'd be happy to
fix it.
Best regards
Igor
Index:
set/test/org/openstreetmap/osmosis/test/task/v0_6/SinkEntityInspector.java
===================================================================
--- set/test/org/openstreetmap/osmosis/test/task/v0_6/SinkEntityInspector.java
(revision 0)
+++ set/test/org/openstreetmap/osmosis/test/task/v0_6/SinkEntityInspector.java
(revision 0)
@@ -0,0 +1,81 @@
+// This software is released into the Public Domain. See copying.txt for
details.
+package org.openstreetmap.osmosis.test.task.v0_6;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import org.openstreetmap.osmosis.core.container.v0_6.EntityContainer;
+import org.openstreetmap.osmosis.core.task.v0_6.Sink;
+
+/**
+ * Mock object for inspecting the resulting entities after passing through a
pipeline task.
+ *
+ * @author Karl Newman
+ */
+public class SinkEntityInspector implements Sink {
+
+ private List<EntityContainer> processedEntities;
+
+
+ /**
+ * Creates a new instance.
+ */
+ public SinkEntityInspector() {
+ processedEntities = new LinkedList<EntityContainer>();
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void complete() {
+ // Nothing to do here
+ }
+
+
+ /**
+ * Catch all passed entities and save them for later inspection.
+ *
+ * @param entityContainer
+ * The entity to be processed.
+ */
+ @Override
+ public void process(EntityContainer entityContainer) {
+ processedEntities.add(entityContainer);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void release() {
+ // Nothing to do here
+ }
+
+
+ /**
+ * Shortcut method if you only care about the most recent
EntityContainer.
+ *
+ * @return the lastEntityContainer
+ */
+ public EntityContainer getLastEntityContainer() {
+ if (processedEntities.isEmpty()) {
+ return null;
+ } else {
+ return processedEntities.get(processedEntities.size() -
1);
+ }
+ }
+
+
+ /**
+ * Retrieve an Iterable of all the processed EntityContainers.
+ *
+ * @return the processedEntities
+ */
+ public Iterable<EntityContainer> getProcessedEntities() {
+ return Collections.unmodifiableList(processedEntities);
+ }
+
+}
Index: set/test/org/openstreetmap/osmosis/set/v0_6/MergeBoundTest.java
===================================================================
--- set/test/org/openstreetmap/osmosis/set/v0_6/MergeBoundTest.java
(revision 0)
+++ set/test/org/openstreetmap/osmosis/set/v0_6/MergeBoundTest.java
(revision 0)
@@ -0,0 +1,269 @@
+// This software is released into the Public Domain. See copying.txt for
details.
+package org.openstreetmap.osmosis.set.v0_6;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.openstreetmap.osmosis.core.container.v0_6.BoundContainer;
+import org.openstreetmap.osmosis.core.container.v0_6.EntityContainer;
+import org.openstreetmap.osmosis.core.container.v0_6.NodeContainer;
+import org.openstreetmap.osmosis.core.domain.v0_6.Bound;
+import org.openstreetmap.osmosis.core.domain.v0_6.EntityType;
+import org.openstreetmap.osmosis.core.domain.v0_6.Node;
+import org.openstreetmap.osmosis.core.domain.v0_6.OsmUser;
+import org.openstreetmap.osmosis.core.merge.common.ConflictResolutionMethod;
+import org.openstreetmap.osmosis.core.task.v0_6.RunnableSource;
+import org.openstreetmap.osmosis.core.task.v0_6.Sink;
+import org.openstreetmap.osmosis.test.task.v0_6.SinkEntityInspector;
+
+
+/**
+ * Tests bounding box processing in merge tasks.
+ *
+ * @author Igor Podolskiy
+ */
+public class MergeBoundTest {
+
+ /**
+ * A simple dummy ID generator for the helper source class.
+ */
+ private static AtomicInteger idGenerator = new AtomicInteger(1000);
+
+ /**
+ * Tests the proper working of the merge task if neither
+ * source has a declared bound.
+ *
+ * @throws Exception if something goes wrong
+ */
+ @Test
+ public void testNeitherHasBound() throws Exception {
+ RunnableSource source0 = new BoundSource(new Bound(1, 2, 4, 3,
"source0"), false);
+ RunnableSource source1 = new BoundSource(new Bound(5, 6, 8, 7,
"source1"), false);
+
+ EntityMerger merger = new
EntityMerger(ConflictResolutionMethod.LatestSource, 1,
+ BoundRemovedAction.Ignore);
+
+ SinkEntityInspector merged = merge(merger, source0, source1);
+ List<EntityContainer> mergedList =
createList(merged.getProcessedEntities());
+
+ Assert.assertEquals(2, mergedList.size());
+ for (EntityContainer entityContainer : mergedList) {
+ Assert.assertEquals(EntityType.Node,
entityContainer.getEntity().getType());
+ }
+ }
+
+ /**
+ * Tests whether merge will delete the declared bound if only source 0
+ * has a declared bound.
+ *
+ * @throws Exception if something goes wrong
+ */
+ @Test
+ public void testSource0HasBound() throws Exception {
+ RunnableSource source0 = new BoundSource(new Bound(1, 2, 4, 3,
"source0"), true);
+ RunnableSource source1 = new BoundSource(new Bound(5, 6, 8, 7,
"source1"), false);
+
+ EntityMerger merger = new
EntityMerger(ConflictResolutionMethod.LatestSource, 1,
+ BoundRemovedAction.Ignore);
+
+ SinkEntityInspector merged = merge(merger, source0, source1);
+
+ List<EntityContainer> mergedList =
createList(merged.getProcessedEntities());
+ Assert.assertEquals(2, mergedList.size());
+ for (EntityContainer entityContainer : mergedList) {
+ Assert.assertEquals(EntityType.Node,
entityContainer.getEntity().getType());
+ }
+ }
+
+ /**
+ * Tests whether merge will delete the declared bound if only source 1
+ * has a declared bound.
+ *
+ * @throws Exception if something goes wrong
+ */
+ @Test
+ public void testSource1HasBound() throws Exception {
+ RunnableSource source0 = new BoundSource(new Bound(1, 2, 4, 3,
"source0"), false);
+ RunnableSource source1 = new BoundSource(new Bound(5, 6, 8, 7,
"source1"), true);
+
+ EntityMerger merger = new
EntityMerger(ConflictResolutionMethod.LatestSource, 1,
+ BoundRemovedAction.Ignore);
+
+ SinkEntityInspector merged = merge(merger, source0, source1);
+ List<EntityContainer> mergedList =
createList(merged.getProcessedEntities());
+
+ Assert.assertEquals(2, mergedList.size());
+ for (EntityContainer entityContainer : mergedList) {
+ Assert.assertEquals(EntityType.Node,
entityContainer.getEntity().getType());
+ }
+ }
+
+ /**
+ * Test the proper computation of the union bound iff both sources
+ * have bounds.
+ *
+ * @throws Exception if something goes wrong
+ */
+ @Test
+ public void testBothHaveBounds() throws Exception {
+ Bound bound0 = new Bound(1, 2, 4, 3, "source1");
+ RunnableSource source0 = new BoundSource(bound0, true);
+
+ Bound bound1 = new Bound(5, 6, 8, 7, "source2");
+ RunnableSource source1 = new BoundSource(bound1, true);
+
+ EntityMerger merger = new
EntityMerger(ConflictResolutionMethod.LatestSource, 1,
+ BoundRemovedAction.Ignore);
+
+ SinkEntityInspector merged = merge(merger, source0, source1);
+ List<EntityContainer> mergedList =
createList(merged.getProcessedEntities());
+ Assert.assertEquals(3, mergedList.size());
+ Assert.assertEquals(EntityType.Bound,
mergedList.get(0).getEntity().getType());
+
+ // Check the bound
+ Bound bound01 = (Bound) mergedList.get(0).getEntity();
+ Assert.assertEquals(bound0.union(bound1), bound01);
+
+ for (int i = 1; i < mergedList.size(); i++) {
+ Assert.assertEquals(EntityType.Node,
mergedList.get(i).getEntity().getType());
+ }
+ }
+
+ /**
+ * Tests the proper working of the merge task if both sources are
+ * empty.
+ *
+ * @throws Exception if something goes wrong
+ */
+ @Test
+ public void testBothEmpty() throws Exception {
+ RunnableSource source0 = new EmptySource();
+ RunnableSource source1 = new EmptySource();
+
+ EntityMerger merger = new
EntityMerger(ConflictResolutionMethod.LatestSource, 1,
+ BoundRemovedAction.Ignore);
+
+ SinkEntityInspector merged = merge(merger, source0, source1);
+ Assert.assertTrue("Expected empty result set but got some
data", merged.getLastEntityContainer() == null);
+ }
+
+ /**
+ * Tests the proper working of the merge task if exactly one source is
+ * empty with respect to the declared bound.
+ *
+ * @throws Exception if something goes wrong
+ */
+ @Test
+ public void testOneSourceEmpty() throws Exception {
+ RunnableSource source0 = new EmptySource();
+
+ Bound bound1 = new Bound(5, 6, 8, 7, "source2");
+ RunnableSource source1 = new BoundSource(bound1, true);
+
+ EntityMerger merger = new
EntityMerger(ConflictResolutionMethod.LatestSource, 1,
+ BoundRemovedAction.Ignore);
+
+ SinkEntityInspector merged = merge(merger, source0, source1);
+ List<EntityContainer> mergedList =
createList(merged.getProcessedEntities());
+
+ Assert.assertEquals(2, mergedList.size());
+ Assert.assertEquals(bound1, mergedList.get(0).getEntity());
+ Assert.assertEquals(EntityType.Node,
mergedList.get(1).getEntity().getType());
+ }
+
+ private static <T> List<T> createList(Iterable<T> t) {
+ List<T> list = new ArrayList<T>();
+ for (T elem : t) {
+ list.add(elem);
+ }
+ return list;
+ }
+
+
+ /**
+ * Helper method to execute a simple merge of two sources.
+ *
+ * @throws Exception if something goes wrong.
+ */
+ private SinkEntityInspector merge(EntityMerger merger,
+ RunnableSource source1, RunnableSource source2) throws
Exception {
+ Thread t1 = new Thread(source1);
+ Thread t2 = new Thread(source2);
+
+ SinkEntityInspector inspector = new SinkEntityInspector();
+ source1.setSink(merger.getSink(0));
+ source2.setSink(merger.getSink(1));
+ merger.setSink(inspector);
+
+ Thread mThread = new Thread(merger);
+ mThread.start();
+ t1.start();
+ t2.start();
+ mThread.join();
+ t1.join();
+ t2.join();
+ return inspector;
+ }
+
+ /**
+ * A trivial empty source.
+ */
+ private static class EmptySource implements RunnableSource {
+
+ private Sink sink;
+
+ @Override
+ public void setSink(Sink sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public void run() {
+ sink.complete();
+ }
+ }
+
+ /**
+ * A simple source which provides a single node in the center of a given
+ * bounding box, and optionally, a declared bounding box.
+ */
+ private static class BoundSource implements RunnableSource {
+
+ private Sink sink;
+ private Bound bound;
+ private boolean publishBound;
+
+ public BoundSource(Bound bound, boolean publishBound) {
+ if (bound == null) {
+ throw new IllegalArgumentException("bound must
not be null");
+ }
+ this.publishBound = publishBound;
+ this.bound = bound;
+ }
+
+ @Override
+ public void setSink(Sink sink) {
+ this.sink = sink;
+ }
+
+
+ @Override
+ public void run() {
+ if (publishBound) {
+ sink.process(new BoundContainer(bound));
+ }
+ sink.process(new NodeContainer(createNode()));
+ sink.complete();
+ }
+
+ private Node createNode() {
+ double lon = (bound.getRight() - bound.getLeft()) / 2;
+ double lat = (bound.getTop() - bound.getBottom()) / 2;
+ return new Node(idGenerator.incrementAndGet(), 1, new
Date(), OsmUser.NONE, 1, lat, lon);
+ }
+ }
+}
Index: set/src/org/openstreetmap/osmosis/set/v0_6/EntityMergerFactory.java
===================================================================
--- set/src/org/openstreetmap/osmosis/set/v0_6/EntityMergerFactory.java
(revision 25940)
+++ set/src/org/openstreetmap/osmosis/set/v0_6/EntityMergerFactory.java
(working copy)
@@ -24,6 +24,9 @@
private static final String ALTERNATIVE_CONFLICT_RESOLUTION_METHOD_2 =
"lastSource";
private static final Map<String, ConflictResolutionMethod>
CONFLICT_RESOLUTION_METHOD_MAP =
new HashMap<String, ConflictResolutionMethod>();
+
+ private static final String ARG_BOUND_REMOVED_ACTION =
"boundRemovedAction";
+ private static final String DEFAULT_BOUND_REMOVED_ACTION = "warn";
static {
CONFLICT_RESOLUTION_METHOD_MAP.put(
@@ -45,6 +48,11 @@
conflictResolutionMethod = getStringArgument(
taskConfig, ARG_CONFLICT_RESOLUTION_METHOD,
DEFAULT_CONFLICT_RESOLUTION_METHOD);
+ BoundRemovedAction boundRemovedAction;
+
+ boundRemovedAction = BoundRemovedAction.parse(
+ getStringArgument(taskConfig,
ARG_BOUND_REMOVED_ACTION, DEFAULT_BOUND_REMOVED_ACTION));
+
if
(!CONFLICT_RESOLUTION_METHOD_MAP.containsKey(conflictResolutionMethod)) {
throw new OsmosisRuntimeException(
"Argument " +
ARG_CONFLICT_RESOLUTION_METHOD + " for task " + taskConfig.getId()
@@ -53,7 +61,8 @@
return new MultiSinkRunnableSourceManager(
taskConfig.getId(),
- new
EntityMerger(CONFLICT_RESOLUTION_METHOD_MAP.get(conflictResolutionMethod), 10),
+ new
EntityMerger(CONFLICT_RESOLUTION_METHOD_MAP.get(conflictResolutionMethod), 10,
+ boundRemovedAction),
taskConfig.getPipeArgs()
);
}
Index: set/src/org/openstreetmap/osmosis/set/v0_6/EntityMerger.java
===================================================================
--- set/src/org/openstreetmap/osmosis/set/v0_6/EntityMerger.java
(revision 25940)
+++ set/src/org/openstreetmap/osmosis/set/v0_6/EntityMerger.java
(working copy)
@@ -1,8 +1,13 @@
// This software is released into the Public Domain. See copying.txt for
details.
package org.openstreetmap.osmosis.set.v0_6;
+import java.util.logging.Logger;
+
import org.openstreetmap.osmosis.core.OsmosisRuntimeException;
+import org.openstreetmap.osmosis.core.container.v0_6.BoundContainer;
import org.openstreetmap.osmosis.core.container.v0_6.EntityContainer;
+import org.openstreetmap.osmosis.core.domain.v0_6.Bound;
+import org.openstreetmap.osmosis.core.domain.v0_6.EntityType;
import org.openstreetmap.osmosis.core.merge.common.ConflictResolutionMethod;
import org.openstreetmap.osmosis.core.sort.v0_6.EntityByTypeThenIdComparator;
import org.openstreetmap.osmosis.core.sort.v0_6.EntityContainerComparator;
@@ -22,14 +27,16 @@
*/
public class EntityMerger implements MultiSinkRunnableSource {
+ private static final Logger LOG =
Logger.getLogger(EntityMerger.class.getName());
+
private Sink sink;
private DataPostbox<EntityContainer> postbox0;
private SortedEntityPipeValidator sortedEntityValidator0;
private DataPostbox<EntityContainer> postbox1;
private SortedEntityPipeValidator sortedEntityValidator1;
private ConflictResolutionMethod conflictResolutionMethod;
-
-
+ private BoundRemovedAction boundRemovedAction;
+
/**
* Creates a new instance.
*
@@ -38,8 +45,13 @@
* contain the same entity.
* @param inputBufferCapacity
* The size of the buffers to use for input sources.
+ * @param boundRemovedAction
+ * The action to take if the merge operation removes
+ * a bound entity.
*/
- public EntityMerger(ConflictResolutionMethod conflictResolutionMethod,
int inputBufferCapacity) {
+ public EntityMerger(ConflictResolutionMethod conflictResolutionMethod,
int inputBufferCapacity,
+ BoundRemovedAction boundRemovedAction) {
+
this.conflictResolutionMethod = conflictResolutionMethod;
postbox0 = new
DataPostbox<EntityContainer>(inputBufferCapacity);
@@ -101,6 +113,42 @@
// Create a comparator for comparing two entities by
type and identifier.
comparator = new EntityContainerComparator(new
EntityByTypeThenIdComparator());
+ // BEGIN bound special handling
+
+ // If there is a bound, it's going to be the first
object
+ // in a properly sorted stream
+ entityContainer0 = nextOrNull(postbox0);
+ entityContainer1 = nextOrNull(postbox1);
+
+ // There's only need for special processing if there
actually is some data
+ // on both streams - no data implies no bound
+ if (entityContainer0 != null && entityContainer1 !=
null) {
+ Bound bound0 = null;
+ Bound bound1 = null;
+
+ // If there are any bounds upstream, eat them up
+ if (entityContainer0.getEntity().getType() ==
EntityType.Bound) {
+ bound0 = (Bound)
entityContainer0.getEntity();
+ entityContainer0 = nextOrNull(postbox0);
+ }
+ if (entityContainer1.getEntity().getType() ==
EntityType.Bound) {
+ bound1 = (Bound)
entityContainer1.getEntity();
+ entityContainer1 = nextOrNull(postbox1);
+ }
+
+ // Only post a bound downstream if both
upstream sources had a bound.
+ // (Otherwise there's either nothing to post or
the posted bound is going
+ // to be smaller than the actual data, which is
bad)
+ if (bound0 != null && bound1 != null) {
+ sink.process(new
BoundContainer(bound0.union(bound1)));
+ } else if ((bound0 != null && bound1 == null)
+ || (bound0 == null && bound1 !=
null)) {
+ handleBoundRemoved(bound0 == null);
+ }
+ }
+
+ // END bound special handling
+
// We continue in the comparison loop while both
sources still have data.
while (
(entityContainer0 != null ||
postbox0.hasNext())
@@ -199,4 +247,45 @@
sink.release();
}
}
+
+ private void handleBoundRemoved(boolean source0BoundMissing) {
+
+ if (boundRemovedAction == BoundRemovedAction.Ignore) {
+ // Nothing to do
+ return;
+ }
+
+ // Message for log or exception
+ String missingSourceID, otherSourceID;
+
+ if (source0BoundMissing) {
+ missingSourceID = "0";
+ otherSourceID = "1";
+ } else {
+ missingSourceID = "1";
+ otherSourceID = "0";
+ }
+
+ String message = String.format(
+ "Source %s of the merge task has an explicit
bound set, but source %s has not. "
+ + "Therefore the explicit bound has been
removed from the merged stream.",
+ missingSourceID, otherSourceID);
+
+ // Now actually log or fail.
+ if (boundRemovedAction == BoundRemovedAction.Warn) {
+ LOG.warning(message);
+ } else if (boundRemovedAction == BoundRemovedAction.Fail) {
+ throw new OsmosisRuntimeException(message);
+ }
+ }
+
+
+ private static EntityContainer nextOrNull(DataPostbox<EntityContainer>
postbox) {
+
+ if (postbox.hasNext()) {
+ return postbox.getNext();
+ }
+
+ return null;
+ }
}
Index: set/src/org/openstreetmap/osmosis/set/v0_6/BoundRemovedAction.java
===================================================================
--- set/src/org/openstreetmap/osmosis/set/v0_6/BoundRemovedAction.java
(revision 0)
+++ set/src/org/openstreetmap/osmosis/set/v0_6/BoundRemovedAction.java
(revision 0)
@@ -0,0 +1,51 @@
+// This software is released into the Public Domain. See copying.txt for
details.
+package org.openstreetmap.osmosis.set.v0_6;
+
+import org.openstreetmap.osmosis.core.OsmosisRuntimeException;
+
+/**
+ * Defines possible actions to take when a task removes
+ * a bound entity from the output stream.
+ */
+public enum BoundRemovedAction {
+ /**
+ * Continue processing quietly.
+ */
+ Ignore ("ignore"),
+
+ /**
+ * Continue processing but emit a warning to the log.
+ */
+ Warn ("warn"),
+
+ /**
+ * Stop processing and emit an error message to the log.
+ */
+ Fail ("fail");
+
+ private final String keyword;
+
+ private BoundRemovedAction(String keyword) {
+ this.keyword = keyword;
+ }
+
+ /**
+ * Returns an action for a given string, if possible.
+ *
+ * @param s the string to parse
+ * @return an action corresponding to a string, if it exists.
+ */
+ public static BoundRemovedAction parse(String s) {
+ if (s == null) {
+ throw new OsmosisRuntimeException(
+ "Unrecognized bound removed action value: must
be one of ignore, warn, fail.");
+ }
+ for (BoundRemovedAction a : values()) {
+ if (a.keyword.equals(s.toLowerCase())) {
+ return a;
+ }
+ }
+ throw new OsmosisRuntimeException(
+ "Unrecognized bound removed action value: must be one
of ignore, warn, fail.");
+ }
+}
_______________________________________________
osmosis-dev mailing list
[email protected]
http://lists.openstreetmap.org/listinfo/osmosis-dev