This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new fd0d3a69f ATLAS-4789: added couchbase bridge
fd0d3a69f is described below

commit fd0d3a69fe97867b3565d0a2f497da6362ce5ef4
Author: Dmitrii Chechetkin <dmitrii.chechet...@couchbase.com>
AuthorDate: Mon Oct 30 12:31:32 2023 -0400

    ATLAS-4789: added couchbase bridge
    
    Signed-off-by: Madhan Neethiraj <mad...@apache.org>
---
 addons/couchbase-bridge/pom.xml                    | 201 ++++++++++++
 .../com/couchbase/atlas/connector/AtlasConfig.java |  54 ++++
 .../com/couchbase/atlas/connector/CBConfig.java    | 145 +++++++++
 .../couchbase/atlas/connector/CouchbaseHook.java   | 341 +++++++++++++++++++++
 .../connector/entities/CouchbaseAtlasEntity.java   | 227 ++++++++++++++
 .../atlas/connector/entities/CouchbaseBucket.java  |  89 ++++++
 .../atlas/connector/entities/CouchbaseCluster.java | 130 ++++++++
 .../connector/entities/CouchbaseCollection.java    |  90 ++++++
 .../atlas/connector/entities/CouchbaseField.java   | 129 ++++++++
 .../connector/entities/CouchbaseFieldType.java     |  71 +++++
 .../atlas/connector/entities/CouchbaseScope.java   |  93 ++++++
 .../main/resources/atlas-application.properties    |  17 +
 .../atlas/connector/CouchbaseHookTest.java         | 136 ++++++++
 .../entities/CouchbaseAtlasEntityTest.java         | 108 +++++++
 .../couchbase-bridge/src/test/resources/log4j.xml  |  35 +++
 .../5000-Couchbase/5020-couchbase_model.json       | 159 ++++++++++
 distro/pom.xml                                     |   1 +
 .../assemblies/atlas-couchbase-hook-package.xml    |  34 ++
 distro/src/main/assemblies/standalone-package.xml  |   6 +
 docs/src/documents/Hook/HookCouchbase.md           |  45 +++
 pom.xml                                            |   7 +
 21 files changed, 2118 insertions(+)

