This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 0fa9000 Add Postgresql SqlFirehose (#6813) 0fa9000 is described below commit 0fa90008496926c15426710b0dd4698bdc224bac Author: scrawfor <scraw...@users.noreply.github.com> AuthorDate: Fri Feb 15 01:52:03 2019 -0500 Add Postgresql SqlFirehose (#6813) * Add Postgresql SqlFirehose * Fix Code Style. * Fix style. * Fix Import Order. * Add Line Break before package. --- docs/content/ingestion/firehose.md | 22 +++++--- .../PostgresqlFirehoseDatabaseConnector.java | 58 ++++++++++++++++++++++ .../storage/postgresql/PostgreSQLConnector.java | 20 +++++--- .../PostgreSQLMetadataStorageModule.java | 12 ++++- 4 files changed, 96 insertions(+), 16 deletions(-) diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index ff10206..faa83f3 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -110,7 +110,10 @@ A sample ingest firehose spec is shown below - #### SqlFirehose SqlFirehoseFactory can be used to ingest events residing in RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background upto `maxFetchCapacityBytes` bytes. -An example is shown below: + +Requires one of the following extensions: + * [MySQL Metadata Store](../ingestion/mysql.html). + * [PostgreSQL Metadata Store](../ingestion/postgresql.html). ```json { @@ -118,20 +121,19 @@ An example is shown below: "database": { "type": "mysql", "connectorConfig" : { - "connectURI" : "jdbc:mysql://host:port/schema", - "user" : "user", - "password" : "password" + "connectURI" : "jdbc:mysql://host:port/schema", + "user" : "user", + "password" : "password" } }, "sqls" : ["SELECT * FROM table1", "SELECT * FROM table2"] } ``` - |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be "sql".||Yes| -|database|Specifies the database connection details.`type` should specify the database type and `connectorConfig` should specify the database connection properties via `connectURI`, `user` and `password`||Yes| +|database|Specifies the database connection details.||Yes| |maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|No| |maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|No| |prefetchTriggerBytes|Threshold to trigger prefetching SQL result objects.|maxFetchCapacityBytes / 2|No| @@ -139,6 +141,14 @@ An example is shown below: |foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|false|No| |sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.||Yes| +#### Database + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|The type of database to query. Valid values are `mysql` and `postgresql`_||Yes| +|connectorConfig|specify the database connection properties via `connectURI`, `user` and `password`||Yes| + + ### CombiningFirehose This firehose can be used to combine and merge data from a list of different firehoses. diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java new file mode 100644 index 0000000..e40d444 --- /dev/null +++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.firehose; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.druid.metadata.MetadataStorageConnectorConfig; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.skife.jdbi.v2.DBI; + + +@JsonTypeName("postgresql") +public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector +{ + private final DBI dbi; + private final MetadataStorageConnectorConfig connectorConfig; + + public PostgresqlFirehoseDatabaseConnector( + @JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig + ) + { + this.connectorConfig = connectorConfig; + final BasicDataSource datasource = getDatasource(connectorConfig); + datasource.setDriverClassLoader(getClass().getClassLoader()); + datasource.setDriverClassName("org.postgresql.Driver"); + this.dbi = new DBI(datasource); + } + + @JsonProperty + public MetadataStorageConnectorConfig getConnectorConfig() + { + return connectorConfig; + } + + @Override + public DBI getDBI() + { + return dbi; + } +} diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java index 52e11ad..e234a15 100644 --- a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java +++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java @@ -148,10 +148,10 @@ public class PostgreSQLConnector extends SQLMetadataConnector return !handle.createQuery( "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename ILIKE :tableName" ) - .bind("tableName", tableName) - .map(StringMapper.FIRST) - .list() - .isEmpty(); + .bind("tableName", tableName) + .map(StringMapper.FIRST) + .list() + .isEmpty(); } @Override @@ -184,10 +184,14 @@ public class PostgreSQLConnector extends SQLMetadataConnector } else { handle.createStatement( StringUtils.format( - "BEGIN;\n" + - "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + - "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + - " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + + "BEGIN;\n" + + + "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + + + "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + + + " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + + "COMMIT;", tableName, keyColumn, diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java index 6cbb09b..f10de65 100644 --- a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java +++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java @@ -20,9 +20,11 @@ package org.apache.druid.metadata.storage.postgresql; import com.fasterxml.jackson.databind.Module; -import com.google.common.collect.ImmutableList; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Key; +import org.apache.druid.firehose.PostgresqlFirehoseDatabaseConnector; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.PolyBind; @@ -35,6 +37,7 @@ import org.apache.druid.metadata.NoopMetadataStorageProvider; import org.apache.druid.metadata.PostgreSQLMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.SQLMetadataConnector; +import java.util.Collections; import java.util.List; public class PostgreSQLMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule @@ -50,7 +53,12 @@ public class PostgreSQLMetadataStorageModule extends SQLMetadataStorageDruidModu @Override public List<? extends Module> getJacksonModules() { - return ImmutableList.of(); + return Collections.singletonList( + new SimpleModule() + .registerSubtypes( + new NamedType(PostgresqlFirehoseDatabaseConnector.class, "postgresql") + ) + ); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org