This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new fd0d3a69f ATLAS-4789: added couchbase bridge fd0d3a69f is described below commit fd0d3a69fe97867b3565d0a2f497da6362ce5ef4 Author: Dmitrii Chechetkin <dmitrii.chechet...@couchbase.com> AuthorDate: Mon Oct 30 12:31:32 2023 -0400 ATLAS-4789: added couchbase bridge Signed-off-by: Madhan Neethiraj <mad...@apache.org> --- addons/couchbase-bridge/pom.xml | 201 ++++++++++++ .../com/couchbase/atlas/connector/AtlasConfig.java | 54 ++++ .../com/couchbase/atlas/connector/CBConfig.java | 145 +++++++++ .../couchbase/atlas/connector/CouchbaseHook.java | 341 +++++++++++++++++++++ .../connector/entities/CouchbaseAtlasEntity.java | 227 ++++++++++++++ .../atlas/connector/entities/CouchbaseBucket.java | 89 ++++++ .../atlas/connector/entities/CouchbaseCluster.java | 130 ++++++++ .../connector/entities/CouchbaseCollection.java | 90 ++++++ .../atlas/connector/entities/CouchbaseField.java | 129 ++++++++ .../connector/entities/CouchbaseFieldType.java | 71 +++++ .../atlas/connector/entities/CouchbaseScope.java | 93 ++++++ .../main/resources/atlas-application.properties | 17 + .../atlas/connector/CouchbaseHookTest.java | 136 ++++++++ .../entities/CouchbaseAtlasEntityTest.java | 108 +++++++ .../couchbase-bridge/src/test/resources/log4j.xml | 35 +++ .../5000-Couchbase/5020-couchbase_model.json | 159 ++++++++++ distro/pom.xml | 1 + .../assemblies/atlas-couchbase-hook-package.xml | 34 ++ distro/src/main/assemblies/standalone-package.xml | 6 + docs/src/documents/Hook/HookCouchbase.md | 45 +++ pom.xml | 7 + 21 files changed, 2118 insertions(+) diff --git a/addons/couchbase-bridge/pom.xml b/addons/couchbase-bridge/pom.xml new file mode 100644 index 000000000..8a3ccef17 --- /dev/null +++ b/addons/couchbase-bridge/pom.xml @@ -0,0 +1,201 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>apache-atlas</artifactId> + <groupId>org.apache.atlas</groupId> + <version>3.0.0-SNAPSHOT</version> + <relativePath>../../</relativePath> + </parent> + <artifactId>couchbase-bridge</artifactId> + <description>Apache Atlas Couchbase Bridge Module</description> + <name>Apache Atlas Couchbase Bridge</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>30.1.1-jre</version> + </dependency> + <dependency> + <groupId>com.couchbase.client</groupId> + <artifactId>java-client</artifactId> + <version>3.4.4</version> + </dependency> + <dependency> + <groupId>com.couchbase.client</groupId> + <artifactId>dcp-client</artifactId> + <version>0.44.0</version> + </dependency> + <!-- Logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v2</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-notification</artifactId> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>dist</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-hook</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/dependency/hook/couchbase/atlas-couchbase-plugin-impl</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-client-common</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-client-v2</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-notification</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-common</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${kafka.scala.binary.version}</artifactId> + <version>${kafka.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + <version>${jersey.version}</version> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-module-twiki</artifactId> + <version>${doxia.version}</version> + </dependency> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-core</artifactId> + <version>${doxia.version}</version> + </dependency> + </dependencies> + <executions> + <execution> + <goals> + <goal>site</goal> + </goals> + <phase>prepare-package</phase> + </execution> + </executions> + <configuration> + <generateProjectInfo>false</generateProjectInfo> + <generateReports>false</generateReports> + </configuration> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <inherited>false</inherited> + <executions> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>post-integration-test</phase> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + + </plugins> + </build> +</project> diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java new file mode 100644 index 000000000..3168d7bfa --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector; + +import org.apache.atlas.AtlasClientV2; + +import java.util.Map; + +public class AtlasConfig { + private static final Map<String, String> ENV = System.getenv(); + private static AtlasClientV2 client = null; + + public static String[] urls() { + return new String[] { ENV.getOrDefault("ATLAS_URL", "http://localhost:21000") }; + } + + public static String username() { + return ENV.getOrDefault("ATLAS_USERNAME", "admin"); + } + + public static String password() { + return ENV.getOrDefault("ATLAS_PASSWORD", "admin"); + } + + public static String[] auth() { + return new String[] {username(), password()}; + } + + public static AtlasClientV2 client() { + if (client == null) { + client = new AtlasClientV2(urls(), auth()); + } + + return client; + } + + public static void client(AtlasClientV2 client) { + AtlasConfig.client = client; + } +} diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java new file mode 100644 index 000000000..70e9dc37c --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java @@ -0,0 +1,145 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector; + +import com.couchbase.client.dcp.Client; +import com.couchbase.client.dcp.SecurityConfig; +import com.couchbase.client.java.Cluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.stream.Collectors; + +public class CBConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(CBConfig.class); + + private static final Map<String, String> ENV = System.getenv(); + private static final Integer DCP_FIELD_MIN_OCCUR = Integer.valueOf(ENV.getOrDefault("DCP_FIELD_MIN_OCCUR", "0")); + private static final Float DCP_SAMPLE_RATIO = Float.valueOf(ENV.getOrDefault("DCP_SAMPLE_RATIO", "1")); + private static final Short DCP_FIELD_THRESHOLD = Short.valueOf(ENV.getOrDefault("DCP_FIELD_THRESHOLD", "0")); + private static Client mockDcpClient; + private static Cluster cluster; + + + + public static String address() { + return ENV.getOrDefault("CB_CLUSTER", "couchbase://localhost"); + } + + public static String username() { + return ENV.getOrDefault("CB_USERNAME", "Administrator"); + } + + public static String password() { + return ENV.getOrDefault("CB_PASSWORD", "password"); + } + + public static String bucket() { + return ENV.getOrDefault("CB_BUCKET", "default"); + } + + public static Collection<String> collections() { + String collections = ENV.get("CB_COLLECTIONS"); + + if (collections == null) { + return null; + } + + return Arrays.stream(collections.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + } + + public static String dcpPort() { + return ENV.getOrDefault("DCP_PORT", "11210"); + } + + /** + * @return Percentage of DCP messages to be analyzed in form of a short between 0 and 1. + */ + public static Float dcpSampleRatio() { + return DCP_SAMPLE_RATIO; + } + + /** + * @return a threshold that indicates in what percentage of analyzed messages per collection a field must appear before it is sent to Atlas + */ + public static Short dcpFieldThreshold() { + return DCP_FIELD_THRESHOLD; + } + + public static Integer dcpFieldMinOccurences() { + return DCP_FIELD_MIN_OCCUR; + } + + public static Cluster cluster() { + if (cluster == null) { + cluster = Cluster.connect(address(), username(), password()); + } + + return cluster; + } + + public static Client dcpClient() { + if (mockDcpClient != null) { + LOGGER.debug("Using mock DCP client"); + + return mockDcpClient; + } + + Client.Builder builder = Client.builder() + .collectionsAware(true) + .seedNodes(String.format("%s:%s",address(),dcpPort())) + .connectionString(address()) + .credentials(username(), password()); + + String bucket = bucket(); + + if (!(bucket == null || bucket.equals("*") || bucket.isEmpty())) { + LOGGER.debug("Monitoring bucket `{}`", bucket); + + builder.bucket(bucket); + + Collection<String> collections = collections(); + + if (collections != null && !collections.isEmpty()) { + LOGGER.debug("Monitoring collections: {}", String.join(", ", collections)); + + builder.collectionNames(collections); + } + } + + if (enableTLS()) { + LOGGER.debug("Using native TLS"); + + builder.securityConfig(SecurityConfig.builder().enableNativeTls(true).build()); + } + + return builder.build(); + } + + protected static void dcpClient(Client mockDcpClient) { + CBConfig.mockDcpClient = mockDcpClient; + } + + private static boolean enableTLS() { + return Boolean.parseBoolean(ENV.getOrDefault("CB_ENABLE_TLS", "false")); + } +} \ No newline at end of file diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java new file mode 100644 index 000000000..0a73307c8 --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java @@ -0,0 +1,341 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector; + +import com.couchbase.atlas.connector.entities.CouchbaseBucket; +import com.couchbase.atlas.connector.entities.CouchbaseCluster; +import com.couchbase.atlas.connector.entities.CouchbaseCollection; +import com.couchbase.atlas.connector.entities.CouchbaseField; +import com.couchbase.atlas.connector.entities.CouchbaseFieldType; +import com.couchbase.atlas.connector.entities.CouchbaseScope; +import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.dcp.Client; +import com.couchbase.client.dcp.ControlEventHandler; +import com.couchbase.client.dcp.DataEventHandler; +import com.couchbase.client.dcp.StreamFrom; +import com.couchbase.client.dcp.StreamTo; +import com.couchbase.client.dcp.highlevel.internal.CollectionIdAndKey; +import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest.CollectionInfo; +import com.couchbase.client.dcp.message.DcpMutationMessage; +import com.couchbase.client.dcp.message.MessageUtil; +import com.couchbase.client.dcp.transport.netty.ChannelFlowController; +import com.couchbase.client.java.json.JsonObject; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; +import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class CouchbaseHook extends AtlasHook implements ControlEventHandler, DataEventHandler { + private static final Logger LOG = LoggerFactory.getLogger(CouchbaseHook.class); + + protected static CouchbaseHook INSTANCE; + protected static Client DCP; + protected static AtlasClientV2 ATLAS; + private static Consumer<List<AtlasEntity>> createInterceptor; + private static Consumer<List<AtlasEntity>> updateInterceptor; + private static boolean loop = true; + + + private CouchbaseCluster clusterEntity; + private CouchbaseBucket bucketEntity; + + /** + * START HERE + * + * @param args + */ + public static void main(String[] args) { + // create instances of DCP client, + DCP = CBConfig.dcpClient(); + + // Atlas client, + ATLAS = AtlasConfig.client(); + + // and DCP handler + INSTANCE = new CouchbaseHook(); + + // register DCP handler with DCP client + DCP.controlEventHandler(INSTANCE); + DCP.dataEventHandler(INSTANCE); + + // Connect to the cluster + DCP.connect().block(); + + LOG.info("DCP client connected."); + + // Ensure the existence of corresponding + // CouchbaseCluster, CouchbaseBucket, CouchbaseScope + // entities and store them in local cache + INSTANCE.initializeAtlasContext(); + + // Start listening to DCP + DCP.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block(); + + System.out.println("Starting the stream..."); + DCP.startStreaming().block(); + + System.out.println("Started the stream."); + // And then just loop the loop + try { + while (loop) { + Thread.sleep(1000); + } + } catch (InterruptedException e) { + + } finally { + DCP.disconnect().block(); + } + } + + /** + * Ensures the existence of CouchbaseCluster, + * CouchbaseBucket and Couchbase scope entities + * and stores them into local cache + */ + private void initializeAtlasContext() { + LOG.debug("Creating cluster/bucket/scope entities"); + + clusterEntity = new CouchbaseCluster() + .name(CBConfig.address()) + .url(CBConfig.address()) + .get(); + + bucketEntity = new CouchbaseBucket() + .name(CBConfig.bucket()) + .cluster(clusterEntity) + .get(); + + List<AtlasEntity> entitiesToCreate = new ArrayList<>(); + + if (!clusterEntity.exists(ATLAS)) { + entitiesToCreate.add(clusterEntity.atlasEntity(ATLAS)); + } + + if (!bucketEntity.exists(ATLAS)) { + entitiesToCreate.add(bucketEntity.atlasEntity(ATLAS)); + } + + if (!entitiesToCreate.isEmpty()) { + createEntities(entitiesToCreate); + } + } + + private void createEntities(List<AtlasEntity> entities) { + if (createInterceptor != null) { + createInterceptor.accept(entities); + return; + } + + AtlasEntitiesWithExtInfo entity = new AtlasEntitiesWithExtInfo(entities); + EntityCreateRequestV2 request = new EntityCreateRequestV2("couchbase", entity); + + notifyEntities(Arrays.asList(request), null); + } + + private void updateEntities(List<AtlasEntity> entities) { + if (updateInterceptor != null) { + updateInterceptor.accept(entities); + + return; + } + + AtlasEntitiesWithExtInfo entity = new AtlasEntitiesWithExtInfo(entities); + EntityUpdateRequestV2 request = new EntityUpdateRequestV2("couchbase", entity); + + notifyEntities(Arrays.asList(request), null); + } + + @Override + public void onEvent(ChannelFlowController flowController, ByteBuf event) { + // Probabilistic sampling + if (Math.random() > CBConfig.dcpSampleRatio()) { + LOG.debug("Skipping DCP message."); + return; + } + + if (DcpMutationMessage.is(event)) { + try { + // Borrowed from Couchbeans :) + // Gathering some information about the message. + CollectionIdAndKey ckey = MessageUtil.getCollectionIdAndKey(event, true); + CollectionInfo collectionInfo = collectionInfo(MessageUtil.getVbucket(event), ckey.collectionId()); + String collectionName = collectionInfo.name(); + String scopeName = collectionInfo.scope().name(); + + LOG.debug("Received DCP mutation message for scope '{}' and collection '{}'", scopeName, collectionName); + + CouchbaseScope scopeEntity = bucketEntity.scope(scopeName); + + // Because Atlas doesn't support upserts, + // we need to send new entities in a separate message + // from already existing ones + List<AtlasEntity> toCreate = new ArrayList<>(); + List<AtlasEntity> toUpdate = new ArrayList<>(); + + if (!scopeEntity.exists(ATLAS)) { + toCreate.add(scopeEntity.atlasEntity(ATLAS)); + + LOG.debug("Creating scope: {}", scopeEntity.qualifiedName()); + } else { + toUpdate.add(scopeEntity.atlasEntity(ATLAS)); + + LOG.debug("Updating scope: {}", scopeEntity.qualifiedName()); + } + + CouchbaseCollection collectionEntity = scopeEntity.collection(collectionName); + + // Let's record this attempt to analyze a collection document + // so that we can calculate field frequencies + // when filtering them via DCP_FIELD_THRESHOLD + collectionEntity.incrementAnalyzedDocuments(); + + // and then schedule it to be sent to Atlas + if (!collectionEntity.exists(ATLAS)) { + toCreate.add(collectionEntity.atlasEntity(ATLAS)); + } else { + toUpdate.add(collectionEntity.atlasEntity(ATLAS)); + } + + Map<String, Object> document = JsonObject.fromJson(DcpMutationMessage.contentBytes(event)).toMap(); + + System.out.println(String.format("Document keys: %s", document.keySet())); + + // for each field in the document... + document.entrySet().stream() + // transform the field into CouchbaseField either by loading corresponding entity or by creating it + .filter(e -> e.getValue() != null) + .flatMap(entry -> processField(collectionEntity, (Collection<String>) Collections.EMPTY_LIST, null, entry.getKey(), entry.getValue())) + // update document counter on the field entity + .peek(CouchbaseField::incrementDocumentCount) + // Only passes fields that either already in Atlas or pass DCP_FIELD_THRESHOLD setting + .filter(field -> field.exists(ATLAS) || field.documentCount() / (float) collectionEntity.documentsAnalyzed() > CBConfig.dcpFieldThreshold()) + // Schedule the entity either for creation or to be updated in Atlas + .forEach(field -> { + if (field.exists(ATLAS)) { + toUpdate.add(field.atlasEntity(ATLAS)); + } else { + toCreate.add(field.atlasEntity(ATLAS)); + } + }); + + createEntities(toCreate); + updateEntities(toUpdate); + + System.out.println("Notified Atlas"); + } catch (Exception e) { + LOG.error("Failed to process DCP message", e); + } + } + } + + /** + * Constructs a {@link CouchbaseField} from field information + * + * @param collectionEntity the {@link CouchbaseCollection} to which the field belongs + * @param path the path to the field inside the collection document excluding the field itself + * @param parent the parent field, if present or null + * @param name the name of the field + * @param value the value for the field from received document + * @return constructed or loaded from Atlas {@link CouchbaseField} + */ + private static Stream<CouchbaseField> processField(CouchbaseCollection collectionEntity, Collection<String> path, @Nullable CouchbaseField parent, String name, Object value) { + // Let's figure out what type does this field have + CouchbaseFieldType fieldType = CouchbaseFieldType.infer(value); + + // The full field path as it will be stored in Atlas + final Collection<String> fieldPath = new ArrayList<>(path); + + fieldPath.add(name); + + // constructing the field entity and loading it from cache or Atlas, if previously stored there + CouchbaseField rootField = new CouchbaseField() + .name(name) + .fieldPath(fieldPath.stream().collect(Collectors.joining("."))) + .fieldType(fieldType) + .collection(collectionEntity) + .parentField(parent) + .get(); + + // return value + Stream<CouchbaseField> result = Stream.of(rootField); + + // Recursive transformation of embedded object fields + if (fieldType == CouchbaseFieldType.OBJECT) { + // Normalizing the value + if (value instanceof JsonObject) { + value = ((JsonObject) value).toMap(); + } + + if (value instanceof Map) { + // Append embedded field entities to the resulting Stream + result = Stream.concat( + result, + ((Map<String, ?>) value).entrySet().stream() + // recursion + .flatMap(entity -> processField(collectionEntity, fieldPath, rootField, entity.getKey(), entity.getValue())) + ); + } else { + throw new IllegalArgumentException(String.format("Incorrect value type '%s' for field type 'object': a Map was expected instead.", value.getClass())); + } + } + + return result; + } + + @Override + public String getMessageSource() { + return "couchbase"; + } + + /** + * Looks up the collection name by its vbucket identifier + * + * @param vbucket + * @param collid + * @return the name of the collection + */ + private static CollectionInfo collectionInfo(int vbucket, long collid) { + return DCP.sessionState() + .get(vbucket) + .getCollectionsManifest() + .getCollection(collid); + } + + protected static void setEntityInterceptors(Consumer<List<AtlasEntity>> createInterceptor, Consumer<List<AtlasEntity>> updateInterceptor) { + CouchbaseHook.createInterceptor = createInterceptor; + CouchbaseHook.updateInterceptor = updateInterceptor; + } + + static void loop(boolean loop) { + CouchbaseHook.loop = loop; + } +} diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java new file mode 100644 index 000000000..8bc8e42e9 --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java @@ -0,0 +1,227 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector.entities; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasStructDef; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +/** + * Base class for all couchbase atlas models + * The class uses "Self-Builder" pattern: + * 1. First, create the "builder" instance of the class + * 2. Populate the identifying fields of the class (check the `qualifiedName` method of the entity for the list) + * (all setters return the instance just as a Builder would) + * 3. Call `get()` method to resolve the instance and replace it with previously loaded from Atlas data (if present) + * + * Example: + * ```java + * clusterEntity = new CouchbaseCluster() + * .name(CBConfig.address()) + * .url(CBConfig.address()) + * .get(); + * ``` + * + * @param <E> extending class + */ +public abstract class CouchbaseAtlasEntity<E extends CouchbaseAtlasEntity<?>> { + private static final Map<Class, Map<String, AtlasEntity>> ENTITY_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>()); + private static final Map<Class, Map<String, CouchbaseAtlasEntity>> MODEL_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>()); + private String name; + + public String name() { + return name; + } + + public E name(String name) { + this.name = name; + return (E) this; + } + + /** + * Loads or creates corresponding Atlas Entity and marks this model as existing in Atlas + * + * @param atlas + * @return + */ + public AtlasEntity atlasEntity(AtlasClientV2 atlas) { + AtlasEntity atlasEntity = atlasEntity() + .filter(entity -> entity.getGuid().charAt(0) != '-') + .orElseGet(() -> + cache(load(atlas) + .orElseGet(() -> + atlasEntity() + .orElseGet(() -> new AtlasEntity(atlasTypeName()))) + ) + ); + + atlasEntity.setAttribute("name", name); + atlasEntity.setAttribute("qualifiedName", qualifiedName()); + + updateAtlasEntity(atlasEntity); + + return atlasEntity; + } + + protected abstract String qualifiedName(); + + /** + * Looks up precreated atlas entity in the entity cache + * @return Optional of the cached entity + */ + public Optional<AtlasEntity> atlasEntity() { + return cachedEntity().map(atlasEntity -> { + updateAtlasEntity(atlasEntity); + return atlasEntity; + }); + } + + /** + * Checks whether the model has the Atlas Entity created for it + * by looking it up in the entity cache. + * NOTE: this method does not check if the entity has been saved in Atlas so, + * it will return true when the entity is already created and cached but is yet to be sent to Atlas + * + * This method is _mostly_ used in related objects when setting relationship field to ensure that related + * model has an AtlasEntity that can be referenced when storing relationship information. + * + * @return true if the entity found + */ + protected boolean exists() { + return cachedEntity().isPresent(); + } + + public abstract String atlasTypeName(); + + public abstract UUID id(); + + /** + * Invoked when the entity needs to be updated with values from the model + * @param entity the entity to write the values into + */ + protected void updateAtlasEntity(AtlasEntity entity) { + // override me + } + + /** + * Invoked when the model needs to be updated with values from the entity + * @param entity the entity to read the values from + */ + protected void updateJavaModel(AtlasEntity entity) { + // override me + } + + /** + * Loads the entity for this model from Atlas and stores it in the entity cache + * @param client Atlas client to use + * @return loaded entity + */ + private Optional<AtlasEntity> load(AtlasClientV2 client) { + try { + Map<String, String> query = new HashMap<>(); + query.put("qualifiedName", qualifiedName()); + AtlasEntity atlasEntity = client.getEntityByAttribute(atlasTypeName(), query).getEntity(); + + if (atlasEntity != null) { + cache(atlasEntity); + if (atlasEntity.hasAttribute("name")) { + this.name = (String) atlasEntity.getAttribute("name"); + } + updateJavaModel(atlasEntity); + return Optional.of(atlasEntity); + } + } catch (AtlasServiceException e) { + if (e.getStatus().getStatusCode() != 404) { + throw new RuntimeException(e); + } + } + return Optional.empty(); + } + + /** + * Puts an entity into the entity cache + * @param atlasEntity the entity to cache + * @return the same entity + */ + private AtlasEntity cache(AtlasEntity atlasEntity) { + if (!ENTITY_BY_TYPE_AND_ID.containsKey(getClass())) { + ENTITY_BY_TYPE_AND_ID.put(getClass(), new HashMap<>()); + } + + ENTITY_BY_TYPE_AND_ID.get(getClass()).put(id().toString(), atlasEntity); + return atlasEntity; + } + + /** + * Looks up the entity in the cache + * @return Optional of cached entity + */ + private Optional<AtlasEntity> cachedEntity() { + return Optional.ofNullable(ENTITY_BY_TYPE_AND_ID.getOrDefault(getClass(), (Map<String, AtlasEntity>) Collections.EMPTY_MAP).getOrDefault(id().toString(), null)); + } + + /** + * First checks if the entity has been loaded and cached and, if not, then tries to load it from Atlas + * @param atlas Atlas client to use + * @return true if the entity found either in cache or in Atlas + */ + public boolean exists(AtlasClientV2 atlas) { + if (!exists()) { + return load(atlas).isPresent(); + } + return true; + } + + /** + * Returns pre-cached model with provided identifiers or caches this model and returns it + * + * @return the model + */ + public E get() { + Class<E> type = (Class<E>) getClass(); + String id = id().toString(); + + // ensure valid cache structure + if (!MODEL_BY_TYPE_AND_ID.containsKey(type)) { + MODEL_BY_TYPE_AND_ID.put(type, Collections.synchronizedMap(new HashMap<>())); + } + + // put the model into the cache, if not already present + Map<String, CouchbaseAtlasEntity> modelsById = MODEL_BY_TYPE_AND_ID.get(type); + if (!modelsById.containsKey(id)) { + try { + modelsById.put(id, this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return (E) modelsById.get(id); + } + + public static void dropCache() { + ENTITY_BY_TYPE_AND_ID.clear(); + MODEL_BY_TYPE_AND_ID.clear(); + } +} \ No newline at end of file diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java new file mode 100644 index 000000000..2983b2af2 --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java @@ -0,0 +1,89 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector.entities; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.type.AtlasTypeUtil; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +public class CouchbaseBucket extends CouchbaseAtlasEntity<CouchbaseBucket> { + public static final String TYPE_NAME = "couchbase_bucket"; + private CouchbaseCluster cluster; + private transient Map<String, CouchbaseScope> scopes = Collections.synchronizedMap(new HashMap<>()); + + @Override + public AtlasEntity atlasEntity(AtlasClientV2 atlas) { + AtlasEntity entity = super.atlasEntity(atlas); + entity.setRelationshipAttribute("cluster", cluster.atlasEntity(atlas)); + return entity; + } + + @Override + protected String qualifiedName() { + return String.format("%s/%s", cluster.qualifiedName(), name()); + } + + public CouchbaseBucket() { + + } + + public CouchbaseCluster cluster() { + return cluster; + } + + public CouchbaseBucket cluster(CouchbaseCluster cluster) { + this.cluster = cluster; + return this; + } + + @Override + public String atlasTypeName() { + return TYPE_NAME; + } + + @Override + public UUID id() { + return UUID.nameUUIDFromBytes(String.format("%s:%s:%s", atlasTypeName(), cluster().id(), name()).getBytes(Charset.defaultCharset())); + } + + public CouchbaseScope scope(String name) { + if (!scopes.containsKey(name)) { + scopes.put(name, new CouchbaseScope() + .bucket(this) + .name(name) + .get() + ); + } + + return scopes.get(name); + } +} diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java new file mode 100644 index 000000000..2f4efe865 --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java @@ -0,0 +1,130 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector.entities; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.type.AtlasTypeUtil; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +public class CouchbaseCluster extends CouchbaseAtlasEntity<CouchbaseCluster> { + public static final String TYPE_NAME = "couchbase_cluster"; + private String url; + + public String url() { + return url; + } + + public CouchbaseCluster url(String url) { + this.url = url; + return this; + } + + public static AtlasEntityDef atlasEntityDef() { + AtlasEntityDef definition = AtlasTypeUtil.createClassTypeDef( + "couchbase_cluster", + new HashSet<>() + ); + + definition.getSuperTypes().add("Asset"); + definition.setServiceType("couchbase"); + definition.setTypeVersion("0.1"); + + List<AtlasStructDef.AtlasAttributeDef> attributes = definition.getAttributeDefs(); + + attributes.add(new AtlasStructDef.AtlasAttributeDef( + "url", + "string", + false, + AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE, + 1, + 1, + true, + true, + true, + Collections.EMPTY_LIST + )); + + return definition; + } + + public static Collection<? extends AtlasRelationshipDef> atlasRelationshipDefs() { + return Arrays.asList( + new AtlasRelationshipDef( + "couchbase_cluster_buckets", + "", + "0.1", + "couchbase", + AtlasRelationshipDef.RelationshipCategory.AGGREGATION, + AtlasRelationshipDef.PropagateTags.ONE_TO_TWO, + new AtlasRelationshipEndDef( + "couchbase_cluster", + "buckets", + AtlasStructDef.AtlasAttributeDef.Cardinality.SET, + true + ), + new AtlasRelationshipEndDef( + "couchbase_bucket", + "cluster", + AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE, + false + ) + ) + ); + } + + @Override + public String atlasTypeName() { + return TYPE_NAME; + } + + @Override + public UUID id() { + return UUID.nameUUIDFromBytes(String.format("%s:%s", atlasTypeName(), url()).getBytes(Charset.defaultCharset())); + } + + @Override + public AtlasEntity atlasEntity(AtlasClientV2 atlas) { + AtlasEntity entity = super.atlasEntity(atlas); + entity.setAttribute("url", url()); + return entity; + } + + @Override + protected String qualifiedName() { + return url(); + } + + @Override + protected void updateJavaModel(AtlasEntity entity) { + if (entity.hasAttribute("url")) { + this.url = (String) entity.getAttribute("url"); + } + } +} \ No newline at end of file diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java new file mode 100644 index 000000000..b7319581d --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java @@ -0,0 +1,90 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector.entities; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.type.AtlasTypeUtil; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.UUID; + +public class CouchbaseCollection extends CouchbaseAtlasEntity<CouchbaseCollection> { + + private CouchbaseScope scope; + + private long documentsAnalyzed; + + public CouchbaseCollection scope(CouchbaseScope scope) { + this.scope = scope; + return this; + } + + @Override + public AtlasEntity atlasEntity(AtlasClientV2 atlas) { + AtlasEntity entity = super.atlasEntity(atlas); + entity.setRelationshipAttribute("scope", scope.atlasEntity(atlas)); + return entity; + } + + @Override + protected void updateAtlasEntity(AtlasEntity entity) { + entity.setAttribute("documentsAnalyzed", documentsAnalyzed); + } + + @Override + protected void updateJavaModel(AtlasEntity entity) { + documentsAnalyzed = (Integer) entity.getAttribute("documentsAnalyzed"); + } + + public long documentsAnalyzed() { + return documentsAnalyzed; + } + + public CouchbaseCollection incrementAnalyzedDocuments() { + this.documentsAnalyzed++; + return this; + } + + @Override + protected String qualifiedName() { + return String.format("%s/%s", scope.qualifiedName(), name()); + } + + @Override + public String atlasTypeName() { + return "couchbase_collection"; + } + + @Override + public UUID id() { + return UUID.nameUUIDFromBytes(String.format("%s:%s:%s", atlasTypeName(), scope().id().toString(), name()).getBytes(Charset.defaultCharset())); + } + + public CouchbaseScope scope() { + return this.scope; + } +} diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java new file mode 100644 index 000000000..5d14f6e20 --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java @@ -0,0 +1,129 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector.entities; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.type.AtlasTypeUtil; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.UUID; + +public class CouchbaseField extends CouchbaseAtlasEntity<CouchbaseField> { + public static final String TYPE_NAME = "couchbase_field"; + private CouchbaseFieldType fieldType; + private String fieldPath; + private long documentCount = 0; + + private CouchbaseField parentField; + + private CouchbaseCollection collection; + + public CouchbaseField() { + + } + + public CouchbaseFieldType fieldType() { + return fieldType; + } + + public CouchbaseField fieldType(CouchbaseFieldType fieldType) { + this.fieldType = fieldType; + return this; + } + + public String fieldPath() { + return fieldPath; + } + + public CouchbaseField fieldPath(String fieldPath) { + this.fieldPath = fieldPath; + return this; + } + + public long documentCount() { + return documentCount; + } + + public CouchbaseField documentCount(long documentCount) { + this.documentCount = documentCount; + return this; + } + + public void incrementDocumentCount() { + this.documentCount++; + } + + public CouchbaseCollection collection() { + return collection; + } + + public CouchbaseField collection(CouchbaseCollection collection) { + this.collection = collection; + return this; + } + + @Override + public AtlasEntity atlasEntity(AtlasClientV2 atlas) { + AtlasEntity entity = super.atlasEntity(atlas); + entity.setRelationshipAttribute("collection", collection.atlasEntity(atlas)); + if (parentField != null) { + entity.setRelationshipAttribute("parentField", parentField.atlasEntity(atlas)); + } + return entity; + } + + @Override + protected void updateAtlasEntity(AtlasEntity entity) { + entity.setAttribute("fieldType", fieldType.toString()); + entity.setAttribute("fieldPath", fieldPath); + entity.setAttribute("documentCount", documentCount); + } + + @Override + protected String qualifiedName() { + return String.format("%s/%s:%s", collection.qualifiedName(), fieldPath(), fieldType()); + } + + @Override + public String atlasTypeName() { + return TYPE_NAME; + } + + @Override + public UUID id() { + return UUID.nameUUIDFromBytes(qualifiedName().getBytes(Charset.defaultCharset())); + } + + public CouchbaseField parentField() { + return parentField; + } + + public CouchbaseField parentField(CouchbaseField parent) { + this.parentField = parent; + return this; + } +} \ No newline at end of file diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java new file mode 100644 index 000000000..8355f60cf --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java @@ -0,0 +1,71 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector.entities; + +import com.couchbase.client.core.error.InvalidArgumentException; +import com.couchbase.client.java.json.JsonObject; +import org.apache.atlas.model.typedef.AtlasEnumDef; +import org.apache.atlas.type.AtlasTypeUtil; + +import javax.annotation.Nonnull; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +public enum CouchbaseFieldType { + BOOLEAN, + NUMBER, + STRING, + ARRAY, + OBJECT, + BINARY; + + public static CouchbaseFieldType infer(@Nonnull Object value) { + if (value instanceof Map || value instanceof JsonObject) { + return OBJECT; + } else if (value instanceof Collection || value.getClass().isArray()) { + if (value.getClass().isArray() && Byte.class.isAssignableFrom(value.getClass().getComponentType())) { + return BINARY; + } + return ARRAY; + } else if (value instanceof Number) { + return NUMBER; + } else if (value instanceof Boolean) { + return BOOLEAN; + } else if (value instanceof String) { + String sValue = (String) value; + if ("true".equalsIgnoreCase(sValue) || "false".equalsIgnoreCase(sValue)) { + return BOOLEAN; + } + try { + Double.parseDouble(sValue); + return NUMBER; + } catch (NumberFormatException nfe) { + return STRING; + } + } + + throw new IllegalArgumentException("Failed to infer type"); + } + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.getDefault()); + } + +} \ No newline at end of file diff --git a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java new file mode 100644 index 000000000..13f4e4852 --- /dev/null +++ b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java @@ -0,0 +1,93 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector.entities; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasRelationshipDef; +import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.type.AtlasTypeUtil; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CouchbaseScope extends CouchbaseAtlasEntity<CouchbaseScope> { + + public static final String TYPE_NAME = "couchbase_scope"; + private CouchbaseBucket bucket; + + private transient Map<String, CouchbaseCollection> collections = Collections.synchronizedMap(new HashMap<>()); + + public CouchbaseBucket bucket() { + return bucket; + } + + public CouchbaseScope bucket(CouchbaseBucket bucket) { + this.bucket = bucket; + return this; + } + + @Override + public UUID id() { + return UUID.nameUUIDFromBytes( + String.format( + "%s:%s:%s", + atlasTypeName(), + bucket().id().toString(), + name() + ).getBytes(Charset.defaultCharset()) + ); + } + + @Override + public AtlasEntity atlasEntity(AtlasClientV2 atlas) { + AtlasEntity entity = super.atlasEntity(atlas); + entity.setRelationshipAttribute("bucket", bucket.atlasEntity(atlas)); + return entity; + } + + @Override + public String qualifiedName() { + return String.format("%s/%s", bucket.qualifiedName(), name()); + } + + @Override + public String atlasTypeName() { + return TYPE_NAME; + } + + public CouchbaseCollection collection(String name) { + if (!collections.containsKey(name)) { + collections.put(name, new CouchbaseCollection() + .name(name) + .scope(this) + .get() + ); + } + + return collections.get(name); + } +} diff --git a/addons/couchbase-bridge/src/main/resources/atlas-application.properties b/addons/couchbase-bridge/src/main/resources/atlas-application.properties new file mode 100644 index 000000000..3df60b02f --- /dev/null +++ b/addons/couchbase-bridge/src/main/resources/atlas-application.properties @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java new file mode 100644 index 000000000..36d81a3ac --- /dev/null +++ b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java @@ -0,0 +1,136 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector; + +import com.couchbase.atlas.connector.entities.CouchbaseAtlasEntity; +import com.couchbase.atlas.connector.entities.CouchbaseBucket; +import com.couchbase.atlas.connector.entities.CouchbaseCluster; +import com.couchbase.atlas.connector.entities.CouchbaseScope; +import com.couchbase.client.dcp.Client; +import com.couchbase.client.dcp.StreamFrom; +import com.couchbase.client.dcp.StreamTo; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public class CouchbaseHookTest { + + private Client mockDcpClient() { + Client mockDcpClient = Mockito.mock(Client.class); + Mockito.when(mockDcpClient.connect()).thenReturn(Mono.empty()); + Mockito.when(mockDcpClient.initializeState(StreamFrom.NOW, StreamTo.INFINITY)).thenReturn(Mono.empty()); + Mockito.when(mockDcpClient.startStreaming()).thenReturn(Mono.empty()); + Mockito.when(mockDcpClient.disconnect()).thenReturn(Mono.empty()); + return mockDcpClient; + } + + private AtlasClientV2 mockAtlasClient(boolean returnEntities) throws Exception { + AtlasClientV2 mockAtlasClient = Mockito.mock(AtlasClientV2.class); + final String clusterName = "couchbase://localhost"; + final String bucketName = String.format("%s/%s", clusterName, "default"); + final String scopeName = String.format("%s/%s", bucketName, "_default"); + + Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseCluster.TYPE_NAME), Mockito.anyMap())).thenAnswer(iom -> { + Map<String, String> query = iom.getArgument(1); + Assert.assertEquals(clusterName, query.get("qualifiedName")); + return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ? Mockito.mock(AtlasEntity.class) : null); + }); + + Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseBucket.TYPE_NAME), Mockito.anyMap())).thenAnswer(iom -> { + Map<String, String> query = iom.getArgument(1); + Assert.assertEquals(bucketName, query.get("qualifiedName")); + return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ? Mockito.mock(AtlasEntity.class) : null); + }); + + Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseScope.TYPE_NAME), Mockito.anyMap())).thenAnswer(iom -> { + Map<String, String> query = iom.getArgument(1); + Assert.assertEquals(scopeName, query.get("qualifiedName")); + return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ? Mockito.mock(AtlasEntity.class) : null); + }); + + return mockAtlasClient; + } + + @Test + public void testMain() throws Exception { + Client mockDcpClient = mockDcpClient(); + CBConfig.dcpClient(mockDcpClient); + AtlasClientV2 mockAtlasClient = mockAtlasClient(false); + AtlasConfig.client(mockAtlasClient); + + AtomicInteger createCalled = new AtomicInteger(); + Consumer<List<AtlasEntity>> createEntitiesInterceptor = ents -> { + createCalled.getAndIncrement(); + Assert.assertEquals(ents.size(), 2); + }; + Consumer<List<AtlasEntity>> updateEntitiesInterceptor = ents -> { + Assert.assertTrue(false); + }; + + CouchbaseHook.setEntityInterceptors(createEntitiesInterceptor, updateEntitiesInterceptor); + CouchbaseHook.loop(false); + // AAAAAND, ACTION (missing entities) + CouchbaseHook.main(new String[0]); + + Mockito.verify(mockDcpClient, Mockito.times(1)).connect(); + Assert.assertEquals(1, createCalled.get()); + // 2 times: 1 time when we call exists(ATLAS) and second time when we request the entity + validateAtlasInvocations(mockAtlasClient, 3, 2, 0); + + // simulate existing entities situation + mockAtlasClient = mockAtlasClient(true); + AtlasConfig.client(mockAtlasClient); + CouchbaseAtlasEntity.dropCache(); + + // ACTION AGAIN, this time with mock entities in mock Atlas + CouchbaseHook.main(new String[0]); + + Mockito.verify(mockDcpClient, Mockito.times(2)).connect(); + Assert.assertEquals(1, createCalled.get()); + // 1 time and then it should be cached + validateAtlasInvocations(mockAtlasClient, 1, 1, 0); + + testEvents(CouchbaseHook.INSTANCE); + } + + public void testEvents(CouchbaseHook listener) { + + } + + private void validateAtlasInvocations(AtlasClientV2 mockAtlasClient, int cluster, int bucket, int scope) throws Exception { + Mockito.verify(mockAtlasClient, Mockito.times(cluster)).getEntityByAttribute( + Mockito.eq(CouchbaseCluster.TYPE_NAME), + Mockito.anyMap() + ); + Mockito.verify(mockAtlasClient, Mockito.times(bucket)).getEntityByAttribute( + Mockito.eq(CouchbaseBucket.TYPE_NAME), + Mockito.anyMap() + ); + Mockito.verify(mockAtlasClient, Mockito.times(scope)).getEntityByAttribute( + Mockito.eq(CouchbaseScope.TYPE_NAME), + Mockito.anyMap() + ); + } +} \ No newline at end of file diff --git a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java new file mode 100644 index 000000000..5449d6778 --- /dev/null +++ b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2023 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.couchbase.atlas.connector.entities; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.model.instance.AtlasEntity; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.nio.charset.Charset; +import java.util.Map; +import java.util.UUID; + +/** + * Tests atlas entity loading and caching + */ +public class CouchbaseAtlasEntityTest { + final static String QUALIFIED_NAME = "testEntityQualifiedName"; + final static String TYPE_NAME = "testEntityTypeName"; + final static UUID ID = UUID.randomUUID(); + + public class TestEntity extends CouchbaseAtlasEntity<TestEntity> { + + @Override + protected String qualifiedName() { + return QUALIFIED_NAME; + } + + @Override + public String atlasTypeName() { + return TYPE_NAME; + } + + @Override + public UUID id() { + return ID; + } + } + + @Test + public void testEntityLoading() throws Exception { + final AtlasClientV2 ac = Mockito.mock(AtlasClientV2.class); + final AtlasEntity ae = Mockito.mock(AtlasEntity.class); + + Mockito.when(ae.getAttribute(Mockito.eq("qualifiedName"))) + .thenReturn(QUALIFIED_NAME); + + Mockito.when( + ac.getEntityByAttribute( + Mockito.eq(TYPE_NAME), + Mockito.anyMap() + ) + ).thenAnswer(iom -> { + Map<String, String> query = iom.getArgument(1); + Assert.assertTrue(query.containsKey("qualifiedName")); + Assert.assertEquals(QUALIFIED_NAME, query.get("qualifiedName")); + return new AtlasEntity.AtlasEntityWithExtInfo(ae); + }); + + TestEntity subject = Mockito.spy(new TestEntity()); + // exists must return false at this point as we've just created the model but it doesn't have the corresponding AtlasEntity + // and the cache should be empty + Assert.assertFalse(subject.exists()); + Assert.assertSame(subject, subject.get()); + Assert.assertFalse(subject.exists()); + // ditto + Assert.assertTrue(!subject.atlasEntity().isPresent()); + // Because our client mock should return the mock entity, exists with Atlas check should find the entity, + // cache it, and return true + Assert.assertTrue(subject.exists(ac)); + // and call the method to update our model + Mockito.verify(subject, Mockito.times(1)).updateJavaModel(Mockito.eq(ae)); + // Let's validate that exists with Atlas check did, in fact, query our atlas mock for the entity + Mockito.verify(ac, Mockito.times(1)).getEntityByAttribute(Mockito.eq(TYPE_NAME), Mockito.anyMap()); + // the entity should exist in cache + Assert.assertTrue(subject.exists()); + // and exists with Atlas check should use it + Assert.assertTrue(subject.exists(ac)); + // so, let's verify that the item was pulled not from atlas (from cache will be the only option left) + Mockito.verify(ac, Mockito.times(1)).getEntityByAttribute(Mockito.eq(TYPE_NAME), Mockito.anyMap()); + + // This method should return filled Optional with our mocked entity pulled from cache + // And, no matter how many times we call, the result should be the same (but let's make sure that we call it at least twice) + int timesToLoadEntity = 2 + (int) (Math.random() * 98); + for (int i = 0; i < timesToLoadEntity; i++) { + Assert.assertSame(ae, subject.atlasEntity().get()); + } + // verify that atlas entity was updated every time we requested it + Mockito.verify(subject, Mockito.times(timesToLoadEntity)).updateAtlasEntity(Mockito.eq(ae)); + // verify that the model was not updated when we requested the entity second time + Mockito.verify(subject, Mockito.times(1)).updateJavaModel(Mockito.eq(ae)); + } +} \ No newline at end of file diff --git a/addons/couchbase-bridge/src/test/resources/log4j.xml b/addons/couchbase-bridge/src/test/resources/log4j.xml new file mode 100644 index 000000000..e53c232d0 --- /dev/null +++ b/addons/couchbase-bridge/src/test/resources/log4j.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/> + </layout> + </appender> + + <root> + <level value="DEBUG"/> + <appender-ref ref="console"/> + </root> + +</log4j:configuration> diff --git a/addons/models/5000-Couchbase/5020-couchbase_model.json b/addons/models/5000-Couchbase/5020-couchbase_model.json new file mode 100644 index 000000000..acd5d798d --- /dev/null +++ b/addons/models/5000-Couchbase/5020-couchbase_model.json @@ -0,0 +1,159 @@ +{ + "enumDefs": [ + { + "name": "couchbase_field_type", + "typeVersion": "1.0", + "service_type": "couchbase", + "elementDefs": [ + { "ordinal": 0, "value": "boolean" }, + { "ordinal": 1, "value": "number" }, + { "ordinal": 2, "value": "string" }, + { "ordinal": 3, "value": "array" }, + { "ordinal": 4, "value": "object" }, + { "ordinal": 5, "value": "binary" } + ] + } + ], + "structDefs": [], + "classificationDefs": [], + "entityDefs": [ + { + "name": "couchbase_cluster", + "superTypes": [ "Asset" ], + "serviceType": "couchbase", + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "url", + "typeName": "string", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": false, + "isUnique": true, + "includeInNotification": true + } + ] + }, + { + "name": "couchbase_bucket", + "superTypes": [ "Asset" ], + "serviceType": "couchbase", + "typeVersion": "1.0", + "attributeDefs": [ ] + }, + { + "name": "couchbase_scope", + "superTypes": [ "Asset" ], + "serviceType": "couchbase", + "typeVersion": "1.0", + "attributeDefs": [] + }, + { + "name": "couchbase_collection", + "superTypes": [ "DataSet" ], + "serviceType": "couchbase", + "typeVersion": "1.0", + "options": { "schemaElementsAttribute": "fields" }, + "attributeDefs": [ + { + "name": "documentsAnalyzed", + "typeName": "long", + "isOptional": false, + "cardinality": "SINGLE", + "valuesMinCount": 1, + "valuesMaxCount": 1, + "isUnique": false, + "isIndexable": false, + "includeInNotification": false + } + ] + }, + { + "name": "couchbase_field", + "superTypes": [ "DataSet" ], + "serviceType": "couchbase", + "typeVersion": "1.0", + "attributeDefs": [ + { + "name": "fieldType", + "typeName": "couchbase_field_type", + "isOptional": false, + "cardinality": "SINGLE", + "valuesMinCount": 1, + "valuesMaxCount": 1, + "isUnique": false, + "isIndexable": true, + "includeInNotification": false + }, + { + "name": "fieldPath", + "typeName": "string", + "isOptional": false, + "cardinality": "SINGLE", + "valuesMinCount": 1, + "valuesMaxCount": 1, + "isUnique": false, + "isIndexable": true, + "includeInNotification": false + }, + { + "name": "documentCount", + "typeName": "long", + "isOptional": false, + "cardinality": "SINGLE", + "valuesMinCount": 1, + "valuesMaxCount": 1, + "isUnique": false, + "isIndexable": false + } + ] + } + ], + "relationshipDefs": [ + { + "name": "couchbase_cluster_buckets", + "typeVersion": "1.0", + "serviceType": "couchbase", + "relationshipCategory": "AGGREGATION", + "propagateTags": "ONE_TO_TWO", + "endDef1": { "type": "couchbase_cluster", "name": "buckets", "cardinality": "SET", "isContainer": true }, + "endDef2": { "type": "couchbase_bucket", "name": "cluster", "cardinality": "SINGLE", "isContainer": false } + }, + { + "name": "couchbase_bucket_scopes", + "typeVersion": "1.0", + "serviceType": "couchbase", + "relationshipCategory": "AGGREGATION", + "propagateTags": "ONE_TO_TWO", + "endDef1": { "type": "couchbase_bucket", "name": "scopes", "cardinality": "SET", "isContainer": true }, + "endDef2": { "type": "couchbase_scope", "name": "bucket", "cardinality": "SINGLE", "isContainer": false } + }, + { + "name": "couchbase_scope_collections", + "typeVersion": "1.0", + "serviceType": "couchbase", + "relationshipCategory": "AGGREGATION", + "propagateTags": "ONE_TO_TWO", + "endDef1": { "type": "couchbase_scope", "name": "collections", "cardinality": "SET", "isContainer": true }, + "endDef2": { "type": "couchbase_collection", "name": "scope", "cardinality": "SINGLE", "isContainer": false } + }, + { + "name": "couchbase_collection_fields", + "typeVersion": "1.0", + "serviceType": "couchbase", + "relationshipCategory": "AGGREGATION", + "propagateTags": "ONE_TO_TWO", + "endDef1": { "type": "couchbase_collection", "name": "fields", "cardinality": "SET", "isContainer": true }, + "endDef2": { "type": "couchbase_field", "name": "collection", "cardinality": "SINGLE", "isContainer": false } + }, + { + "name": "couchbase_field_fields", + "typeVersion": "1.0", + "serviceType": "couchbase", + "relationshipCategory": "AGGREGATION", + "propagateTags": "ONE_TO_TWO", + "endDef1": { "type": "couchbase_field", "name": "objectFields", "cardinality": "SET", "isContainer": true }, + "endDef2": { "type": "couchbase_field", "name": "parentField", "cardinality": "SINGLE", "isContainer": false } + } + ] +} diff --git a/distro/pom.xml b/distro/pom.xml index ed477dfbb..9df238eb3 100644 --- a/distro/pom.xml +++ b/distro/pom.xml @@ -129,6 +129,7 @@ atlas.graph.storage.hbase.regions-per-server=1 <descriptor>src/main/assemblies/atlas-sqoop-hook-package.xml</descriptor> <descriptor>src/main/assemblies/atlas-storm-hook-package.xml</descriptor> <descriptor>src/main/assemblies/atlas-kafka-hook-package.xml</descriptor> + <descriptor>src/main/assemblies/atlas-couchbase-hook-package.xml</descriptor> <descriptor>src/main/assemblies/atlas-server-package.xml</descriptor> <descriptor>src/main/assemblies/standalone-package.xml</descriptor> <descriptor>src/main/assemblies/src-package.xml</descriptor> diff --git a/distro/src/main/assemblies/atlas-couchbase-hook-package.xml b/distro/src/main/assemblies/atlas-couchbase-hook-package.xml new file mode 100644 index 000000000..5022bef51 --- /dev/null +++ b/distro/src/main/assemblies/atlas-couchbase-hook-package.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <formats> + <format>tar.gz</format> + </formats> + <id>couchbase-hook</id> + <baseDirectory>apache-atlas-couchbase-hook-${project.version}</baseDirectory> + <fileSets> + <fileSet> + <directory>../addons/couchbase-bridge/target/dependency/hook</directory> + <outputDirectory>hook</outputDirectory> + </fileSet> + + </fileSets> +</assembly> diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml index 3e2ca1c39..888d3edf4 100755 --- a/distro/src/main/assemblies/standalone-package.xml +++ b/distro/src/main/assemblies/standalone-package.xml @@ -180,6 +180,12 @@ <outputDirectory>hook</outputDirectory> </fileSet> + <!-- addons/couchbase --> + <fileSet> + <directory>../addons/couchbase-bridge/target/dependency/hook</directory> + <outputDirectory>hook</outputDirectory> + </fileSet> + <!-- for kafka topic setup --> <fileSet> <directory>../addons/kafka-bridge/src/bin</directory> diff --git a/docs/src/documents/Hook/HookCouchbase.md b/docs/src/documents/Hook/HookCouchbase.md new file mode 100644 index 000000000..53b9e9cb3 --- /dev/null +++ b/docs/src/documents/Hook/HookCouchbase.md @@ -0,0 +1,45 @@ +--- +name: Couchbase +route: /HookCouchbase +menu: Documentation +submenu: Hooks +--- + +import themen from 'theme/styles/styled-colors'; +import * as theme from 'react-syntax-highlighter/dist/esm/styles/hljs'; +import SyntaxHighlighter from 'react-syntax-highlighter'; + +# Apache Atlas Couchbase bridge +This bridge connects to a Couchbase cluster using DCP protocol +and performs real-time analysis and metadata extraction from stored on the cluster documents. +The extracted metadata is then sent to Atlas via its REST API + +## Configuration +The bridge uses environment variables for configuration. + +### Atlas REST API +| Environment variable | Description | Default Value | +|----------------------|-----------------------------------------------------------------------------------------------------------------------------------|--------------------------| +| ATLAS_URL | Atlas REST API url | "http://localhost:21000" | +| ATLAS_USERNAME | Atlas REST API username | "admin" | +| ATLAS_PASSWORD | Atlas REST API password | "admin" | + +### Couchbase DCP connection +| Environment variable | Description | Default Value | +|----------------------|-----------------------------------------------------------------------------------------------------------------------------------|--------------------------| +| CB_CLUSTER | Couchbase Cluster connection string | "couchbase://localhost" | +| CB_USERNAME | Couchbase Cluster username | "Administrator" | +| CB_PASSWORD | Couchbase Cluster password | "password" | +| CB_ENABLE_TLS | Use TLS | false | +| CB_BUCKET | Couchbase bucket to monitor | "default" | +| CB_COLLECTIONS | Comma-separated list of collections to monitor with each collection listed as <scope>.<collection> | | +| DCP_PORT | DCP port to use | 11210 | +| DCP_FIELD_THRESHOLD | A threshold that indicates in what percentage of analyzed messages per collection a field must appear before it is sent to Atlas | 0 | +| DCP_SAMPLE_RATIO | Percentage of DCP messages to be analyzed in form of a short between 0 and 1. | 1 | + +## Running the bridge +In the following commands, replace `<VARNAME>` and `<version>` with appropriate values. +* Set environment variables using `export <VARNAME>=...` or prefix them before the next command. +* Run `java -cp couchbase-bridge-<version>.jar com.couchbase.atlas.connector.CouchbaseHook`. +* Verify that appropriate `couchbase_cluster`, `couchbase_bucket` objects were created on your Atlas cluster. +* After having some documents updated on the Couchbase cluster, verify that appropriate `couchbase_scope`, `couchbase_collection`, and `couchbase_field` objects were created on your Atlas cluster. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6a0e68485..c55bec763 100644 --- a/pom.xml +++ b/pom.xml @@ -849,6 +849,7 @@ <module>addons/impala-hook-api</module> <module>addons/impala-bridge-shim</module> <module>addons/impala-bridge</module> + <module>addons/couchbase-bridge</module> <module>distro</module> <module>atlas-examples</module> @@ -1615,6 +1616,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>couchbase-bridge</artifactId> + <version>${project.version}</version> + </dependency> + <!-- API documentation --> <dependency> <groupId>com.webcohesion.enunciate</groupId>