This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fa9000  Add Postgresql SqlFirehose (#6813)
0fa9000 is described below

commit 0fa90008496926c15426710b0dd4698bdc224bac
Author: scrawfor <scraw...@users.noreply.github.com>
AuthorDate: Fri Feb 15 01:52:03 2019 -0500

    Add Postgresql SqlFirehose (#6813)
    
    * Add Postgresql SqlFirehose
    
    * Fix Code Style.
    
    * Fix style.
    
    * Fix Import Order.
    
    * Add Line Break before package.
---
 docs/content/ingestion/firehose.md                 | 22 +++++---
 .../PostgresqlFirehoseDatabaseConnector.java       | 58 ++++++++++++++++++++++
 .../storage/postgresql/PostgreSQLConnector.java    | 20 +++++---
 .../PostgreSQLMetadataStorageModule.java           | 12 ++++-
 4 files changed, 96 insertions(+), 16 deletions(-)

diff --git a/docs/content/ingestion/firehose.md 
b/docs/content/ingestion/firehose.md
index ff10206..faa83f3 100644
--- a/docs/content/ingestion/firehose.md
+++ b/docs/content/ingestion/firehose.md
@@ -110,7 +110,10 @@ A sample ingest firehose spec is shown below -
 #### SqlFirehose
 
 SqlFirehoseFactory can be used to ingest events residing in RDBMS. The 
database connection information is provided as part of the ingestion spec. For 
each query, the results are fetched locally and indexed. If there are multiple 
queries from which data needs to be indexed, queries are prefetched in the 
background upto `maxFetchCapacityBytes` bytes.
-An example is shown below:
+
+Requires one of the following extensions:
+ * [MySQL Metadata Store](../ingestion/mysql.html).
+ * [PostgreSQL Metadata Store](../ingestion/postgresql.html).
 
 ```json
 {
@@ -118,20 +121,19 @@ An example is shown below:
     "database": {
         "type": "mysql",
         "connectorConfig" : {
-        "connectURI" : "jdbc:mysql://host:port/schema",
-        "user" : "user",
-        "password" : "password"
+            "connectURI" : "jdbc:mysql://host:port/schema",
+            "user" : "user",
+            "password" : "password"
         }
      },
     "sqls" : ["SELECT * FROM table1", "SELECT * FROM table2"]
 }
 ```
 
-
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should be "sql".||Yes|
-|database|Specifies the database connection details.`type` should specify the 
database type and `connectorConfig` should specify the database connection 
properties via `connectURI`, `user` and `password`||Yes|
+|database|Specifies the database connection details.||Yes|
 |maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means 
disabling cache. Cached files are not removed until the ingestion task 
completes.|1073741824|No|
 |maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means 
disabling prefetch. Prefetched files are removed immediately once they are 
read.|1073741824|No|
 |prefetchTriggerBytes|Threshold to trigger prefetching SQL result 
objects.|maxFetchCapacityBytes / 2|No|
@@ -139,6 +141,14 @@ An example is shown below:
 |foldCase|Toggle case folding of database column names. This may be enabled in 
cases where the database returns case insensitive column names in query 
results.|false|No|
 |sqls|List of SQL queries where each SQL query would retrieve the data to be 
indexed.||Yes|
 
+#### Database
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|type|The type of database to query. Valid values are `mysql` and 
`postgresql`_||Yes|
+|connectorConfig|specify the database connection properties via `connectURI`, 
`user` and `password`||Yes|
+
+
 ### CombiningFirehose
 
 This firehose can be used to combine and merge data from a list of different 
firehoses.
diff --git 
a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
 
b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
new file mode 100644
index 0000000..e40d444
--- /dev/null
+++ 
b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.firehose;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.druid.metadata.MetadataStorageConnectorConfig;
+import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
+import org.skife.jdbi.v2.DBI;
+
+
+@JsonTypeName("postgresql")
+public class PostgresqlFirehoseDatabaseConnector extends 
SQLFirehoseDatabaseConnector
+{
+  private final DBI dbi;
+  private final MetadataStorageConnectorConfig connectorConfig;
+
+  public PostgresqlFirehoseDatabaseConnector(
+      @JsonProperty("connectorConfig") MetadataStorageConnectorConfig 
connectorConfig
+  )
+  {
+    this.connectorConfig = connectorConfig;
+    final BasicDataSource datasource = getDatasource(connectorConfig);
+    datasource.setDriverClassLoader(getClass().getClassLoader());
+    datasource.setDriverClassName("org.postgresql.Driver");
+    this.dbi = new DBI(datasource);
+  }
+
+  @JsonProperty
+  public MetadataStorageConnectorConfig getConnectorConfig()
+  {
+    return connectorConfig;
+  }
+
+  @Override
+  public DBI getDBI()
+  {
+    return dbi;
+  }
+}
diff --git 
a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java
 
b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java
index 52e11ad..e234a15 100644
--- 
a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java
+++ 
b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java
@@ -148,10 +148,10 @@ public class PostgreSQLConnector extends 
SQLMetadataConnector
     return !handle.createQuery(
         "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 
'public' AND tablename ILIKE :tableName"
     )
-                 .bind("tableName", tableName)
-                 .map(StringMapper.FIRST)
-                 .list()
-                 .isEmpty();
+                  .bind("tableName", tableName)
+                  .map(StringMapper.FIRST)
+                  .list()
+                  .isEmpty();
   }
 
   @Override
@@ -184,10 +184,14 @@ public class PostgreSQLConnector extends 
SQLMetadataConnector
             } else {
               handle.createStatement(
                   StringUtils.format(
-                      "BEGIN;\n" +
-                      "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
-                      "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE 
%2$s=:key RETURNING *)\n" +
-                      "    INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value 
WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
+                      "BEGIN;\n"
+                      +
+                      "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n"
+                      +
+                      "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE 
%2$s=:key RETURNING *)\n"
+                      +
+                      "    INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value 
WHERE NOT EXISTS (SELECT * FROM upsert)\n;"
+                      +
                       "COMMIT;",
                       tableName,
                       keyColumn,
diff --git 
a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java
 
b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java
index 6cbb09b..f10de65 100644
--- 
a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java
+++ 
b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java
@@ -20,9 +20,11 @@
 package org.apache.druid.metadata.storage.postgresql;
 
 import com.fasterxml.jackson.databind.Module;
-import com.google.common.collect.ImmutableList;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.inject.Binder;
 import com.google.inject.Key;
+import org.apache.druid.firehose.PostgresqlFirehoseDatabaseConnector;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.PolyBind;
@@ -35,6 +37,7 @@ import org.apache.druid.metadata.NoopMetadataStorageProvider;
 import org.apache.druid.metadata.PostgreSQLMetadataStorageActionHandlerFactory;
 import org.apache.druid.metadata.SQLMetadataConnector;
 
+import java.util.Collections;
 import java.util.List;
 
 public class PostgreSQLMetadataStorageModule extends 
SQLMetadataStorageDruidModule implements DruidModule
@@ -50,7 +53,12 @@ public class PostgreSQLMetadataStorageModule extends 
SQLMetadataStorageDruidModu
   @Override
   public List<? extends Module> getJacksonModules()
   {
-    return ImmutableList.of();
+    return Collections.singletonList(
+        new SimpleModule()
+            .registerSubtypes(
+                new NamedType(PostgresqlFirehoseDatabaseConnector.class, 
"postgresql")
+            )
+    );
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to