[CARBONDATA-2435] Remove SDK dependency on spark jars. [CARBONDATA-2435] Remove SDK dependency on spark jars. Problem and cause : when sdk writer is used in standalone cluster without spark jars, exception is thrown during reverse dictionary cache initialize time.
Solution: carbon SDK doesn't support dictionary encoding, This spark dependency is only for dictionary encoding. Move the spark dependency code inside dictionary encoding if block. So that SDK flow will not have to access spark class. This closes #2289 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ff5166ef Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ff5166ef Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ff5166ef Branch: refs/heads/spark-2.3 Commit: ff5166ef78c42ca0819d3d9fa439aa56d32953eb Parents: b2060c6 Author: ajantha-bhat <ajanthab...@gmail.com> Authored: Wed May 9 18:07:56 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu May 10 16:23:12 2018 +0530 ---------------------------------------------------------------------- .../processing/datatypes/PrimitiveDataType.java | 12 ++++++---- .../impl/DictionaryFieldConverterImpl.java | 12 ++++++---- .../converter/impl/FieldEncoderFactory.java | 25 +++++++------------- .../converter/impl/RowConverterImpl.java | 13 ++-------- .../InputProcessorStepWithNoConverterImpl.java | 2 +- 5 files changed, 28 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/ff5166ef/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index dee8968..e34c184 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -28,6 +28,8 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -135,7 +137,6 @@ public class PrimitiveDataType implements GenericDataType<Object> { * @param parentname * @param columnId * @param carbonDimension - * @param cache * @param absoluteTableIdentifier * @param client * @param useOnePass @@ -144,9 +145,9 @@ public class PrimitiveDataType implements GenericDataType<Object> { * @param isEmptyBadRecords */ public PrimitiveDataType(CarbonColumn carbonColumn, String parentname, String columnId, - CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, - AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass, - Map<Object, Integer> localCache, String nullFormat, Boolean isEmptyBadRecords) { + CarbonDimension carbonDimension, AbsoluteTableIdentifier absoluteTableIdentifier, + DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache, + String nullFormat, Boolean isEmptyBadRecords) { this.name = carbonColumn.getColName(); this.parentname = parentname; this.columnId = columnId; @@ -163,6 +164,9 @@ public class PrimitiveDataType implements GenericDataType<Object> { dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory .getDirectDictionaryGenerator(carbonDimension.getDataType())); } else if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) { + CacheProvider cacheProvider = CacheProvider.getInstance(); + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache = + cacheProvider.createCache(CacheType.REVERSE_DICTIONARY); Dictionary dictionary = null; if (useOnePass) { if (CarbonUtil.isFileExistsForGivenColumn(identifier)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/ff5166ef/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java index 1fb4086..0757f8a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java @@ -24,6 +24,8 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -64,16 +66,18 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert private boolean isEmptyBadRecord; public DictionaryFieldConverterImpl(DataField dataField, - Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, AbsoluteTableIdentifier absoluteTableIdentifier, String nullFormat, int index, - DictionaryClient client, boolean useOnePass, - Map<Object, Integer> localCache, boolean isEmptyBadRecord, - DictionaryColumnUniqueIdentifier identifier) throws IOException { + DictionaryClient client, boolean useOnePass, Map<Object, Integer> localCache, + boolean isEmptyBadRecord, DictionaryColumnUniqueIdentifier identifier) throws IOException { this.index = index; this.carbonDimension = (CarbonDimension) dataField.getColumn(); this.nullFormat = nullFormat; this.isEmptyBadRecord = isEmptyBadRecord; + CacheProvider cacheProvider = CacheProvider.getInstance(); + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache = + cacheProvider.createCache(CacheType.REVERSE_DICTIONARY); + // if use one pass, use DictionaryServerClientDictionary if (useOnePass) { if (CarbonUtil.isFileExistsForGivenColumn(identifier)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/ff5166ef/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java index dd28dc6..c6cea65 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java @@ -20,8 +20,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import org.apache.carbondata.core.cache.Cache; -import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.dictionary.client.DictionaryClient; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -61,18 +59,15 @@ public class FieldEncoderFactory { * Creates the FieldConverter for all dimensions, for measures return null. * * @param dataField column schema - * @param cache dicionary cache. * @param absoluteTableIdentifier table identifier * @param index index of column in the row. * @param isEmptyBadRecord * @return */ public FieldConverter createFieldEncoder(DataField dataField, - Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, AbsoluteTableIdentifier absoluteTableIdentifier, int index, String nullFormat, - DictionaryClient client, Boolean useOnePass, - Map<Object, Integer> localCache, boolean isEmptyBadRecord) - throws IOException { + DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache, + boolean isEmptyBadRecord) throws IOException { // Converters are only needed for dimensions and measures it return null. if (dataField.getColumn().isDimension()) { if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) && @@ -88,7 +83,7 @@ public class FieldEncoderFactory { || dataField.getColumn().getColumnSchema().getParentColumnTableRelations().isEmpty()) { identifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType()); - return new DictionaryFieldConverterImpl(dataField, cache, absoluteTableIdentifier, + return new DictionaryFieldConverterImpl(dataField, absoluteTableIdentifier, nullFormat, index, client, useOnePass, localCache, isEmptyBadRecord, identifier); } else { @@ -110,12 +105,12 @@ public class FieldEncoderFactory { parentTableIdentifier); identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier, parentColumnIdentifier, dataField.getColumn().getDataType()); - return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier, + return new DictionaryFieldConverterImpl(dataField, parentAbsoluteTableIdentifier, nullFormat, index, null, false, null, isEmptyBadRecord, identifier); } } else if (dataField.getColumn().isComplex()) { return new ComplexFieldConverterImpl( - createComplexDataType(dataField, cache, absoluteTableIdentifier, + createComplexDataType(dataField, absoluteTableIdentifier, client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index); } else { return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); @@ -129,10 +124,9 @@ public class FieldEncoderFactory { * Create parser for the carbon column. */ public static GenericDataType createComplexDataType(DataField dataField, - Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache, int index, String nullFormat, Boolean isEmptyBadRecords) { - return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache, + return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), absoluteTableIdentifier, client, useOnePass, localCache, index, nullFormat, isEmptyBadRecords); } @@ -144,7 +138,6 @@ public class FieldEncoderFactory { */ private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName, - Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache, int index, String nullFormat, Boolean isEmptyBadRecords) { DataType dataType = carbonColumn.getDataType(); @@ -156,7 +149,7 @@ public class FieldEncoderFactory { new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId()); for (CarbonDimension dimension : listOfChildDimensions) { arrayDataType.addChildren( - createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier, + createComplexType(dimension, carbonColumn.getColName(), absoluteTableIdentifier, client, useOnePass, localCache, index, nullFormat, isEmptyBadRecords)); } return arrayDataType; @@ -168,7 +161,7 @@ public class FieldEncoderFactory { new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId()); for (CarbonDimension dimension : dimensions) { structDataType.addChildren( - createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier, + createComplexType(dimension, carbonColumn.getColName(), absoluteTableIdentifier, client, useOnePass, localCache, index, nullFormat, isEmptyBadRecords)); } return structDataType; @@ -176,7 +169,7 @@ public class FieldEncoderFactory { throw new UnsupportedOperationException("Complex type Map is not supported yet"); } else { return new PrimitiveDataType(carbonColumn, parentName, carbonColumn.getColumnId(), - (CarbonDimension) carbonColumn, cache, absoluteTableIdentifier, client, useOnePass, + (CarbonDimension) carbonColumn, absoluteTableIdentifier, client, useOnePass, localCache, nullFormat, isEmptyBadRecords); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/ff5166ef/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java index 6e5c2e0..8f0557a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java @@ -29,11 +29,6 @@ import java.util.concurrent.Future; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.cache.Cache; -import org.apache.carbondata.core.cache.CacheProvider; -import org.apache.carbondata.core.cache.CacheType; -import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.dictionary.client.DictionaryClient; import org.apache.carbondata.core.dictionary.service.DictionaryOnePassService; @@ -72,8 +67,6 @@ public class RowConverterImpl implements RowConverter { private ExecutorService executorService; - private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache; - private Map<Object, Integer>[] localCaches; public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration, @@ -85,8 +78,6 @@ public class RowConverterImpl implements RowConverter { @Override public void initialize() throws IOException { - CacheProvider cacheProvider = CacheProvider.getInstance(); - cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY); String nullFormat = configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT) .toString(); @@ -102,7 +93,7 @@ public class RowConverterImpl implements RowConverter { for (int i = 0; i < fields.length; i++) { localCaches[i] = new ConcurrentHashMap<>(); FieldConverter fieldConverter = FieldEncoderFactory.getInstance() - .createFieldEncoder(fields[i], cache, configuration.getTableIdentifier(), i, nullFormat, + .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat, client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord); fieldConverterList.add(fieldConverter); } @@ -221,7 +212,7 @@ public class RowConverterImpl implements RowConverter { FieldConverter fieldConverter = null; try { fieldConverter = FieldEncoderFactory.getInstance() - .createFieldEncoder(fields[i], cache, configuration.getTableIdentifier(), i, nullFormat, + .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat, client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/carbondata/blob/ff5166ef/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java index 8c24b7f..77f5260 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java @@ -107,7 +107,7 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce // create a ComplexDataType dataFieldsWithComplexDataType.put(srcDataField[i].getColumn().getOrdinal(), fieldConverterFactory - .createComplexDataType(srcDataField[i], null, configuration.getTableIdentifier(), + .createComplexDataType(srcDataField[i], configuration.getTableIdentifier(), null, false, null, i, nullFormat, isEmptyBadRecord)); } }