abdullah alamoudi has submitted this change and it was merged. Change subject: ASTERIXDB-1302 ASTERIXDB-1301 Fix Socket Feed Connection ......................................................................
ASTERIXDB-1302 ASTERIXDB-1301 Fix Socket Feed Connection A bug causes a read lock to never be released when a feed is connected with "wait-for-completion" set to false. The bug was fixed and a test case was added. Another bug was causing the socket feed to not receive connections correctly. The bug was fixed and a test case was added. Additionally, this change ensures that adapters have absolute partitions to ensure consistency with regards to feed log manager. Change-Id: I8f6e982440d3577343f2479c3779653a9c3db614 Reviewed-on: https://asterix-gerrit.ics.uci.edu/660 Tested-by: Jenkins <[email protected]> Reviewed-by: Ildar Absalyamov <[email protected]> --- M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java M asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java M asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql A asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql A asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql A asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm M asterix-app/src/test/resources/runtimets/testsuite.xml A asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java A asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java A asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java M asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java M asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java A asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java A asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java M asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java M asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java M asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java M asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java M asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java M asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java M asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java M asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java 50 files changed, 1,183 insertions(+), 216 deletions(-) Approvals: Ildar Absalyamov: Looks good to me, approved Jenkins: Verified diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index e33aed2..be9452b 100644 --- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -87,7 +87,8 @@ if (tempPath.endsWith(File.separator)) { tempPath = tempPath.substring(0, tempPath.length() - 1); } - //get initial partitions from properties + System.err.println("Using the path: " + tempPath); + // get initial partitions from properties String[] nodeStores = propertiesAccessor.getStores().get(ncName); if (nodeStores == null) { throw new Exception("Coudn't find stores for NC: " + ncName); @@ -97,7 +98,7 @@ tempDirPath += File.separator; } for (int p = 0; p < nodeStores.length; p++) { - //create IO devices based on stores + // create IO devices based on stores String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p]; File ioDeviceDir = new File(iodevicePath); ioDeviceDir.mkdirs(); diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java index c0245d7..5cd490a 100644 --- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java +++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.feed.api.IFeedJoint; import org.apache.asterix.external.feed.api.IFeedMessage; @@ -37,9 +38,11 @@ import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor; import org.apache.asterix.external.util.FeedConstants; +import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.file.JobSpecificationUtils; import org.apache.asterix.metadata.declared.AqlMetadataProvider; import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.om.util.AsterixClusterProperties; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; @@ -49,6 +52,9 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; +import org.apache.hyracks.dataflow.std.file.FileSplit; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor; /** @@ -251,4 +257,17 @@ completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED); return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations); } + + public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception { + JobSpecification spec = JobSpecificationUtils.createJobSpecification(); + AlgebricksAbsolutePartitionConstraint locations = AsterixClusterProperties.INSTANCE.getClusterLocations(); + FileSplit[] feedLogFileSplits = FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), + locations); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = StoragePathUtil + .splitProviderAndPartitionConstraints(feedLogFileSplits); + FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second); + spec.addRoot(frod); + return spec; + } } diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java index 9f024e9..ea50221 100644 --- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java +++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java @@ -209,7 +209,7 @@ ASYNC_DEFERRED } - public static final boolean IS_DEBUG_MODE = false;//true + public static final boolean IS_DEBUG_MODE = false;// true private final List<Statement> statements; private final SessionConfig sessionConfig; private Dataverse activeDefaultDataverse; @@ -593,8 +593,9 @@ } if (compactionPolicy == null) { if (filterField != null) { - // If the dataset has a filter and the user didn't specify a merge policy, then we will pick the - // correlated-prefix as the default merge policy. + //If the dataset has a filter and the user didn't specify a merge + //policy, then we will pick the + //correlated-prefix as the default merge policy. compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME; compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES; } @@ -659,10 +660,10 @@ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { //#. execute compensation operations - // remove the index in NC - // [Notice] - // As long as we updated(and committed) metadata, we should remove any effect of the job - // because an exception occurs during runJob. + //remove the index in NC + //[Notice] + //As long as we updated(and committed) metadata, we should remove any effect of the job + //because an exception occurs during runJob. mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); @@ -679,7 +680,7 @@ } } - // remove the record from the metadata. + //remove the record from the metadata. mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); try { @@ -803,7 +804,7 @@ String indexName = null; JobSpecification spec = null; Dataset ds = null; - // For external datasets + //For external datasets ArrayList<ExternalFile> externalFilesSnapshot = null; boolean firstExternalDatasetIndex = false; boolean filesIndexReplicated = false; @@ -880,8 +881,10 @@ } } - // Checks whether a user is trying to create an inverted secondary index on a dataset with a variable-length primary key. - // Currently, we do not support this. Therefore, as a temporary solution, we print an error message and stop. + //Checks whether a user is trying to create an inverted secondary index on a dataset + //with a variable-length primary key. + //Currently, we do not support this. Therefore, as a temporary solution, we print an + //error message and stop. if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX @@ -891,7 +894,7 @@ IAType keyType = aRecordType.getSubFieldType(partitioningKey); ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType); - // If it is not a fixed length + //If it is not a fixed length if (typeTrait.getFixedLength() < 0) { throw new AlgebricksException("The keyword or ngram index -" + indexName + " cannot be created on the dataset -" + datasetName @@ -904,27 +907,27 @@ if (ds.getDatasetType() == DatasetType.INTERNAL) { validateIfResourceIsActiveInFeed(dataverseName, datasetName); } else { - // External dataset - // Check if the dataset is indexible + //External dataset + //Check if the dataset is indexible if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) { throw new AlgebricksException( "dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter() + " Adapter can't be indexed"); } - // check if the name of the index is valid + //Check if the name of the index is valid if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) { throw new AlgebricksException("external dataset index name is invalid"); } - // Check if the files index exist + //Check if the files index exist filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); firstExternalDatasetIndex = (filesIndex == null); - // lock external dataset + //Lock external dataset ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex); datasetLocked = true; if (firstExternalDatasetIndex) { - // verify that no one has created an index before we acquire the lock + //Verify that no one has created an index before we acquire the lock filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); if (filesIndex != null) { @@ -934,20 +937,20 @@ } } if (firstExternalDatasetIndex) { - // Get snapshot from External File System + //Get snapshot from External File System externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds); - // Add an entry for the files index + //Add an entry for the files index filesIndex = new Index(dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE, ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false, IMetadataEntity.PENDING_ADD_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); - // Add files to the external files index + //Add files to the external files index for (ExternalFile file : externalFilesSnapshot) { MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); } - // This is the first index for the external dataset, replicate the files index + //This is the first index for the external dataset, replicate the files index spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot, metadataProvider, true); if (spec == null) { @@ -1025,13 +1028,14 @@ indexName); index.setPendingOp(IMetadataEntity.PENDING_NO_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); - // add another new files index with PendingNoOp after deleting the index with PendingAddOp + //add another new files index with PendingNoOp after deleting the index with + //PendingAddOp if (firstExternalDatasetIndex) { MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, filesIndex.getIndexName()); filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); - // update transaction timestamp + //update transaction timestamp ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date()); MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds); } @@ -1041,7 +1045,7 @@ if (bActiveTxn) { abort(e, e, mdTxnCtx); } - // If files index was replicated for external dataset, it should be cleaned up on NC side + //If files index was replicated for external dataset, it should be cleaned up on NC side if (filesIndexReplicated) { mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; @@ -1063,7 +1067,7 @@ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { //#. execute compensation operations - // remove the index in NC + //remove the index in NC mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); @@ -1086,7 +1090,7 @@ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); try { - // Drop External Files from metadata + //Drop External Files from metadata MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e2) { @@ -1098,7 +1102,7 @@ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); try { - // Drop the files index from metadata + //Drop the files index from metadata MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -1110,7 +1114,7 @@ + ") couldn't be removed from the metadata", e); } } - // remove the record from the metadata. + //remove the record from the metadata. mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); try { @@ -1183,7 +1187,6 @@ MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName); List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>(); try { - Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); if (dv == null) { if (stmtDelete.getIfExists()) { @@ -1216,6 +1219,9 @@ + connection.getDatasetName() + ". Encountered exception " + exception); } } + //prepare job to remove feed log storage + jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob( + MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedId.getFeedName()))); } } @@ -1239,7 +1245,7 @@ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider)); } else { - // External dataset + //External dataset List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); for (int k = 0; k < indexes.size(); k++) { @@ -1260,8 +1266,9 @@ } jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider)); //#. mark PendingDropOp on the dataverse record by - // first, deleting the dataverse record from the DATAVERSE_DATASET - // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET + //first, deleting the dataverse record from the DATAVERSE_DATASET + //second, inserting the dataverse record with the PendingDropOp value into the + //DATAVERSE_DATASET MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP)); @@ -1295,7 +1302,7 @@ } //#. execute compensation operations - // remove the all indexes in NC + //remove the all indexes in NC try { for (JobSpecification jobSpec : jobsToExecute) { JobUtils.runJob(hcc, jobSpec, true); @@ -1305,7 +1312,7 @@ e.addSuppressed(e2); } - // remove the record from the metadata. + //remove the record from the metadata. mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); try { MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); @@ -1352,7 +1359,7 @@ Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>(); if (ds.getDatasetType() == DatasetType.INTERNAL) { - // prepare job spec(s) that would disconnect any active feeds involving the dataset. + //prepare job spec(s) that would disconnect any active feeds involving the dataset. List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null); if (feedConnections != null && !feedConnections.isEmpty()) { for (FeedConnectionId connection : feedConnections) { @@ -1363,6 +1370,10 @@ LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset " + datasetName + " as dataset is being dropped"); } + //prepare job to remove feed log storage + jobsToExecute + .add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(mdTxnCtx, + connection.getFeedId().getDataverse(), connection.getFeedId().getFeedName()))); } } @@ -1404,7 +1415,7 @@ bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); } else { - // External dataset + //External dataset ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds); //#. prepare jobs to drop the datatset and the indexes in NC List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); @@ -1447,7 +1458,7 @@ //#. finally, delete the dataset. MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); - // Drop the associated nodegroup + //Drop the associated nodegroup String nodegroup = ds.getNodeGroupName(); if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) { MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName); @@ -1461,7 +1472,7 @@ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { //#. execute compensation operations - // remove the all indexes in NC + //remove the all indexes in NC try { for (JobSpecification jobSpec : jobsToExecute) { JobUtils.runJob(hcc, jobSpec, true); @@ -1471,7 +1482,7 @@ e.addSuppressed(e2); } - // remove the record from the metadata. + //remove the record from the metadata. mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); try { @@ -1506,7 +1517,7 @@ MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName); String indexName = null; - // For external index + //For external index boolean dropFilesIndex = false; List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>(); try { @@ -1573,7 +1584,7 @@ //#. finally, delete the existing index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); } else { - // External dataset + //External dataset indexName = stmtIndexDrop.getIndexName().getValue(); Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); if (index == null) { @@ -1593,7 +1604,7 @@ datasetName); if (datasetIndexes.size() == 2) { dropFilesIndex = true; - // only one index + the files index, we need to delete both of the indexes + //only one index + the files index, we need to delete both of the indexes for (Index externalIndex : datasetIndexes) { if (ExternalIndexingOperations.isFileIndex(externalIndex)) { cds = new CompiledIndexDropStatement(dataverseName, datasetName, @@ -1636,7 +1647,7 @@ //#. finally, delete the existing index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); if (dropFilesIndex) { - // delete the files index too + //delete the files index too MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds); @@ -1652,7 +1663,7 @@ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { //#. execute compensation operations - // remove the all indexes in NC + //remove the all indexes in NC try { for (JobSpecification jobSpec : jobsToExecute) { JobUtils.runJob(hcc, jobSpec, true); @@ -1662,7 +1673,7 @@ e.addSuppressed(e2); } - // remove the record from the metadata. + //remove the record from the metadata. mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); try { @@ -1916,11 +1927,11 @@ ICompiledDmlStatement stmt) throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException { - // Query Rewriting (happens under the same ongoing metadata transaction) + //Query Rewriting (happens under the same ongoing metadata transaction) Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionConfig); - // Query Compilation (happens under the same ongoing metadata transaction) + //Query Compilation (happens under the same ongoing metadata transaction) JobSpecification spec = apiFramework.compileQuery(declaredFunctions, metadataProvider, reWrittenQuery.first, reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, stmt); @@ -1930,14 +1941,12 @@ private void handleCreateFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) throws Exception { - CreateFeedStatement cfs = (CreateFeedStatement) stmt; String dataverseName = getActiveDataverse(cfs.getDataverseName()); String feedName = cfs.getFeedName().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName); - Feed feed = null; try { feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName); @@ -2065,6 +2074,9 @@ throw new AlgebricksException("Feed " + feedId + " is currently active and connected to the following dataset(s) \n" + builder.toString()); } else { + JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob( + MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getFeedName())); + JobUtils.runJob(hcc, spec, true); MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName); } @@ -2120,7 +2132,6 @@ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - boolean readLatchAcquired = true; boolean subscriberRegistered = false; IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber(); FeedConnectionId feedConnId = null; @@ -2149,7 +2160,7 @@ FeedPolicyEntity feedPolicy = FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), mdTxnCtx); - // All Metadata checks have passed. Feed connect request is valid. // + //All Metadata checks have passed. Feed connect request is valid. // FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties()); Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = getFeedConnectionRequest(dataverseName, @@ -2165,19 +2176,20 @@ feedId.getFeedName()); Pair<JobSpecification, IAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed, metadataProvider, policyAccessor); - // adapter configuration are valid at this stage - // register the feed joints (these are auto-de-registered) + //adapter configuration are valid at this stage + //register the feed joints (these are auto-de-registered) for (IFeedJoint fj : triple.third) { FeedLifecycleListener.INSTANCE.registerFeedJoint(fj); } JobUtils.runJob(hcc, pair.first, false); - /* TODO: Fix record tracking + /* + * TODO: Fix record tracking * IFeedAdapterFactory adapterFactory = pair.second; - if (adapterFactory.isRecordTrackingEnabled()) { - FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId, - adapterFactory.createIntakeProgressTracker()); - } - */ + * if (adapterFactory.isRecordTrackingEnabled()) { + * FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId, + * adapterFactory.createIntakeProgressTracker()); + * } + */ eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED); } else { for (IFeedJoint fj : triple.third) { @@ -2186,18 +2198,9 @@ } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - readLatchAcquired = false; eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED); if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) { eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call - } - String waitForCompletionParam = metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION); - boolean waitForCompletion = waitForCompletionParam == null ? false - : Boolean.valueOf(waitForCompletionParam); - if (waitForCompletion) { - MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName, - dataverseName + "." + feedName); - readLatchAcquired = false; } } catch (Exception e) { if (bActiveTxn) { @@ -2205,10 +2208,8 @@ } throw e; } finally { - if (readLatchAcquired) { - MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName, - dataverseName + "." + feedName); - } + MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName, + dataverseName + "." + feedName); if (subscriberRegistered) { FeedLifecycleListener.INSTANCE.deregisterFeedEventSubscriber(feedConnId, eventSubscriber); } @@ -2242,7 +2243,7 @@ boolean isFeedJointAvailable = FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey); if (!isFeedJointAvailable) { sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey); - if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable. + if (sourceFeedJoint == null) { //the feed is currently not being ingested, i.e., it is unavailable. connectionLocation = ConnectionLocation.SOURCE_FEED_INTAKE_STAGE; FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName()); @@ -2262,8 +2263,8 @@ functionsToApply.add(f); } } - // register the compute feed point that represents the final output from the collection of - // functions that will be applied. + //register the compute feed point that represents the final output from the collection of + //functions that will be applied. if (!functionsToApply.isEmpty()) { FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply); IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(), @@ -2435,7 +2436,7 @@ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), ds.getItemTypeDataverseName(), itemTypeName); - // Prepare jobs to compact the datatset and its indexes + //Prepare jobs to compact the datatset and its indexes List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); if (indexes.size() == 0) { throw new AlgebricksException( @@ -2523,9 +2524,9 @@ ResultReader resultReader = new ResultReader(hcc, hdc); resultReader.open(jobId, metadataProvider.getResultSetId()); - // In this case (the normal case), we don't use the - // "response" JSONObject - just stream the results - // to the "out" PrintWriter + //In this case (the normal case), we don't use the + //"response" JSONObject - just stream the results + //to the "out" PrintWriter if (sessionConfig.fmt() == OutputFormat.CSV && sessionConfig.is(SessionConfig.FORMAT_CSV_HEADER)) { ResultUtils.displayCSVHeader(metadataProvider.findOutputRecordType(), sessionConfig); @@ -2554,7 +2555,7 @@ throw e; } finally { MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets()); - // release external datasets' locks acquired during compilation of the query + //release external datasets' locks acquired during compilation of the query ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } @@ -2615,55 +2616,56 @@ ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName); - // Dataset exists ? + //Dataset exists ? if (ds == null) { throw new AlgebricksException( "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName); } - // Dataset external ? + //Dataset external ? if (ds.getDatasetType() != DatasetType.EXTERNAL) { throw new AlgebricksException( "dataset " + datasetName + " in dataverse " + dataverseName + " is not an external dataset"); } - // Dataset has indexes ? + //Dataset has indexes ? indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); if (indexes.size() == 0) { throw new AlgebricksException("External dataset " + datasetName + " in dataverse " + dataverseName + " doesn't have any index"); } - // Record transaction time + //Record transaction time Date txnTime = new Date(); - // refresh lock here + //refresh lock here ExternalDatasetsRegistry.INSTANCE.refreshBegin(ds); lockAquired = true; - // Get internal files + //Get internal files metadataFiles = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, ds); deletedFiles = new ArrayList<ExternalFile>(); addedFiles = new ArrayList<ExternalFile>(); appendedFiles = new ArrayList<ExternalFile>(); - // Compute delta - // Now we compare snapshot with external file system + //Compute delta + //Now we compare snapshot with external file system if (ExternalIndexingOperations.isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles, appendedFiles)) { ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(txnTime); MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - // latch will be released in the finally clause + //latch will be released in the finally clause return; } - // At this point, we know data has changed in the external file system, record transaction in metadata and start + //At this point, we know data has changed in the external file system, record + //transaction in metadata and start transactionDataset = ExternalIndexingOperations.createTransactionDataset(ds); /* * Remove old dataset record and replace it with a new one */ MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset); - // Add delta files to the metadata + //Add delta files to the metadata for (ExternalFile file : addedFiles) { MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); } @@ -2674,7 +2676,7 @@ MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); } - // Create the files index update job + //Create the files index update job spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, metadataFiles, deletedFiles, addedFiles, appendedFiles, metadataProvider); @@ -2694,10 +2696,10 @@ } } - // all index updates has completed successfully, record transaction state + //all index updates has completed successfully, record transaction state spec = ExternalIndexingOperations.buildCommitJob(ds, indexes, metadataProvider); - // Aquire write latch again -> start a transaction and record the decision to commit + //Aquire write latch again -> start a transaction and record the decision to commit mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); bActiveTxn = true; @@ -2708,9 +2710,9 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT; - // We don't release the latch since this job is expected to be quick + //We don't release the latch since this job is expected to be quick JobUtils.runJob(hcc, spec, true); - // Start a new metadata transaction to record the final state of the transaction + //Start a new metadata transaction to record the final state of the transaction mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); bActiveTxn = true; @@ -2723,11 +2725,11 @@ while (iterator.hasNext()) { ExternalFile appendedFile = iterator.next(); if (file.getFileName().equals(appendedFile.getFileName())) { - // delete existing file + //delete existing file MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); - // delete existing appended file + //delete existing appended file MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, appendedFile); - // add the original file with appended information + //add the original file with appended information appendedFile.setFileNumber(file.getFileNumber()); appendedFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP); MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, appendedFile); @@ -2737,24 +2739,24 @@ } } - // remove the deleted files delta + //remove the deleted files delta for (ExternalFile file : deletedFiles) { MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); } - // insert new files + //insert new files for (ExternalFile file : addedFiles) { MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP); MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); } - // mark the transaction as complete + //mark the transaction as complete ((ExternalDatasetDetails) transactionDataset.getDatasetDetails()) .setState(ExternalDatasetTransactionState.COMMIT); MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset); - // commit metadata transaction + //commit metadata transaction MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); success = true; } catch (Exception e) { @@ -2766,12 +2768,12 @@ + datasetName + ") refresh couldn't carry out the commit phase", e); } if (transactionState == ExternalDatasetTransactionState.COMMIT) { - // Nothing to do , everything should be clean + //Nothing to do , everything should be clean throw e; } if (transactionState == ExternalDatasetTransactionState.BEGIN) { - // transaction failed, need to do the following - // clean NCs removing transaction components + //transaction failed, need to do the following + //clean NCs removing transaction components mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); @@ -2781,12 +2783,12 @@ try { JobUtils.runJob(hcc, spec, true); } catch (Exception e2) { - // This should never happen -- fix throw illegal + //This should never happen -- fix throw illegal e.addSuppressed(e2); throw new IllegalStateException("System is in inconsistent state. Failed to abort refresh", e); } - // remove the delta of files - // return the state of the dataset to committed + //remove the delta of files + //return the state of the dataset to committed try { mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); for (ExternalFile file : deletedFiles) { @@ -2799,7 +2801,7 @@ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); } MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds); - // commit metadata transaction + //commit metadata transaction MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e2) { abort(e, e2, mdTxnCtx); @@ -2852,19 +2854,20 @@ datasetNameFrom, datasetNameTo, mdTxnCtx); String pregelixHomeKey = "PREGELIX_HOME"; - // Finds PREGELIX_HOME in system environment variables. + //Finds PREGELIX_HOME in system environment variables. String pregelixHome = System.getenv(pregelixHomeKey); - // Finds PREGELIX_HOME in Java properties. + //Finds PREGELIX_HOME in Java properties. if (pregelixHome == null) { pregelixHome = System.getProperty(pregelixHomeKey); } - // Finds PREGELIX_HOME in AsterixDB configuration. + //Finds PREGELIX_HOME in AsterixDB configuration. if (pregelixHome == null) { - // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties, pregelixHome can never be null. + //Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties, + //pregelixHome can never be null. pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome(); } - // Constructs the pregelix command line. + //Constructs the pregelix command line. List<String> cmd = constructPregelixCommand(pregelixStmt, dataverseNameFrom, datasetNameFrom, dataverseNameTo, datasetNameTo); ProcessBuilder pb = new ProcessBuilder(cmd); @@ -2873,9 +2876,9 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - // Executes the Pregelix command. + //Executes the Pregelix command. int resultState = executeExternalShellProgram(pb); - // Checks the return state of the external Pregelix command. + //Checks the return state of the external Pregelix command. if (resultState != 0) { throw new AlgebricksException( "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restarted. " @@ -2893,12 +2896,12 @@ } } - // Prepares to run a program on external runtime. + //Prepares to run a program on external runtime. private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc, RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom, String datasetNameTo, MetadataTransactionContext mdTxnCtx) throws AlgebricksException, AsterixException, Exception { - // Validates the source/sink dataverses and datasets. + //Validates the source/sink dataverses and datasets. Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom); if (fromDataset == null) { throw new AsterixException("The source dataset " + datasetNameFrom + " in dataverse " + dataverseNameFrom @@ -2911,7 +2914,7 @@ } try { - // Find the primary index of the sink dataset. + //Find the primary index of the sink dataset. Index toIndex = null; List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo, pregelixStmt.getDatasetNameTo().getValue()); @@ -2924,7 +2927,7 @@ if (toIndex == null) { throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo); } - // Cleans up the sink dataset -- Drop and then Create. + //Cleans up the sink dataset -- Drop and then Create. DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(), true); this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc); @@ -2941,12 +2944,12 @@ throw new AlgebricksException("Error cleaning the result dataset. This should not happen."); } - // Flushes source dataset. + //Flushes source dataset. FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom, datasetNameFrom); } - // Executes external shell commands. + //Executes external shell commands. private int executeExternalShellProgram(ProcessBuilder pb) throws IOException, AlgebricksException, InterruptedException { Process process = pb.start(); @@ -2972,15 +2975,15 @@ } process.waitFor(); } - // Gets the exit value of the program. + //Gets the exit value of the program. int resultState = process.exitValue(); return resultState; } - // Constructs a Pregelix command line. + //Constructs a Pregelix command line. private List<String> constructPregelixCommand(RunStatement pregelixStmt, String fromDataverseName, String fromDatasetName, String toDataverseName, String toDatasetName) { - // Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset. + //Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset. AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties(); AsterixClusterProperties clusterProperties = AsterixClusterProperties.INSTANCE; String clientIP = clusterProperties.getCluster().getMasterNode().getClientIp(); @@ -2995,7 +2998,7 @@ asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataset=" + toDatasetName + ","); asterixdbParameterBuilder.append("pregelix.asterixdb.output.cleanup=false,"); - // construct command + //construct command List<String> cmds = new ArrayList<String>(); cmds.add("bin/pregelix"); cmds.add(pregelixStmt.getParameters().get(0)); // jar @@ -3008,7 +3011,7 @@ String outputConverterClassValue = "=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter,"; boolean custPropAdded = false; boolean meetCustProp = false; - // User parameters. + //User parameters. for (String s : pregelixStmt.getParameters().get(2).split(" ")) { if (meetCustProp) { if (!s.contains(inputConverterClassKey)) { @@ -3030,10 +3033,10 @@ if (!custPropAdded) { cmds.add(customizedPregelixProperty); - // Appends default converter classes to asterixdbParameterBuilder. + //Appends default converter classes to asterixdbParameterBuilder. asterixdbParameterBuilder.append(inputConverterClassKey + inputConverterClassValue); asterixdbParameterBuilder.append(outputConverterClassKey + outputConverterClassValue); - // Remove the last comma. + //Remove the last comma. asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1, asterixdbParameterBuilder.length()); cmds.add(asterixdbParameterBuilder.toString()); diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java index 976ca70..8d020e7 100644 --- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java +++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java @@ -27,7 +27,6 @@ import org.apache.asterix.common.config.AsterixTransactionProperties; import org.apache.asterix.test.aql.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; -import org.apache.asterix.testframework.xml.TestGroup; import org.apache.commons.lang3.StringUtils; import org.junit.AfterClass; import org.junit.BeforeClass; diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql new file mode 100644 index 0000000..70322cb --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.1.ddl.aql @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Drop a dataverse with disconnected feed + * Expected Res : Success + * Date : 24th Feb 2016 + */ +drop dataverse experiments if exists; +create dataverse experiments; +use dataverse experiments; + +create type TwitterUserType as closed { + screen-name: string, + lang: string, + friends_count: int32, + statuses_count: int32, + name: string, + followers_count: int32 +} + +create type TweetMessageType as closed { + tweetid: int64, + user: TwitterUserType, + sender-location: point, + send-time: datetime, + referred-topics: {{ string }}, + message-text: string +} + +create dataset Tweets(TweetMessageType) primary key tweetid; + +create feed TweetFeed using socket_adapter +( + ("sockets"="127.0.0.1:10001"), + ("address-type"="IP"), + ("type-name"="TweetMessageType"), + ("format"="adm") +); \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql new file mode 100644 index 0000000..4d2f9c4 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Drop a dataverse with disconnected feed + * Expected Res : Success + * Date : 24th Feb 2016 + */ + +use dataverse experiments; +set wait-for-completion-feed "false"; + +connect feed TweetFeed to dataset Tweets; diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql new file mode 100644 index 0000000..e70df33 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Drop a dataverse with disconnected feed + * Expected Res : Success + * Date : 24th Feb 2016 + */ +3000 \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql new file mode 100644 index 0000000..34d6285 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Drop a dataverse with disconnected feed + * Expected Res : Success + * Date : 24th Feb 2016 + */ + +use dataverse experiments; +disconnect feed TweetFeed from dataset Tweets; \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql new file mode 100644 index 0000000..5684b1c --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Drop a dataverse with disconnected feed + * Expected Res : Success + * Date : 24th Feb 2016 + */ + +use dataverse experiments; +drop dataverse experiments; \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql new file mode 100644 index 0000000..547085f --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.1.ddl.aql @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ +drop dataverse experiments if exists; +create dataverse experiments; +use dataverse experiments; + +create type TwitterUserType as closed { + screen-name: string, + lang: string, + friends_count: int32, + statuses_count: int32, + name: string, + followers_count: int32 +} + +create type TweetMessageType as closed { + tweetid: string, + tweetid-copy:string, + user: TwitterUserType, + sender-location: point, + send-time: datetime, + send-time-copy:datetime, + referred-topics: {{ string }}, + message-text: string +} + +create dataset Tweets(TweetMessageType) primary key tweetid; + +create feed TweetFeed using socket_adapter +( + ("sockets"="127.0.0.1:10001"), + ("address-type"="IP"), + ("type-name"="TweetMessageType"), + ("format"="adm") +); \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql new file mode 100644 index 0000000..3d7fdbf --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.2.update.aql @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ + +use dataverse experiments; +set wait-for-completion-feed "false"; + +connect feed TweetFeed to dataset Tweets; diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql new file mode 100644 index 0000000..eb18795 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ +3000 \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql new file mode 100644 index 0000000..578d458 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ +start client 10001 file-client localhost data/twitter/tw_messages.adm 500 50 1000 \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql new file mode 100644 index 0000000..18bbbbc --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ +10000 \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql new file mode 100644 index 0000000..0862bae --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ + +use dataverse experiments; +disconnect feed TweetFeed from dataset Tweets; \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql new file mode 100644 index 0000000..fd8926b --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ + +use dataverse experiments; + +for $x in dataset Tweets +order by $x.tweetid +return $x; \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql new file mode 100644 index 0000000..6753868 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ + +stop 10001 \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql new file mode 100644 index 0000000..1295b97 --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Create a socket feed with a client that pushes + * 10 records. The feed is connected to a dataset that is then + * queried for the data. + * Expected Res : Success + * Date : 24th Feb 2016 + */ + +use dataverse experiments; +drop dataverse experiments; \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm new file mode 100644 index 0000000..7047dbc --- /dev/null +++ b/asterix-app/src/test/resources/runtimets/results/feeds/feed-push-socket/feed-push-socket.1.adm @@ -0,0 +1,10 @@ +{ "tweetid": "1", "tweetid-copy": "1", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("42.13,80.43"), "send-time": datetime("2005-12-05T21:06:41.000Z"), "send-time-copy": datetime("2005-12-05T21:06:41.000Z"), "referred-topics": {{ "samsung", "plan" }}, "message-text": " love samsung the plan is amazing" } +{ "tweetid": "10", "tweetid-copy": "10", "user": { "screen-name": "Rolldstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstful", "followers_count": 3311368i32 }, "sender-location": point("46.94,93.98"), "send-time": datetime("2011-04-07T14:08:46.000Z"), "send-time-copy": datetime("2011-04-07T14:08:46.000Z"), "referred-topics": {{ "t-mobile", "signal" }}, "message-text": " like t-mobile the signal is good" } +{ "tweetid": "2", "tweetid-copy": "2", "user": { "screen-name": "RollandEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "David Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("28.86,70.44"), "send-time": datetime("2007-08-15T06:44:17.000Z"), "send-time-copy": datetime("2007-08-15T06:44:17.000Z"), "referred-topics": {{ "sprint", "voice-clarity" }}, "message-text": " like sprint its voice-clarity is mind-blowing" } +{ "tweetid": "3", "tweetid-copy": "3", "user": { "screen-name": "RollandEckhard#500", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Hetfield", "followers_count": 3311368i32 }, "sender-location": point("39.84,86.48"), "send-time": datetime("2008-12-24T00:07:04.000Z"), "send-time-copy": datetime("2008-12-24T00:07:04.000Z"), "referred-topics": {{ "verizon", "voice-command" }}, "message-text": " can't stand verizon its voice-command is terrible:(" } +{ "tweetid": "4", "tweetid-copy": "4", "user": { "screen-name": "RollandEckhardstein#221", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardstinz", "followers_count": 3311368i32 }, "sender-location": point("27.67,87.32"), "send-time": datetime("2007-02-05T16:39:13.000Z"), "send-time-copy": datetime("2007-02-05T16:39:13.000Z"), "referred-topics": {{ "t-mobile", "customer-service" }}, "message-text": " love t-mobile its customer-service is mind-blowing" } +{ "tweetid": "5", "tweetid-copy": "5", "user": { "screen-name": "RollandEcstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckhardst", "followers_count": 3311368i32 }, "sender-location": point("27.3,92.77"), "send-time": datetime("2010-09-12T06:15:28.000Z"), "send-time-copy": datetime("2010-09-12T06:15:28.000Z"), "referred-topics": {{ "t-mobile", "customization" }}, "message-text": " like t-mobile the customization is amazing:)" } +{ "tweetid": "6", "tweetid-copy": "6", "user": { "screen-name": "Rollkhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Kirk Hammette ", "followers_count": 3311368i32 }, "sender-location": point("45.62,84.78"), "send-time": datetime("2012-01-23T06:23:13.000Z"), "send-time-copy": datetime("2012-01-23T06:23:13.000Z"), "referred-topics": {{ "iphone", "network" }}, "message-text": " like iphone its network is awesome:)" } +{ "tweetid": "7", "tweetid-copy": "7", "user": { "screen-name": "andEckhardstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland khardstein", "followers_count": 3311368i32 }, "sender-location": point("44.12,81.46"), "send-time": datetime("2012-02-17T17:30:26.000Z"), "send-time-copy": datetime("2012-02-17T17:30:26.000Z"), "referred-topics": {{ "t-mobile", "network" }}, "message-text": " hate t-mobile the network is bad" } +{ "tweetid": "8", "tweetid-copy": "8", "user": { "screen-name": "Rolltein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Ron Eckhardstein", "followers_count": 3311368i32 }, "sender-location": point("36.86,90.71"), "send-time": datetime("2009-03-12T13:18:04.000Z"), "send-time-copy": datetime("2009-03-12T13:18:04.000Z"), "referred-topics": {{ "at&t", "touch-screen" }}, "message-text": " dislike at&t its touch-screen is OMG" } +{ "tweetid": "9", "tweetid-copy": "9", "user": { "screen-name": "Roldstein#211", "lang": "en", "friends_count": 3657079i32, "statuses_count": 268i32, "name": "Rolland Eckdstein", "followers_count": 3311368i32 }, "sender-location": point("29.07,97.05"), "send-time": datetime("2012-08-15T20:19:46.000Z"), "send-time-copy": datetime("2012-08-15T20:19:46.000Z"), "referred-topics": {{ "verizon", "speed" }}, "message-text": " hate verizon its speed is bad" } \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml index a3a1fba..96a2c37 100644 --- a/asterix-app/src/test/resources/runtimets/testsuite.xml +++ b/asterix-app/src/test/resources/runtimets/testsuite.xml @@ -36,6 +36,16 @@ </compilation-unit> </test-case> --> <test-case FilePath="feeds"> + <compilation-unit name="feed-push-socket"> + <output-dir compare="Text">feed-push-socket</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="feeds"> + <compilation-unit name="drop-dataverse-with-disconnected-feed"> + <output-dir compare="Text">drop-dataverse-with-disconnected-feed</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="feeds"> <compilation-unit name="feed-with-external-parser"> <output-dir compare="Text">feed-with-external-parser</output-dir> </compilation-unit> diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java new file mode 100644 index 0000000..765dc71 --- /dev/null +++ b/asterix-common/src/test/java/org/apache/asterix/test/client/FileFeedSocketAdapterClient.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.client; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class FileFeedSocketAdapterClient implements ITestClient { + private final int port; + private final int wait; + private final String url; + private Socket socket; + private String path; + private int batchSize; + private int maxCount; + private OutputStream out = null; + + // expected args: url, source-file-path, max-count, batch-size, wait + public FileFeedSocketAdapterClient(int port, String[] args) throws Exception { + this.port = port; + if (args.length != 5) { + throw new Exception( + "Invalid arguments for FileFeedSocketAdapterClient. Expected arguments <url> <source-file-path> <max-count> <batch-size> <wait>"); + } + this.url = args[0]; + this.path = args[1]; + this.maxCount = Integer.parseInt(args[2]); + this.batchSize = Integer.parseInt(args[3]); + this.wait = Integer.parseInt(args[4]); + } + + @Override + public void start() { + try { + socket = new Socket(url, port); + } catch (IOException e) { + System.err.println("Problem in creating socket against host " + url + " on the port " + port); + e.printStackTrace(); + } + + int recordCount = 0; + BufferedReader br = null; + try { + out = socket.getOutputStream(); + br = new BufferedReader(new FileReader(path)); + String nextRecord; + while ((nextRecord = br.readLine()) != null) { + ByteBuffer b = StandardCharsets.UTF_8.encode(nextRecord); + if (wait >= 1 && recordCount % batchSize == 0) { + Thread.sleep(wait); + } + out.write(b.array(), 0, b.limit()); + recordCount++; + if (recordCount == maxCount) { + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + @Override + public void stop() throws Exception { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + try { + if (socket != null) { + socket.close(); + } + } catch (IOException e) { + System.err.println("Problem in closing socket against host " + url + " on the port " + port); + e.printStackTrace(); + } + } +} diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java new file mode 100644 index 0000000..56d626d --- /dev/null +++ b/asterix-common/src/test/java/org/apache/asterix/test/client/ITestClient.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.client; + +public interface ITestClient { + + public void start() throws Exception; + + public void stop() throws Exception; + +} diff --git a/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java new file mode 100644 index 0000000..d26351b --- /dev/null +++ b/asterix-common/src/test/java/org/apache/asterix/test/client/TestClientProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.client; + +import java.util.Arrays; + +public class TestClientProvider { + + public static ITestClient createTestClient(String[] args, int port) throws Exception { + if (args.length < 1) { + throw new Exception("Unspecified test client"); + } + String clientName = args[0]; + String[] clientArgs = Arrays.copyOfRange(args, 1, args.length); + switch (clientName) { + case "file-client": + return new FileFeedSocketAdapterClient(port, clientArgs); + default: + throw new Exception("Unknown test client: " + clientName); + } + } + +} diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java index f40cce4..ba32af2 100644 --- a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java +++ b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java @@ -26,10 +26,10 @@ import java.net.Socket; public class FileTestServer implements ITestServer { - private String[] paths; - private final int port; - private ServerSocket serverSocket; - private Thread listenerThread; + protected String[] paths; + protected final int port; + protected ServerSocket serverSocket; + protected Thread listenerThread; public FileTestServer(int port) { this.port = port; diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java index 18a4969..b3b1183 100644 --- a/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java +++ b/asterix-common/src/test/java/org/apache/asterix/test/server/ITestServer.java @@ -20,7 +20,7 @@ public interface ITestServer { - public void configure(String[] args); + public void configure(String[] args) throws Exception; public void start() throws Exception; diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java new file mode 100644 index 0000000..1c2cef6 --- /dev/null +++ b/asterix-common/src/test/java/org/apache/asterix/test/server/OpenSocketFileTestServer.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.server; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; + +public class OpenSocketFileTestServer extends FileTestServer { + + private boolean closed; + + public OpenSocketFileTestServer(int port) { + super(port); + } + + @Override + public void start() throws IOException { + serverSocket = new ServerSocket(port); + listenerThread = new Thread(new Runnable() { + @Override + public void run() { + while (!serverSocket.isClosed()) { + try { + Socket socket = serverSocket.accept(); + new Thread(new SocketThread(socket)).start(); + } catch (IOException e) { + e.printStackTrace(); + // Do nothing. This means the socket was closed for some reason. + // There is nothing to do here except try to close the socket and see if the + // server is still listening! + // This also could be due to the close() call + } + } + } + }); + listenerThread.start(); + } + + private class SocketThread implements Runnable { + private Socket socket; + private OutputStream os; + + public SocketThread(Socket socket) { + this.socket = socket; + } + + @Override + public void run() { + try { + os = socket.getOutputStream(); + byte[] chunk = new byte[1024]; + for (String path : paths) { + try (FileInputStream fin = new FileInputStream(new File(path))) { + int read = fin.read(chunk); + while (read > 0) { + os.write(chunk, 0, read); + read = fin.read(chunk); + } + } + } + } catch (Throwable th) { + th.printStackTrace(); + // There are two possibilities here: + // 1. The socket was closed from the other end. + // 2. Server.close() was called. + } finally { + synchronized (serverSocket) { + if (!closed) { + try { + serverSocket.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + try { + os.close(); + } catch (Throwable th) { + th.printStackTrace(); + } + try { + socket.close(); + } catch (Throwable th) { + th.printStackTrace(); + } + } + } + } + + @Override + public void stop() throws IOException, InterruptedException { + synchronized (serverSocket) { + closed = true; + try { + serverSocket.close(); + if (listenerThread.isAlive()) { + listenerThread.join(); + } + } finally { + serverSocket.notifyAll(); + } + } + } +} diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java new file mode 100644 index 0000000..3312d1b --- /dev/null +++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestClientServer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.server; + +import org.apache.asterix.test.client.ITestClient; +import org.apache.asterix.test.client.TestClientProvider; + +public class TestClientServer implements ITestServer { + + // port of the server to connect to + private final int port; + private ITestClient client; + + public TestClientServer(int port) { + this.port = port; + } + + @Override + public void configure(String[] args) throws Exception { + client = TestClientProvider.createTestClient(args, port); + } + + @Override + public void start() throws Exception { + client.start(); + } + + @Override + public void stop() throws Exception { + client.stop(); + } + +} diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java index 60c1c11..0bdb74e 100644 --- a/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java +++ b/asterix-common/src/test/java/org/apache/asterix/test/server/TestServerProvider.java @@ -26,6 +26,10 @@ return new FileTestServer(port); case "rss": return new RSSTestServer(port); + case "open-socket-file": + return new OpenSocketFileTestServer(port); + case "client": + return new TestClientServer(port); default: throw new Exception("Unknown test server"); } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index d5b1c6e..851acd4 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -38,7 +38,7 @@ import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.dataflow.std.file.FileSplit; @@ -66,7 +66,7 @@ } @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception { return dataSourceFactory.getPartitionConstraint(); } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java index 17916e5..3965e5e 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java @@ -22,7 +22,7 @@ import java.util.Map; import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; /** @@ -50,7 +50,7 @@ * In the former case, the IP address is translated to a node controller id * running on the node with the given IP address. */ - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception; + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception; /** * Creates an instance of IDatasourceAdapter. diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java index 370ea93..1487cf1 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java @@ -19,9 +19,12 @@ package org.apache.asterix.external.api; import java.io.Serializable; +import java.util.ArrayList; import java.util.Map; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.asterix.om.util.AsterixAppContextInfo; +import org.apache.asterix.om.util.AsterixClusterProperties; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; public interface IExternalDataSourceFactory extends Serializable { @@ -45,7 +48,7 @@ * @return * @throws Exception */ - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception; + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception; /** * Configure the data parser factory. The passed map contains key value pairs from the @@ -63,4 +66,32 @@ return false; } + public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints( + AlgebricksAbsolutePartitionConstraint constraints, int count) { + if (constraints == null) { + ArrayList<String> locs = new ArrayList<String>(); + Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores(); + int i = 0; + while (i < count) { + for (String node : stores.keySet()) { + int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node); + for (int k = 0; k < numIODevices; k++) { + locs.add(node); + i++; + if (i == count) { + break; + } + } + if (i == count) { + break; + } + } + } + String[] cluster = new String[locs.size()]; + cluster = locs.toArray(cluster); + constraints = new AlgebricksAbsolutePartitionConstraint(cluster); + } + return constraints; + } + } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java index adb2602..fdc54d6 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java @@ -25,5 +25,4 @@ public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception; public Class<? extends T> getRecordClass(); - } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index 5b3828d..6e3ead2 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.hdfs.dataflow.ConfFactory; @@ -51,7 +50,7 @@ implements IInputStreamProviderFactory, IRecordReaderFactory<Object>, IIndexibleExternalDataSource { protected static final long serialVersionUID = 1L; - protected transient AlgebricksPartitionConstraint clusterLocations; + protected transient AlgebricksAbsolutePartitionConstraint clusterLocations; protected String[] readSchedule; protected boolean read[]; protected InputSplitsFactory inputSplitsFactory; @@ -76,7 +75,7 @@ JobConf conf = HDFSUtils.configureHDFSJobConf(configuration); confFactory = new ConfFactory(conf); clusterLocations = getPartitionConstraint(); - int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length; + int numPartitions = clusterLocations.getLocations().length; // if files list was set, we restrict the splits to the list InputSplit[] inputSplits; if (files == null) { @@ -99,7 +98,8 @@ } } - // Used to tell the factory to restrict the splits to the intersection between this list and the actual files on hdfs side + // Used to tell the factory to restrict the splits to the intersection between this list and the + // actual files on hdfs side @Override public void setSnapshot(List<ExternalFile> files, boolean indexingOp) { this.files = files; @@ -108,7 +108,8 @@ /* * The method below was modified to take care of the following - * 1. when target files are not null, it generates a file aware input stream that validate against the files + * 1. when target files are not null, it generates a file aware input stream that validate + * against the files * 2. if the data is binary, it returns a generic reader */ @Override @@ -135,7 +136,7 @@ * @return */ @Override - public AlgebricksPartitionConstraint getPartitionConstraint() { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations); return clusterLocations; } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java index b9b6f65..b715a26 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java @@ -29,7 +29,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.om.util.AsterixClusterProperties; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; import com.couchbase.client.core.CouchbaseCore; @@ -71,7 +71,7 @@ } @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception { return AsterixClusterProperties.INSTANCE.getClusterLocations(); } @@ -100,7 +100,8 @@ } /* - * We distribute the work of streaming vbuckets between all the partitions in a round robin fashion. + * We distribute the work of streaming vbuckets between all the partitions in a round robin + * fashion. */ private void schedule() { schedule = new int[numOfVBuckets]; diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java index c302b9b..22488f7 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java @@ -28,14 +28,14 @@ import org.apache.asterix.external.util.HDFSUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.hdfs.dataflow.ConfFactory; public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> { protected static final long serialVersionUID = 1L; - protected transient AlgebricksPartitionConstraint clusterLocations; + protected transient AlgebricksAbsolutePartitionConstraint clusterLocations; protected ConfFactory confFactory; protected Map<String, String> configuration; @@ -48,7 +48,7 @@ } @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception { clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations); return clusterLocations; } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java index bbe485c..beceea8 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java @@ -22,11 +22,11 @@ import java.util.List; import java.util.Map; +import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; import com.sun.syndication.feed.synd.SyndEntryImpl; @@ -36,6 +36,7 @@ private static final long serialVersionUID = 1L; private Map<String, String> configuration; private List<String> urls = new ArrayList<String>(); + private transient AlgebricksAbsolutePartitionConstraint clusterLocations; @Override public DataSourceType getDataSourceType() { @@ -43,8 +44,10 @@ } @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { - return new AlgebricksCountPartitionConstraint(urls.size()); + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception { + int count = urls.size(); + clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, count); + return clusterLocations; } @Override diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java index f02bd93..d02de03 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java @@ -29,7 +29,7 @@ import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; public abstract class AbstractStreamRecordReaderFactory<T> @@ -51,7 +51,7 @@ } @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception { return inputStreamFactory.getPartitionConstraint(); } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java index 9b2d095..f41486e 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java @@ -77,7 +77,7 @@ inString = false; depth = 0; do { - int startPosn = bufferPosn; //starting from where we left off the last time + int startPosn = bufferPosn; // starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; bufferLength = reader.read(inputBuffer); @@ -87,7 +87,7 @@ } } if (!hasStarted) { - for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin + for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin if (inputBuffer[bufferPosn] == recordStart) { startPosn = bufferPosn; hasStarted = true; @@ -108,7 +108,7 @@ } } if (hasStarted) { - for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin + for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin if (inString) { // we are in a string, we only care about the string end if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) { diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index f38c2cb..a2a4742 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -23,6 +23,7 @@ import java.util.logging.Logger; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.util.ExternalDataConstants; @@ -30,8 +31,7 @@ import org.apache.asterix.external.util.TwitterUtil; import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants; import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; import twitter4j.Status; @@ -46,6 +46,7 @@ private Map<String, String> configuration; private boolean pull; + private transient AlgebricksAbsolutePartitionConstraint clusterLocations; @Override public DataSourceType getDataSourceType() { @@ -53,8 +54,9 @@ } @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { - return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY); + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception { + clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY); + return clusterLocations; } @Override diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java index e780c95..89008aa 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java @@ -20,16 +20,28 @@ import java.io.IOException; import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; public class AInputStreamReader extends InputStreamReader { private AInputStream in; + private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE]; + private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + private CharBuffer charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE); + private CharsetDecoder decoder; + private boolean done = false; public AInputStreamReader(AInputStream in) { super(in); this.in = in; + this.decoder = StandardCharsets.UTF_8.newDecoder(); + this.byteBuffer.flip(); } public boolean skipError() throws Exception { @@ -51,4 +63,33 @@ public void setFeedLogManager(FeedLogManager feedLogManager) { in.setFeedLogManager(feedLogManager); } + + @Override + public int read(char cbuf[]) throws IOException { + return read(cbuf, 0, cbuf.length); + } + + @Override + public int read(char cbuf[], int offset, int length) throws IOException { + if (done) { + return -1; + } + charBuffer.clear(); + if (byteBuffer.hasRemaining()) { + decoder.decode(byteBuffer, charBuffer, false); + System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position()); + return charBuffer.position(); + } + int len = in.read(bytes, 0, bytes.length); + if (len == -1) { + done = true; + return len; + } + byteBuffer.clear(); + byteBuffer.position(len); + byteBuffer.flip(); + decoder.decode(byteBuffer, charBuffer, false); + System.arraycopy(charBuffer.array(), 0, cbuf, offset, charBuffer.position()); + return charBuffer.position(); + } } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java index 1e86f39..cf8d339 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java @@ -25,7 +25,9 @@ import java.util.Map; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; +import org.apache.asterix.external.util.ExternalDataExceptionUtils; import org.apache.asterix.external.util.FeedLogManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class SocketInputStream extends AInputStream { private ServerSocket server; @@ -34,8 +36,13 @@ public SocketInputStream(ServerSocket server) throws IOException { this.server = server; - socket = server.accept(); - connectionStream = socket.getInputStream(); + socket = new Socket(); + connectionStream = new InputStream() { + @Override + public int read() throws IOException { + return -1; + } + }; } @Override @@ -56,20 +63,31 @@ @Override public int read(byte b[]) throws IOException { - int read = connectionStream.read(b, 0, b.length); - while (read < 0) { - accept(); - read = connectionStream.read(b, 0, b.length); - } - return read; + return read(b, 0, b.length); } @Override public int read(byte b[], int off, int len) throws IOException { - int read = connectionStream.read(b, off, len); - while (read < 0) { - accept(); + if (server == null) { + return -1; + } + int read = -1; + try { read = connectionStream.read(b, off, len); + } catch (IOException e) { + e.printStackTrace(); + read = -1; + } + while (read < 0) { + if (!accept()) { + return -1; + } + try { + read = connectionStream.read(b, off, len); + } catch (IOException e) { + e.printStackTrace(); + read = -1; + } } return read; } @@ -85,22 +103,57 @@ } @Override - public void close() throws IOException { - connectionStream.close(); - socket.close(); - server.close(); + public synchronized void close() throws IOException { + HyracksDataException hde = null; + try { + if (connectionStream != null) { + connectionStream.close(); + } + connectionStream = null; + } catch (IOException e) { + hde = new HyracksDataException(e); + } + try { + if (socket != null) { + socket.close(); + } + socket = null; + } catch (IOException e) { + hde = ExternalDataExceptionUtils.suppress(hde, e); + } + try { + if (server != null) { + server.close(); + } + } catch (IOException e) { + hde = ExternalDataExceptionUtils.suppress(hde, e); + } finally { + server = null; + } + if (hde != null) { + throw hde; + } } - private void accept() throws IOException { - connectionStream.close(); - socket.close(); - socket = server.accept(); - connectionStream = socket.getInputStream(); + private boolean accept() throws IOException { + try { + connectionStream.close(); + connectionStream = null; + socket.close(); + socket = null; + socket = server.accept(); + connectionStream = socket.getInputStream(); + return true; + } catch (Exception e) { + close(); + return false; + } } @Override public boolean stop() throws Exception { - return false; + close(); + return true; } @Override diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java index 3f70ce1..5c1583e 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java @@ -34,7 +34,6 @@ import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.external.util.NodeResolverFactory; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.dataflow.std.file.FileSplit; @@ -48,7 +47,8 @@ protected static INodeResolver nodeResolver; protected Map<String, String> configuration; protected FileSplit[] inputFileSplits; - protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage + protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log + // storage protected boolean isFeed; protected String expression; // transient fields (They don't need to be serialized and transferred) @@ -84,7 +84,7 @@ } @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception { return constraints; } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java index ea60f43..6fdc42d 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java @@ -35,7 +35,6 @@ import org.apache.asterix.om.util.AsterixRuntimeUtil; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -106,7 +105,7 @@ } @Override - public AlgebricksPartitionConstraint getPartitionConstraint() { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { List<String> locations = new ArrayList<String>(); for (Pair<String, Integer> socket : sockets) { locations.add(socket.first); diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java index 484626a..95378cb 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java @@ -27,7 +27,6 @@ import org.apache.asterix.external.input.stream.provider.TwitterFirehoseInputStreamProvider; import org.apache.asterix.om.util.AsterixClusterProperties; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.api.context.IHyracksTaskContext; /** @@ -54,7 +53,7 @@ private Map<String, String> configuration; @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception { String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY); String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS); String[] locations = null; diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java index e39b507..cd4a3c1 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java @@ -58,7 +58,6 @@ @Override public AInputStream getInputStream() throws Exception { - twitterServer.start(); return twitterServer; } diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java index 7e28c35..d0348c2 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java @@ -149,7 +149,7 @@ private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) { int waitCycleCount = 0; ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId); - while (ingestionRuntime == null && waitCycleCount < 10) { + while (ingestionRuntime == null && waitCycleCount < 1000) { try { Thread.sleep(3000); waitCycleCount++; diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java index 3cb5d64..36c11e9 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java @@ -241,7 +241,8 @@ FeedRuntimeId runtimeId = null; FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType(); if (endFeedMessage.isCompleteDisconnection()) { - // subscribableRuntimeType represents the location at which the feed connection receives data + // subscribableRuntimeType represents the location at which the feed connection receives + // data FeedRuntimeType runtimeType = null; switch (subscribableRuntimeType) { case INTAKE: @@ -257,15 +258,19 @@ runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID); CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager() .getFeedRuntime(connectionId, runtimeId); - feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime); + if (feedRuntime != null) { + feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime); + } if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId()); } } else { - // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers + // subscribaleRuntimeType represents the location for data hand-off in presence of + // subscribers switch (subscribableRuntimeType) { case INTAKE: - // illegal state as data hand-off from one feed to another does not happen at intake + // illegal state as data hand-off from one feed to another does not happen at + // intake throw new IllegalStateException("Illegal State, invalid runtime type " + subscribableRuntimeType); case COMPUTE: // feed could be primary or secondary, doesn't matter diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index c128545..50d8ac0 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -58,13 +58,12 @@ public static FileSplit[] splitsForAdapter(String dataverseName, String feedName, AlgebricksPartitionConstraint partitionConstraints) throws Exception { - File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName)); - String[] locations = null; if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) { throw new AlgebricksException("Can't create file splits for adapter with count partitioning constraints"); - } else { - locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations(); } + File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName)); + String[] locations = null; + locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations(); List<FileSplit> splits = new ArrayList<FileSplit>(); String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName(); int i = 0; diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java index 7ac0428..9a72135 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java @@ -42,7 +42,6 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.api.context.ICCContext; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.hdfs.scheduler.Scheduler; @@ -199,8 +198,8 @@ return conf; } - public static AlgebricksPartitionConstraint getPartitionConstraints( - AlgebricksPartitionConstraint clusterLocations) { + public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints( + AlgebricksAbsolutePartitionConstraint clusterLocations) { if (clusterLocations == null) { ArrayList<String> locs = new ArrayList<String>(); Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores(); diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java index e34a09b..6b11d21 100644 --- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java +++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java @@ -23,14 +23,14 @@ import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.ITupleForwarder; import org.apache.asterix.external.parser.ADMDataParser; import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -48,14 +48,17 @@ private Map<String, String> configuration; + private transient AlgebricksAbsolutePartitionConstraint clusterLocations; + @Override public String getAlias() { return "test_typed"; } @Override - public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception { - return new AlgebricksCountPartitionConstraint(1); + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception { + clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1); + return clusterLocations; } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/660 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I8f6e982440d3577343f2479c3779653a9c3db614 Gerrit-PatchSet: 11 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Ildar Absalyamov <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
