>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17813 )
Change subject: [ASTERIXDB-3259][MTD] Include 'database' in DataSourceId & EntityId ...................................................................... [ASTERIXDB-3259][MTD] Include 'database' in DataSourceId & EntityId - user model changes: no - storage format changes: no - interface changes: yes Details: Include 'database' in DataSourceId, DataSourceIndex, and EntityId. - Change 'Feed' tuple translator to handle 'database' value and pass it to EntityId. Change-Id: Icf67d0950810fe0706e16e5bf27f2f9ceda703c6 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17813 Integration-Tests: Jenkins <[email protected]> Tested-by: Ali Alsuliman <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java 26 files changed, 137 insertions(+), 52 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Ali Alsuliman: Looks good to me, but someone else must approve; Verified Jenkins: Verified diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java index 1d33961..fa82d37 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java @@ -28,18 +28,24 @@ */ public class EntityId implements Serializable { - private static final long serialVersionUID = 2L; + private static final long serialVersionUID = 3L; private final String extensionName; + private final String databaseName; private final DataverseName dataverseName; private final String entityName; - public EntityId(String extentionName, DataverseName dataverseName, String entityName) { - this.extensionName = extentionName; + public EntityId(String extensionName, String databaseName, DataverseName dataverseName, String entityName) { + this.extensionName = extensionName; + this.databaseName = databaseName; this.dataverseName = dataverseName; this.entityName = entityName; } + public String getDatabaseName() { + return databaseName; + } + public DataverseName getDataverseName() { return dataverseName; } @@ -57,13 +63,13 @@ return true; } EntityId other = (EntityId) o; - return Objects.equals(other.dataverseName, dataverseName) && Objects.equals(other.entityName, entityName) - && Objects.equals(other.extensionName, extensionName); + return Objects.equals(other.databaseName, databaseName) && Objects.equals(other.dataverseName, dataverseName) + && Objects.equals(other.entityName, entityName) && Objects.equals(other.extensionName, extensionName); } @Override public int hashCode() { - return Objects.hash(dataverseName, entityName, extensionName); + return Objects.hash(databaseName, dataverseName, entityName, extensionName); } @Override diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java index 25c97cc..58f85ee 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java @@ -366,7 +366,7 @@ context.getOutputTypeEnvironment(currentTop), index.getIndexDetails().isOverridingKeyFieldTypes()); } - DataSourceIndex dataSourceIndex = new DataSourceIndex(index, dataverseName, datasetName, mp); + DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp); // Introduce the TokenizeOperator only when doing bulk-load, // and index type is keyword or n-gram. @@ -623,7 +623,7 @@ beforeOpFilterExpression = createAnyUnknownFilterExpression(originalKeyVarList, context.getOutputTypeEnvironment(originalAssignCoordinates), forceFilter); } - DataSourceIndex dataSourceIndex = new DataSourceIndex(index, dataverseName, datasetName, mp); + DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp); indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex, OperatorManipulationUtil .cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()), diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index 90c5fad..0949c44 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -260,9 +260,9 @@ AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams(); jobGenParams.readFromFuncArgs(f.getArguments()); MetadataProvider mp = (MetadataProvider) context.getMetadataProvider(); - DataSourceId dataSourceId = - new DataSourceId(jobGenParams.getDataverseName(), jobGenParams.getDatasetName()); String database = MetadataUtil.resolveDatabase(null, jobGenParams.getDataverseName()); + DataSourceId dataSourceId = + new DataSourceId(database, jobGenParams.getDataverseName(), jobGenParams.getDatasetName()); Dataset dataset = mp.findDataset(database, jobGenParams.getDataverseName(), jobGenParams.getDatasetName()); IDataSourceIndex<String, DataSourceId> dsi = mp.findDataSourceIndex(jobGenParams.getIndexName(), dataSourceId); diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java index defb383..67c8423 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java @@ -1001,7 +1001,7 @@ /** * In case of a left outer join we look for a special GroupBy above the join operator - * (see {@link IntroduceJoinAccessMethodRule#checkAndApplyJoinTransformation(Mutable, IOptimizationContext)}. + * (see {@link IntroduceJoinAccessMethodRule#checkAndApplyJoinTransformation(Mutable, IOptimizationContext, boolean)}. * A "Special GroupBy" is a GroupBy that eliminates unjoined duplicates that might be produced by the secondary * index probe. We probe secondary indexes on each index partition and return a tuple with a right branch variable * set to MISSING (or NULL) if there's no match on that partition. Therefore if there's more than one partition @@ -2049,7 +2049,8 @@ unnestOp.setExecutionMode(ExecutionMode.PARTITIONED); //set the physical operator - DataSourceId dataSourceId = new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName()); + DataSourceId dataSourceId = + new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName()); unnestOp.setPhysicalOperator(new ExternalDataLookupPOperator(dataSourceId, dataset, recordType, primaryKeyVars, false, retainInput, retainNull)); return unnestOp; diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java index a751aaf..a6f9005 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java @@ -231,7 +231,9 @@ return null; } - DataSourceId dsid = new DataSourceId(DataverseName.createFromCanonicalForm(dataverse), dataset); + DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverse); + String database = MetadataUtil.resolveDatabase(null, dataverseName); + DataSourceId dsid = new DataSourceId(database, dataverseName, dataset); MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider(); return metadataProvider.findDataSource(dsid); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java index 09d5792..88be2d8 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java @@ -734,7 +734,7 @@ throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Cannot write output to an external " + dataset()); } - DataSourceId sourceId = new DataSourceId(dataverseName, datasetName); + DataSourceId sourceId = new DataSourceId(database, dataverseName, datasetName); String itemTypeDatabase = MetadataUtil.resolveDatabase(null, dataset.getItemTypeDataverseName()); String metaItemTypeDatabase = MetadataUtil.resolveDatabase(null, dataset.getMetaItemTypeDataverseName()); IAType itemType = metadataProvider.findType(itemTypeDatabase, dataset.getItemTypeDataverseName(), diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java index d6abf55..bbdde88 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java @@ -99,7 +99,8 @@ } variables.add(unnest.getVariable()); - DataSourceId dsid = new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName()); + DataSourceId dsid = + new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName()); DataSource dataSource = metadataProvider.findDataSource(dsid); boolean hasMeta = dataSource.hasMeta(); if (hasMeta) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java index 035765f..66d8190 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java @@ -85,9 +85,9 @@ String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4); String outputType = ConstantExpressionUtil.getStringArgument(f, 5); MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider(); - DataSourceId asid = new DataSourceId(dataverseName, getTargetFeed); - String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME); String database = MetadataUtil.resolveDatabase(null, dataverseName); + DataSourceId asid = new DataSourceId(database, dataverseName, getTargetFeed); + String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME); FeedPolicyEntity policy = metadataProvider.findFeedPolicy(database, dataverseName, policyName); if (policy == null) { policy = BuiltinFeedPolicies.getFeedPolicy(policyName); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java index 5178c3e..b3889fd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java @@ -135,7 +135,7 @@ } private static DataSourceId createQueryIndexDataSourceId(Dataset dataset, String indexName) { - return new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName(), + return new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(), new String[] { indexName, QueryIndexRewriter.QUERY_INDEX.getName() }); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 574448d..17988cf 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -3872,7 +3872,7 @@ Map<String, String> configuration = cfs.getConfiguration(); ExternalDataUtils.normalize(configuration); ExternalDataUtils.validate(configuration); - feed = new Feed(dataverseName, feedName, configuration); + feed = new Feed(database, dataverseName, feedName, configuration); FeedMetadataUtil.validateFeed(feed, mdTxnCtx, appCtx, warningCollector); MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -4060,7 +4060,7 @@ try { metadataProvider.setMetadataTxnContext(mdTxnCtx); // Runtime handler - EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); + EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, feedName); // Feed & Feed Connections Feed feed = FeedMetadataUtil.validateIfFeedExists(database, dataverseName, feedName, metadataProvider.getMetadataTxnContext()); @@ -4106,8 +4106,9 @@ StopFeedStatement sfst = (StopFeedStatement) stmt; SourceLocation sourceLoc = sfst.getSourceLocation(); DataverseName dataverseName = getActiveDataverseName(sfst.getDataverseName()); + String database = MetadataUtil.resolveDatabase(null, dataverseName); String feedName = sfst.getFeedName().getValue(); - EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); + EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, feedName); ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); // Obtain runtime info from ActiveListener @@ -4208,7 +4209,7 @@ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); // Check whether feed is alive ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler - .getListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)); + .getListener(new EntityId(Feed.EXTENSION_NAME, database, dataverseName, feedName)); if (listener != null && listener.isActive()) { throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, sourceLoc, feedName); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java index 93da8c5..baecf79 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java @@ -86,7 +86,7 @@ static DataverseName dataverseName = MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME; static String database = MetadataUtil.databaseFor(dataverseName); static String entityName = "entityName"; - static EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, entityName); + static EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, entityName); static Dataset firstDataset; static Dataset secondDataset; static List<Dataset> allDatasets; @@ -1523,7 +1523,7 @@ TestEventsListener[] additionalListeners = new TestEventsListener[3]; for (int i = 0; i < additionalListeners.length; i++) { String entityName = "entityName" + i; - EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, entityName); + EntityId entityId = new EntityId(Feed.EXTENSION_NAME, database, dataverseName, entityName); ClusterControllerService ccService = Mockito.mock(ClusterControllerService.class); CCServiceContext ccServiceCtx = Mockito.mock(CCServiceContext.class); CcApplicationContext ccAppCtx = Mockito.mock(CcApplicationContext.class); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index cb123bf..e17a7fe 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -42,6 +42,7 @@ import org.apache.asterix.app.result.ResponsePrinter; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.MetadataUtil; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -74,8 +75,9 @@ @Test public void refreshStatsTest() throws Exception { // Entities to be used - EntityId entityId = - new EntityId("MockExtension", DataverseName.createSinglePartName("MockDataverse"), "MockEntity"); + DataverseName mockDataverse = DataverseName.createSinglePartName("MockDataverse"); + String mockDatabase = MetadataUtil.resolveDatabase(null, mockDataverse); + EntityId entityId = new EntityId("MockExtension", mockDatabase, mockDataverse, "MockEntity"); ActiveRuntimeId activeRuntimeId = new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0); List<Dataset> datasetList = new ArrayList<>(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java index dd1d1b7..de3fa57 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeed.java @@ -28,6 +28,8 @@ public String getFeedName(); + public String getDatabaseName(); + public DataverseName getDataverseName(); public EntityId getFeedId(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java index 73edc6e..a2f3b10 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java @@ -42,8 +42,8 @@ this.hash = toString().hashCode(); } - public FeedConnectionId(DataverseName dataverseName, String feedName, String datasetName) { - this(new EntityId(FEED_EXTENSION_NAME, dataverseName, feedName), datasetName); + public FeedConnectionId(String databaseName, DataverseName dataverseName, String feedName, String datasetName) { + this(new EntityId(FEED_EXTENSION_NAME, databaseName, dataverseName, feedName), datasetName); } public EntityId getFeedId() { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java index 0647763..dbc5714 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java @@ -85,7 +85,8 @@ public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, ITypedAdapterFactory adapterFactory, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) { super(spec, 0, 1); - this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName()); + this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDatabaseName(), primaryFeed.getDataverseName(), + primaryFeed.getFeedName()); this.adaptorFactory = adapterFactory; this.adapterOutputType = adapterOutputType; this.policyAccessor = policyAccessor; @@ -96,7 +97,8 @@ String adapterLibraryName, String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) { super(spec, 0, 1); - this.feedId = new EntityId(FEED_EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName()); + this.feedId = + new EntityId(FEED_EXTENSION_NAME, feed.getDatabaseName(), feed.getDataverseName(), feed.getFeedName()); this.adaptorFactoryClassName = adapterFactoryClassName; this.adaptorLibraryDataverse = adapterLibraryDataverse; this.adaptorLibraryName = adapterLibraryName; diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java index 30d3266..a72c1bf 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java @@ -34,6 +34,7 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.memory.ConcurrentFramePool; import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.MetadataUtil; import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; @@ -72,8 +73,9 @@ private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer, FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException, AsterixException { DataverseName dvName = DataverseName.createSinglePartName(DATAVERSE_NAME); + String dbName = MetadataUtil.databaseFor(dvName); FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class); - EntityId feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dvName, FEED); + EntityId feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dbName, dvName, FEED); FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET); ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.COLLECT.toString(), 0); return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java index c27ac42..b60682c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java @@ -26,6 +26,8 @@ public final class DataSourceId { + private final String databaseName; + private final DataverseName dataverseName; private final String datasourceName; @@ -35,13 +37,15 @@ /** * The original constructor taking * + * @param databaseName + * the database name * @param dataverseName - * the dataverse (namespace) for this datasource + * the dataverse (namespace) for this datasource * @param datasourceName - * the name for this datasource + * the name for this datasource */ - public DataSourceId(DataverseName dataverseName, String datasourceName) { - this(dataverseName, datasourceName, null); + public DataSourceId(String databaseName, DataverseName dataverseName, String datasourceName) { + this(databaseName, dataverseName, datasourceName, null); } /** @@ -50,7 +54,8 @@ * that would expose different behavior. It enables the definition of (compile-time) parameterized datasources. * Please note that the first 2 parameters still need to be 1) a dataverse name and 2) a datasource name. */ - public DataSourceId(DataverseName dataverseName, String datasourceName, String[] parameters) { + public DataSourceId(String databaseName, DataverseName dataverseName, String datasourceName, String[] parameters) { + this.databaseName = databaseName; this.dataverseName = dataverseName; this.datasourceName = datasourceName; this.parameters = parameters; @@ -61,6 +66,10 @@ return dataverseName + "." + datasourceName + (parameters != null ? "." + String.join(".", parameters) : ""); } + public String getDatabaseName() { + return databaseName; + } + public DataverseName getDataverseName() { return dataverseName; } @@ -78,13 +87,13 @@ return false; } DataSourceId that = (DataSourceId) o; - return dataverseName.equals(that.dataverseName) && datasourceName.equals(that.datasourceName) - && Arrays.equals(parameters, that.parameters); + return Objects.equals(databaseName, that.databaseName) && dataverseName.equals(that.dataverseName) + && datasourceName.equals(that.datasourceName) && Arrays.equals(parameters, that.parameters); } @Override public int hashCode() { - int result = Objects.hash(dataverseName, datasourceName); + int result = Objects.hash(databaseName, dataverseName, datasourceName); result = 31 * result + Arrays.hashCode(parameters); return result; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java index 05498b9..80ca036 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceIndex.java @@ -27,15 +27,17 @@ public class DataSourceIndex implements IDataSourceIndex<String, DataSourceId> { private final Index index; + private final String database; private final DataverseName dataverseName; private final String datasetName; private final MetadataProvider metadataProvider; // Every transactions needs to work with its own instance of an // MetadataProvider. - public DataSourceIndex(Index index, DataverseName dataverseName, String datasetName, + public DataSourceIndex(Index index, String database, DataverseName dataverseName, String datasetName, MetadataProvider metadataProvider) { this.index = index; + this.database = database; this.dataverseName = dataverseName; this.datasetName = datasetName; this.metadataProvider = metadataProvider; @@ -45,7 +47,7 @@ @Override public IDataSource<DataSourceId> getDataSource() { try { - DataSourceId sourceId = new DataSourceId(dataverseName, datasetName); + DataSourceId sourceId = new DataSourceId(database, dataverseName, datasetName); return metadataProvider.lookupSourceInMetadata(sourceId); } catch (Exception me) { return null; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java index d975404..628c745 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java @@ -191,8 +191,8 @@ throw new AlgebricksException("Feed not configured with a policy"); } feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName()); - FeedConnectionId feedConnectionId = - new FeedConnectionId(getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset()); + FeedConnectionId feedConnectionId = new FeedConnectionId(getId().getDatabaseName(), + getId().getDataverseName(), getId().getDatasourceName(), getTargetDataset()); FeedCollectOperatorDescriptor feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId, feedOutputType, feedDesc, feedPolicy.getProperties(), getLocation()); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java index 690338c..683bf0f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java @@ -26,6 +26,7 @@ import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.common.metadata.MetadataUtil; import org.apache.asterix.external.adapter.factory.GenericAdapterFactory; import org.apache.asterix.external.api.IDataParserFactory; import org.apache.asterix.external.parser.factory.ADMDataParserFactory; @@ -135,6 +136,7 @@ } protected static DataSourceId createDataSourceId(FunctionIdentifier fid, String... parameters) { - return new DataSourceId(FunctionSignature.getDataverseName(fid), fid.getName(), parameters); + return new DataSourceId(MetadataUtil.resolveDatabase(null, FunctionSignature.getDataverseName(fid)), + FunctionSignature.getDataverseName(fid), fid.getName(), parameters); } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java index 5fa2af4..20505bb 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java @@ -27,6 +27,7 @@ import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.MetadataUtil; import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory; import org.apache.asterix.metadata.entities.Dataset; @@ -64,8 +65,9 @@ public LoadableDataSource(Dataset targetDataset, IAType itemType, IAType metaItemType, String adapter, Map<String, String> properties) throws AlgebricksException, IOException { - super(new DataSourceId(DataverseName.createSinglePartName(LOADABLE_DV), LOADABLE_DS), itemType, metaItemType, - Type.LOADABLE, null); + super(new DataSourceId(MetadataUtil.databaseFor(DataverseName.createSinglePartName(LOADABLE_DV)), + DataverseName.createSinglePartName(LOADABLE_DV), LOADABLE_DS), itemType, metaItemType, Type.LOADABLE, + null); this.targetDataset = targetDataset; this.adapter = adapter; this.adapterProperties = properties; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index a900366..3a9f6c1 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -434,7 +434,7 @@ String database = dataset.getDatabaseName(); String datasetName = dataset.getDatasetName(); Index index = getIndex(database, dataverseName, datasetName, indexId); - return index != null ? new DataSourceIndex(index, dataverseName, datasetName, this) : null; + return index != null ? new DataSourceIndex(index, database, dataverseName, datasetName, this) : null; } public Index getIndex(String database, DataverseName dataverseName, String datasetName, String indexName) diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java index 3b2937b..ff6cffa 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java @@ -76,6 +76,7 @@ } private static DataSourceId createSampleDataSourceId(Dataset dataset, String sampleIndexName) { - return new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName(), new String[] { sampleIndexName }); + return new DataSourceId(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(), + new String[] { sampleIndexName }); } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java index e78b7d4..5796df9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Feed.java @@ -41,8 +41,9 @@ /** Feed configurations */ private final Map<String, String> feedConfiguration; - public Feed(DataverseName dataverseName, String feedName, Map<String, String> feedConfiguration) { - this.feedId = new EntityId(EXTENSION_NAME, dataverseName, feedName); + public Feed(String databaseName, DataverseName dataverseName, String feedName, + Map<String, String> feedConfiguration) { + this.feedId = new EntityId(EXTENSION_NAME, databaseName, dataverseName, feedName); this.displayName = "(" + feedId + ")"; this.feedConfiguration = feedConfiguration; } @@ -53,6 +54,11 @@ } @Override + public String getDatabaseName() { + return feedId.getDatabaseName(); + } + + @Override public DataverseName getDataverseName() { return feedId.getDataverseName(); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java index 06ffce4..a0a288f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java @@ -58,7 +58,7 @@ this.policyName = policyName; this.whereClauseBody = whereClauseBody == null ? "" : whereClauseBody; this.outputType = outputType; - this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dataverseName, feedName); + this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, databaseName, dataverseName, feedName); } public List<FunctionSignature> getAppliedFunctions() { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java index bb1c9ad..0af8298 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java @@ -28,6 +28,7 @@ import org.apache.asterix.builders.RecordBuilder; import org.apache.asterix.builders.UnorderedListBuilder; import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.metadata.MetadataUtil; import org.apache.asterix.metadata.bootstrap.FeedEntity; import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes; import org.apache.asterix.metadata.entities.Feed; @@ -59,6 +60,13 @@ String dataverseCanonicalName = ((AString) feedRecord.getValueByPos(feedEntity.dataverseNameIndex())).getStringValue(); DataverseName dataverseName = DataverseName.createFromCanonicalForm(dataverseCanonicalName); + int databaseNameIndex = feedEntity.databaseNameIndex(); + String databaseName; + if (databaseNameIndex >= 0) { + databaseName = ((AString) feedRecord.getValueByPos(databaseNameIndex)).getStringValue(); + } else { + databaseName = MetadataUtil.databaseFor(dataverseName); + } String feedName = ((AString) feedRecord.getValueByPos(feedEntity.feedNameIndex())).getStringValue(); AUnorderedList feedConfig = (AUnorderedList) feedRecord.getValueByPos(feedEntity.adapterConfigIndex()); @@ -74,7 +82,7 @@ adaptorConfiguration.put(key, value); } - return new Feed(dataverseName, feedName, adaptorConfiguration); + return new Feed(databaseName, dataverseName, feedName, adaptorConfiguration); } @Override @@ -83,6 +91,12 @@ // write the key in the first two fields of the tuple tupleBuilder.reset(); + + if (feedEntity.databaseNameIndex() >= 0) { + aString.setValue(feed.getDatabaseName()); + stringSerde.serialize(aString, tupleBuilder.getDataOutput()); + tupleBuilder.addFieldEndOffset(); + } aString.setValue(dataverseCanonicalName); stringSerde.serialize(aString, tupleBuilder.getDataOutput()); tupleBuilder.addFieldEndOffset(); @@ -93,6 +107,12 @@ recordBuilder.reset(feedEntity.getRecordType()); + if (feedEntity.databaseNameIndex() >= 0) { + fieldValue.reset(); + aString.setValue(feed.getDatabaseName()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(feedEntity.databaseNameIndex(), fieldValue); + } // write dataverse name fieldValue.reset(); aString.setValue(dataverseCanonicalName); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17813 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Icf67d0950810fe0706e16e5bf27f2f9ceda703c6 Gerrit-Change-Number: 17813 Gerrit-PatchSet: 3 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-CC: Till Westmann <[email protected]> Gerrit-MessageType: merged
