Aklakan commented on code in PR #3027: URL: https://github.com/apache/jena/pull/3027#discussion_r2081313619
########## jena-fuseki2/jena-fuseki-mod-geosparql/src/main/java/org/apache/jena/fuseki/mod/geosparql/SpatialIndexComputeService.java: ########## @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.fuseki.mod.geosparql; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentHashMap.KeySetView; +import java.util.stream.Stream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.jena.atlas.iterator.Iter; +import org.apache.jena.atlas.lib.DateTimeUtils; +import org.apache.jena.fuseki.FusekiException; +import org.apache.jena.fuseki.servlets.BaseActionREST; +import org.apache.jena.fuseki.servlets.HttpAction; +import org.apache.jena.geosparql.spatial.SpatialIndex; +import org.apache.jena.geosparql.spatial.SpatialIndexException; +import org.apache.jena.geosparql.spatial.index.v2.STRtreePerGraph; +import org.apache.jena.geosparql.spatial.index.v2.SpatialIndexIoKryo; +import org.apache.jena.geosparql.spatial.index.v2.SpatialIndexPerGraph; +import org.apache.jena.geosparql.spatial.index.v2.SpatialIndexUtils; +import org.apache.jena.geosparql.spatial.index.v2.SpatialIndexerComputation; +import org.apache.jena.geosparql.spatial.task.AbortableThread; +import org.apache.jena.geosparql.spatial.task.TaskControl; +import org.apache.jena.geosparql.spatial.task.TaskControlOverAbortableThread; +import org.apache.jena.graph.Node; +import org.apache.jena.graph.NodeFactory; +import org.apache.jena.query.ReadWrite; +import org.apache.jena.riot.WebContent; +import org.apache.jena.riot.web.HttpNames; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.Quad; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.system.AutoTxn; +import org.apache.jena.system.Txn; +import org.apache.jena.web.HttpSC; +import org.locationtech.jts.index.strtree.STRtree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.reflect.TypeToken; + +import jakarta.servlet.AsyncContext; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +/** + * Spatial index (re)computation service. + * + * Supports two types of tasks that are executed concurrently. For a given data set, only a single + * task can be active at a given time. The status of the most recent task can be queried via the + * rest API. Task execution also broadcasts start/abort/termination events via server side events + * (SSE). + * + * <ul> + * <li>Updating/replacing the graphs of a spatial index.</li> + * <li>Removal of graphs from a spatial index that are absent in the corresponding data set.</li> + * </ul> + */ +public class SpatialIndexComputeService extends BaseActionREST { + private static final Logger logger = LoggerFactory.getLogger(SpatialIndexComputeService.class); + + private static Gson gson = new Gson(); + private KeySetView<AsyncContext, ?> eventListeners = ConcurrentHashMap.newKeySet(); + + public SpatialIndexComputeService() {} + + private static List<Node> extractGraphs(DatasetGraph dsg, HttpAction action) { + String uris = action.getRequest().getParameter(HttpNames.paramGraph); + Collection<String> strs; + if (uris == null || uris.isBlank()) { + strs = List.of(Quad.defaultGraphIRI.toString(), Quad.unionGraph.toString()); + } else { + TypeToken<List<String>> typeToken = new TypeToken<List<String>>(){}; + strs = gson.fromJson(uris, typeToken); + } + List<Node> rawGraphNodes = strs.stream().map(NodeFactory::createURI).distinct().toList(); + // If the set of specified graphs is empty then index all. + if (rawGraphNodes.isEmpty()) { + rawGraphNodes = List.of(Quad.defaultGraphIRI, Quad.unionGraph); + } + List<Node> expandedGraphNodes = rawGraphNodes.stream() + .flatMap(node -> expandUnionGraphNode(dsg, node).stream()).distinct().toList(); + return expandedGraphNodes; + } + + private static boolean isReplaceMode(HttpAction action) { + String str = action.getRequest().getParameter("replaceMode"); + boolean result = (str == null || str.isBlank()) ? false : Boolean.parseBoolean(str); + return result; + } + + private static int getThreadCount(HttpAction action) { + String str = action.getRequest().getParameter("maxThreadCount"); + int result = (str == null || str.isBlank()) ? 1 : Integer.parseInt(str); + + if (result == 0) { + result = Runtime.getRuntime().availableProcessors(); + } + + return result; + } + + private static List<Node> expandUnionGraphNode(DatasetGraph dsg, Node node) { + return Quad.isUnionGraph(node) + ? listGraphNodes(dsg) + : List.of(node); + } + + private static List<Node> listGraphNodes(DatasetGraph dsg) { + try (Stream<Node> s = Iter.asStream(dsg.listGraphNodes())) { + return s.toList(); + } + } + + /** + * The GET command can serve: the website, the notification stream from task execution + * and the latest task execution status. + */ + @Override + protected void doGet(HttpAction action) { + String rawCommand = action.getRequestParameter("command"); + String command = Optional.ofNullable(rawCommand).orElse("website"); + switch (command) { + case "website": serveWebSite(action); break; + case "events": serveEvents(action); break; + case "status": serveStatus(action); break; + default: + throw new UnsupportedOperationException("Unsupported operation: " + command); + } + } + + public void serveWebSite(HttpAction action) { + // Serves the minimal graphql ui + String resourceName = "spatial-indexer/index.html"; + String str = null; + try (InputStream in = SpatialIndexComputeService.class.getClassLoader().getResourceAsStream(resourceName)) { + str = IOUtils.toString(in, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new FusekiException(e); + } + + if (str == null) { + action.setResponseStatus(HttpSC.INTERNAL_SERVER_ERROR_500); + action.setResponseContentType(WebContent.contentTypeTextPlain); + str = "Failed to load classpath resource " + resourceName; + } else { + action.setResponseStatus(HttpSC.OK_200); + action.setResponseContentType(WebContent.contentTypeHTML); + } + try (OutputStream out = action.getResponseOutputStream()) { + IOUtils.write(str, out, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new FusekiException(e); + } + } + + protected TaskControl<?> getActiveTask(HttpAction action) { + DatasetGraph dsg = action.getDataset(); + Context cxt = dsg.getContext(); + TaskControl<?> activeTask = cxt.get(SpatialIndexUtils.SPATIAL_INDEX_TASK_SYMBOL); + return activeTask; + } + + /** + * Post request: Handle API call. + * Request is rejected if there is an already running task. + */ + @Override + protected void doPost(HttpAction action) { + try { + String rawCommand = action.getRequestParameter("command"); + String command = Optional.ofNullable(rawCommand).orElse("none"); + switch (command) { + case "index": doIndex(action); break; + case "clean": doClean(action); break; + case "status": serveStatus(action); break; + case "cancel": doStop(action); break; + default: throw new UnsupportedOperationException("Unsupported operation: " + command); + } + } catch (Throwable t) { + String str = ExceptionUtils.getStackTrace(t); + action.log.error("An unexpected error occurred.", t); + // FusekiException + setResponse(action, HttpSC.INTERNAL_SERVER_ERROR_500, WebContent.contentTypeTextPlain, str); + } + } + + protected void serveEvents(HttpAction action) { + HttpServletRequest request = action.getRequest(); + HttpServletResponse response = action.getResponse(); + + response.setContentType("text/event-stream"); + response.setCharacterEncoding("UTF-8"); + response.setHeader("Cache-Control", "no-cache"); + + final AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + eventListeners.add(asyncContext); + } + + /** + * Remove all graphs from the index for which there is no corresponding graph in the dataset. + */ + protected void doClean(HttpAction action) throws Exception { + DatasetGraph dsg = action.getDataset(); + Context cxt = dsg.getContext(); + + TaskControlOverAbortableThread<Thread> taskCtl = new TaskControlOverAbortableThread<>("Spatial Indexer Task"); + + cxt.compute(SpatialIndexUtils.SPATIAL_INDEX_TASK_SYMBOL, (key, priorTaskObj) -> { + TaskControl<?> priorTask = (TaskControl<?>)priorTaskObj; + if (priorTask != null && !priorTask.isComplete()) { + throw new RuntimeException("A spatial indexing task is already active for this dataset. Wait for completion or abort it."); + } + + AbortableThread<?> thread = new AbortableThread<>() { + @Override + public void runActual() throws Exception { + broadcastTaskStart(getStartTime(), null); + + SpatialIndex spatialIndexRaw = SpatialIndexUtils.getSpatialIndex(cxt); + if (spatialIndexRaw == null) { + throw new SpatialIndexException("No spatial index available on current dataset."); + } else if (spatialIndexRaw instanceof SpatialIndexPerGraph spatialIndex) { + STRtreePerGraph perGraphIndex = spatialIndex.getIndex(); + Map<Node, STRtree> treeMap = perGraphIndex.getTreeMap(); + Set<Node> graphNodes = new LinkedHashSet<>(Txn.calculateRead(dsg, () -> listGraphNodes(dsg))); + // int initialGraphCount = treeMap.keySet().size(); + List<Node> indexGraphNodes = new ArrayList<>(treeMap.keySet()); + + int cleanCount = 0; + for (Node node : indexGraphNodes) { + if (!(node == null || Quad.isDefaultGraph(node)) && !graphNodes.contains(node)) { + perGraphIndex.removeTree(node); + ++cleanCount; + // System.out.println("Removed: [" + node + "] " + (node == null)); + } + } + + int finalGraphCount = treeMap.keySet().size(); + Path targetFile = spatialIndex.getLocation(); + // SpatialIndexUtils.setSpatialIndex(cxt, newIndex); + if (cleanCount > 0 && targetFile != null) { + // newIndex.setLocation(targetFile); + action.log.info("Writing spatial index of {} graphs (cleaned: {}) to disk at path {}", finalGraphCount, cleanCount, targetFile.toAbsolutePath()); + SpatialIndexIoKryo.save(targetFile, spatialIndex); + } + String statusMsg = String.format("Updated spatial index of %d graphs (cleaned: %d)", finalGraphCount, cleanCount); + setStatusMessage(statusMsg); + if (logger.isInfoEnabled()) { + logger.info("Indexing of {} graphs completed successfully.", finalGraphCount); + } + } else { + throw new SpatialIndexException("Unsupported spatial index type for cleaning."); + } + } + + @Override + protected void doAfterRun() { + broadcastTaskEnd(getEndTime(), getThrowable(), getStatusMessage()); + } + }; + + taskCtl.setThread(thread); + taskCtl.setSource(thread); + thread.start(); + return taskCtl; + }); + } + + public static <T> T calculateRead(HttpAction action, Callable<T> supplier) throws Exception { + action.begin(); + DatasetGraph dsg = action.getDataset(); + try (AutoTxn txn = Txn.begin(dsg, ReadWrite.READ)) { + return supplier.call(); + } finally { + action.end(); + } + } + + protected void doStop(HttpAction action) { + TaskControl<?> task = getActiveTask(action); + String state; + if (task != null) { + state = "true"; + // Add a completion listener to return when done. + task.abort(); + Thread thread = (Thread)task.getSource(); + try { + thread.join(); + } catch (InterruptedException e) { + // TODO Raise some HTTP error + throw new RuntimeException(e); + } + } else { + state = "false"; + } + + action.setResponseStatus(HttpSC.OK_200); + action.setResponseContentType(WebContent.contentTypeJSON); + try { + action.getResponseOutputStream().print(String.format("{ \"stopped\": %s }", state)); + } catch (IOException e) { + throw new FusekiException(e); + } + } + + public void serveStatus(HttpAction action) { + TaskControl<?> task = getActiveTask(action); + + JsonObject status = new JsonObject(); + Instant time; + if (task == null) { + status.addProperty("isIndexing", false); + time = Instant.ofEpochMilli(0); + } else { + status.addProperty("isIndexing", !task.isComplete()); + if (!task.isComplete()) { + status.addProperty("isAborting", task.isAborting()); + time = !task.isAborting() ? task.getStartTime() : task.getCancelTime(); + } else { + time = task.getEndTime(); + } + Throwable throwable = task.getThrowable(); + if (throwable != null) { + String msg = ExceptionUtils.getStackTrace(throwable); + status.addProperty("error", msg); + } + + String msg = task.getStatusMessage(); + if (msg != null) { + status.addProperty("message", msg); + } + } + status.addProperty("time", time.toEpochMilli()); + + action.setResponseStatus(HttpSC.OK_200); + action.setResponseContentType(WebContent.contentTypeJSON); + try { + String str = gson.toJson(status); + action.getResponseOutputStream().println(str); + } catch (IOException e) { + throw new FusekiException(e); + } + } + + public TaskControl<?> scheduleTask(HttpAction action, SpatialIndexerComputation indexComputation, Path targetFile, boolean isReplaceTask) { + DatasetGraph dsg = action.getDataset(); + Context cxt = dsg.getContext(); + + TaskControlOverAbortableThread<Thread> taskCtl = new TaskControlOverAbortableThread<>("Spatial Indexer Task"); + + cxt.compute(SpatialIndexUtils.SPATIAL_INDEX_TASK_SYMBOL, (key, priorTaskObj) -> { + TaskControl<?> priorTask = (TaskControl<?>)priorTaskObj; + if (priorTask != null && !priorTask.isComplete()) { + throw new RuntimeException("A spatial indexing task is already active for this dataset. Wait for completion or abort it."); + } + + long graphCount = indexComputation.getGraphNodes().size(); + + AbortableThread<?> thread = new AbortableThread<Object>() { + @Override + public void runActual() throws Exception { + broadcastTaskStart(getStartTime(), null); + if (logger.isInfoEnabled()) { + String replaceMsg = isReplaceTask ? "The resulting index will REPLACE a prior index." : "A prior index will be UPDATED with the newly indexed graphs."; + logger.info("Indexing of {} graphs started. " + replaceMsg, graphCount); + } + + // Uncomment to test artificial delays. + // Thread.sleep(5000); + + SpatialIndexPerGraph newIndex = indexComputation.call(); + + // If NOT in replace mode, add all graph-indexes from the previous index + if (!isReplaceTask) { + SpatialIndexPerGraph oldIndex = (SpatialIndexPerGraph)SpatialIndexUtils.getSpatialIndex(cxt); + Map<Node, STRtree> treeMap = oldIndex.getIndex().getTreeMap(); + treeMap.forEach((name, tree) -> { + if (!newIndex.getIndex().contains(name)) { + newIndex.getIndex().setTree(name, tree); + } + }); + } + + SpatialIndexUtils.setSpatialIndex(cxt, newIndex); + if (targetFile != null) { + newIndex.setLocation(targetFile); + action.log.info("Writing spatial index of {} graphs to disk at path {}", graphCount, targetFile.toAbsolutePath()); + SpatialIndexIoKryo.save(targetFile, newIndex); + } + String statusMsg = String.format("Updated spatial index with %d graphs.", graphCount); + setStatusMessage(statusMsg); + if (logger.isInfoEnabled()) { + logger.info("Indexing of {} graphs completed successfully.", graphCount); + } + } + + public void requestCancel() { + broadcastTaskAbort(getCancelTime(), null); + indexComputation.abort(); + super.requestCancel(); // Interrupt + } + + @Override + protected void doAfterRun() { + try { + indexComputation.close(); + } finally { + try { + Throwable throwable = getThrowable(); + Instant endTime = getEndTime(); + broadcastTaskEnd(endTime, throwable, getStatusMessage()); + } finally { + if (logger.isInfoEnabled()) { + logger.info("Indexing task of {} graphs terminated.", graphCount); + } + } + } + } + }; + + taskCtl.setThread(thread); + taskCtl.setSource(thread); + thread.start(); + return taskCtl; + }); + + return taskCtl; + } + + protected void doIndex(HttpAction action) throws Exception { + DatasetGraph dsg = action.getDataset(); + SpatialIndex indexTmp = SpatialIndexUtils.getSpatialIndex(dsg.getContext()); + SpatialIndexPerGraph index = (SpatialIndexPerGraph)indexTmp; + + if (index == null) { // error: no spatial index has been configured + String msg = String.format("[%d] no spatial index has been configured for the dataset", action.id); + action.log.error(msg); + action.setResponseStatus(HttpSC.SERVICE_UNAVAILABLE_503); + action.setResponseContentType(WebContent.contentTypeTextPlain); + try { + action.getResponseWriter().println(msg); + } catch (IOException e) { + throw new FusekiException(e); + } + return; + } else { + boolean isReplaceMode = isReplaceMode(action); + int threadCount = getThreadCount(action); + + // Only SpatialIndexPerGraph can be updated. + // Check if the index can be updated. If not then raise an exception + // that informs that only replace mode can be used in this situation. + if (!isReplaceMode && !(index instanceof SpatialIndexPerGraph)) { Review Comment: My intention is that the servlet can upgrade any legacy index (such as SpatialIndexV1) to the latest implementation version. I need to revise the code for that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@jena.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@jena.apache.org For additional commands, e-mail: pr-h...@jena.apache.org