abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1149
Change subject: PLEASE EDIT to provide a meaningful commit message! ...................................................................... PLEASE EDIT to provide a meaningful commit message! The following commits from your working branch will be included: commit 7f7a592ab3f5decaee50326704fbfd082c9a7267 Author: Abdullah Alamoudi <bamou...@gmail.com> Date: Sun Sep 4 01:05:41 2016 +0300 Small Cleanup Change-Id: I7fdc43c6aed99cab7aedb1d900c0b8725abd7892 --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java 14 files changed, 143 insertions(+), 60 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/49/1149/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java index c8a9566..5df687a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java @@ -38,7 +38,6 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; -import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider; import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.indexing.ExternalFile; @@ -71,6 +70,7 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.util.NonTaggedFormatUtil; import org.apache.asterix.runtime.util.AsterixAppContextInfo; +import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadata; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; @@ -90,6 +90,7 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; @@ -255,13 +256,15 @@ * @param files * @param indexerDesc * @return - * @throws AsterixException + * @throws AlgebricksException + * @throws HyracksDataException * @throws Exception */ private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDataIndexingOperator( AqlMetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset, - List<ExternalFile> files, RecordDescriptor indexerDesc) throws AsterixException { + List<ExternalFile> files, RecordDescriptor indexerDesc) + throws HyracksDataException, AlgebricksException { ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); Map<String, String> configuration = externalDatasetDetails.getProperties(); IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory( @@ -273,7 +276,7 @@ public static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp( JobSpecification spec, AqlMetadataProvider metadataProvider, Dataset dataset, ARecordType itemType, - RecordDescriptor indexerDesc, List<ExternalFile> files) throws AsterixException { + RecordDescriptor indexerDesc, List<ExternalFile> files) throws HyracksDataException, AlgebricksException { if (files == null) { files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index c7bed8d..275825b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataFlowController; @@ -45,6 +44,7 @@ import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.std.file.FileSplit; @@ -75,7 +75,8 @@ } @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() + throws HyracksDataException, AlgebricksException { return dataSourceFactory.getPartitionConstraint(); } @@ -89,7 +90,7 @@ .getApplicationContext().getApplicationObject(); try { restoreExternalObjects(runtimeCtx.getLibraryManager()); - } catch (AsterixException e) { + } catch (Exception e) { throw new HyracksDataException(e); } if (isFeed) { @@ -107,7 +108,8 @@ } } - private void restoreExternalObjects(ILibraryManager libraryManager) throws AsterixException { + private void restoreExternalObjects(ILibraryManager libraryManager) + throws HyracksDataException, AlgebricksException { if (dataSourceFactory == null) { dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(libraryManager, configuration); // create and configure parser factory @@ -126,7 +128,8 @@ } @Override - public void configure(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException { + public void configure(ILibraryManager libraryManager, Map<String, String> configuration) + throws HyracksDataException, AlgebricksException { this.configuration = configuration; ExternalDataUtils.validateDataSourceParameters(configuration); dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(libraryManager, configuration); @@ -144,7 +147,7 @@ nullifyExternalObjects(); } - private void configureFeedLogManager() throws AsterixException { + private void configureFeedLogManager() throws HyracksDataException, AlgebricksException { this.isFeed = ExternalDataUtils.isFeed(configuration); if (isFeed) { feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration), diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java index 01fcfc2..6b69d9c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java @@ -21,7 +21,6 @@ import java.io.Serializable; import java.util.Map; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.ILookupReaderFactory; import org.apache.asterix.external.api.ILookupRecordReader; @@ -34,6 +33,7 @@ import org.apache.asterix.external.provider.LookupReaderFactoryProvider; import org.apache.asterix.external.provider.ParserFactoryProvider; import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; @@ -77,7 +77,8 @@ } } - public void configure(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException { + public void configure(ILibraryManager libraryManager, Map<String, String> configuration) + throws HyracksDataException, AlgebricksException { this.configuration = configuration; readerFactory = LookupReaderFactoryProvider.getLookupReaderFactory(configuration); dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(libraryManager, diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java index 2d42ba9..3ea3bb1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java @@ -21,10 +21,10 @@ import java.io.Serializable; import java.util.Map; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -52,8 +52,12 @@ * constraint can be expressed as a node IP address or a node controller id. * In the former case, the IP address is translated to a node controller id * running on the node with the given IP address. + * + * @throws AlgebricksException + * @throws HyracksDataException */ - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException; + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() + throws HyracksDataException, AlgebricksException; /** * Creates an instance of IDatasourceAdapter. @@ -68,9 +72,11 @@ /** * @param libraryManager * @param configuration - * @throws Exception + * @throws AlgebricksException + * @throws HyracksDataException */ - public void configure(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException; + public void configure(ILibraryManager libraryManager, Map<String, String> configuration) + throws HyracksDataException, AlgebricksException; public void setOutputType(ARecordType outputType); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java index e4f21a6..6a237c6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java @@ -20,12 +20,16 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Iterator; import java.util.Map; +import java.util.Set; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.runtime.util.AsterixAppContextInfo; import org.apache.asterix.runtime.util.AsterixClusterProperties; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IExternalDataSourceFactory extends Serializable { @@ -49,7 +53,8 @@ * @return * @throws AsterixException */ - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException; + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() + throws AlgebricksException, HyracksDataException; /** * Configure the data parser factory. The passed map contains key value pairs from the @@ -58,7 +63,7 @@ * @param configuration * @throws AsterixException */ - public void configure(Map<String, String> configuration) throws AsterixException; + public void configure(Map<String, String> configuration) throws AlgebricksException, HyracksDataException; /** * Specify whether the external data source can be indexed @@ -69,30 +74,42 @@ return false; } + /** + * returns the passed partition constraints if not null, otherwise returns round robin absolute partition + * constraints that matches the count. + * + * @param constraints + * @param count + * @return + * @throws AlgebricksException + */ public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints( - AlgebricksAbsolutePartitionConstraint constraints, int count) { + AlgebricksAbsolutePartitionConstraint constraints, int count) throws AlgebricksException { if (constraints == null) { - ArrayList<String> locs = new ArrayList<String>(); - Map<String, String[]> stores = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getStores(); + ArrayList<String> locs = new ArrayList<>(); + Set<String> stores = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getStores().keySet(); + if (stores.isEmpty()) { + throw new AlgebricksException("Configurations don't have any stores"); + } int i = 0; - while (i < count) { - for (String node : stores.keySet()) { + outer: while (i < count) { + Iterator<String> storeIt = stores.iterator(); + while (storeIt.hasNext()) { + String node = storeIt.next(); int numIODevices = AsterixClusterProperties.INSTANCE.getIODevices(node).length; for (int k = 0; k < numIODevices; k++) { locs.add(node); i++; if (i == count) { - break; + break outer; } } - if (i == count) { - break; - } + } + if (i == 0) { + throw new AlgebricksException("All stores have 0 IO devices"); } } - String[] cluster = new String[locs.size()]; - cluster = locs.toArray(cluster); - constraints = new AlgebricksAbsolutePartitionConstraint(cluster); + return new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()])); } return constraints; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java index f9eedd1..2ded3fb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java @@ -28,6 +28,7 @@ import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -45,7 +46,7 @@ } @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { int count = urls.size(); clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, count); return clusterLocations; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java index f743a3f..4649559 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java @@ -20,13 +20,13 @@ import java.util.Map; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.provider.StreamRecordReaderProvider; import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -52,12 +52,13 @@ } @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() + throws HyracksDataException, AlgebricksException { return streamFactory.getPartitionConstraint(); } @Override - public void configure(Map<String, String> configuration) throws AsterixException { + public void configure(Map<String, String> configuration) throws HyracksDataException, AlgebricksException { this.configuration = configuration; streamFactory.configure(configuration); format = StreamRecordReaderProvider.getReaderFormat(configuration); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index 172b22b..73d1b39 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -31,11 +31,11 @@ import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants; import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import twitter4j.FilterQuery; -import twitter4j.Status; public class TwitterRecordReaderFactory implements IRecordReaderFactory<String> { @@ -55,7 +55,7 @@ } @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY); return clusterLocations; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java index 1eb760e..8ab8ead 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java @@ -33,6 +33,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.http.impl.conn.SystemDefaultDnsResolver; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -44,7 +45,7 @@ private List<Pair<String, Integer>> sockets; @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, sockets.size()); return clusterLocations; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java index 4cc8e33..059de63 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.adapter.factory.GenericAdapterFactory; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; @@ -30,7 +29,9 @@ import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.util.ExternalDataCompatibilityUtils; import org.apache.asterix.om.types.ARecordType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; /** * This class represents the entry point to all things adapters @@ -39,7 +40,8 @@ // Adapters public static IAdapterFactory getAdapterFactory(ILibraryManager libraryManager, String adapterName, - Map<String, String> configuration, ARecordType itemType, ARecordType metaType) throws AsterixException { + Map<String, String> configuration, ARecordType itemType, ARecordType metaType) + throws HyracksDataException, AlgebricksException { ExternalDataCompatibilityUtils.prepare(adapterName, configuration); GenericAdapterFactory adapterFactory = new GenericAdapterFactory(); adapterFactory.setOutputType(itemType); @@ -51,7 +53,7 @@ // Indexing Adapters public static IIndexingAdapterFactory getIndexingAdapterFactory(ILibraryManager libraryManager, String adapterName, Map<String, String> configuration, ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp, - ARecordType metaType) throws AsterixException { + ARecordType metaType) throws HyracksDataException, AlgebricksException { ExternalDataCompatibilityUtils.prepare(adapterName, configuration); GenericAdapterFactory adapterFactory = new GenericAdapterFactory(); adapterFactory.setOutputType(itemType); @@ -64,7 +66,8 @@ // Lookup Adapters public static LookupAdapterFactory<?> getLookupAdapterFactory(ILibraryManager libraryManager, Map<String, String> configuration, ARecordType recordType, int[] ridFields, boolean retainInput, - boolean retainMissing, IMissingWriterFactory missingWriterFactory) throws AsterixException { + boolean retainMissing, IMissingWriterFactory missingWriterFactory) + throws HyracksDataException, AlgebricksException { LookupAdapterFactory<?> adapterFactory = new LookupAdapterFactory<>(recordType, ridFields, retainInput, retainMissing, missingWriterFactory); adapterFactory.configure(libraryManager, configuration); diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java index 87c187a..3467411 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java @@ -25,6 +25,7 @@ import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.input.record.RecordWithPK; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.context.IHyracksTaskContext; public class RecordWithPKTestReaderFactory implements IRecordReaderFactory<RecordWithPK<char[]>> { @@ -33,7 +34,7 @@ private transient AlgebricksAbsolutePartitionConstraint clusterLocations; @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1); return clusterLocations; } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java index d45097a..20cb2bc 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java @@ -24,7 +24,6 @@ import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.IAsterixPropertiesProvider; -import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataSourceAdapter; @@ -36,6 +35,7 @@ import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -62,7 +62,7 @@ } @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException { + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1); return clusterLocations; } @@ -78,9 +78,11 @@ ADMDataParser parser; ITupleForwarder forwarder; ArrayTupleBuilder tb; - IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) ((NodeControllerService) ctx - .getJobletContext().getApplicationContext().getControllerService()).getApplicationContext() - .getApplicationObject(); + IAsterixPropertiesProvider propertiesProvider = + (IAsterixPropertiesProvider) ((NodeControllerService) ctx + .getJobletContext().getApplicationContext().getControllerService()) + .getApplicationContext() + .getApplicationObject(); ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions() .get(nodeId)[0]; parser = new ADMDataParser(outputType, true); diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java index a604315..c46b0be 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java @@ -19,6 +19,8 @@ package org.apache.asterix.lang.common.util; import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import org.apache.asterix.builders.OrderedListBuilder; @@ -39,6 +41,7 @@ import org.apache.asterix.om.base.AString; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; @@ -68,7 +71,7 @@ parseLiteral((LiteralExpr) expr, serialized); break; case RECORD_CONSTRUCTOR_EXPRESSION: - parseRecord((RecordConstructor) expr, serialized, true); + parseRecord((RecordConstructor) expr, serialized, true, Collections.emptyList()); break; case LIST_CONSTRUCTOR_EXPRESSION: parseList((ListConstructor) expr, serialized); @@ -82,7 +85,8 @@ } } - public static void parseRecord(RecordConstructor recordValue, ArrayBackedValueStorage serialized, boolean tagged) + public static void parseRecord(RecordConstructor recordValue, ArrayBackedValueStorage serialized, boolean tagged, + List<Pair<String, String>> defaults) throws HyracksDataException { AMutableString fieldNameString = new AMutableString(null); ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage(); @@ -91,30 +95,48 @@ recordBuilder.reset(ARecordType.FULLY_OPEN_RECORD_TYPE); recordBuilder.init(); List<FieldBinding> fbList = recordValue.getFbList(); + HashSet<String> fieldNames = new HashSet<>(); for (FieldBinding fb : fbList) { fieldName.reset(); fieldValue.reset(); // get key - Expression keyExpr = fb.getLeftExpr(); - if (keyExpr.getKind() != Expression.Kind.LITERAL_EXPRESSION) { - throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_PARSE_ERROR, - "JSON key can only be of type %1$s", Expression.Kind.LITERAL_EXPRESSION); + fieldNameString.setValue(exprToStringLiteral(fb.getLeftExpr()).getStringValue()); + if (!fieldNames.add(fieldNameString.getStringValue())) { + throw new HyracksDataException( + "Field " + fieldNameString.getStringValue() + " was specified multiple times"); } - LiteralExpr keyLiteralExpr = (LiteralExpr) keyExpr; - Literal keyLiteral = keyLiteralExpr.getValue(); - if (keyLiteral.getLiteralType() != Literal.Type.STRING) { - throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_PARSE_ERROR, - "JSON key can only be of type %1$s", Literal.Type.STRING); - } - fieldNameString.setValue(keyLiteral.getStringValue()); stringSerde.serialize(fieldNameString, fieldName.getDataOutput()); // get value parseExpression(fb.getRightExpr(), fieldValue); recordBuilder.addField(fieldName, fieldValue); } + // defaults + for (Pair<String, String> kv : defaults) { + if (!fieldNames.contains(kv.first)) { + fieldName.reset(); + fieldValue.reset(); + stringSerde.serialize(new AString(kv.first), fieldName.getDataOutput()); + stringSerde.serialize(new AString(kv.second), fieldValue.getDataOutput()); + recordBuilder.addField(fieldName, fieldValue); + } + } recordBuilder.write(serialized.getDataOutput(), tagged); } + public static Literal exprToStringLiteral(Expression expr) throws HyracksDataException { + if (expr.getKind() != Expression.Kind.LITERAL_EXPRESSION) { + throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_PARSE_ERROR, + "Expected expression can only be of type %1$s", Expression.Kind.LITERAL_EXPRESSION); + } + LiteralExpr keyLiteralExpr = (LiteralExpr) expr; + Literal keyLiteral = keyLiteralExpr.getValue(); + if (keyLiteral.getLiteralType() != Literal.Type.STRING) { + throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_PARSE_ERROR, + "Expected Literal can only be of type %1$s", Literal.Type.STRING); + } + return keyLiteral; + } + private static void parseList(ListConstructor valueExpr, ArrayBackedValueStorage serialized) throws HyracksDataException { if (valueExpr.getType() != ListConstructor.Type.ORDERED_LIST_CONSTRUCTOR) { diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java index c69c89e..55c9d13 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; import org.apache.asterix.builders.IARecordBuilder; import org.apache.asterix.builders.RecordBuilder; @@ -39,6 +40,7 @@ import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.util.NonTaggedFormatUtil; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; @@ -210,6 +212,26 @@ confRecordBuilder.write(dataOutput, writeTypeTag); } + @SuppressWarnings("unchecked") + public static void serializeSimpleSchemalessRecord(List<Pair<String, String>> record, DataOutput dataOutput, + boolean writeTypeTag) + throws HyracksDataException { + ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(BuiltinType.ASTRING); + RecordBuilder confRecordBuilder = new RecordBuilder(); + confRecordBuilder.reset(ARecordType.FULLY_OPEN_RECORD_TYPE); + ArrayBackedValueStorage fieldNameBytes = new ArrayBackedValueStorage(); + ArrayBackedValueStorage fieldValueBytes = new ArrayBackedValueStorage(); + for (int i = 0; i < record.size(); i++) { + fieldValueBytes.reset(); + fieldNameBytes.reset(); + stringSerde.serialize(new AString(record.get(i).first), fieldNameBytes.getDataOutput()); + stringSerde.serialize(new AString(record.get(i).second), fieldValueBytes.getDataOutput()); + confRecordBuilder.addField(fieldNameBytes, fieldValueBytes); + } + confRecordBuilder.write(dataOutput, writeTypeTag); + } + private IAObject[] mergeFields(IAObject[] closedFields, IAObject[] openFields) { IAObject[] fields = new IAObject[closedFields.length + openFields.length]; int i = 0; -- To view, visit https://asterix-gerrit.ics.uci.edu/1149 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7fdc43c6aed99cab7aedb1d900c0b8725abd7892 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>