[GitHub] incubator-hawq pull request #1390: HAWQ-1650. Fix compilation issue in Java ...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-hawq/pull/1390 ---
[GitHub] incubator-hawq pull request #1390: HAWQ-1650. Fix compilation issue in Java ...
GitHub user frankgh opened a pull request: https://github.com/apache/incubator-hawq/pull/1390 HAWQ-1650. Fix compilation issue in Java 7 Fixes compilation issue in Java 7 for SecureHDFSTest You can merge this pull request into a Git repository by running: $ git pull https://github.com/frankgh/incubator-hawq HAWQ-1650 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1390.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1390 commit 9f33d8dd0c97d06a5e2fcb555b73fd83281c1494 Author: Francisco Guerrero Date: 2018-08-13T21:43:06Z HAWQ-1650. Fix compilation issue in Java 7 ---
[GitHub] incubator-hawq pull request #1389: Run gradle build in parallel
Github user asfgit closed the pull request at: https://github.com/apache/incubator-hawq/pull/1389 ---
[GitHub] incubator-hawq pull request #1389: Run gradle build in parallel
GitHub user frankgh opened a pull request: https://github.com/apache/incubator-hawq/pull/1389 Run gradle build in parallel Improve PXF compilation time by running gradle in parallel and using gradle daemon You can merge this pull request into a Git repository by running: $ git pull https://github.com/frankgh/incubator-hawq HAWQ-1649 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1389.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1389 commit 2c4af56f407fb65eb7bdc7b690984907c0d2c36b Author: Francisco Guerrero Date: 2018-08-13T18:45:14Z Add parallel true and daemon to gradle properties ---
[GitHub] incubator-hawq pull request #1388: Fix doc and makefile issue for hawq docke...
GitHub user ginobiliwang opened a pull request: https://github.com/apache/incubator-hawq/pull/1388 Fix doc and makefile issue for hawq docker make pull command will fail without remote docker registry, so add local docker registry operations. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ginobiliwang/incubator-hawq fix-doc-and-makefile-issue-for-hawq-docker Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1388.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1388 commit 6ed93ebf1abf877547376d895966cf9066c21d03 Author: Fenggang Date: 2018-08-13T07:00:50Z fix doc and makefile issue for hawq-docker commit fb6e78ae3f83fac6bd2ff8b81be25856d8e791ec Author: Fenggang Date: 2018-08-13T07:10:17Z fix doc and makefile issue for hawq-docker v0.2 ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user stanlyxiang commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r209506292 --- Diff: contrib/exthdfs/exthdfs.c --- @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "postgres.h" + +#include "common.h" +#include "access/extprotocol.h" +#include "cdb/cdbdatalocality.h" +#include "storage/fd.h" +#include "storage/filesystem.h" +#include "utils/uri.h" + + + + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(hdfsprotocol_blocklocation); +PG_FUNCTION_INFO_V1(hdfsprotocol_validate); + +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS); +Datum hdfsprotocol_validate(PG_FUNCTION_ARGS); + +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS) +{ + + // Build the result instance + int nsize = 0; + int numOfBlock = 0; + ExtProtocolBlockLocationData *bldata = + palloc0(sizeof(ExtProtocolBlockLocationData)); + if (bldata == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " +"cannot allocate due to no memory"); + } + bldata->type = T_ExtProtocolBlockLocationData; + fcinfo->resultinfo = bldata; + + ExtProtocolValidatorData *pvalidator_data = (ExtProtocolValidatorData *) + (fcinfo->context); + + +// Parse URI of the first location, we expect all locations uses the same +// name node server. This is checked in validation function. + + char *first_uri_str = (char *)strVal(lfirst(list_head(pvalidator_data->url_list))); + Uri *uri = ParseExternalTableUri(first_uri_str); + + elog(DEBUG3, "hdfsprotocol_blocklocation : " +"extracted HDFS name node address %s:%d", +uri->hostname, uri->port); + + // Create file system instance + hdfsFS fs = hdfsConnect(uri->hostname, uri->port); + if (fs == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " + "failed to create HDFS instance to connect to %s:%d", + uri->hostname, uri->port); + } + + // Clean up uri instance as we don't need it any longer + pfree(uri); --- End diff -- please use "FreeExternalTableUri" to all Uri ---
[GitHub] incubator-hawq pull request #1385: HAWQ-1645. Remove code generation from PX...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-hawq/pull/1385 ---
[GitHub] incubator-hawq pull request #1383: HAWQ-1644. Make delegation token optional...
Github user divyabhargov commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1383#discussion_r208903549 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/SecuredHDFS.java --- @@ -45,28 +45,33 @@ * * All token properties will be deserialized from string to a Token object * - * @param protData input parameters + * @param userGroupInformation the UGI for token verification + * @param tokenString (optional) the delegation token * @param context servlet context which contains the NN address * * @throws SecurityException Thrown when authentication fails */ -public static void verifyToken(ProtocolData protData, ServletContext context) { +public static void verifyToken(UserGroupInformation userGroupInformation, + String tokenString, + ServletContext context) { try { if (UserGroupInformation.isSecurityEnabled()) { /* - * HAWQ-1215: The verify token method validates that the token sent from + * The verify token method validates that the token sent from --- End diff -- Was removing the HAWQ reference story from the comment accidental? ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208448574 --- Diff: src/backend/commands/analyze.c --- @@ -3266,3 +3298,380 @@ static void gp_statistics_estimate_reltuples_relpages_parquet(Relation rel, floa pfree(fstotal); return; } + +/** + * This method estimates the number of tuples and pages in an extern relation. We can not get accurate tuple counts + * and pages counts in the catalog. Therefore, we have to get reltuples and relpages manually. + * + * Input: + * rel - Relation. Must be an external table. + * + * Output: + * reltuples - exact number of tuples in relation. + * relpages - exact number of pages. + */ +static void gp_statistics_estimate_reltuples_relpages_external(Relation rel, float4 *relTuples, float4 *relPages){ + Oid extRelationOid = RelationGetRelid(rel); + getExternalRelTuples(extRelationOid, relTuples); + getExternalRelPages(extRelationOid, relPages, rel); +} + +/** + * This method called by analyzeExternalEstimateReltuplesRelpages, + * to get External Relation reltuple counts, we run count(*) sql manually + * + * Input: + * extRelationOid - External Table Relation Oid + * Output: + * relTuples - exact number of tuples in relation. + */ +static void getExternalRelTuples(Oid extRelationOid, float4 *relTuples){ + const char *schemaName = NULL; + const char *tableName = NULL; + schemaName = get_namespace_name(get_rel_namespace(extRelationOid)); /* must be pfreed */ + tableName = get_rel_name(extRelationOid); /* must be pfreed */ + + StringInfoData str; + initStringInfo(); + appendStringInfo(, "select count(*)::float4 from %s.%s as Ta", + quote_identifier(schemaName), + quote_identifier(tableName)); + + spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */, + spiCallback_getSingleResultRowColumnAsFloat4, relTuples); + pfree((void *) tableName); + pfree((void *) schemaName); + pfree(str.data); +} + +/** + * This method called by analyzeExternalEstimateReltuplesRelpages,to get External Relation relpages counts. + * We call GetExtTableEntry method to get get List of external Table Locations.And then we go through every + * location url to sum the count of relpages.External Relation now support some different protocals, therefore + * we need to process them in different way. + * + * Input: + * extRelationOid - External Table Relation Oid + * Output: + * relTuples - exact number of pages in relation. + */ +static void getExternalRelPages(Oid extRelationOid, float4 *relPages , Relation rel){ + + ExtTableEntry* entry = GetExtTableEntry(extRelationOid); + List* extLocations = entry->locations; + int num_urls = list_length(extLocations); + ListCell *cell = list_head(extLocations); + ListCell *cellTmp = NULL; + while(cell != NULL) + { + char *url = pstrdup(((Value*)lfirst(cell))->val.str); + Assert(url != NULL); + Uri *uri = ParseExternalTableUri(url); + Assert(uri != NULL); + switch (uri->protocol){ + case URI_HDFS: + *relPages += getExtrelPagesHDFS(uri); + break; + + /* +* to be done +*/ + case URI_GPFDIST: +*relPages = 1.0; +elog(NOTICE,"In external table ANALYZE command are not supported in GPFDIST location so far."); +break; +case URI_FILE: +*relPages = 1.0; +elog(NOTICE,"In external table ANALYZE command are not supported in FILE location so far."); +break; + case URI_FTP: + *relPages = 1.0; + elog(NOTICE,"In external table ANALYZE command are not supported in FTP location so far."); +break; + case URI_HTTP: + *relPages = 1.0; + elog(NOTICE,"In external table ANALYZE command are not supported in HTTP location so far."); +break; + case URI_CUSTOM: + *relPages = 1.0; + elog(NOTICE,"In external table ANALYZE command are not supported in CUSTOM location so far."); +break; + case URI_GPFDISTS: +
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208439468 --- Diff: src/backend/commands/analyze.c --- @@ -3266,3 +3298,380 @@ static void gp_statistics_estimate_reltuples_relpages_parquet(Relation rel, floa pfree(fstotal); return; } + +/** + * This method estimates the number of tuples and pages in an extern relation. We can not get accurate tuple counts + * and pages counts in the catalog. Therefore, we have to get reltuples and relpages manually. + * + * Input: + * rel - Relation. Must be an external table. + * + * Output: + * reltuples - exact number of tuples in relation. + * relpages - exact number of pages. + */ +static void gp_statistics_estimate_reltuples_relpages_external(Relation rel, float4 *relTuples, float4 *relPages){ + Oid extRelationOid = RelationGetRelid(rel); + getExternalRelTuples(extRelationOid, relTuples); + getExternalRelPages(extRelationOid, relPages, rel); +} + +/** + * This method called by analyzeExternalEstimateReltuplesRelpages, + * to get External Relation reltuple counts, we run count(*) sql manually + * + * Input: + * extRelationOid - External Table Relation Oid + * Output: + * relTuples - exact number of tuples in relation. + */ +static void getExternalRelTuples(Oid extRelationOid, float4 *relTuples){ + const char *schemaName = NULL; + const char *tableName = NULL; + schemaName = get_namespace_name(get_rel_namespace(extRelationOid)); /* must be pfreed */ + tableName = get_rel_name(extRelationOid); /* must be pfreed */ + + StringInfoData str; + initStringInfo(); + appendStringInfo(, "select count(*)::float4 from %s.%s as Ta", + quote_identifier(schemaName), + quote_identifier(tableName)); + + spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */, + spiCallback_getSingleResultRowColumnAsFloat4, relTuples); + pfree((void *) tableName); --- End diff -- no need (void *), and does it need to be free? ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208437399 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -193,6 +212,9 @@ typedef struct Relation_Data { List *files; Oid partition_parent_relid; int64 block_count; + int port; --- End diff -- port and hostname is bindded with hivepath for hive protocol, remove them all. ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208449241 --- Diff: src/backend/commands/tablecmds.c --- @@ -1042,7 +1277,7 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) * permissions in pg_auth for creating this table. */ - bool isnull; + bool isnull; Oid userid = GetUserId(); --- End diff -- incorrect indent ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208449610 --- Diff: src/backend/commands/tablecmds.c --- @@ -17561,29 +17887,67 @@ static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool i if (first_uri && uri->protocol == URI_CUSTOM) { Oid procOid = InvalidOid; - procOid = LookupExtProtocolFunction(uri->customprotocol, EXTPTC_FUNC_VALIDATOR, false); if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH) InvokeProtocolValidation(procOid, uri->customprotocol, - iswritable, + iswritable, forceCreateDir, locs, fmtopts); + first_customprotocal = uri->customprotocol; } + if (first_uri && uri->protocol == URI_HDFS) + { + Oid procOid = InvalidOid; + + procOid = LookupCustomProtocolValidatorFunc("hdfs"); + if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH) + { + InvokeProtocolValidation(procOid, + uri->customprotocol, + iswritable, forceCreateDir, + locs, fmtopts); + } + } + + if (first_uri && uri->protocol == URI_HIVE) + { + Oid procOid = InvalidOid; + procOid = LookupCustomProtocolValidatorFunc("hive"); + if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH) + { + InvokeProtocolValidation(procOid, + uri->customprotocol, + iswritable, forceCreateDir, + locs, fmtopts); + } + } + + if (first_uri && uri->protocol == URI_MAGMA) --- End diff -- remove magma ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208436217 --- Diff: src/backend/access/external/fileam.c --- @@ -1381,9 +1603,11 @@ lookupCustomFormatter(char *formatter_name, bool iswritable) ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("formatter function %s is not declared STABLE.", -formatter_name), +new_formatter_name), errOmitLocation(true))); + pfree(new_formatter_name); --- End diff -- add if (new_formatter_name != NULL) ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435844 --- Diff: src/backend/access/external/fileam.c --- @@ -1290,6 +1397,97 @@ externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss) return NULL; } +static HeapTuple +externalgettup_custom_noextprot(FileScanDesc scan, + ExternalSelectDesc desc, + ScanState *ss) +{ + HeapTuple tuple; + CopyState pstate = scan->fs_pstate; + FormatterData* formatter = scan->fs_formatter; + boolno_more_data = false; + MemoryContext oldctxt = CurrentMemoryContext; + + Assert(formatter); + + /* while didn't finish processing the entire file */ + while (!no_more_data) + { + bool error_caught = false; + + /* +* Invoke the custom formatter function. +*/ + PG_TRY(); + { + Datum d; + FunctionCallInfoDatafcinfo; --- End diff -- incorrect indent ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208451302 --- Diff: src/backend/optimizer/plan/createplan.c --- @@ -1127,6 +1127,20 @@ bool is_pxf_protocol(Uri *uri) return false; } +bool is_hdfs_protocol(Uri *uri) +{ + return uri->protocol == URI_HDFS; +} + +bool is_magma_protocol(Uri *uri) +{ + return uri->protocol == URI_MAGMA; --- End diff -- remove magma and hive ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208451726 --- Diff: src/include/utils/uri.h --- @@ -32,7 +32,11 @@ typedef enum UriProtocol URI_HTTP, URI_GPFDIST, URI_CUSTOM, - URI_GPFDISTS + URI_GPFDISTS, + URI_HDFS, + URI_HBASE, + URI_MAGMA, --- End diff -- remove ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208449174 --- Diff: src/backend/commands/tablecmds.c --- @@ -939,38 +983,217 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) char* commandString = NULL; char rejectlimittype = '\0'; char formattype; + char* formattername = NULL; int rejectlimit = -1; int encoding = -1; int preferred_segment_num = -1; bool issreh = false; /* is single row error handling requested? */ + bool isexternal = createExtStmt->isexternal; bool iswritable = createExtStmt->iswritable; bool isweb = createExtStmt->isweb; + bool forceCreateDir = createExtStmt->forceCreateDir; + + bool isExternalHdfs = false; + bool isExternalMagma = false; + bool isExternalHive = false; + char* location = NULL; + int location_len = NULL; /* * now set the parameters for keys/inheritance etc. Most of these are * uninteresting for external relations... */ - createStmt->relation = createExtStmt->relation; - createStmt->tableElts = createExtStmt->tableElts; - createStmt->inhRelations = NIL; - createStmt->constraints = NIL; - createStmt->options = NIL; - createStmt->oncommit = ONCOMMIT_NOOP; - createStmt->tablespacename = NULL; + createStmt->base = createExtStmt->base; + // external table options is not compatible with internal table + // set NIL here + createStmt->base.options = NIL; createStmt->policy = createExtStmt->policy; /* policy was set in transform */ - + +/* +* Recognize formatter option if there are some tokens found in parser. +* This design is to give CREATE EXTERNAL TABLE DDL the flexiblity to +* support user defined external formatter options. +*/ + recognizeExternalRelationFormatterOptions(createExtStmt); + + /* +* Get tablespace, database, schema for the relation +*/ + RangeVar *rel = createExtStmt->base.relation; + // get tablespace name for the relation + Oid tablespace_id = (gp_upgrade_mode) ? DEFAULTTABLESPACE_OID : GetDefaultTablespace(); + if (!OidIsValid(tablespace_id)) + { + tablespace_id = get_database_dts(MyDatabaseId); + } + char *tablespace_name = get_tablespace_name(tablespace_id); + + // get database name for the relation + char *database_name = rel->catalogname ? rel->catalogname : get_database_name(MyDatabaseId); + + // get schema name for the relation + char *schema_name = get_namespace_name(RangeVarGetCreationNamespace(rel)); + + // get table name for the relation + char *table_name = rel->relname; + + /* +* Do some special logic when we use custom +*/ + if (exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION) + { + if (exttypeDesc->location_list == NIL) + { + if (dfs_url == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), +errmsg("Cannot create table on HDFS when the service is not available"), +errhint("Check HDFS service and hawq_dfs_url configuration"), +errOmitLocation(true))); + } + + location_len = strlen(PROTOCOL_HDFS) + /* hdfs:// */ + strlen(dfs_url) + /* hawq_dfs_url */ + // 1 + strlen(filespace_name) + /* '/' + filespace name */ + 1 + strlen(tablespace_name) + /* '/' + tablespace name */ + 1 + strlen(database_name) + /* '/' + database name */ + 1 + strlen(schema_name) + /* '/' + schema name */ + 1 + strlen(table_name) + 1; /* '/' + table name + '\0' */ + + char *path; + +
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208451740 --- Diff: src/include/utils/uri.h --- @@ -41,6 +45,10 @@ typedef enum UriProtocol #define PROTOCOL_GPFDIST "gpfdist://" #define PROTOCOL_GPFDISTS "gpfdists://" #define PROTOCOL_PXF "pxf://" +#define PROTOCOL_HDFS "hdfs://" +#define PROTOCOL_HBASE "hbase://" +#define PROTOCOL_MAGMA "magma://" --- End diff -- remove ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208451756 --- Diff: src/include/utils/uri.h --- @@ -52,6 +60,10 @@ typedef enum UriProtocol #define IS_GPFDISTS_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_GPFDISTS, strlen(PROTOCOL_GPFDISTS)) == 0) #define IS_FTP_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_FTP, strlen(PROTOCOL_FTP)) == 0) #define IS_PXF_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_PXF, strlen(PROTOCOL_PXF)) == 0) +#define IS_HDFS_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_HDFS, strlen(PROTOCOL_HDFS)) == 0) +#define IS_HBASE_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_HBASE, strlen(PROTOCOL_HBASE)) == 0) +#define IS_MAGMA_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_MAGMA, strlen(PROTOCOL_MAGMA)) == 0) --- End diff -- remove ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208437762 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -1579,7 +1700,242 @@ static void ParquetGetSegFileDataLocation(Relation relation, return; } +static void InvokeHDFSProtocolBlockLocation(OidprocOid, +List *locs, +List **blockLocations) +{ + ExtProtocolValidatorData *validator_data; + FmgrInfo *validator_udf; + FunctionCallInfoDatafcinfo; + + validator_data = (ExtProtocolValidatorData *) +palloc0 (sizeof(ExtProtocolValidatorData)); + validator_udf = palloc(sizeof(FmgrInfo)); + fmgr_info(procOid, validator_udf); + + validator_data->type= T_ExtProtocolValidatorData; + validator_data->url_list= locs; + validator_data->format_opts = NULL; + validator_data->errmsg = NULL; + validator_data->direction = EXT_VALIDATE_READ; + validator_data->action = EXT_VALID_ACT_GETBLKLOC; + + InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, +/* FmgrInfo */ validator_udf, +/* nArgs */ 0, +/* Call Context */ (Node *) validator_data, +/* ResultSetInfo */ NULL); + + /* invoke validator. if this function returns - validation passed */ + FunctionCallInvoke(); + + ExtProtocolBlockLocationData *bls = + (ExtProtocolBlockLocationData *)(fcinfo.resultinfo); + /* debug output block location. */ + if (bls != NULL) + { + ListCell *c; + foreach(c, bls->files) + { + blocklocation_file *blf = (blocklocation_file *)(lfirst(c)); + elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks", +blf->file_uri, blf->block_num); + for ( int i = 0 ; i < blf->block_num ; ++i ) + { + BlockLocation *pbl = &(blf->locations[i]); + elog(DEBUG3, "DEBUG LOCATION for block %d : %d, " +INT64_FORMAT ", " INT64_FORMAT ", %d", +i, +pbl->corrupt, pbl->length, pbl->offset, +pbl->numOfNodes); + for ( int j = 0 ; j < pbl->numOfNodes ; ++j ) + { + elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s", +i, +pbl->hosts[j], pbl->names[j], + pbl->topologyPaths[j]); + } + } + } + } + elog(DEBUG3, "after invoking get block location API"); + + /* get location data from fcinfo.resultinfo. */ + if (bls != NULL) + { + Assert(bls->type == T_ExtProtocolBlockLocationData); + while(list_length(bls->files) > 0) + { + void *v = lfirst(list_head(bls->files)); + bls->files = list_delete_first(bls->files); + *blockLocations = lappend(*blockLocations, v); + } + } + pfree(validator_data); + pfree(validator_udf); +} + +Oid +LookupCustomProtocolBlockLocationFunc(char *protoname) +{ + List* funcname= NIL; --- End diff -- incorrect indent ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208449650 --- Diff: src/backend/commands/tablecmds.c --- @@ -17652,7 +18018,47 @@ static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool i result = (Datum) 0; return result; +} + +static Oid +LookupCustomProtocolValidatorFunc(char *protoname) +{ + List* funcname= NIL; --- End diff -- indent ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208437472 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -844,36 +918,17 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte /* * We only consider the data stored in HDFS. */ - if (RelationIsAoRows(rel) || RelationIsParquet(rel)) { - Relation_Data *rel_data = NULL; - /* -* Get pg_appendonly information for this table. -*/ - AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow); - - rel_data = (Relation_Data *) palloc(sizeof(Relation_Data)); + bool isDataStoredInHdfs = dataStoredInHdfs(rel); + if (isDataStoredInHdfs ) { + GpPolicy *targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid); + Relation_Data *rel_data = (Relation_Data *) palloc(sizeof(Relation_Data)); rel_data->relid = rel_oid; rel_data->files = NIL; rel_data->partition_parent_relid = 0; rel_data->block_count = 0; - - GpPolicy *targetPolicy = NULL; - targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid); - /* -* Based on the pg_appendonly information, calculate the data -* location information associated with this relation. -*/ - if (RelationIsAoRows(rel)) { - rel_data->type = DATALOCALITY_APPENDONLY; - AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context, - aoEntry->splitsize, rel_data, , - , targetPolicy); - } else { - rel_data->type = DATALOCALITY_PARQUET; - ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context, - context->split_size, rel_data, , - , targetPolicy); - } + rel_data->port = 0; --- End diff -- NO "port and hostname" ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208451046 --- Diff: src/backend/nodes/outfuncs.c --- @@ -2170,16 +2170,27 @@ _outCreateExternalStmt(StringInfo str, CreateExternalStmt *node) { WRITE_NODE_TYPE("CREATEEXTERNALSTMT"); - WRITE_NODE_FIELD(relation); - WRITE_NODE_FIELD(tableElts); + WRITE_CHAR_FIELD(base.relKind); + WRITE_NODE_FIELD(base.relation); + WRITE_NODE_FIELD(base.tableElts); + WRITE_NODE_FIELD(base.inhRelations); + WRITE_NODE_FIELD(base.constraints); + WRITE_NODE_FIELD(base.options); + WRITE_ENUM_FIELD(base.oncommit, OnCommitAction); + WRITE_STRING_FIELD(base.tablespacename); + WRITE_NODE_FIELD(base.distributedBy); + WRITE_BOOL_FIELD(base.is_part_child); + WRITE_BOOL_FIELD(base.is_add_part); + WRITE_NODE_FIELD(base.partitionBy); --- End diff -- check the struct with master branch ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208436246 --- Diff: src/backend/access/external/fileam.c --- @@ -1664,7 +1888,9 @@ FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo, Relation rel, TupleDesc tupDesc, FmgrInfo *convFuncs, -Oid *typioparams) +Oid *typioparams, --- End diff -- incorrect indent ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208438223 --- Diff: src/backend/commands/analyze.c --- @@ -989,17 +1013,20 @@ static List* analyzableRelations(bool rootonly, List **fullRelOids) while (HeapTupleIsValid(tuple = caql_getnext(pcqCtx))) { Oid candidateOid = HeapTupleGetOid(tuple); - if (analyzePermitted(candidateOid) - && candidateOid != StatisticRelationId) + bool isExternalHDFSORMAGMA = isExternalHDFSORMAGMAProtocol(candidateOid); + if (analyzePermitted(candidateOid) && + candidateOid != StatisticRelationId && + isExternalHDFSORMAGMA) --- End diff -- remove magma ---
[GitHub] incubator-hawq pull request #1387: HAWQ-1647. Update HAWQ version from 2.3.0...
GitHub user radarwave opened a pull request: https://github.com/apache/incubator-hawq/pull/1387 HAWQ-1647. Update HAWQ version from 2.3.0.0 to 2.4.0.0 You can merge this pull request into a Git repository by running: $ git pull https://github.com/radarwave/incubator-hawq HAWQ-1647 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1387.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1387 commit 9800d3b8bd5806c759b7be62fc618d1ceaa99bec Author: rlei Date: 2018-08-07T09:43:23Z HAWQ-1647. Update HAWQ version from 2.3.0.0 to 2.4.0.0 ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208438791 --- Diff: src/include/catalog/pg_exttable.h --- @@ -164,9 +164,12 @@ GetExtTableEntry(Oid relid); extern void RemoveExtTableEntry(Oid relid); -#define CustomFormatType 'b' -#define TextFormatType 't' -#define CsvFormatType 'c' +#define CustomFormatType'b' +#define TextFormatType 't' --- End diff -- There should be no hard code format type in pluggable storage framwork ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208438657 --- Diff: src/include/utils/uri.h --- @@ -52,6 +60,10 @@ typedef enum UriProtocol #define IS_GPFDISTS_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_GPFDISTS, strlen(PROTOCOL_GPFDISTS)) == 0) #define IS_FTP_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_FTP, strlen(PROTOCOL_FTP)) == 0) #define IS_PXF_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_PXF, strlen(PROTOCOL_PXF)) == 0) +#define IS_HDFS_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_HDFS, strlen(PROTOCOL_HDFS)) == 0) +#define IS_HBASE_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_HBASE, strlen(PROTOCOL_HBASE)) == 0) --- End diff -- No HBase, Hive, and Magma ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208438576 --- Diff: src/include/utils/uri.h --- @@ -41,6 +45,10 @@ typedef enum UriProtocol #define PROTOCOL_GPFDIST "gpfdist://" #define PROTOCOL_GPFDISTS "gpfdists://" #define PROTOCOL_PXF "pxf://" +#define PROTOCOL_HDFS "hdfs://" +#define PROTOCOL_HBASE "hbase://" --- End diff -- No Hive and Magma ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208438342 --- Diff: src/include/utils/uri.h --- @@ -32,7 +32,11 @@ typedef enum UriProtocol URI_HTTP, URI_GPFDIST, URI_CUSTOM, - URI_GPFDISTS + URI_GPFDISTS, + URI_HDFS, --- End diff -- No Hive and Magma ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208438291 --- Diff: src/include/nodes/parsenodes.h --- @@ -1459,8 +1463,10 @@ typedef struct SharedStorageOpStmt */ typedef enum ExtTableType { - EXTTBL_TYPE_LOCATION, /* table defined with LOCATION clause */ - EXTTBL_TYPE_EXECUTE /* table defined with EXECUTE clause */ +EXTTBL_TYPE_LOCATION, /* table defined with LOCATION clause */ +EXTTBL_TYPE_EXECUTE, /* table defined with EXECUTE clause */ +EXTTBL_TYPE_MAGMA,/* table defined with MAGMA */ --- End diff -- No magma ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208438235 --- Diff: src/include/access/xact.h --- @@ -173,6 +173,33 @@ typedef struct XidBuffer extern XidBuffer subxbuf; extern File subxip_file; +/* --- End diff -- No transaction for pluggable storage framework for now ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208438165 --- Diff: src/include/access/formatter.h --- @@ -36,10 +36,19 @@ typedef enum FmtNotification { FMT_NONE, + FMT_DONE, FMT_NEED_MORE_DATA } FmtNotification; +typedef enum FmtActionMask --- End diff -- The pluggable storage framework do not use this structure, remove it ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208437581 --- Diff: src/backend/optimizer/plan/createplan.c --- @@ -1127,6 +1127,20 @@ bool is_pxf_protocol(Uri *uri) return false; } +bool is_hdfs_protocol(Uri *uri) +{ + return uri->protocol == URI_HDFS; +} + +bool is_magma_protocol(Uri *uri) --- End diff -- No magma and hive protocol ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208437403 --- Diff: src/backend/commands/tablecmds.c --- @@ -18041,11 +18646,11 @@ static Datum transformFormatOpts(char formattype, List *formatOpts, int numcols, 1 + /* space*/ 4 + 1 + 1 + strlen(null_print) + 1 + /* "null 'str'" */ 1 + /* space*/ - 6 + 1 + 1 + strlen(escape) + 1; /* "escape 'c' or 'off' */ + 6 + 1 + 1 + strlen(escape) + 1; /* "escape 'c' or 'off' */ --- End diff -- Do not format the code if you don't change it logic ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208437352 --- Diff: src/backend/commands/tablecmds.c --- @@ -17845,14 +18448,16 @@ static Datum transformFormatOpts(char formattype, List *formatOpts, int numcols, Assert(fmttype_is_custom(formattype) || fmttype_is_text(formattype) || - fmttype_is_csv(formattype)); + fmttype_is_csv(formattype) || --- End diff -- The check of format options should not be hard coded. ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208436378 --- Diff: src/backend/commands/tablecmds.c --- @@ -17615,14 +17979,16 @@ static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool i errhint("Writable external tables may use \'gpfdist(s)\' URIs only."), errOmitLocation(true))); - if(uri->protocol != URI_CUSTOM && iswritable && strchr(uri->path, '*')) + if(uri->protocol != URI_CUSTOM && uri->protocol != URI_MAGMA && iswritable && strchr(uri->path, '*')) --- End diff -- No magma ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208436303 --- Diff: src/backend/commands/tablecmds.c --- @@ -17528,6 +17845,15 @@ static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool i errOmitLocation(true))); } + // Oushu will not support pxf anymore --- End diff -- Apache HAWQ still have pxf available, remove this ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208436195 --- Diff: src/backend/commands/tablecmds.c --- @@ -1222,20 +1529,24 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) elog(ERROR, "unrecognized node type: %d", nodeTag(dencoding->arg)); } + if (!dencoding && isExternalMagma) --- End diff -- No magma ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208436174 --- Diff: src/backend/commands/tablecmds.c --- @@ -1210,6 +1507,16 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) else if (IsA(dencoding->arg, String)) { encoding_name = strVal(dencoding->arg); + + /* custom format */ + if (!fmttype_is_text(formattype) && !fmttype_is_csv(formattype) && --- End diff -- There should be no hard coded format name in pluggable storage framework ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208436164 --- Diff: src/backend/commands/tablecmds.c --- @@ -1199,6 +1489,13 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) { encoding = intVal(dencoding->arg); encoding_name = pg_encoding_to_char(encoding); + + /* custom format */ + if (!fmttype_is_text(formattype) && !fmttype_is_csv(formattype)) --- End diff -- There should be no hard coded format name in pluggable storage framework ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435854 --- Diff: src/backend/commands/tablecmds.c --- @@ -1174,6 +1426,44 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) fmtErrTblOid = InvalidOid; /* no err tbl was requested */ } + /* +* Parse and validate FORMAT clause. +* +* We force formatter as 'custom' if it is external hdfs protocol +*/ + formattype = (isExternalHdfs || isExternalMagma || isExternalHive) ? +'b' : transformFormatType(createExtStmt->format); + + if ((formattype == 'b') && --- End diff -- There should be no hard coded format name in pluggable storage framework ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435805 --- Diff: src/backend/access/external/fileam.c --- @@ -1290,6 +1397,97 @@ externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss) return NULL; } +static HeapTuple +externalgettup_custom_noextprot(FileScanDesc scan, + ExternalSelectDesc desc, + ScanState *ss) +{ + HeapTuple tuple; + CopyState pstate = scan->fs_pstate; --- End diff -- incorrect indent ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435735 --- Diff: src/backend/access/external/fileam.c --- @@ -782,6 +864,28 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot) void external_insert_finish(ExternalInsertDesc extInsertDesc) { + /* Tell formatter to close */ + if (extInsertDesc->ext_formatter_data != NULL && + (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF) == 0) + { + Datum d; + FunctionCallInfoData fcinfo; + + extInsertDesc->ext_formatter_data->fmt_mask |= FMT_WRITE_END; + + /* per call formatter prep */ + FunctionCallPrepareFormatter(, --- End diff -- incorrect indent ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435758 --- Diff: src/backend/commands/tablecmds.c --- @@ -1105,53 +1340,70 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) char* protname = uri->customprotocol; Oid ptcId = LookupExtProtocolOid(protname, false); AclResult aclresult; - + /* Check we have the right permissions on this protocol */ if (!pg_extprotocol_ownercheck(ptcId, ownerId)) - { + { AclMode mode = (iswritable ? ACL_INSERT : ACL_SELECT); - + aclresult = pg_extprotocol_aclcheck(ptcId, ownerId, mode); - + if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_EXTPROTOCOL, protname); } } + /* magma follow the same ack check as HDFS */ + else if (uri->protocol == URI_HDFS || uri->protocol == URI_MAGMA) --- End diff -- No magma ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435710 --- Diff: src/backend/commands/tablecmds.c --- @@ -939,38 +983,217 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) char* commandString = NULL; char rejectlimittype = '\0'; char formattype; + char* formattername = NULL; int rejectlimit = -1; int encoding = -1; int preferred_segment_num = -1; bool issreh = false; /* is single row error handling requested? */ + bool isexternal = createExtStmt->isexternal; bool iswritable = createExtStmt->iswritable; bool isweb = createExtStmt->isweb; + bool forceCreateDir = createExtStmt->forceCreateDir; + + bool isExternalHdfs = false; + bool isExternalMagma = false; + bool isExternalHive = false; + char* location = NULL; + int location_len = NULL; /* * now set the parameters for keys/inheritance etc. Most of these are * uninteresting for external relations... */ - createStmt->relation = createExtStmt->relation; - createStmt->tableElts = createExtStmt->tableElts; - createStmt->inhRelations = NIL; - createStmt->constraints = NIL; - createStmt->options = NIL; - createStmt->oncommit = ONCOMMIT_NOOP; - createStmt->tablespacename = NULL; + createStmt->base = createExtStmt->base; + // external table options is not compatible with internal table + // set NIL here + createStmt->base.options = NIL; createStmt->policy = createExtStmt->policy; /* policy was set in transform */ - + +/* +* Recognize formatter option if there are some tokens found in parser. +* This design is to give CREATE EXTERNAL TABLE DDL the flexiblity to +* support user defined external formatter options. +*/ + recognizeExternalRelationFormatterOptions(createExtStmt); + + /* +* Get tablespace, database, schema for the relation +*/ + RangeVar *rel = createExtStmt->base.relation; + // get tablespace name for the relation + Oid tablespace_id = (gp_upgrade_mode) ? DEFAULTTABLESPACE_OID : GetDefaultTablespace(); + if (!OidIsValid(tablespace_id)) + { + tablespace_id = get_database_dts(MyDatabaseId); + } + char *tablespace_name = get_tablespace_name(tablespace_id); + + // get database name for the relation + char *database_name = rel->catalogname ? rel->catalogname : get_database_name(MyDatabaseId); + + // get schema name for the relation + char *schema_name = get_namespace_name(RangeVarGetCreationNamespace(rel)); + + // get table name for the relation + char *table_name = rel->relname; + + /* +* Do some special logic when we use custom +*/ + if (exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION) + { + if (exttypeDesc->location_list == NIL) + { + if (dfs_url == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), +errmsg("Cannot create table on HDFS when the service is not available"), +errhint("Check HDFS service and hawq_dfs_url configuration"), +errOmitLocation(true))); + } + + location_len = strlen(PROTOCOL_HDFS) + /* hdfs:// */ + strlen(dfs_url) + /* hawq_dfs_url */ + // 1 + strlen(filespace_name) + /* '/' + filespace name */ + 1 + strlen(tablespace_name) + /* '/' + tablespace name */ + 1 + strlen(database_name) + /* '/' + database name */ + 1 + strlen(schema_name) + /* '/' + schema name */ + 1 + strlen(table_name) + 1; /* '/' + table name + '\0' */ + + char *path; + +
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435664 --- Diff: src/backend/commands/tablecmds.c --- @@ -939,38 +983,217 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) char* commandString = NULL; char rejectlimittype = '\0'; char formattype; + char* formattername = NULL; int rejectlimit = -1; int encoding = -1; int preferred_segment_num = -1; bool issreh = false; /* is single row error handling requested? */ + bool isexternal = createExtStmt->isexternal; bool iswritable = createExtStmt->iswritable; bool isweb = createExtStmt->isweb; + bool forceCreateDir = createExtStmt->forceCreateDir; + + bool isExternalHdfs = false; + bool isExternalMagma = false; + bool isExternalHive = false; + char* location = NULL; + int location_len = NULL; /* * now set the parameters for keys/inheritance etc. Most of these are * uninteresting for external relations... */ - createStmt->relation = createExtStmt->relation; - createStmt->tableElts = createExtStmt->tableElts; - createStmt->inhRelations = NIL; - createStmt->constraints = NIL; - createStmt->options = NIL; - createStmt->oncommit = ONCOMMIT_NOOP; - createStmt->tablespacename = NULL; + createStmt->base = createExtStmt->base; + // external table options is not compatible with internal table + // set NIL here + createStmt->base.options = NIL; createStmt->policy = createExtStmt->policy; /* policy was set in transform */ - + +/* +* Recognize formatter option if there are some tokens found in parser. +* This design is to give CREATE EXTERNAL TABLE DDL the flexiblity to +* support user defined external formatter options. +*/ + recognizeExternalRelationFormatterOptions(createExtStmt); + + /* +* Get tablespace, database, schema for the relation +*/ + RangeVar *rel = createExtStmt->base.relation; + // get tablespace name for the relation + Oid tablespace_id = (gp_upgrade_mode) ? DEFAULTTABLESPACE_OID : GetDefaultTablespace(); + if (!OidIsValid(tablespace_id)) + { + tablespace_id = get_database_dts(MyDatabaseId); + } + char *tablespace_name = get_tablespace_name(tablespace_id); + + // get database name for the relation + char *database_name = rel->catalogname ? rel->catalogname : get_database_name(MyDatabaseId); + + // get schema name for the relation + char *schema_name = get_namespace_name(RangeVarGetCreationNamespace(rel)); + + // get table name for the relation + char *table_name = rel->relname; + + /* +* Do some special logic when we use custom +*/ + if (exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION) + { + if (exttypeDesc->location_list == NIL) + { + if (dfs_url == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), +errmsg("Cannot create table on HDFS when the service is not available"), +errhint("Check HDFS service and hawq_dfs_url configuration"), +errOmitLocation(true))); + } + + location_len = strlen(PROTOCOL_HDFS) + /* hdfs:// */ + strlen(dfs_url) + /* hawq_dfs_url */ + // 1 + strlen(filespace_name) + /* '/' + filespace name */ + 1 + strlen(tablespace_name) + /* '/' + tablespace name */ + 1 + strlen(database_name) + /* '/' + database name */ + 1 + strlen(schema_name) + /* '/' + schema name */ + 1 + strlen(table_name) + 1; /* '/' + table name + '\0' */ + + char *path; + +
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435628 --- Diff: src/backend/commands/tablecmds.c --- @@ -939,38 +983,217 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) char* commandString = NULL; char rejectlimittype = '\0'; char formattype; + char* formattername = NULL; int rejectlimit = -1; int encoding = -1; int preferred_segment_num = -1; bool issreh = false; /* is single row error handling requested? */ + bool isexternal = createExtStmt->isexternal; bool iswritable = createExtStmt->iswritable; bool isweb = createExtStmt->isweb; + bool forceCreateDir = createExtStmt->forceCreateDir; + + bool isExternalHdfs = false; + bool isExternalMagma = false; + bool isExternalHive = false; + char* location = NULL; + int location_len = NULL; /* * now set the parameters for keys/inheritance etc. Most of these are * uninteresting for external relations... */ - createStmt->relation = createExtStmt->relation; - createStmt->tableElts = createExtStmt->tableElts; - createStmt->inhRelations = NIL; - createStmt->constraints = NIL; - createStmt->options = NIL; - createStmt->oncommit = ONCOMMIT_NOOP; - createStmt->tablespacename = NULL; + createStmt->base = createExtStmt->base; + // external table options is not compatible with internal table + // set NIL here + createStmt->base.options = NIL; createStmt->policy = createExtStmt->policy; /* policy was set in transform */ - + +/* +* Recognize formatter option if there are some tokens found in parser. +* This design is to give CREATE EXTERNAL TABLE DDL the flexiblity to +* support user defined external formatter options. +*/ + recognizeExternalRelationFormatterOptions(createExtStmt); + + /* +* Get tablespace, database, schema for the relation +*/ + RangeVar *rel = createExtStmt->base.relation; + // get tablespace name for the relation + Oid tablespace_id = (gp_upgrade_mode) ? DEFAULTTABLESPACE_OID : GetDefaultTablespace(); + if (!OidIsValid(tablespace_id)) + { + tablespace_id = get_database_dts(MyDatabaseId); + } + char *tablespace_name = get_tablespace_name(tablespace_id); + + // get database name for the relation + char *database_name = rel->catalogname ? rel->catalogname : get_database_name(MyDatabaseId); + + // get schema name for the relation + char *schema_name = get_namespace_name(RangeVarGetCreationNamespace(rel)); + + // get table name for the relation + char *table_name = rel->relname; + + /* +* Do some special logic when we use custom +*/ + if (exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION) + { + if (exttypeDesc->location_list == NIL) + { + if (dfs_url == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), +errmsg("Cannot create table on HDFS when the service is not available"), +errhint("Check HDFS service and hawq_dfs_url configuration"), +errOmitLocation(true))); + } + + location_len = strlen(PROTOCOL_HDFS) + /* hdfs:// */ + strlen(dfs_url) + /* hawq_dfs_url */ + // 1 + strlen(filespace_name) + /* '/' + filespace name */ + 1 + strlen(tablespace_name) + /* '/' + tablespace name */ + 1 + strlen(database_name) + /* '/' + database name */ + 1 + strlen(schema_name) + /* '/' + schema name */ + 1 + strlen(table_name) + 1; /* '/' + table name + '\0' */ + + char *path; + +
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435571 --- Diff: src/backend/commands/tablecmds.c --- @@ -632,9 +665,20 @@ DefineRelation_int(CreateStmt *stmt, */ descriptor = BuildDescForRelation(schema); - localHasOids = interpretOidsOption(stmt->options); + localHasOids = interpretOidsOption(stmt->base.options); descriptor->tdhasoid = (localHasOids || parentOidCount > 0); + /* +* Check supported data types for pluggable format, i.e., orc +* Need to remove this check if all data types are supported for orc format. +*/ + if ((relkind == RELKIND_RELATION) && (relstorage == RELSTORAGE_EXTERNAL) && + formattername && pg_strncasecmp(formattername, "text", strlen("text")) && --- End diff -- TEXT/CSV format in pluggable storage should have no data type constraint, should remove this. ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435430 --- Diff: src/backend/access/external/fileam.c --- @@ -305,11 +308,53 @@ external_beginscan(ExternalScan *extScan, scan->errcontext.previous = error_context_stack; //pgstat_initstats(relation); - + external_populate_formatter_actionmask(scan->fs_pstate, scan->fs_formatter); return scan; } +/* + * external_populate_formatter_actionmask + * + */ +void external_populate_formatter_actionmask(struct CopyStateData *pstate, --- End diff -- use CopyState pstate instead. ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435255 --- Diff: src/backend/commands/analyze.c --- @@ -442,6 +455,17 @@ void analyzeStmt(VacuumStmt *stmt, List *relids, int preferred_seg_num) "Please run ANALYZE on the root partition table.", get_rel_name(relationOid; } + else if (!isExternalHDFSORMAGMAProtocol(relationOid)) + { + /* +* Support analyze for external table. +* For now, HDFS protocol external table is supported. +*/ + ereport(WARNING, +(errmsg("skipping \"%s\" --- cannot analyze external table with non-HDFS or non-MAGMA protocol. " --- End diff -- No "Magma" ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435117 --- Diff: src/backend/commands/analyze.c --- @@ -442,6 +455,17 @@ void analyzeStmt(VacuumStmt *stmt, List *relids, int preferred_seg_num) "Please run ANALYZE on the root partition table.", get_rel_name(relationOid; } + else if (!isExternalHDFSORMAGMAProtocol(relationOid)) --- End diff -- Use "isExternalHDFSProtocol" ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208435034 --- Diff: src/backend/commands/analyze.c --- @@ -153,9 +160,15 @@ static List*buildExplicitAttributeNames(Oid relationOid, VacuumStmt *stmt); static void gp_statistics_estimate_reltuples_relpages_heap(Relation rel, float4 *reltuples, float4 *relpages); static void gp_statistics_estimate_reltuples_relpages_ao_rows(Relation rel, float4 *reltuples, float4 *relpages); static void gp_statistics_estimate_reltuples_relpages_parquet(Relation rel, float4 *reltuples, float4 *relpages); +static void gp_statistics_estimate_reltuples_relpages_external(Relation rel, float4 *relTuples, float4 *relPages); static void analyzeEstimateReltuplesRelpages(Oid relationOid, float4 *relTuples, float4 *relPages, bool rootonly); static void analyzeEstimateIndexpages(Oid relationOid, Oid indexOid, float4 *indexPages); +static void getExternalRelTuples(Oid relationOid, float4 *relTuples); +static void getExternalRelPages(Oid relationOid, float4 *relPages , Relation rel); +static float4 getExtrelPagesHDFS(Uri *uri); +static bool isExternalHDFSORMAGMAProtocol(Oid relOid); --- End diff -- Implement only HDFS protocol, not HDFS and Magma ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434847 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -1579,7 +1700,242 @@ static void ParquetGetSegFileDataLocation(Relation relation, return; } +static void InvokeHDFSProtocolBlockLocation(OidprocOid, +List *locs, +List **blockLocations) +{ + ExtProtocolValidatorData *validator_data; + FmgrInfo *validator_udf; + FunctionCallInfoDatafcinfo; + + validator_data = (ExtProtocolValidatorData *) +palloc0 (sizeof(ExtProtocolValidatorData)); + validator_udf = palloc(sizeof(FmgrInfo)); + fmgr_info(procOid, validator_udf); + + validator_data->type= T_ExtProtocolValidatorData; + validator_data->url_list= locs; + validator_data->format_opts = NULL; + validator_data->errmsg = NULL; + validator_data->direction = EXT_VALIDATE_READ; + validator_data->action = EXT_VALID_ACT_GETBLKLOC; + + InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, +/* FmgrInfo */ validator_udf, +/* nArgs */ 0, +/* Call Context */ (Node *) validator_data, +/* ResultSetInfo */ NULL); + + /* invoke validator. if this function returns - validation passed */ + FunctionCallInvoke(); + + ExtProtocolBlockLocationData *bls = + (ExtProtocolBlockLocationData *)(fcinfo.resultinfo); + /* debug output block location. */ + if (bls != NULL) + { + ListCell *c; + foreach(c, bls->files) + { + blocklocation_file *blf = (blocklocation_file *)(lfirst(c)); + elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks", +blf->file_uri, blf->block_num); + for ( int i = 0 ; i < blf->block_num ; ++i ) + { + BlockLocation *pbl = &(blf->locations[i]); + elog(DEBUG3, "DEBUG LOCATION for block %d : %d, " +INT64_FORMAT ", " INT64_FORMAT ", %d", +i, +pbl->corrupt, pbl->length, pbl->offset, +pbl->numOfNodes); + for ( int j = 0 ; j < pbl->numOfNodes ; ++j ) + { + elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s", +i, +pbl->hosts[j], pbl->names[j], + pbl->topologyPaths[j]); + } + } + } + } + elog(DEBUG3, "after invoking get block location API"); + + /* get location data from fcinfo.resultinfo. */ + if (bls != NULL) + { + Assert(bls->type == T_ExtProtocolBlockLocationData); + while(list_length(bls->files) > 0) + { + void *v = lfirst(list_head(bls->files)); + bls->files = list_delete_first(bls->files); + *blockLocations = lappend(*blockLocations, v); + } + } + pfree(validator_data); + pfree(validator_udf); +} + +Oid +LookupCustomProtocolBlockLocationFunc(char *protoname) +{ + List* funcname= NIL; + Oid procOid = InvalidOid; + Oid argList[1]; + Oid returnOid; + + char* new_func_name = (char *)palloc0(strlen(protoname) + 16); + sprintf(new_func_name, "%s_blocklocation", protoname); + funcname = lappend(funcname, makeString(new_func_name)); + returnOid = VOIDOID; + procOid = LookupFuncName(funcname, 0, argList, true); + + if (!OidIsValid(procOid)) + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("protocol function %s was not found.", + new_func_name), + errhint("Create it with CREATE FUNCTION."), +
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434889 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -4039,6 +4398,9 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife, result->planner_segments = 0; result->datalocalityInfo = makeStringInfo(); result->datalocalityTime = 0; + // result->hivehost = NULL; --- End diff -- Remove ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434912 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -4104,6 +4468,12 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife, /* get block location and calculate relation size*/ get_block_locations_and_claculte_table_size(); + if(context.chsl_context.relations){ + Relation_Data* tmp = (Relation_Data*) lfirst(context.chsl_context.relations->tail); + // result->hivehost = (tmp->hostname) ? pstrdup(tmp->hostname):(char*)NULL; --- End diff -- Remove ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434835 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -1579,7 +1700,242 @@ static void ParquetGetSegFileDataLocation(Relation relation, return; } +static void InvokeHDFSProtocolBlockLocation(OidprocOid, +List *locs, +List **blockLocations) +{ + ExtProtocolValidatorData *validator_data; + FmgrInfo *validator_udf; + FunctionCallInfoDatafcinfo; + + validator_data = (ExtProtocolValidatorData *) +palloc0 (sizeof(ExtProtocolValidatorData)); + validator_udf = palloc(sizeof(FmgrInfo)); + fmgr_info(procOid, validator_udf); + + validator_data->type= T_ExtProtocolValidatorData; + validator_data->url_list= locs; + validator_data->format_opts = NULL; + validator_data->errmsg = NULL; + validator_data->direction = EXT_VALIDATE_READ; + validator_data->action = EXT_VALID_ACT_GETBLKLOC; + + InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, +/* FmgrInfo */ validator_udf, +/* nArgs */ 0, +/* Call Context */ (Node *) validator_data, +/* ResultSetInfo */ NULL); + + /* invoke validator. if this function returns - validation passed */ + FunctionCallInvoke(); + + ExtProtocolBlockLocationData *bls = + (ExtProtocolBlockLocationData *)(fcinfo.resultinfo); + /* debug output block location. */ + if (bls != NULL) + { + ListCell *c; + foreach(c, bls->files) + { + blocklocation_file *blf = (blocklocation_file *)(lfirst(c)); + elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks", +blf->file_uri, blf->block_num); + for ( int i = 0 ; i < blf->block_num ; ++i ) + { + BlockLocation *pbl = &(blf->locations[i]); + elog(DEBUG3, "DEBUG LOCATION for block %d : %d, " +INT64_FORMAT ", " INT64_FORMAT ", %d", +i, +pbl->corrupt, pbl->length, pbl->offset, +pbl->numOfNodes); + for ( int j = 0 ; j < pbl->numOfNodes ; ++j ) + { + elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s", +i, +pbl->hosts[j], pbl->names[j], + pbl->topologyPaths[j]); + } + } + } + } + elog(DEBUG3, "after invoking get block location API"); + + /* get location data from fcinfo.resultinfo. */ + if (bls != NULL) + { + Assert(bls->type == T_ExtProtocolBlockLocationData); + while(list_length(bls->files) > 0) + { + void *v = lfirst(list_head(bls->files)); + bls->files = list_delete_first(bls->files); + *blockLocations = lappend(*blockLocations, v); + } + } + pfree(validator_data); + pfree(validator_udf); +} + +Oid +LookupCustomProtocolBlockLocationFunc(char *protoname) +{ + List* funcname= NIL; + Oid procOid = InvalidOid; + Oid argList[1]; + Oid returnOid; + + char* new_func_name = (char *)palloc0(strlen(protoname) + 16); + sprintf(new_func_name, "%s_blocklocation", protoname); + funcname = lappend(funcname, makeString(new_func_name)); + returnOid = VOIDOID; + procOid = LookupFuncName(funcname, 0, argList, true); + + if (!OidIsValid(procOid)) + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("protocol function %s was not found.", + new_func_name), + errhint("Create it with CREATE FUNCTION."), +
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434815 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -1579,7 +1700,242 @@ static void ParquetGetSegFileDataLocation(Relation relation, return; } +static void InvokeHDFSProtocolBlockLocation(OidprocOid, +List *locs, +List **blockLocations) +{ + ExtProtocolValidatorData *validator_data; + FmgrInfo *validator_udf; + FunctionCallInfoDatafcinfo; + + validator_data = (ExtProtocolValidatorData *) +palloc0 (sizeof(ExtProtocolValidatorData)); + validator_udf = palloc(sizeof(FmgrInfo)); + fmgr_info(procOid, validator_udf); + + validator_data->type= T_ExtProtocolValidatorData; + validator_data->url_list= locs; + validator_data->format_opts = NULL; + validator_data->errmsg = NULL; + validator_data->direction = EXT_VALIDATE_READ; + validator_data->action = EXT_VALID_ACT_GETBLKLOC; + + InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, +/* FmgrInfo */ validator_udf, +/* nArgs */ 0, +/* Call Context */ (Node *) validator_data, +/* ResultSetInfo */ NULL); + + /* invoke validator. if this function returns - validation passed */ + FunctionCallInvoke(); + + ExtProtocolBlockLocationData *bls = + (ExtProtocolBlockLocationData *)(fcinfo.resultinfo); + /* debug output block location. */ + if (bls != NULL) + { + ListCell *c; + foreach(c, bls->files) + { + blocklocation_file *blf = (blocklocation_file *)(lfirst(c)); + elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks", +blf->file_uri, blf->block_num); + for ( int i = 0 ; i < blf->block_num ; ++i ) + { + BlockLocation *pbl = &(blf->locations[i]); + elog(DEBUG3, "DEBUG LOCATION for block %d : %d, " +INT64_FORMAT ", " INT64_FORMAT ", %d", +i, +pbl->corrupt, pbl->length, pbl->offset, +pbl->numOfNodes); + for ( int j = 0 ; j < pbl->numOfNodes ; ++j ) + { + elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s", +i, +pbl->hosts[j], pbl->names[j], + pbl->topologyPaths[j]); + } + } + } + } + elog(DEBUG3, "after invoking get block location API"); + + /* get location data from fcinfo.resultinfo. */ + if (bls != NULL) + { + Assert(bls->type == T_ExtProtocolBlockLocationData); + while(list_length(bls->files) > 0) + { + void *v = lfirst(list_head(bls->files)); + bls->files = list_delete_first(bls->files); + *blockLocations = lappend(*blockLocations, v); + } + } + pfree(validator_data); + pfree(validator_udf); +} + +Oid +LookupCustomProtocolBlockLocationFunc(char *protoname) +{ + List* funcname= NIL; + Oid procOid = InvalidOid; + Oid argList[1]; + Oid returnOid; + + char* new_func_name = (char *)palloc0(strlen(protoname) + 16); + sprintf(new_func_name, "%s_blocklocation", protoname); + funcname = lappend(funcname, makeString(new_func_name)); + returnOid = VOIDOID; + procOid = LookupFuncName(funcname, 0, argList, true); + + if (!OidIsValid(procOid)) + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("protocol function %s was not found.", + new_func_name), + errhint("Create it with CREATE FUNCTION."), +
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434723 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -1579,7 +1700,242 @@ static void ParquetGetSegFileDataLocation(Relation relation, return; } +static void InvokeHDFSProtocolBlockLocation(OidprocOid, +List *locs, +List **blockLocations) +{ + ExtProtocolValidatorData *validator_data; + FmgrInfo *validator_udf; + FunctionCallInfoDatafcinfo; + + validator_data = (ExtProtocolValidatorData *) +palloc0 (sizeof(ExtProtocolValidatorData)); --- End diff -- Indent need to be fixed ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434493 --- Diff: src/backend/access/external/fileam.c --- @@ -86,14 +86,17 @@ static void InitParseState(CopyState pstate, Relation relation, char *uri, int rejectlimit, bool islimitinrows, Oid fmterrtbl, ResultRelSegFileInfo *segfileinfo, int encoding); -static void FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo, - intnArgs, - CopyState pstate, - FormatterData* formatter, - Relation rel, - TupleDesc tupDesc, - FmgrInfo *convFuncs, - Oid *typioparams); +static void +FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo, +int nArgs, +CopyState pstate, +FormatterData *formatter, +Relation rel, +TupleDesc tupDesc, +FmgrInfo *convFuncs, +Oid *typioparams, --- End diff -- indent is not consistent ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434427 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -844,36 +918,17 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte /* * We only consider the data stored in HDFS. */ - if (RelationIsAoRows(rel) || RelationIsParquet(rel)) { - Relation_Data *rel_data = NULL; - /* -* Get pg_appendonly information for this table. -*/ - AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow); - - rel_data = (Relation_Data *) palloc(sizeof(Relation_Data)); + bool isDataStoredInHdfs = dataStoredInHdfs(rel); + if (isDataStoredInHdfs ) { + GpPolicy *targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid); + Relation_Data *rel_data = (Relation_Data *) palloc(sizeof(Relation_Data)); rel_data->relid = rel_oid; rel_data->files = NIL; rel_data->partition_parent_relid = 0; rel_data->block_count = 0; - - GpPolicy *targetPolicy = NULL; - targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid); - /* -* Based on the pg_appendonly information, calculate the data -* location information associated with this relation. -*/ - if (RelationIsAoRows(rel)) { - rel_data->type = DATALOCALITY_APPENDONLY; - AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context, - aoEntry->splitsize, rel_data, , - , targetPolicy); - } else { - rel_data->type = DATALOCALITY_PARQUET; - ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context, - context->split_size, rel_data, , - , targetPolicy); - } + rel_data->port = 0; + rel_data->hivepath = NULL; --- End diff -- No "rel_data->hivepath = NULL;" ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user Librago commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434274 --- Diff: contrib/exthdfs/exthdfs.c --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +#include "postgres.h" + +#include "common.h" +#include "access/extprotocol.h" +#include "cdb/cdbdatalocality.h" +#include "storage/fd.h" +#include "storage/filesystem.h" +#include "utils/uri.h" + + + + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(hdfsprotocol_blocklocation); +PG_FUNCTION_INFO_V1(hdfsprotocol_validate); + +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS); +Datum hdfsprotocol_validate(PG_FUNCTION_ARGS); + +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS) +{ + + // Build the result instance + int nsize = 0; + int numOfBlock = 0; + ExtProtocolBlockLocationData *bldata = + palloc0(sizeof(ExtProtocolBlockLocationData)); + if (bldata == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " +"cannot allocate due to no memory"); + } + bldata->type = T_ExtProtocolBlockLocationData; + fcinfo->resultinfo = bldata; + + ExtProtocolValidatorData *pvalidator_data = (ExtProtocolValidatorData *) + (fcinfo->context); + + +// Parse URI of the first location, we expect all locations uses the same +// name node server. This is checked in validation function. + + char *first_uri_str = + (char *)strVal(lfirst(list_head(pvalidator_data->url_list))); + Uri *uri = ParseExternalTableUri(first_uri_str); + + elog(DEBUG3, "hdfsprotocol_blocklocation : " +"extracted HDFS name node address %s:%d", +uri->hostname, uri->port); + + // Create file system instance + hdfsFS fs = hdfsConnect(uri->hostname, uri->port); + if (fs == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " + "failed to create HDFS instance to connect to %s:%d", + uri->hostname, uri->port); + } + + // Clean up uri instance as we don't need it any longer + pfree(uri); + + // Check all locations to get files to fetch location. + ListCell *lc = NULL; + foreach(lc, pvalidator_data->url_list) + { + // Parse current location URI. + char *url = (char *)strVal(lfirst(lc)); + Uri *uri = ParseExternalTableUri(url); + if (uri == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " + "invalid URI encountered %s", url); + } + +// +// NOTICE: We temporarily support only directories as locations. We plan +//to extend the logic to specifying single file as one location +// very soon. + + + // get files contained in the path. + hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,); + if (fiarray == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " + "failed to get files of path %s", + uri->path); + } + + int i = 0 ; + // Call block location api to get data location for each file + for (i = 0 ; i < nsize ; i++) + { + hdfsFileInfo *fi = [i]; + + // break condition of this for loop + if (fi == NULL) {break;} + + // Build file name full path. + const char *fname =
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434291 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -193,6 +212,9 @@ typedef struct Relation_Data { List *files; Oid partition_parent_relid; int64 block_count; + int port; + char *hostname; + char *hivepath; --- End diff -- No "char *hivepath" ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434247 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -123,10 +132,20 @@ typedef struct File_Split { int64 logiceof; int host; bool is_local_read; + char *ext_file_uri; + char *lower_bound_inc; + char *upper_bound_exc; + int fs_lb_len; + int fs_ub_len; } File_Split; typedef enum DATALOCALITY_RELATION_TYPE { - DATALOCALITY_APPENDONLY, DATALOCALITY_PARQUET, DATALOCALITY_UNKNOWN + DATALOCALITY_APPENDONLY, + DATALOCALITY_PARQUET, + DATALOCALITY_HDFS, + DATALOCALITY_HIVE, --- End diff -- No below relation type: ``` DATALOCALITY_HIVE, DATALOCALITY_MAGMA, ``` ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434124 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -123,10 +132,20 @@ typedef struct File_Split { int64 logiceof; int host; bool is_local_read; + char *ext_file_uri; --- End diff -- Remove below element: ``` char *lower_bound_inc; char *upper_bound_exc; int fs_lb_len; int fs_ub_len; ``` ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208433943 --- Diff: src/backend/catalog/heap.c --- @@ -2543,8 +2551,55 @@ heap_drop_with_catalog(Oid relid) /* * External table? If so, delete the pg_exttable tuple. */ +/* if (is_external_rel) --- End diff -- Remove the line commented here ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208433876 --- Diff: src/backend/catalog/cdb_external_extensions.sql --- @@ -47,3 +47,38 @@ LANGUAGE C STABLE; CREATE OR REPLACE FUNCTION fixedwidth_out(record) RETURNS bytea AS '$libdir/fixedwidth.so', 'fixedwidth_out' LANGUAGE C STABLE; + +-- +-- external HDFS +-- +CREATE OR REPLACE FUNCTION hdfs_validate() RETURNS void +AS '$libdir/exthdfs.so', 'hdfsprotocol_validate' +LANGUAGE C STABLE; + +CREATE OR REPLACE FUNCTION hdfs_blocklocation() RETURNS void +AS '$libdir/exthdfs.so', 'hdfsprotocol_blocklocation' +LANGUAGE C STABLE; + +-- --- End diff -- No TEXT/CSV format code here ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208433762 --- Diff: contrib/exthdfs/exthdfs.c --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +#include "postgres.h" + +#include "common.h" +#include "access/extprotocol.h" +#include "cdb/cdbdatalocality.h" +#include "storage/fd.h" +#include "storage/filesystem.h" +#include "utils/uri.h" + + + + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(hdfsprotocol_blocklocation); +PG_FUNCTION_INFO_V1(hdfsprotocol_validate); + +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS); +Datum hdfsprotocol_validate(PG_FUNCTION_ARGS); + +Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS) +{ + + // Build the result instance + int nsize = 0; + int numOfBlock = 0; + ExtProtocolBlockLocationData *bldata = + palloc0(sizeof(ExtProtocolBlockLocationData)); + if (bldata == NULL) + { + elog(ERROR, "hdfsprotocol_blocklocation : " +"cannot allocate due to no memory"); + } + bldata->type = T_ExtProtocolBlockLocationData; + fcinfo->resultinfo = bldata; + + ExtProtocolValidatorData *pvalidator_data = (ExtProtocolValidatorData *) + (fcinfo->context); + + +// Parse URI of the first location, we expect all locations uses the same +// name node server. This is checked in validation function. + + char *first_uri_str = + (char *)strVal(lfirst(list_head(pvalidator_data->url_list))); + Uri *uri = ParseExternalTableUri(first_uri_str); + + elog(DEBUG3, "hdfsprotocol_blocklocation : " +"extracted HDFS name node address %s:%d", --- End diff -- Indent should follow the convention as previous code ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208433663 --- Diff: contrib/exthdfs/exthdfs.c --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + --- End diff -- Remove redundant blank line here ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208433613 --- Diff: contrib/exthdfs/common.h --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +#ifndef _EXTHDFS_COMMON_H_ +#define _EXTHDFS_COMMON_H_ + +#include "postgres.h" +#include "fmgr.h" +#include "funcapi.h" +#include "access/extprotocol.h" +#include "access/fileam.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_exttable.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "miscadmin.h" + --- End diff -- Remove redundant blank line here ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208433457 --- Diff: contrib/extfmtcsv/Makefile --- @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one --- End diff -- Remove extfmtcsv related code if the current commit is for hdfs protocol. ---
[GitHub] incubator-hawq pull request #1384: HAWQ-1628. Add HDFS protocol for external...
Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208433287 --- Diff: contrib/Makefile --- @@ -9,7 +9,9 @@ WANTED_DIRS = \ extprotocol \ gp_cancel_query \ formatter_fixedwidth \ - hawq-hadoop + exthdfs\ --- End diff -- Use ``` exthdfs \ hawq-hadoop ``` Instead of ``` exthdfs\ extfmtcsv\ hawq-hadoop\ ``` ---
[GitHub] incubator-hawq pull request #1386: HAWQ-1646. Fixes travis CI issues
GitHub user frankgh opened a pull request: https://github.com/apache/incubator-hawq/pull/1386 HAWQ-1646. Fixes travis CI issues Travis CI is using a new default OSX image starting July 31st, which is causing compilation issues in javadocs. This PR sets the previously known working osx_image in .travis.yml. You can merge this pull request into a Git repository by running: $ git pull https://github.com/frankgh/incubator-hawq HAWQ-1646 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1386.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1386 commit 5b185559814a86b66ba29544a5ebe2d71dc8 Author: Francisco Guerrero Date: 2018-08-07T06:14:27Z HAWQ-1646. Fixes travis CI issues ---
[GitHub] incubator-hawq pull request #1385: HAWQ-1645. Remove code generation from PX...
GitHub user frankgh opened a pull request: https://github.com/apache/incubator-hawq/pull/1385 HAWQ-1645. Remove code generation from PXF and upgrade gradle version - Removes code generation from PXF to enable IntelliJ code intelligence - Upgrade gradle to version 4.9 You can merge this pull request into a Git repository by running: $ git pull https://github.com/frankgh/incubator-hawq HAWQ-1645 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1385.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1385 commit 41eec36532c3e5a2bb6fb5174b099f23669e87a2 Author: Pivotal Date: 2018-08-06T22:15:05Z HAWQ-1645. Use the latest available version of gradle (4.9) ---
[GitHub] incubator-hawq pull request #1383: HAWQ-1644. Make delegation token optional...
GitHub user shivzone opened a pull request: https://github.com/apache/incubator-hawq/pull/1383 HAWQ-1644. Make delegation token optional in PXF Make delegation token property optional for PXF when used against secure hadoop. Delegation token is a complementary means for authentication with hadoop along with Kerberos. There is not much value in using the delegation token with masterless and segment only PXF architecture, as each pxf jvm will need to establish the token anyway. Simply authenticating with Keberos should be good enough in such a deployment mode. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shivzone/incubator-hawq HAWQ-1644 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1383.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1383 commit eb1aa6a0596b5b9a1f1ef2f8c3986aacd80f0f15 Author: Pivotal Date: 2018-07-31T21:28:56Z HAWQ-1644. Make delegation token optional in PXF ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user lavjain closed the pull request at: https://github.com/apache/incubator-hawq/pull/1379 ---
[GitHub] incubator-hawq pull request #1382: Relogin from keytab when security is enab...
GitHub user benchristel opened a pull request: https://github.com/apache/incubator-hawq/pull/1382 Relogin from keytab when security is enabled instead of using the TOKEN - We are now supporting Kerberos login with GPDB, which unlike HAWQ does not pass the TOKEN parameter. Thus, we re-login from the local keytab to authenticate the user. Co-authored-by: Lav Jain Co-authored-by: Ben Christel Co-authored-by: Shivram Mani You can merge this pull request into a Git repository by running: $ git pull https://github.com/lavjain/incubator-hawq kerberize_pxf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1382.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1382 commit ea87935819ac7952181869caab0e5fd82842fa05 Author: Lav Jain Date: 2018-07-25T21:06:37Z Relogin from keytab when security is enabled instead of using the TOKEN - We are now supporting Kerberos login with GPDB, which unlike HAWQ does not pass the TOKEN parameter. Thus, we re-login from the local keytab to authenticate the user. Co-authored-by: Lav Jain Co-authored-by: Ben Christel Co-authored-by: Shivram Mani ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user benchristel commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202824082 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -19,94 +19,114 @@ * under the License. */ - import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; -import javax.servlet.*; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hawq.pxf.service.SessionId; +import org.apache.hawq.pxf.service.UGICache; import org.apache.hawq.pxf.service.utilities.SecureLogin; - /** * Listener on lifecycle events of our webapp */ public class SecurityServletFilter implements Filter { private static final Log LOG = LogFactory.getLog(SecurityServletFilter.class); private static final String USER_HEADER = "X-GP-USER"; -private static final String MISSING_HEADER_ERROR = String.format("Header %s is missing in the request", USER_HEADER); -private static final String EMPTY_HEADER_ERROR = String.format("Header %s is empty in the request", USER_HEADER); +private static final String SEGMENT_ID_HEADER = "X-GP-SEGMENT-ID"; +private static final String TRANSACTION_ID_HEADER = "X-GP-XID"; +private static final String FRAGMENT_INDEX_HEADER = "X-GP-FRAGMENT-INDEX"; +private static final String FRAGMENT_COUNT_HEADER = "X-GP-FRAGMENT-COUNT"; +private static final String MISSING_HEADER_ERROR = "Header %s is missing in the request"; +private static final String EMPTY_HEADER_ERROR = "Header %s is empty in the request"; +private static UGICache proxyUGICache; --- End diff -- @denalex @lavjain What is the lifetime of the `SecurityServletFilter` instance? As I was thinking about making this non-static, I realized that if there's more than one instance per JVM, we might get redundant caches (and possible UGI resource leaks, if a `SecurityServletFilter` gets garbage-collected before all the UGIs in its cache are cleaned up). ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user frankgh commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202409510 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.resetTime(); +timedProxyUGI.releaseReference();
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202195089 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { --- End diff -- hmm ... not sure that would be better. We have dilemma here -- cache should not know about caller and caller should not care about internal cache implementation, such as that
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user benchristel commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202193131 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { --- End diff -- I'd like the parameter names to allow someone reading this code to consider the cache in isolation from its callers, and from that perspective, `cleanImmediatelyIfNoRefs` or
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user benchristel commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202185416 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); --- End diff -- We are changing it to entry ---
[GitHub] incubator-hawq pull request #1381: Remove autogeneration of version number f...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1381#discussion_r202179965 --- Diff: pxf/gradle/wrapper/gradle-wrapper.properties --- @@ -1,23 +1,6 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -#Wed Aug 05 16:07:21 PDT 2015 +#Mon Jul 09 15:02:34 PDT 2018 --- End diff -- lack of license will trip license checker, unless you add this file to exclusion list, even better to preserve the license. ---
[GitHub] incubator-hawq pull request #1381: Remove autogeneration of version number f...
GitHub user benchristel opened a pull request: https://github.com/apache/incubator-hawq/pull/1381 Remove autogeneration of version number from pxf/build.gradle **NOT READY TO MERGE**. The purpose of this PR is to solicit feedback on our approach. We'll need to do more work to get the hardcoded version number out of this code. This enables us to use IntelliJ's code navigation and refactoring features. Previously, IntelliJ would navigate to / refactor the generated files rather than the source files since the generated files were the ones that actually got compiled. We also upgraded Gradle to 3.0 because we suspected IntelliJ might not have good support for older Gradle versions. We may want to revert the change to the Gradle version if it wasn't the culprit. Co-authored-by: Lav Jain Co-authored-by: Ben Christel You can merge this pull request into a Git repository by running: $ git pull https://github.com/lavjain/incubator-hawq support-intellij-refactoring Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-hawq/pull/1381.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1381 commit c3ba4a5312283cca3bf820a312da5428458be704 Author: Francisco Guerrero Date: 2018-07-12T20:59:55Z Remove autogeneration of version number from pxf/build.gradle This enables us to use IntelliJ's code navigation and refactoring features. Previously, IntelliJ would navigate to / refactor the generated files rather than the source files since the generated files were the ones that actually got compiled. Co-authored-by: Francisco Guerrero Co-authored-by: Ben Christel ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202134716 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -52,69 +62,90 @@ */ @Override public void init(FilterConfig filterConfig) throws ServletException { +//TODO: initialize cache here } /** * If user impersonation is configured, examines the request for the presense of the expected security headers * and create a proxy user to execute further request chain. Responds with an HTTP error if the header is missing * or the chain processing throws an exception. * - * @param request http request + * @param request http request * @param response http response - * @param chain filter chain + * @param chainfilter chain */ @Override -public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) throws IOException, ServletException { +public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) +throws IOException, ServletException { if (SecureLogin.isUserImpersonationEnabled()) { // retrieve user header and make sure header is present and is not empty -final String user = ((HttpServletRequest) request).getHeader(USER_HEADER); -if (user == null) { -throw new IllegalArgumentException(MISSING_HEADER_ERROR); -} else if (user.trim().isEmpty()) { -throw new IllegalArgumentException(EMPTY_HEADER_ERROR); +final String gpdbUser = getHeaderValue(request, USER_HEADER); +String transactionId = getHeaderValue(request, TRANSACTION_ID_HEADER); +Integer segmentId = getHeaderValueInt(request, SEGMENT_ID_HEADER, true); +Integer fragmentCount = getHeaderValueInt(request, FRAGMENT_COUNT_HEADER, false); +Integer fragmentIndex = getHeaderValueInt(request, FRAGMENT_INDEX_HEADER, false); + +SessionId session = new SessionId(segmentId, transactionId, gpdbUser); +if (LOG.isDebugEnabled() && fragmentCount != null) { +LOG.debug(session.toString() + " Fragment = " + fragmentIndex + " of " + fragmentCount); } // TODO refresh Kerberos token when security is enabled -// prepare pivileged action to run on behalf of proxy user +// prepare privileged action to run on behalf of proxy user PrivilegedExceptionAction action = new PrivilegedExceptionAction() { @Override public Boolean run() throws IOException, ServletException { -LOG.debug("Performing request chain call for proxy user = " + user); +LOG.debug("Performing request chain call for proxy user = " + gpdbUser); chain.doFilter(request, response); return true; } }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user -UserGroupInformation proxyUGI = null; +UserGroupInformation ugi = cache.getUserGroupInformation(session); try { -LOG.debug("Creating proxy user = " + user); -proxyUGI = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); -proxyUGI.doAs(action); +ugi.doAs(action); } catch (UndeclaredThrowableException ute) { // unwrap the real exception thrown by the action throw new ServletException(ute.getCause()); } catch (InterruptedException ie) { throw new ServletException(ie); } finally { -try { -if (proxyUGI != null) { -LOG.debug("Closing FileSystem for proxy user = " + proxyUGI.getUserName()); -FileSystem.closeAllForUGI(proxyUGI); -} -} catch (Throwable t) { -LOG.warn("Error closing FileSystem for proxy user = " + proxyUGI.getUserName()); -} +// Optimization to cleanup the cache if it is the last fragment +boolean forceClean = (fragmentIndex != null && fragmentIndex.equals(fragmentCount)); +cache.release(session, forceClean); --- End diff -- here you're
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202131530 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.resetTime(); +timedProxyUGI.releaseReference();
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202132916 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.resetTime(); +timedProxyUGI.releaseReference();
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202130797 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.resetTime(); +timedProxyUGI.releaseReference();
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202128478 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); --- End diff -- not the best name, IMO increaseRefCount was more meaningful ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202126336 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/SessionId.java --- @@ -65,12 +74,15 @@ public int hashCode() { * {@inheritDoc} */ @Override -public boolean equals(Object other) { -if (!(other instanceof SessionId)) { -return false; -} -SessionId that = (SessionId) other; -return this.sessionId.equals(that.sessionId); +public boolean equals(Object obj) { +if (obj == null) return false; +if (obj == this) return true; +if (obj.getClass() != getClass()) return false; + +SessionId that = (SessionId) obj; +return new EqualsBuilder() +.append(sessionId, that.sessionId) +.isEquals(); --- End diff -- yes, agree, my previous comment about EqualsBuilder was about the pattern of comparisons for the top of the method and the helper in case there are multiple fields. Here with only one member, direct comparison should suffice. ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202127945 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); --- End diff -- dial this down to debug for production code ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202132529 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.resetTime(); +timedProxyUGI.releaseReference();
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202126694 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") --- End diff -- do you still need this ? that was needed for arrays, should be ok for maps ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202129998 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); --- End diff -- rename variable to cacheEntry ? ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202131246 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; + +timedProxyUGI.resetTime(); +timedProxyUGI.releaseReference();
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202133845 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -52,69 +62,90 @@ */ @Override public void init(FilterConfig filterConfig) throws ServletException { +//TODO: initialize cache here } /** * If user impersonation is configured, examines the request for the presense of the expected security headers * and create a proxy user to execute further request chain. Responds with an HTTP error if the header is missing * or the chain processing throws an exception. * - * @param request http request + * @param request http request * @param response http response - * @param chain filter chain + * @param chainfilter chain */ @Override -public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) throws IOException, ServletException { +public void doFilter(final ServletRequest request, final ServletResponse response, final FilterChain chain) +throws IOException, ServletException { if (SecureLogin.isUserImpersonationEnabled()) { // retrieve user header and make sure header is present and is not empty -final String user = ((HttpServletRequest) request).getHeader(USER_HEADER); -if (user == null) { -throw new IllegalArgumentException(MISSING_HEADER_ERROR); -} else if (user.trim().isEmpty()) { -throw new IllegalArgumentException(EMPTY_HEADER_ERROR); +final String gpdbUser = getHeaderValue(request, USER_HEADER); +String transactionId = getHeaderValue(request, TRANSACTION_ID_HEADER); +Integer segmentId = getHeaderValueInt(request, SEGMENT_ID_HEADER, true); +Integer fragmentCount = getHeaderValueInt(request, FRAGMENT_COUNT_HEADER, false); +Integer fragmentIndex = getHeaderValueInt(request, FRAGMENT_INDEX_HEADER, false); + +SessionId session = new SessionId(segmentId, transactionId, gpdbUser); +if (LOG.isDebugEnabled() && fragmentCount != null) { +LOG.debug(session.toString() + " Fragment = " + fragmentIndex + " of " + fragmentCount); } // TODO refresh Kerberos token when security is enabled -// prepare pivileged action to run on behalf of proxy user +// prepare privileged action to run on behalf of proxy user PrivilegedExceptionAction action = new PrivilegedExceptionAction() { @Override public Boolean run() throws IOException, ServletException { -LOG.debug("Performing request chain call for proxy user = " + user); +LOG.debug("Performing request chain call for proxy user = " + gpdbUser); chain.doFilter(request, response); return true; } }; // create proxy user UGI from the UGI of the logged in user and execute the servlet chain as that user -UserGroupInformation proxyUGI = null; +UserGroupInformation ugi = cache.getUserGroupInformation(session); --- End diff -- I'd call this variable here proxyUGI as this is what we are after ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202133483 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/servlet/SecurityServletFilter.java --- @@ -52,69 +62,90 @@ */ @Override public void init(FilterConfig filterConfig) throws ServletException { +//TODO: initialize cache here --- End diff -- please do that ---
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202129124 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -0,0 +1,318 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Ticker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Stores UserGroupInformation instances for each active session. The UGIs are cleaned up if they + * have not been accessed for 15 minutes. + * + * The motivation for caching is that destroying UGIs is slow. The alternative, creating and + * destroying a UGI per-request, is wasteful. + */ +public class UGICache { + +private static final Log LOG = LogFactory.getLog(UGICache.class); +private Map cache = new ConcurrentHashMap<>(); +@SuppressWarnings("unchecked") +// There is a separate DelayQueue for each segment (also being used for locking) +private final Map> queueMap = new HashMap<>(); +private final UGIProvider ugiProvider; +private Ticker ticker; +private final static long UGI_CACHE_EXPIRY = 15 * 60 * 1000L; // 15 Minutes + +/** + * Create a UGICache with the given {@link UGIProvider}. Intended for use by tests which need + * to substitute a mock UGIProvider. + */ +UGICache(UGIProvider provider, Ticker ticker) { +this.ticker = ticker; +this.ugiProvider = provider; +} + +/** + * Create a UGICache. Automatically creates a {@link UGIProvider} that this cache will use to + * create and destroy UserGroupInformation instances. + */ +public UGICache() { +this(new UGIProvider(), Ticker.systemTicker()); +} + +/** + * Create new proxy UGI if not found in cache and increment reference count + */ +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { +Integer segmentId = session.getSegmentId(); +String user = session.getUser(); +DelayQueue delayQueue = getExpirationQueue(segmentId); +synchronized (delayQueue) { +// Use the opportunity to cleanup any expired entries +cleanup(session); +Entry entry = cache.get(session); +if (entry == null) { +LOG.info(session.toString() + " Creating proxy user = " + user); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); +delayQueue.offer(entry); +cache.put(session, entry); +} +entry.acquireReference(); +return entry.getUGI(); +} +} + +/** + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. + * + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). + */ +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { --- End diff -- cleanImmediatelyIfNoRefs is our interpretation of the fact that we do not expect any more requests from this session, and this is only what's known to the caller of the function,
[GitHub] incubator-hawq pull request #1379: HAWQ-1622. Cache PXF proxy UGI so that cl...
Github user denalex commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1379#discussion_r202124776 --- Diff: pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/UGICache.java --- @@ -61,48 +64,51 @@ * create and destroy UserGroupInformation instances. */ public UGICache() { -this(new UGIProvider()); +this(new UGIProvider(), Ticker.systemTicker()); } /** * Create new proxy UGI if not found in cache and increment reference count */ -public Entry getTimedProxyUGI(SessionId session) -throws IOException { - +public UserGroupInformation getUserGroupInformation(SessionId session) throws IOException { Integer segmentId = session.getSegmentId(); String user = session.getUser(); DelayQueue delayQueue = getExpirationQueue(segmentId); synchronized (delayQueue) { // Use the opportunity to cleanup any expired entries -cleanup(segmentId); +cleanup(session); Entry entry = cache.get(session); if (entry == null) { LOG.info(session.toString() + " Creating proxy user = " + user); -entry = new Entry(ugiProvider.createProxyUGI(user), session); +entry = new Entry(ticker, ugiProvider.createProxyUGI(user)); delayQueue.offer(entry); cache.put(session, entry); } entry.acquireReference(); -return entry; +return entry.getUGI(); } } /** - * Decrement reference count for the given Entry. + * Decrement reference count for the given session's UGI. Resets the time at which the UGI will + * expire to 15 minutes in the future. * - * @param timedProxyUGI the cache entry to release - * @param forceClean if true, destroys the UGI for the given Entry (only if it is now unreferenced). + * @param session the session for which we want to release the UGI. + * @param cleanImmediatelyIfNoRefs if true, destroys the UGI for the given session (only if it is + * now unreferenced). */ -public void release(Entry timedProxyUGI, boolean forceClean) { +public void release(SessionId session, boolean cleanImmediatelyIfNoRefs) { + +Entry timedProxyUGI = cache.get(session); + +if (timedProxyUGI == null) return; --- End diff -- I'd rather not have any assert statements in Java code, if that's what you mean. ---