[ https://issues.apache.org/jira/browse/RYA-241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15856699#comment-15856699 ]
ASF GitHub Bot commented on RYA-241: ------------------------------------ Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/139#discussion_r99916370 --- Diff: extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoWaveGeoIndexer.java --- @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.accumulo.geo; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.GeoIndexer; +import org.apache.rya.indexing.Md5Hash; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.StatementSerializer; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.geotools.data.DataStore; +import org.geotools.data.DataUtilities; +import org.geotools.data.FeatureSource; +import org.geotools.data.FeatureStore; +import org.geotools.factory.CommonFactoryFinder; +import org.geotools.factory.Hints; +import org.geotools.feature.DefaultFeatureCollection; +import org.geotools.feature.SchemaException; +import org.geotools.feature.simple.SimpleFeatureBuilder; +import org.geotools.filter.text.cql2.CQLException; +import org.geotools.filter.text.ecql.ECQL; +import org.opengis.feature.simple.SimpleFeature; +import org.opengis.feature.simple.SimpleFeatureType; +import org.opengis.filter.Filter; +import org.opengis.filter.FilterFactory; +import org.opengis.filter.identity.Identifier; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; + +import info.aduna.iteration.CloseableIteration; +import mil.nga.giat.geowave.adapter.vector.FeatureDataAdapter; +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStore; +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWaveGTDataStoreFactory; +import mil.nga.giat.geowave.adapter.vector.plugin.GeoWavePluginException; +import mil.nga.giat.geowave.adapter.vector.query.cql.CQLQuery; +import mil.nga.giat.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider; +import mil.nga.giat.geowave.core.store.CloseableIterator; +import mil.nga.giat.geowave.core.store.StoreFactoryFamilySpi; +import mil.nga.giat.geowave.core.store.index.PrimaryIndex; +import mil.nga.giat.geowave.core.store.memory.MemoryStoreFactoryFamily; +import mil.nga.giat.geowave.core.store.query.EverythingQuery; +import mil.nga.giat.geowave.core.store.query.QueryOptions; +import mil.nga.giat.geowave.datastore.accumulo.AccumuloDataStore; +import mil.nga.giat.geowave.datastore.accumulo.AccumuloStoreFactoryFamily; + +/** + * A {@link GeoIndexer} wrapper around a GeoWave {@link AccumuloDataStore}. This class configures and connects to the Datastore, creates the + * RDF Feature Type, and interacts with the Datastore. + * <p> + * Specifically, this class creates a RDF Feature type and stores each RDF Statement as a RDF Feature in the datastore. Each feature + * contains the standard set of GeoWave attributes (Geometry, Start Date, and End Date). The GeoWaveGeoIndexer populates the Geometry + * attribute by parsing the Well-Known Text contained in the RDF Statement’s object literal value. + * <p> + * The RDF Feature contains four additional attributes for each component of the RDF Statement. These attributes are: + * <p> + * <table border="1"> + * <tr> + * <th>Name</th> + * <th>Symbol</th> + * <th>Type</th> + * </tr> + * <tr> + * <td>Subject Attribute</td> + * <td>S</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Predicate Attribute</td> + * <td>P</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Object Attribute</td> + * <td>O</td> + * <td>String</td> + * </tr> + * </tr> + * <tr> + * <td>Context Attribute</td> + * <td>C</td> + * <td>String</td> + * </tr> + * </table> + */ +public class GeoWaveGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer { + + private static final String TABLE_SUFFIX = "geo"; + + private static final Logger logger = Logger.getLogger(GeoWaveGeoIndexer.class); + + private static final String FEATURE_NAME = "RDF"; + + private static final String SUBJECT_ATTRIBUTE = "S"; + private static final String PREDICATE_ATTRIBUTE = "P"; + private static final String OBJECT_ATTRIBUTE = "O"; + private static final String CONTEXT_ATTRIBUTE = "C"; + private static final String GEO_ID_ATTRIBUTE = "geo_id"; + private static final String GEOMETRY_ATTRIBUTE = "geowave_index_geometry"; + + private Set<URI> validPredicates; + private Configuration conf; + private FeatureStore<SimpleFeatureType, SimpleFeature> featureStore; + private FeatureSource<SimpleFeatureType, SimpleFeature> featureSource; + private SimpleFeatureType featureType; + private FeatureDataAdapter featureDataAdapter; + private DataStore geoToolsDataStore; + private mil.nga.giat.geowave.core.store.DataStore geoWaveDataStore; + private final PrimaryIndex index = new SpatialDimensionalityTypeProvider().createPrimaryIndex(); + private boolean isInit = false; + + //initialization occurs in setConf because index is created using reflection + @Override + public void setConf(final Configuration conf) { + this.conf = conf; + if (!isInit) { + try { + initInternal(); + isInit = true; + } catch (final IOException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * @return the internal GeoTools{@link DataStore} used by the {@link GeoWaveGeoIndexer}. + */ + public DataStore getGeoToolsDataStore() { + return geoToolsDataStore; + } + + /** + * @return the internal GeoWave {@link DataStore} used by the {@link GeoWaveGeoIndexer}. + */ + public mil.nga.giat.geowave.core.store.DataStore getGeoWaveDataStore() { + return geoWaveDataStore; + } + + private void initInternal() throws IOException { + validPredicates = ConfigUtils.getGeoPredicates(conf); + + try { + geoToolsDataStore = createDataStore(conf); + geoWaveDataStore = ((GeoWaveGTDataStore) geoToolsDataStore).getDataStore(); + } catch (final GeoWavePluginException e) { + logger.error("Failed to create GeoWave data store", e); + } + + try { + featureType = getStatementFeatureType(geoToolsDataStore); + } catch (final IOException | SchemaException e) { + throw new IOException(e); + } + + featureDataAdapter = new FeatureDataAdapter(featureType); + + featureSource = geoToolsDataStore.getFeatureSource(featureType.getName()); + if (!(featureSource instanceof FeatureStore)) { + throw new IllegalStateException("Could not retrieve feature store"); + } + featureStore = (FeatureStore<SimpleFeatureType, SimpleFeature>) featureSource; + } + + public Map<String, Serializable> getParams(final Configuration conf) { + // get the configuration parameters + final Instance instance = ConfigUtils.getInstance(conf); + final String instanceId = instance.getInstanceName(); + final String zookeepers = instance.getZooKeepers(); + final String user = ConfigUtils.getUsername(conf); + final String password = ConfigUtils.getPassword(conf); + final String auths = ConfigUtils.getAuthorizations(conf).toString(); + final String tableName = getTableName(conf); + final String tablePrefix = ConfigUtils.getTablePrefix(conf); + + final Map<String, Serializable> params = new HashMap<>(); + params.put("zookeeper", zookeepers); + params.put("instance", instanceId); + params.put("user", user); + params.put("password", password); + params.put("namespace", tableName); + params.put("gwNamespace", tablePrefix + getClass().getSimpleName()); + + params.put("Lock Management", LockManagementType.MEMORY.toString()); --- End diff -- It should handle multiple clients. > Add Geowave support > ------------------- > > Key: RYA-241 > URL: https://issues.apache.org/jira/browse/RYA-241 > Project: Rya > Issue Type: New Feature > Reporter: Eric White > Assignee: Eric White > > Add in Geowave as an alternative to Geomesa -- This message was sent by Atlassian JIRA (v6.3.15#6346)