diff --git a/addons/couchbase-bridge/pom.xml b/addons/couchbase-bridge/pom.xml
new file mode 100644
index 000000000..8a3ccef17
--- /dev/null
+++ b/addons/couchbase-bridge/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>apache-atlas</artifactId>
+        <groupId>org.apache.atlas</groupId>
+        <version>3.0.0-SNAPSHOT</version>
+        <relativePath>../../</relativePath>
+    </parent>
+    <artifactId>couchbase-bridge</artifactId>
+    <description>Apache Atlas Couchbase Bridge Module</description>
+    <name>Apache Atlas Couchbase Bridge</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>30.1.1-jre</version>
+        </dependency>
+        <dependency>
+            <groupId>com.couchbase.client</groupId>
+            <artifactId>java-client</artifactId>
+            <version>3.4.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.couchbase.client</groupId>
+            <artifactId>dcp-client</artifactId>
+            <version>0.44.0</version>
+        </dependency>
+        <!-- Logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-v2</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-notification</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>dist</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-dependency-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>copy-hook</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>copy</goal>
+                                </goals>
+                                <configuration>
+                                    
<outputDirectory>${project.build.directory}/dependency/hook/couchbase/atlas-couchbase-plugin-impl</outputDirectory>
+                                    
<overWriteReleases>false</overWriteReleases>
+                                    
<overWriteSnapshots>false</overWriteSnapshots>
+                                    <overWriteIfNewer>true</overWriteIfNewer>
+                                    <artifactItems>
+                                        <artifactItem>
+                                            
<groupId>${project.groupId}</groupId>
+                                            
<artifactId>${project.artifactId}</artifactId>
+                                            
<version>${project.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>${project.groupId}</groupId>
+                                            
<artifactId>atlas-client-common</artifactId>
+                                            
<version>${project.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>${project.groupId}</groupId>
+                                            
<artifactId>atlas-client-v2</artifactId>
+                                            
<version>${project.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>${project.groupId}</groupId>
+                                            
<artifactId>atlas-notification</artifactId>
+                                            
<version>${project.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>${project.groupId}</groupId>
+                                            
<artifactId>atlas-common</artifactId>
+                                            
<version>${project.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>org.apache.kafka</groupId>
+                                            
<artifactId>kafka_${kafka.scala.binary.version}</artifactId>
+                                            <version>${kafka.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>org.apache.kafka</groupId>
+                                            
<artifactId>kafka-clients</artifactId>
+                                            <version>${kafka.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>com.sun.jersey</groupId>
+                                            
<artifactId>jersey-json</artifactId>
+                                            
<version>${jersey.version}</version>
+                                        </artifactItem>
+                                    </artifactItems>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-site-plugin</artifactId>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.maven.doxia</groupId>
+                        <artifactId>doxia-module-twiki</artifactId>
+                        <version>${doxia.version}</version>
+                    </dependency>
+                    <dependency>
+                        <groupId>org.apache.maven.doxia</groupId>
+                        <artifactId>doxia-core</artifactId>
+                        <version>${doxia.version}</version>
+                    </dependency>
+                </dependencies>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>site</goal>
+                        </goals>
+                        <phase>prepare-package</phase>
+                    </execution>
+                </executions>
+                <configuration>
+                    <generateProjectInfo>false</generateProjectInfo>
+                    <generateReports>false</generateReports>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>1.2.1</version>
+                <inherited>false</inherited>
+                <executions>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>post-integration-test</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
new file mode 100644
index 000000000..3168d7bfa
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/AtlasConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector;
+
+import org.apache.atlas.AtlasClientV2;
+
+import java.util.Map;
+
+public class AtlasConfig {
+    private static final Map<String, String> ENV    = System.getenv();
+    private static       AtlasClientV2       client = null;
+
+    public static String[] urls() {
+        return new String[] { ENV.getOrDefault("ATLAS_URL", 
"http://localhost:21000";) };
+    }
+
+    public static String username() {
+        return ENV.getOrDefault("ATLAS_USERNAME", "admin");
+    }
+
+    public static String password() {
+        return ENV.getOrDefault("ATLAS_PASSWORD", "admin");
+    }
+
+    public static String[] auth() {
+        return new String[] {username(), password()};
+    }
+
+    public static AtlasClientV2 client() {
+        if (client == null) {
+            client = new AtlasClientV2(urls(), auth());
+        }
+
+        return client;
+    }
+
+    public static void client(AtlasClientV2 client) {
+        AtlasConfig.client = client;
+    }
+}
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
new file mode 100644
index 000000000..70e9dc37c
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CBConfig.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector;
+
+import com.couchbase.client.dcp.Client;
+import com.couchbase.client.dcp.SecurityConfig;
+import com.couchbase.client.java.Cluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CBConfig {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CBConfig.class);
+
+    private static final Map<String, String> ENV                 = 
System.getenv();
+    private static final Integer             DCP_FIELD_MIN_OCCUR = 
Integer.valueOf(ENV.getOrDefault("DCP_FIELD_MIN_OCCUR", "0"));
+    private static final Float               DCP_SAMPLE_RATIO    = 
Float.valueOf(ENV.getOrDefault("DCP_SAMPLE_RATIO", "1"));
+    private static final Short               DCP_FIELD_THRESHOLD = 
Short.valueOf(ENV.getOrDefault("DCP_FIELD_THRESHOLD", "0"));
+    private static       Client              mockDcpClient;
+    private static       Cluster             cluster;
+
+
+
+    public static String address() {
+        return ENV.getOrDefault("CB_CLUSTER", "couchbase://localhost");
+    }
+
+    public static String username() {
+        return ENV.getOrDefault("CB_USERNAME", "Administrator");
+    }
+
+    public static String password() {
+        return ENV.getOrDefault("CB_PASSWORD", "password");
+    }
+
+    public static String bucket() {
+        return ENV.getOrDefault("CB_BUCKET", "default");
+    }
+
+    public static Collection<String> collections() {
+        String collections = ENV.get("CB_COLLECTIONS");
+
+        if (collections == null) {
+            return null;
+        }
+
+        return Arrays.stream(collections.split(","))
+                .map(String::trim)
+                .collect(Collectors.toList());
+    }
+
+    public static String dcpPort() {
+        return ENV.getOrDefault("DCP_PORT", "11210");
+    }
+
+    /**
+     * @return Percentage of DCP messages to be analyzed in form of a short 
between 0 and 1.
+     */
+    public static Float dcpSampleRatio() {
+        return DCP_SAMPLE_RATIO;
+    }
+
+    /**
+     * @return a threshold that indicates in what percentage of analyzed 
messages per collection a field must appear before it is sent to Atlas
+     */
+    public static Short dcpFieldThreshold() {
+        return DCP_FIELD_THRESHOLD;
+    }
+
+    public static Integer dcpFieldMinOccurences() {
+        return DCP_FIELD_MIN_OCCUR;
+    }
+
+    public static Cluster cluster() {
+        if (cluster == null) {
+            cluster = Cluster.connect(address(), username(), password());
+        }
+
+        return cluster;
+    }
+
+    public static Client dcpClient() {
+        if (mockDcpClient != null) {
+            LOGGER.debug("Using mock DCP client");
+
+            return mockDcpClient;
+        }
+
+        Client.Builder builder = Client.builder()
+                .collectionsAware(true)
+                .seedNodes(String.format("%s:%s",address(),dcpPort()))
+                .connectionString(address())
+                .credentials(username(), password());
+
+        String bucket = bucket();
+
+        if (!(bucket == null || bucket.equals("*") || bucket.isEmpty())) {
+            LOGGER.debug("Monitoring bucket `{}`", bucket);
+
+            builder.bucket(bucket);
+
+            Collection<String> collections = collections();
+
+            if (collections != null && !collections.isEmpty()) {
+                LOGGER.debug("Monitoring collections: {}", String.join(", ", 
collections));
+
+                builder.collectionNames(collections);
+            }
+        }
+
+        if (enableTLS()) {
+            LOGGER.debug("Using native TLS");
+
+            
builder.securityConfig(SecurityConfig.builder().enableNativeTls(true).build());
+        }
+
+        return builder.build();
+    }
+
+    protected static void dcpClient(Client mockDcpClient) {
+        CBConfig.mockDcpClient = mockDcpClient;
+    }
+
+    private static boolean enableTLS() {
+        return Boolean.parseBoolean(ENV.getOrDefault("CB_ENABLE_TLS", 
"false"));
+    }
+}
\ No newline at end of file
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
new file mode 100644
index 000000000..0a73307c8
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/CouchbaseHook.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector;
+
+import com.couchbase.atlas.connector.entities.CouchbaseBucket;
+import com.couchbase.atlas.connector.entities.CouchbaseCluster;
+import com.couchbase.atlas.connector.entities.CouchbaseCollection;
+import com.couchbase.atlas.connector.entities.CouchbaseField;
+import com.couchbase.atlas.connector.entities.CouchbaseFieldType;
+import com.couchbase.atlas.connector.entities.CouchbaseScope;
+import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.dcp.Client;
+import com.couchbase.client.dcp.ControlEventHandler;
+import com.couchbase.client.dcp.DataEventHandler;
+import com.couchbase.client.dcp.StreamFrom;
+import com.couchbase.client.dcp.StreamTo;
+import com.couchbase.client.dcp.highlevel.internal.CollectionIdAndKey;
+import 
com.couchbase.client.dcp.highlevel.internal.CollectionsManifest.CollectionInfo;
+import com.couchbase.client.dcp.message.DcpMutationMessage;
+import com.couchbase.client.dcp.message.MessageUtil;
+import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
+import com.couchbase.client.java.json.JsonObject;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import 
org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import 
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CouchbaseHook extends AtlasHook implements ControlEventHandler, 
DataEventHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CouchbaseHook.class);
+
+    protected static CouchbaseHook               INSTANCE;
+    protected static Client                      DCP;
+    protected static AtlasClientV2               ATLAS;
+    private   static Consumer<List<AtlasEntity>> createInterceptor;
+    private   static Consumer<List<AtlasEntity>> updateInterceptor;
+    private   static boolean                     loop = true;
+
+
+    private CouchbaseCluster clusterEntity;
+    private CouchbaseBucket  bucketEntity;
+
+    /**
+     * START HERE
+     *
+     * @param args
+     */
+    public static void main(String[] args) {
+        // create instances of DCP client,
+        DCP = CBConfig.dcpClient();
+
+        // Atlas client,
+        ATLAS = AtlasConfig.client();
+
+        // and DCP handler
+        INSTANCE = new CouchbaseHook();
+
+        // register DCP handler with DCP client
+        DCP.controlEventHandler(INSTANCE);
+        DCP.dataEventHandler(INSTANCE);
+
+        // Connect to the cluster
+        DCP.connect().block();
+
+        LOG.info("DCP client connected.");
+
+        // Ensure the existence of corresponding
+        // CouchbaseCluster, CouchbaseBucket, CouchbaseScope
+        // entities and store them in local cache
+        INSTANCE.initializeAtlasContext();
+
+        // Start listening to DCP
+        DCP.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
+
+        System.out.println("Starting the stream...");
+        DCP.startStreaming().block();
+
+        System.out.println("Started the stream.");
+        // And then just loop the loop
+        try {
+            while (loop) {
+                Thread.sleep(1000);
+            }
+        } catch (InterruptedException e) {
+
+        } finally {
+            DCP.disconnect().block();
+        }
+    }
+
+    /**
+     * Ensures the existence of CouchbaseCluster,
+     * CouchbaseBucket and Couchbase scope entities
+     * and stores them into local cache
+     */
+    private void initializeAtlasContext() {
+        LOG.debug("Creating cluster/bucket/scope entities");
+
+        clusterEntity = new CouchbaseCluster()
+                .name(CBConfig.address())
+                .url(CBConfig.address())
+                .get();
+
+        bucketEntity = new CouchbaseBucket()
+                .name(CBConfig.bucket())
+                .cluster(clusterEntity)
+                .get();
+
+        List<AtlasEntity> entitiesToCreate = new ArrayList<>();
+
+        if (!clusterEntity.exists(ATLAS)) {
+            entitiesToCreate.add(clusterEntity.atlasEntity(ATLAS));
+        }
+
+        if (!bucketEntity.exists(ATLAS)) {
+            entitiesToCreate.add(bucketEntity.atlasEntity(ATLAS));
+        }
+
+        if (!entitiesToCreate.isEmpty()) {
+            createEntities(entitiesToCreate);
+        }
+    }
+
+    private void createEntities(List<AtlasEntity> entities) {
+        if (createInterceptor != null) {
+            createInterceptor.accept(entities);
+            return;
+        }
+
+        AtlasEntitiesWithExtInfo entity  = new 
AtlasEntitiesWithExtInfo(entities);
+        EntityCreateRequestV2    request = new 
EntityCreateRequestV2("couchbase", entity);
+
+        notifyEntities(Arrays.asList(request), null);
+    }
+
+    private void updateEntities(List<AtlasEntity> entities) {
+        if (updateInterceptor != null) {
+            updateInterceptor.accept(entities);
+
+            return;
+        }
+
+        AtlasEntitiesWithExtInfo entity  = new 
AtlasEntitiesWithExtInfo(entities);
+        EntityUpdateRequestV2    request = new 
EntityUpdateRequestV2("couchbase", entity);
+
+        notifyEntities(Arrays.asList(request), null);
+    }
+
+    @Override
+    public void onEvent(ChannelFlowController flowController, ByteBuf event) {
+        // Probabilistic sampling
+        if (Math.random() > CBConfig.dcpSampleRatio()) {
+            LOG.debug("Skipping DCP message.");
+            return;
+        }
+
+        if (DcpMutationMessage.is(event)) {
+            try {
+                // Borrowed from Couchbeans :)
+                // Gathering some information about the message.
+                CollectionIdAndKey ckey           = 
MessageUtil.getCollectionIdAndKey(event, true);
+                CollectionInfo     collectionInfo = 
collectionInfo(MessageUtil.getVbucket(event), ckey.collectionId());
+                String             collectionName = collectionInfo.name();
+                String             scopeName      = 
collectionInfo.scope().name();
+
+                LOG.debug("Received DCP mutation message for scope '{}' and 
collection '{}'", scopeName, collectionName);
+
+                CouchbaseScope scopeEntity = bucketEntity.scope(scopeName);
+
+                // Because Atlas doesn't support upserts,
+                // we need to send new entities in a separate message
+                // from already existing ones
+                List<AtlasEntity> toCreate = new ArrayList<>();
+                List<AtlasEntity> toUpdate = new ArrayList<>();
+
+                if (!scopeEntity.exists(ATLAS)) {
+                    toCreate.add(scopeEntity.atlasEntity(ATLAS));
+
+                    LOG.debug("Creating scope: {}", 
scopeEntity.qualifiedName());
+                } else {
+                    toUpdate.add(scopeEntity.atlasEntity(ATLAS));
+
+                    LOG.debug("Updating scope: {}", 
scopeEntity.qualifiedName());
+                }
+
+                CouchbaseCollection collectionEntity = 
scopeEntity.collection(collectionName);
+
+                // Let's record this attempt to analyze a collection document
+                // so that we can calculate field frequencies
+                // when filtering them via DCP_FIELD_THRESHOLD
+                collectionEntity.incrementAnalyzedDocuments();
+
+                // and then schedule it to be sent to Atlas
+                if (!collectionEntity.exists(ATLAS)) {
+                    toCreate.add(collectionEntity.atlasEntity(ATLAS));
+                } else {
+                    toUpdate.add(collectionEntity.atlasEntity(ATLAS));
+                }
+
+                Map<String, Object> document = 
JsonObject.fromJson(DcpMutationMessage.contentBytes(event)).toMap();
+
+                System.out.println(String.format("Document keys: %s", 
document.keySet()));
+
+                // for each field in the document...
+                document.entrySet().stream()
+                        // transform the field into CouchbaseField either by 
loading corresponding entity or by creating it
+                        .filter(e -> e.getValue() != null)
+                        .flatMap(entry -> processField(collectionEntity, 
(Collection<String>) Collections.EMPTY_LIST, null, entry.getKey(), 
entry.getValue()))
+                        // update document counter on the field entity
+                        .peek(CouchbaseField::incrementDocumentCount)
+                        // Only passes fields that either already in Atlas or 
pass DCP_FIELD_THRESHOLD setting
+                        .filter(field -> field.exists(ATLAS) || 
field.documentCount() / (float) collectionEntity.documentsAnalyzed() > 
CBConfig.dcpFieldThreshold())
+                        // Schedule the entity either for creation or to be 
updated in Atlas
+                        .forEach(field -> {
+                            if (field.exists(ATLAS)) {
+                                toUpdate.add(field.atlasEntity(ATLAS));
+                            } else {
+                                toCreate.add(field.atlasEntity(ATLAS));
+                            }
+                        });
+
+                createEntities(toCreate);
+                updateEntities(toUpdate);
+
+                System.out.println("Notified Atlas");
+            } catch (Exception e) {
+                LOG.error("Failed to process DCP message", e);
+            }
+        }
+    }
+
+    /**
+     * Constructs a {@link CouchbaseField} from field information
+     *
+     * @param collectionEntity the {@link CouchbaseCollection} to which the 
field belongs
+     * @param path             the path to the field inside the collection 
document excluding the field itself
+     * @param parent           the parent field, if present or null
+     * @param name             the name of the field
+     * @param value            the value for the field from received document
+     * @return constructed or loaded from Atlas {@link CouchbaseField}
+     */
+    private static Stream<CouchbaseField> processField(CouchbaseCollection 
collectionEntity, Collection<String> path, @Nullable CouchbaseField parent, 
String name, Object value) {
+        // Let's figure out what type does this field have
+        CouchbaseFieldType fieldType = CouchbaseFieldType.infer(value);
+
+        // The full field path as it will be stored in Atlas
+        final Collection<String> fieldPath = new ArrayList<>(path);
+
+        fieldPath.add(name);
+
+        // constructing the field entity and loading it from cache or Atlas, 
if previously stored there
+        CouchbaseField rootField = new CouchbaseField()
+                .name(name)
+                .fieldPath(fieldPath.stream().collect(Collectors.joining(".")))
+                .fieldType(fieldType)
+                .collection(collectionEntity)
+                .parentField(parent)
+                .get();
+
+        // return value
+        Stream<CouchbaseField> result = Stream.of(rootField);
+
+        // Recursive transformation of embedded object fields
+        if (fieldType == CouchbaseFieldType.OBJECT) {
+            // Normalizing the value
+            if (value instanceof JsonObject) {
+                value = ((JsonObject) value).toMap();
+            }
+
+            if (value instanceof Map) {
+                // Append embedded field entities to the resulting Stream
+                result = Stream.concat(
+                        result,
+                        ((Map<String, ?>) value).entrySet().stream()
+                                // recursion
+                                .flatMap(entity -> 
processField(collectionEntity, fieldPath, rootField, entity.getKey(), 
entity.getValue()))
+                );
+            } else {
+                throw new IllegalArgumentException(String.format("Incorrect 
value type '%s' for field type 'object': a Map was expected instead.", 
value.getClass()));
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public String getMessageSource() {
+        return "couchbase";
+    }
+
+    /**
+     * Looks up the collection name by its vbucket identifier
+     *
+     * @param vbucket
+     * @param collid
+     * @return the name of the collection
+     */
+    private static CollectionInfo collectionInfo(int vbucket, long collid) {
+        return DCP.sessionState()
+                .get(vbucket)
+                .getCollectionsManifest()
+                .getCollection(collid);
+    }
+
+    protected static void setEntityInterceptors(Consumer<List<AtlasEntity>> 
createInterceptor, Consumer<List<AtlasEntity>> updateInterceptor) {
+        CouchbaseHook.createInterceptor = createInterceptor;
+        CouchbaseHook.updateInterceptor = updateInterceptor;
+    }
+
+    static void loop(boolean loop) {
+        CouchbaseHook.loop = loop;
+    }
+}
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
new file mode 100644
index 000000000..8bc8e42e9
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntity.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+/**
+ * Base class for all couchbase atlas models
+ * The class uses "Self-Builder" pattern:
+ * 1. First, create the "builder" instance of the class
+ * 2. Populate the identifying fields of the class (check the `qualifiedName` 
method of the entity for the list)
+ *          (all setters return the instance just as a Builder would)
+ * 3. Call `get()` method to resolve the instance and replace it with 
previously loaded from Atlas data (if present)
+ *
+ * Example:
+ * ```java
+ *  clusterEntity = new CouchbaseCluster()
+ *      .name(CBConfig.address())
+ *      .url(CBConfig.address())
+ *      .get();
+ * ```
+ *
+ * @param <E> extending class
+ */
+public abstract class CouchbaseAtlasEntity<E extends CouchbaseAtlasEntity<?>> {
+    private static final Map<Class, Map<String, AtlasEntity>> 
ENTITY_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>());
+    private static final Map<Class, Map<String, CouchbaseAtlasEntity>> 
MODEL_BY_TYPE_AND_ID = Collections.synchronizedMap(new HashMap<>());
+    private String name;
+
+    public String name() {
+        return name;
+    }
+
+    public E name(String name) {
+        this.name = name;
+        return (E) this;
+    }
+
+    /**
+     * Loads or creates corresponding Atlas Entity and marks this model as 
existing in Atlas
+     *
+     * @param atlas
+     * @return
+     */
+    public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+        AtlasEntity atlasEntity = atlasEntity()
+                .filter(entity -> entity.getGuid().charAt(0) != '-')
+                .orElseGet(() ->
+                        cache(load(atlas)
+                                .orElseGet(() ->
+                                        atlasEntity()
+                                        .orElseGet(() -> new 
AtlasEntity(atlasTypeName())))
+                        )
+                );
+
+        atlasEntity.setAttribute("name", name);
+        atlasEntity.setAttribute("qualifiedName", qualifiedName());
+
+        updateAtlasEntity(atlasEntity);
+
+        return atlasEntity;
+    }
+
+    protected abstract String qualifiedName();
+
+    /**
+     * Looks up precreated atlas entity in the entity cache
+     * @return Optional of the cached entity
+     */
+    public Optional<AtlasEntity> atlasEntity() {
+        return cachedEntity().map(atlasEntity -> {
+            updateAtlasEntity(atlasEntity);
+            return atlasEntity;
+        });
+    }
+
+    /**
+     * Checks whether the model has the Atlas Entity created for it
+     * by looking it up in the entity cache.
+     * NOTE: this method does not check if the entity has been saved in Atlas 
so,
+     *      it will return true when the entity is already created and cached 
but is yet to be sent to Atlas
+     *
+     * This method is _mostly_ used in related objects when setting 
relationship field to ensure that related
+     *      model has an AtlasEntity that can be referenced when storing 
relationship information.
+     *
+     * @return true if the entity found
+     */
+    protected boolean exists() {
+        return cachedEntity().isPresent();
+    }
+
+    public abstract String atlasTypeName();
+
+    public abstract UUID id();
+
+    /**
+     * Invoked when the entity needs to be updated with values from the model
+     * @param entity the entity to write the values into
+     */
+    protected void updateAtlasEntity(AtlasEntity entity) {
+        // override me
+    }
+
+    /**
+     * Invoked when the model needs to be updated with values from the entity
+     * @param entity the entity to read the values from
+     */
+    protected void updateJavaModel(AtlasEntity entity) {
+        // override me
+    }
+
+    /**
+     * Loads the entity for this model from Atlas and stores it in the entity 
cache
+     * @param client Atlas client to use
+     * @return loaded entity
+     */
+    private Optional<AtlasEntity> load(AtlasClientV2 client) {
+        try {
+            Map<String, String> query = new HashMap<>();
+            query.put("qualifiedName", qualifiedName());
+            AtlasEntity atlasEntity = 
client.getEntityByAttribute(atlasTypeName(), query).getEntity();
+
+            if (atlasEntity != null) {
+                cache(atlasEntity);
+                if (atlasEntity.hasAttribute("name")) {
+                    this.name = (String) atlasEntity.getAttribute("name");
+                }
+                updateJavaModel(atlasEntity);
+                return Optional.of(atlasEntity);
+            }
+        } catch (AtlasServiceException e) {
+            if (e.getStatus().getStatusCode() != 404) {
+                throw new RuntimeException(e);
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Puts an entity into the entity cache
+     * @param atlasEntity the entity to cache
+     * @return the same entity
+     */
+    private AtlasEntity cache(AtlasEntity atlasEntity) {
+        if (!ENTITY_BY_TYPE_AND_ID.containsKey(getClass())) {
+            ENTITY_BY_TYPE_AND_ID.put(getClass(), new HashMap<>());
+        }
+
+        ENTITY_BY_TYPE_AND_ID.get(getClass()).put(id().toString(), 
atlasEntity);
+        return atlasEntity;
+    }
+
+    /**
+     * Looks up the entity in the cache
+     * @return Optional of cached entity
+     */
+    private Optional<AtlasEntity> cachedEntity() {
+        return 
Optional.ofNullable(ENTITY_BY_TYPE_AND_ID.getOrDefault(getClass(), (Map<String, 
AtlasEntity>) Collections.EMPTY_MAP).getOrDefault(id().toString(), null));
+    }
+
+    /**
+     * First checks if the entity has been loaded and cached and, if not, then 
tries to load it from Atlas
+     * @param atlas Atlas client to use
+     * @return true if the entity found either in cache or in Atlas
+     */
+    public boolean exists(AtlasClientV2 atlas) {
+        if (!exists()) {
+            return load(atlas).isPresent();
+        }
+        return true;
+    }
+
+    /**
+     * Returns pre-cached model with provided identifiers or caches this model 
and returns it
+     *
+     * @return the model
+     */
+    public E get() {
+        Class<E> type = (Class<E>) getClass();
+        String id = id().toString();
+
+        // ensure valid cache structure
+        if (!MODEL_BY_TYPE_AND_ID.containsKey(type)) {
+            MODEL_BY_TYPE_AND_ID.put(type, Collections.synchronizedMap(new 
HashMap<>()));
+        }
+
+        // put the model into the cache, if not already present
+        Map<String, CouchbaseAtlasEntity> modelsById = 
MODEL_BY_TYPE_AND_ID.get(type);
+        if (!modelsById.containsKey(id)) {
+            try {
+                modelsById.put(id, this);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        return (E) modelsById.get(id);
+    }
+
+    public static void dropCache() {
+        ENTITY_BY_TYPE_AND_ID.clear();
+        MODEL_BY_TYPE_AND_ID.clear();
+    }
+}
\ No newline at end of file
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
new file mode 100644
index 000000000..2983b2af2
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseBucket.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class CouchbaseBucket extends CouchbaseAtlasEntity<CouchbaseBucket> {
+    public static final String TYPE_NAME = "couchbase_bucket";
+    private CouchbaseCluster cluster;
+    private transient Map<String, CouchbaseScope> scopes = 
Collections.synchronizedMap(new HashMap<>());
+
+    @Override
+    public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+        AtlasEntity entity = super.atlasEntity(atlas);
+        entity.setRelationshipAttribute("cluster", cluster.atlasEntity(atlas));
+        return entity;
+    }
+
+    @Override
+    protected String qualifiedName() {
+        return String.format("%s/%s", cluster.qualifiedName(), name());
+    }
+
+    public CouchbaseBucket() {
+
+    }
+
+    public CouchbaseCluster cluster() {
+        return cluster;
+    }
+
+    public CouchbaseBucket cluster(CouchbaseCluster cluster) {
+        this.cluster = cluster;
+        return this;
+    }
+
+    @Override
+    public String atlasTypeName() {
+        return TYPE_NAME;
+    }
+
+    @Override
+    public UUID id() {
+        return UUID.nameUUIDFromBytes(String.format("%s:%s:%s", 
atlasTypeName(), cluster().id(), name()).getBytes(Charset.defaultCharset()));
+    }
+
+    public CouchbaseScope scope(String name) {
+        if (!scopes.containsKey(name)) {
+            scopes.put(name, new CouchbaseScope()
+                    .bucket(this)
+                    .name(name)
+                    .get()
+            );
+        }
+
+        return scopes.get(name);
+    }
+}
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
new file mode 100644
index 000000000..2f4efe865
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCluster.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class CouchbaseCluster extends CouchbaseAtlasEntity<CouchbaseCluster> {
+    public static final String TYPE_NAME = "couchbase_cluster";
+    private String url;
+
+    public String url() {
+        return url;
+    }
+
+    public CouchbaseCluster url(String url) {
+        this.url = url;
+        return this;
+    }
+
+    public static AtlasEntityDef atlasEntityDef() {
+        AtlasEntityDef definition = AtlasTypeUtil.createClassTypeDef(
+                "couchbase_cluster",
+                new HashSet<>()
+        );
+
+        definition.getSuperTypes().add("Asset");
+        definition.setServiceType("couchbase");
+        definition.setTypeVersion("0.1");
+
+        List<AtlasStructDef.AtlasAttributeDef> attributes = 
definition.getAttributeDefs();
+
+        attributes.add(new AtlasStructDef.AtlasAttributeDef(
+                "url",
+                "string",
+                false,
+                AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE,
+                1,
+                1,
+                true,
+                true,
+                true,
+                Collections.EMPTY_LIST
+        ));
+
+        return definition;
+    }
+
+    public static Collection<? extends AtlasRelationshipDef> 
atlasRelationshipDefs() {
+        return Arrays.asList(
+                new AtlasRelationshipDef(
+                        "couchbase_cluster_buckets",
+                        "",
+                        "0.1",
+                        "couchbase",
+                        AtlasRelationshipDef.RelationshipCategory.AGGREGATION,
+                        AtlasRelationshipDef.PropagateTags.ONE_TO_TWO,
+                        new AtlasRelationshipEndDef(
+                                "couchbase_cluster",
+                                "buckets",
+                                
AtlasStructDef.AtlasAttributeDef.Cardinality.SET,
+                                true
+                        ),
+                        new AtlasRelationshipEndDef(
+                                "couchbase_bucket",
+                                "cluster",
+                                
AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE,
+                                false
+                        )
+                )
+        );
+    }
+
+    @Override
+    public String atlasTypeName() {
+        return TYPE_NAME;
+    }
+
+    @Override
+    public UUID id() {
+        return UUID.nameUUIDFromBytes(String.format("%s:%s", atlasTypeName(), 
url()).getBytes(Charset.defaultCharset()));
+    }
+
+    @Override
+    public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+        AtlasEntity entity = super.atlasEntity(atlas);
+        entity.setAttribute("url", url());
+        return entity;
+    }
+
+    @Override
+    protected String qualifiedName() {
+        return url();
+    }
+
+    @Override
+    protected void updateJavaModel(AtlasEntity entity) {
+        if (entity.hasAttribute("url")) {
+            this.url = (String) entity.getAttribute("url");
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
new file mode 100644
index 000000000..b7319581d
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseCollection.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+
+public class CouchbaseCollection extends 
CouchbaseAtlasEntity<CouchbaseCollection> {
+
+    private CouchbaseScope scope;
+
+    private long documentsAnalyzed;
+
+    public CouchbaseCollection scope(CouchbaseScope scope) {
+        this.scope = scope;
+        return this;
+    }
+
+    @Override
+    public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+        AtlasEntity entity = super.atlasEntity(atlas);
+        entity.setRelationshipAttribute("scope", scope.atlasEntity(atlas));
+        return entity;
+    }
+
+    @Override
+    protected void updateAtlasEntity(AtlasEntity entity) {
+        entity.setAttribute("documentsAnalyzed", documentsAnalyzed);
+    }
+
+    @Override
+    protected void updateJavaModel(AtlasEntity entity) {
+        documentsAnalyzed = (Integer) entity.getAttribute("documentsAnalyzed");
+    }
+
+    public long documentsAnalyzed() {
+        return documentsAnalyzed;
+    }
+
+    public CouchbaseCollection incrementAnalyzedDocuments() {
+        this.documentsAnalyzed++;
+        return this;
+    }
+
+    @Override
+    protected String qualifiedName() {
+        return String.format("%s/%s", scope.qualifiedName(), name());
+    }
+
+    @Override
+    public String atlasTypeName() {
+        return "couchbase_collection";
+    }
+
+    @Override
+    public UUID id() {
+        return UUID.nameUUIDFromBytes(String.format("%s:%s:%s", 
atlasTypeName(), scope().id().toString(), 
name()).getBytes(Charset.defaultCharset()));
+    }
+
+    public CouchbaseScope scope() {
+        return this.scope;
+    }
+}
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
new file mode 100644
index 000000000..5d14f6e20
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseField.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+
+public class CouchbaseField extends CouchbaseAtlasEntity<CouchbaseField> {
+    public static final String TYPE_NAME = "couchbase_field";
+    private CouchbaseFieldType fieldType;
+    private String fieldPath;
+    private long documentCount = 0;
+
+    private CouchbaseField parentField;
+
+    private CouchbaseCollection collection;
+
+    public CouchbaseField() {
+
+    }
+
+    public CouchbaseFieldType fieldType() {
+        return fieldType;
+    }
+
+    public CouchbaseField fieldType(CouchbaseFieldType fieldType) {
+        this.fieldType = fieldType;
+        return this;
+    }
+
+    public String fieldPath() {
+        return fieldPath;
+    }
+
+    public CouchbaseField fieldPath(String fieldPath) {
+        this.fieldPath = fieldPath;
+        return this;
+    }
+
+    public long documentCount() {
+        return documentCount;
+    }
+
+    public CouchbaseField documentCount(long documentCount) {
+        this.documentCount = documentCount;
+        return this;
+    }
+
+    public void incrementDocumentCount() {
+        this.documentCount++;
+    }
+
+    public CouchbaseCollection collection() {
+        return collection;
+    }
+
+    public CouchbaseField collection(CouchbaseCollection collection) {
+        this.collection = collection;
+        return this;
+    }
+
+    @Override
+    public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+        AtlasEntity entity = super.atlasEntity(atlas);
+        entity.setRelationshipAttribute("collection", 
collection.atlasEntity(atlas));
+        if (parentField != null) {
+            entity.setRelationshipAttribute("parentField", 
parentField.atlasEntity(atlas));
+        }
+        return entity;
+    }
+
+    @Override
+    protected void updateAtlasEntity(AtlasEntity entity) {
+        entity.setAttribute("fieldType", fieldType.toString());
+        entity.setAttribute("fieldPath", fieldPath);
+        entity.setAttribute("documentCount", documentCount);
+    }
+
+    @Override
+    protected String qualifiedName() {
+        return String.format("%s/%s:%s", collection.qualifiedName(), 
fieldPath(), fieldType());
+    }
+
+    @Override
+    public String atlasTypeName() {
+        return TYPE_NAME;
+    }
+
+    @Override
+    public UUID id() {
+        return 
UUID.nameUUIDFromBytes(qualifiedName().getBytes(Charset.defaultCharset()));
+    }
+
+    public CouchbaseField parentField() {
+        return parentField;
+    }
+
+    public CouchbaseField parentField(CouchbaseField parent) {
+        this.parentField = parent;
+        return this;
+    }
+}
\ No newline at end of file
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
new file mode 100644
index 000000000..8355f60cf
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseFieldType.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import com.couchbase.client.core.error.InvalidArgumentException;
+import com.couchbase.client.java.json.JsonObject;
+import org.apache.atlas.model.typedef.AtlasEnumDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public enum CouchbaseFieldType {
+    BOOLEAN,
+    NUMBER,
+    STRING,
+    ARRAY,
+    OBJECT,
+    BINARY;
+
+    public static CouchbaseFieldType infer(@Nonnull Object value) {
+        if (value instanceof Map || value instanceof JsonObject) {
+            return OBJECT;
+        } else if (value instanceof Collection || value.getClass().isArray()) {
+            if (value.getClass().isArray() && 
Byte.class.isAssignableFrom(value.getClass().getComponentType())) {
+                return BINARY;
+            }
+            return ARRAY;
+        } else if (value instanceof Number) {
+            return NUMBER;
+        } else if (value instanceof Boolean) {
+            return BOOLEAN;
+        } else if (value instanceof String) {
+            String sValue = (String) value;
+            if ("true".equalsIgnoreCase(sValue) || 
"false".equalsIgnoreCase(sValue)) {
+                return BOOLEAN;
+            }
+            try {
+                Double.parseDouble(sValue);
+                return NUMBER;
+            } catch (NumberFormatException nfe) {
+                return STRING;
+            }
+        }
+
+        throw new IllegalArgumentException("Failed to infer type");
+    }
+
+    @Override
+    public String toString() {
+        return super.toString().toLowerCase(Locale.getDefault());
+    }
+
+}
\ No newline at end of file
diff --git 
a/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
new file mode 100644
index 000000000..13f4e4852
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/main/java/com/couchbase/atlas/connector/entities/CouchbaseScope.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.type.AtlasTypeUtil;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CouchbaseScope extends CouchbaseAtlasEntity<CouchbaseScope> {
+
+    public static final String TYPE_NAME = "couchbase_scope";
+    private CouchbaseBucket bucket;
+
+    private transient Map<String, CouchbaseCollection> collections = 
Collections.synchronizedMap(new HashMap<>());
+
+    public CouchbaseBucket bucket() {
+        return bucket;
+    }
+
+    public CouchbaseScope bucket(CouchbaseBucket bucket) {
+        this.bucket = bucket;
+        return this;
+    }
+
+    @Override
+    public UUID id() {
+        return UUID.nameUUIDFromBytes(
+                String.format(
+                        "%s:%s:%s",
+                        atlasTypeName(),
+                        bucket().id().toString(),
+                        name()
+                ).getBytes(Charset.defaultCharset())
+        );
+    }
+
+    @Override
+    public AtlasEntity atlasEntity(AtlasClientV2 atlas) {
+        AtlasEntity entity = super.atlasEntity(atlas);
+        entity.setRelationshipAttribute("bucket", bucket.atlasEntity(atlas));
+        return entity;
+    }
+
+    @Override
+    public String qualifiedName() {
+        return String.format("%s/%s", bucket.qualifiedName(), name());
+    }
+
+    @Override
+    public String atlasTypeName() {
+        return TYPE_NAME;
+    }
+
+    public CouchbaseCollection collection(String name) {
+        if (!collections.containsKey(name)) {
+            collections.put(name, new CouchbaseCollection()
+                    .name(name)
+                    .scope(this)
+                    .get()
+            );
+        }
+
+        return collections.get(name);
+    }
+}
diff --git 
a/addons/couchbase-bridge/src/main/resources/atlas-application.properties 
b/addons/couchbase-bridge/src/main/resources/atlas-application.properties
new file mode 100644
index 000000000..3df60b02f
--- /dev/null
+++ b/addons/couchbase-bridge/src/main/resources/atlas-application.properties
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git 
a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
 
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
new file mode 100644
index 000000000..36d81a3ac
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/CouchbaseHookTest.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector;
+
+import com.couchbase.atlas.connector.entities.CouchbaseAtlasEntity;
+import com.couchbase.atlas.connector.entities.CouchbaseBucket;
+import com.couchbase.atlas.connector.entities.CouchbaseCluster;
+import com.couchbase.atlas.connector.entities.CouchbaseScope;
+import com.couchbase.client.dcp.Client;
+import com.couchbase.client.dcp.StreamFrom;
+import com.couchbase.client.dcp.StreamTo;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+public class CouchbaseHookTest {
+
+    private Client mockDcpClient() {
+        Client mockDcpClient = Mockito.mock(Client.class);
+        Mockito.when(mockDcpClient.connect()).thenReturn(Mono.empty());
+        Mockito.when(mockDcpClient.initializeState(StreamFrom.NOW, 
StreamTo.INFINITY)).thenReturn(Mono.empty());
+        Mockito.when(mockDcpClient.startStreaming()).thenReturn(Mono.empty());
+        Mockito.when(mockDcpClient.disconnect()).thenReturn(Mono.empty());
+        return mockDcpClient;
+    }
+
+    private AtlasClientV2 mockAtlasClient(boolean returnEntities) throws 
Exception {
+        AtlasClientV2 mockAtlasClient = Mockito.mock(AtlasClientV2.class);
+        final String clusterName = "couchbase://localhost";
+        final String bucketName = String.format("%s/%s", clusterName, 
"default");
+        final String scopeName = String.format("%s/%s", bucketName, 
"_default");
+
+        
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseCluster.TYPE_NAME),
 Mockito.anyMap())).thenAnswer(iom -> {
+            Map<String, String> query = iom.getArgument(1);
+            Assert.assertEquals(clusterName, query.get("qualifiedName"));
+            return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ? 
Mockito.mock(AtlasEntity.class) : null);
+        });
+
+        
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseBucket.TYPE_NAME),
 Mockito.anyMap())).thenAnswer(iom -> {
+            Map<String, String> query = iom.getArgument(1);
+            Assert.assertEquals(bucketName, query.get("qualifiedName"));
+            return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ? 
Mockito.mock(AtlasEntity.class) : null);
+        });
+
+        
Mockito.when(mockAtlasClient.getEntityByAttribute(Mockito.eq(CouchbaseScope.TYPE_NAME),
 Mockito.anyMap())).thenAnswer(iom -> {
+            Map<String, String> query = iom.getArgument(1);
+            Assert.assertEquals(scopeName, query.get("qualifiedName"));
+            return new AtlasEntity.AtlasEntityWithExtInfo(returnEntities ? 
Mockito.mock(AtlasEntity.class) : null);
+        });
+
+        return mockAtlasClient;
+    }
+
+    @Test
+    public void testMain() throws Exception {
+        Client mockDcpClient = mockDcpClient();
+        CBConfig.dcpClient(mockDcpClient);
+        AtlasClientV2 mockAtlasClient = mockAtlasClient(false);
+        AtlasConfig.client(mockAtlasClient);
+
+        AtomicInteger createCalled = new AtomicInteger();
+        Consumer<List<AtlasEntity>> createEntitiesInterceptor = ents -> {
+            createCalled.getAndIncrement();
+            Assert.assertEquals(ents.size(), 2);
+        };
+        Consumer<List<AtlasEntity>> updateEntitiesInterceptor = ents -> {
+            Assert.assertTrue(false);
+        };
+
+        CouchbaseHook.setEntityInterceptors(createEntitiesInterceptor, 
updateEntitiesInterceptor);
+        CouchbaseHook.loop(false);
+        // AAAAAND, ACTION (missing entities)
+        CouchbaseHook.main(new String[0]);
+
+        Mockito.verify(mockDcpClient, Mockito.times(1)).connect();
+        Assert.assertEquals(1, createCalled.get());
+        // 2 times: 1 time when we call exists(ATLAS) and second time when we 
request the entity
+        validateAtlasInvocations(mockAtlasClient, 3, 2, 0);
+
+        // simulate existing entities situation
+        mockAtlasClient = mockAtlasClient(true);
+        AtlasConfig.client(mockAtlasClient);
+        CouchbaseAtlasEntity.dropCache();
+
+        // ACTION AGAIN, this time with mock entities in mock Atlas
+        CouchbaseHook.main(new String[0]);
+
+        Mockito.verify(mockDcpClient, Mockito.times(2)).connect();
+        Assert.assertEquals(1, createCalled.get());
+        // 1 time and then it should be cached
+        validateAtlasInvocations(mockAtlasClient, 1, 1, 0);
+
+        testEvents(CouchbaseHook.INSTANCE);
+    }
+
+    public void testEvents(CouchbaseHook listener) {
+
+    }
+
+    private void validateAtlasInvocations(AtlasClientV2 mockAtlasClient, int 
cluster, int bucket, int scope) throws Exception {
+        Mockito.verify(mockAtlasClient, 
Mockito.times(cluster)).getEntityByAttribute(
+                Mockito.eq(CouchbaseCluster.TYPE_NAME),
+                Mockito.anyMap()
+        );
+        Mockito.verify(mockAtlasClient, 
Mockito.times(bucket)).getEntityByAttribute(
+                Mockito.eq(CouchbaseBucket.TYPE_NAME),
+                Mockito.anyMap()
+        );
+        Mockito.verify(mockAtlasClient, 
Mockito.times(scope)).getEntityByAttribute(
+                Mockito.eq(CouchbaseScope.TYPE_NAME),
+                Mockito.anyMap()
+        );
+    }
+}
\ No newline at end of file
diff --git 
a/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
 
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
new file mode 100644
index 000000000..5449d6778
--- /dev/null
+++ 
b/addons/couchbase-bridge/src/test/java/com/couchbase/atlas/connector/entities/CouchbaseAtlasEntityTest.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.couchbase.atlas.connector.entities;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Tests atlas entity loading and caching
+ */
+public class CouchbaseAtlasEntityTest {
+    final static String QUALIFIED_NAME = "testEntityQualifiedName";
+    final static String TYPE_NAME = "testEntityTypeName";
+    final static UUID ID = UUID.randomUUID();
+
+    public class TestEntity extends CouchbaseAtlasEntity<TestEntity> {
+
+        @Override
+        protected String qualifiedName() {
+            return QUALIFIED_NAME;
+        }
+
+        @Override
+        public String atlasTypeName() {
+            return TYPE_NAME;
+        }
+
+        @Override
+        public UUID id() {
+            return ID;
+        }
+    }
+
+    @Test
+    public void testEntityLoading() throws Exception {
+        final AtlasClientV2 ac = Mockito.mock(AtlasClientV2.class);
+        final AtlasEntity ae = Mockito.mock(AtlasEntity.class);
+
+        Mockito.when(ae.getAttribute(Mockito.eq("qualifiedName")))
+                .thenReturn(QUALIFIED_NAME);
+
+        Mockito.when(
+                ac.getEntityByAttribute(
+                        Mockito.eq(TYPE_NAME),
+                        Mockito.anyMap()
+                )
+        ).thenAnswer(iom -> {
+            Map<String, String> query = iom.getArgument(1);
+            Assert.assertTrue(query.containsKey("qualifiedName"));
+            Assert.assertEquals(QUALIFIED_NAME, query.get("qualifiedName"));
+            return new AtlasEntity.AtlasEntityWithExtInfo(ae);
+        });
+
+        TestEntity subject = Mockito.spy(new TestEntity());
+        // exists must return false at this point as we've just created the 
model but it doesn't have the corresponding AtlasEntity
+        // and the cache should be empty
+        Assert.assertFalse(subject.exists());
+        Assert.assertSame(subject, subject.get());
+        Assert.assertFalse(subject.exists());
+        // ditto
+        Assert.assertTrue(!subject.atlasEntity().isPresent());
+        // Because our client mock should return the mock entity, exists with 
Atlas check should find the entity,
+        // cache it, and return true
+        Assert.assertTrue(subject.exists(ac));
+        // and call the method to update our model
+        Mockito.verify(subject, 
Mockito.times(1)).updateJavaModel(Mockito.eq(ae));
+        // Let's validate that exists with Atlas check did, in fact, query our 
atlas mock for the entity
+        Mockito.verify(ac, 
Mockito.times(1)).getEntityByAttribute(Mockito.eq(TYPE_NAME), Mockito.anyMap());
+        // the entity should exist in cache
+        Assert.assertTrue(subject.exists());
+        // and exists with Atlas check should use it
+        Assert.assertTrue(subject.exists(ac));
+        // so, let's verify that the item was pulled not from atlas (from 
cache will be the only option left)
+        Mockito.verify(ac, 
Mockito.times(1)).getEntityByAttribute(Mockito.eq(TYPE_NAME), Mockito.anyMap());
+
+        // This method should return filled Optional with our mocked entity 
pulled from cache
+        // And, no matter how many times we call, the result should be the 
same (but let's make sure that we call it at least twice)
+        int timesToLoadEntity = 2 + (int) (Math.random() * 98);
+        for (int i = 0; i < timesToLoadEntity; i++) {
+            Assert.assertSame(ae, subject.atlasEntity().get());
+        }
+        // verify that atlas entity was updated every time we requested it
+        Mockito.verify(subject, 
Mockito.times(timesToLoadEntity)).updateAtlasEntity(Mockito.eq(ae));
+        // verify that the model was not updated when we requested the entity 
second time
+        Mockito.verify(subject, 
Mockito.times(1)).updateJavaModel(Mockito.eq(ae));
+    }
+}
\ No newline at end of file
diff --git a/addons/couchbase-bridge/src/test/resources/log4j.xml 
b/addons/couchbase-bridge/src/test/resources/log4j.xml
new file mode 100644
index 000000000..e53c232d0
--- /dev/null
+++ b/addons/couchbase-bridge/src/test/resources/log4j.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m 
(%C{1}:%L)%n"/>
+        </layout>
+    </appender>
+
+    <root>
+        <level value="DEBUG"/>
+        <appender-ref ref="console"/>
+    </root>
+
+</log4j:configuration>
diff --git a/addons/models/5000-Couchbase/5020-couchbase_model.json 
b/addons/models/5000-Couchbase/5020-couchbase_model.json
new file mode 100644
index 000000000..acd5d798d
--- /dev/null
+++ b/addons/models/5000-Couchbase/5020-couchbase_model.json
@@ -0,0 +1,159 @@
+{
+  "enumDefs": [
+    {
+      "name":         "couchbase_field_type",
+      "typeVersion":  "1.0",
+      "service_type": "couchbase",
+      "elementDefs": [
+        { "ordinal": 0, "value": "boolean" },
+        { "ordinal": 1, "value": "number" },
+        { "ordinal": 2, "value": "string" },
+        { "ordinal": 3, "value": "array" },
+        { "ordinal": 4, "value": "object" },
+        { "ordinal": 5, "value": "binary" }
+      ]
+    }
+  ],
+  "structDefs": [],
+  "classificationDefs": [],
+  "entityDefs": [
+    {
+      "name":          "couchbase_cluster",
+      "superTypes":    [ "Asset" ],
+      "serviceType":   "couchbase",
+      "typeVersion":   "1.0",
+      "attributeDefs": [
+        {
+          "name":                  "url",
+          "typeName":              "string",
+          "cardinality":           "SINGLE",
+          "isIndexable":           true,
+          "isOptional":            false,
+          "isUnique":              true,
+          "includeInNotification": true
+        }
+      ]
+    },
+    {
+      "name":         "couchbase_bucket",
+      "superTypes":   [ "Asset" ],
+      "serviceType":  "couchbase",
+      "typeVersion":  "1.0",
+      "attributeDefs": [ ]
+    },
+    {
+      "name":          "couchbase_scope",
+      "superTypes":    [ "Asset" ],
+      "serviceType":   "couchbase",
+      "typeVersion":   "1.0",
+      "attributeDefs": []
+    },
+    {
+      "name":          "couchbase_collection",
+      "superTypes":    [ "DataSet" ],
+      "serviceType":   "couchbase",
+      "typeVersion":   "1.0",
+      "options":       { "schemaElementsAttribute": "fields" },
+      "attributeDefs": [
+        {
+          "name":                  "documentsAnalyzed",
+          "typeName":              "long",
+          "isOptional":            false,
+          "cardinality":           "SINGLE",
+          "valuesMinCount":        1,
+          "valuesMaxCount":        1,
+          "isUnique":              false,
+          "isIndexable":           false,
+          "includeInNotification": false
+        }
+      ]
+    },
+    {
+      "name":          "couchbase_field",
+      "superTypes":    [ "DataSet" ],
+      "serviceType":   "couchbase",
+      "typeVersion":   "1.0",
+      "attributeDefs": [
+        {
+          "name":                  "fieldType",
+          "typeName":              "couchbase_field_type",
+          "isOptional":            false,
+          "cardinality":           "SINGLE",
+          "valuesMinCount":        1,
+          "valuesMaxCount":        1,
+          "isUnique":              false,
+          "isIndexable":           true,
+          "includeInNotification": false
+        },
+        {
+          "name":                  "fieldPath",
+          "typeName":              "string",
+          "isOptional":            false,
+          "cardinality":           "SINGLE",
+          "valuesMinCount":        1,
+          "valuesMaxCount":        1,
+          "isUnique":              false,
+          "isIndexable":           true,
+          "includeInNotification": false
+        },
+        {
+          "name":           "documentCount",
+          "typeName":       "long",
+          "isOptional":     false,
+          "cardinality":    "SINGLE",
+          "valuesMinCount": 1,
+          "valuesMaxCount": 1,
+          "isUnique":       false,
+          "isIndexable":    false
+        }
+      ]
+    }
+  ],
+  "relationshipDefs": [
+    {
+      "name":                 "couchbase_cluster_buckets",
+      "typeVersion":          "1.0",
+      "serviceType":          "couchbase",
+      "relationshipCategory": "AGGREGATION",
+      "propagateTags":        "ONE_TO_TWO",
+      "endDef1": { "type": "couchbase_cluster", "name": "buckets", 
"cardinality": "SET",    "isContainer": true },
+      "endDef2": { "type": "couchbase_bucket",  "name": "cluster", 
"cardinality": "SINGLE", "isContainer": false }
+    },
+    {
+      "name":                 "couchbase_bucket_scopes",
+      "typeVersion":          "1.0",
+      "serviceType":          "couchbase",
+      "relationshipCategory": "AGGREGATION",
+      "propagateTags":        "ONE_TO_TWO",
+      "endDef1": { "type": "couchbase_bucket", "name": "scopes", 
"cardinality": "SET",    "isContainer": true },
+      "endDef2": { "type": "couchbase_scope",  "name": "bucket", 
"cardinality": "SINGLE", "isContainer": false }
+    },
+    {
+      "name":                 "couchbase_scope_collections",
+      "typeVersion":          "1.0",
+      "serviceType":          "couchbase",
+      "relationshipCategory": "AGGREGATION",
+      "propagateTags":        "ONE_TO_TWO",
+      "endDef1": { "type": "couchbase_scope",      "name": "collections", 
"cardinality": "SET",    "isContainer": true },
+      "endDef2": { "type": "couchbase_collection", "name": "scope",       
"cardinality": "SINGLE", "isContainer": false }
+    },
+    {
+      "name":                 "couchbase_collection_fields",
+      "typeVersion":          "1.0",
+      "serviceType":          "couchbase",
+      "relationshipCategory": "AGGREGATION",
+      "propagateTags":        "ONE_TO_TWO",
+      "endDef1": { "type": "couchbase_collection", "name": "fields",     
"cardinality": "SET",    "isContainer": true },
+      "endDef2": { "type": "couchbase_field",      "name": "collection", 
"cardinality": "SINGLE", "isContainer": false }
+    },
+    {
+      "name":                 "couchbase_field_fields",
+      "typeVersion":          "1.0",
+      "serviceType":          "couchbase",
+      "relationshipCategory": "AGGREGATION",
+      "propagateTags":        "ONE_TO_TWO",
+      "endDef1": { "type": "couchbase_field",  "name": "objectFields", 
"cardinality": "SET",    "isContainer": true },
+      "endDef2": { "type": "couchbase_field",  "name": "parentField",  
"cardinality": "SINGLE", "isContainer": false }
+    }
+  ]
+}
diff --git a/distro/pom.xml b/distro/pom.xml
index ed477dfbb..9df238eb3 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -129,6 +129,7 @@ atlas.graph.storage.hbase.regions-per-server=1
                                         
<descriptor>src/main/assemblies/atlas-sqoop-hook-package.xml</descriptor>
                                         
<descriptor>src/main/assemblies/atlas-storm-hook-package.xml</descriptor>
                                         
<descriptor>src/main/assemblies/atlas-kafka-hook-package.xml</descriptor>
+                                        
<descriptor>src/main/assemblies/atlas-couchbase-hook-package.xml</descriptor>
                                         
<descriptor>src/main/assemblies/atlas-server-package.xml</descriptor>
                                         
<descriptor>src/main/assemblies/standalone-package.xml</descriptor>
                                         
<descriptor>src/main/assemblies/src-package.xml</descriptor>
diff --git a/distro/src/main/assemblies/atlas-couchbase-hook-package.xml 
b/distro/src/main/assemblies/atlas-couchbase-hook-package.xml
new file mode 100644
index 000000000..5022bef51
--- /dev/null
+++ b/distro/src/main/assemblies/atlas-couchbase-hook-package.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+          
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <id>couchbase-hook</id>
+    
<baseDirectory>apache-atlas-couchbase-hook-${project.version}</baseDirectory>
+    <fileSets>
+        <fileSet>
+            
<directory>../addons/couchbase-bridge/target/dependency/hook</directory>
+            <outputDirectory>hook</outputDirectory>
+        </fileSet>
+
+    </fileSets>
+</assembly>
diff --git a/distro/src/main/assemblies/standalone-package.xml 
b/distro/src/main/assemblies/standalone-package.xml
index 3e2ca1c39..888d3edf4 100755
--- a/distro/src/main/assemblies/standalone-package.xml
+++ b/distro/src/main/assemblies/standalone-package.xml
@@ -180,6 +180,12 @@
             <outputDirectory>hook</outputDirectory>
         </fileSet>
 
+        <!-- addons/couchbase -->
+        <fileSet>
+            
<directory>../addons/couchbase-bridge/target/dependency/hook</directory>
+            <outputDirectory>hook</outputDirectory>
+        </fileSet>
+
         <!-- for kafka topic setup -->
         <fileSet>
             <directory>../addons/kafka-bridge/src/bin</directory>
diff --git a/docs/src/documents/Hook/HookCouchbase.md 
b/docs/src/documents/Hook/HookCouchbase.md
new file mode 100644
index 000000000..53b9e9cb3
--- /dev/null
+++ b/docs/src/documents/Hook/HookCouchbase.md
@@ -0,0 +1,45 @@
+---
+name: Couchbase
+route: /HookCouchbase
+menu: Documentation
+submenu: Hooks
+---
+
+import  themen  from 'theme/styles/styled-colors';
+import  * as theme  from 'react-syntax-highlighter/dist/esm/styles/hljs';
+import SyntaxHighlighter from 'react-syntax-highlighter';
+
+# Apache Atlas Couchbase bridge
+This bridge connects to a Couchbase cluster using DCP protocol
+and performs real-time analysis and metadata extraction from stored on the 
cluster documents.
+The extracted metadata is then sent to Atlas via its REST API
+
+## Configuration
+The bridge uses environment variables for configuration.
+
+### Atlas REST API
+| Environment variable | Description                                           
                                                                            | 
Default Value            |
+|----------------------|-----------------------------------------------------------------------------------------------------------------------------------|--------------------------|
+| ATLAS_URL            | Atlas REST API url                                    
                                                                            | 
"http://localhost:21000"; |
+| ATLAS_USERNAME       | Atlas REST API username                               
                                                                            | 
"admin"                  |
+| ATLAS_PASSWORD       | Atlas REST API password                               
                                                                            | 
"admin"                  |
+
+### Couchbase DCP connection
+| Environment variable | Description                                           
                                                                            | 
Default Value            |
+|----------------------|-----------------------------------------------------------------------------------------------------------------------------------|--------------------------|
+| CB_CLUSTER           | Couchbase Cluster connection string                   
                                                                            | 
"couchbase://localhost"  |
+| CB_USERNAME          | Couchbase Cluster username                            
                                                                            | 
"Administrator"          |
+| CB_PASSWORD          | Couchbase Cluster password                            
                                                                            | 
"password"               |
+| CB_ENABLE_TLS        | Use TLS                                               
                                                                            | 
false                    |
+| CB_BUCKET            | Couchbase bucket to monitor                           
                                                                            | 
"default"                |
+| CB_COLLECTIONS       | Comma-separated list of collections to monitor with 
each collection listed as <scope>.<collection>                                | 
                         |
+| DCP_PORT             | DCP port to use                                       
                                                                            | 
11210                    |
+| DCP_FIELD_THRESHOLD  | A threshold that indicates in what percentage of 
analyzed messages per collection  a field must appear before it is sent to 
Atlas | 0                        |
+| DCP_SAMPLE_RATIO     | Percentage of DCP messages to be analyzed in form of 
a short between 0 and 1.                                                     | 
1                        |
+
+## Running the bridge
+In the following commands, replace `<VARNAME>` and `<version>` with 
appropriate values.
+* Set environment variables using `export <VARNAME>=...` or prefix them before 
the next command.
+* Run `java -cp couchbase-bridge-<version>.jar 
com.couchbase.atlas.connector.CouchbaseHook`.
+* Verify that appropriate `couchbase_cluster`, `couchbase_bucket` objects were 
created on your Atlas cluster.
+* After having some documents updated on the Couchbase cluster, verify that 
appropriate `couchbase_scope`, `couchbase_collection`, and `couchbase_field` 
objects were created on your Atlas cluster.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6a0e68485..c55bec763 100644
--- a/pom.xml
+++ b/pom.xml
@@ -849,6 +849,7 @@
         <module>addons/impala-hook-api</module>
         <module>addons/impala-bridge-shim</module>
         <module>addons/impala-bridge</module>
+        <module>addons/couchbase-bridge</module>
 
         <module>distro</module>
         <module>atlas-examples</module>
@@ -1615,6 +1616,12 @@
                 <version>${project.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.atlas</groupId>
+                <artifactId>couchbase-bridge</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
             <!-- API documentation  -->
             <dependency>
                 <groupId>com.webcohesion.enunciate</groupId>


Reply via email to