[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user dhatchayani commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r179416485 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java --- @@ -52,11 +62,23 @@ private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); - public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException { + private boolean addRowToUnsafe = true; --- End diff -- This variable is to decide whether to write row by row to unsafe or to collect all rows and then write all to unsafe. ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r179341666 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java --- @@ -186,23 +193,28 @@ public void init(DataMapModel dataMapModel) throws IOException, MemoryException } } } -if (unsafeMemoryDMStore != null) { - unsafeMemoryDMStore.finishWriting(); -} if (null != unsafeMemorySummaryDMStore) { addTaskSummaryRowToUnsafeMemoryStore( summaryRow, schemaBinary, filePath, fileName, segmentId); - unsafeMemorySummaryDMStore.finishWriting(); } LOGGER.info( "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + ( System.currentTimeMillis() - startTime)); } + @Override public void commit() throws MemoryException, IOException { --- End diff -- This is specific to BlockletDataMap so lets only keep this class not at interface level. And also I am not sure why we should separate a commit method. Is there any way to avoid this method? ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r179341377 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java --- @@ -52,11 +62,23 @@ private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); - public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException { + private boolean addRowToUnsafe = true; --- End diff -- Class name itself `UnsafeMemoryDMStore` so it does not make sense to have this variable. Please create AbstractClass and give two implementations with safe and Unsafe. ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r179340305 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java --- @@ -62,6 +63,12 @@ */ void fireEvent(Event event); + /** + * Add the dataMap to cache + * @param dataMap + */ + void addDataMapToCache(DataMap dataMap) throws IOException, MemoryException; --- End diff -- Better add these methods in another interface CacheableDataMap and implement it to BlockletDataMapFactory directly. ``` CacheableDataMap { void cache(List dataMaps) List getAllUncachedDataMaps() } ``` ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r179339971 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java --- @@ -67,4 +68,14 @@ */ void clear(); + /** + * This method will be required for dataMaps that require 2 stage of construction. + * Ideal scenario will be first stage contains all the processing logic and second + * stage includes updating to database. + * Method usage can differ based on scenario and implementation + * + * @throws MemoryException + */ + void commit() throws MemoryException, IOException; --- End diff -- I think it does not make sense to have this method in interface level. Caching should only be restricted to DataMapFactory not to the level of DataMap. ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178724959 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java --- @@ -156,15 +167,18 @@ private ExtendedBlocklet getExtendedBlocklet(List toDistributable(Segment segment) { List distributables = new ArrayList<>(); +MapindexFiles = null; try { CarbonFile[] carbonIndexFiles; if (segment.getSegmentFileName() == null) { carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles( CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo())); } else { +// TODO: Buggy code, this code will not work as we need to list only the +// physically existing files --- End diff -- Remove this TODO and handle partition case ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178724390 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java --- @@ -38,4 +38,5 @@ public BlockletDataMapDistributable(String indexFilePath) { public String getFilePath() { return filePath; } + --- End diff -- remove empty line ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178724054 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java --- @@ -32,9 +37,14 @@ /** * Store the data map row @{@link DataMapRow} data to unsafe. */ -public class UnsafeMemoryDMStore { +public class UnsafeMemoryDMStore implements Serializable { + + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; - private MemoryBlock memoryBlock; + private transient MemoryBlock memoryBlock; --- End diff -- Add reason for making it transient ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178723724 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java --- @@ -30,7 +30,12 @@ */ public class UnsafeDataMapRow extends DataMapRow { - private MemoryBlock block; + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + private transient MemoryBlock block; --- End diff -- Add reason for making it transient Reason: As it is an unsafe memory block it is not recommended to serialize ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178723563 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java --- @@ -124,10 +130,12 @@ private UnsafeMemoryDMStore unsafeMemorySummaryDMStore; - private SegmentProperties segmentProperties; + private transient SegmentProperties segmentProperties; --- End diff -- Add reason why it is made transient Reason: As it is a heavy object it is not recommended to serialize this object ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178723434 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java --- @@ -77,6 +78,11 @@ */ public class BlockletDataMap implements DataMap, Cacheable { + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; --- End diff -- generate proper serial version UID ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178723320 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java --- @@ -177,10 +204,45 @@ public UnsafeDataMapRow getUnsafeRow(int index) { return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]); } - public void finishWriting() throws MemoryException { + /** + * Add the index row to dataMapRows, basically to in memory. + * + * @param indexRow + * @return + */ + private void addDataMapRow(DataMapRow indexRow) throws MemoryException { +dataMapRows.add(indexRow); + } + + /** + * This method will write all the dataMapRows to unsafe + * + * @throws MemoryException + * @throws IOException + */ + private void adddataMapRowToUnsafe() throws MemoryException, IOException { --- End diff -- Correct the typo in method name and rename to addDataMapRowsToUnsafe ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178723089 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java --- @@ -97,12 +119,17 @@ public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException { int rowSize = indexRow.getTotalSizeInBytes(); // Check whether allocated memory is sufficient or not. ensureSize(rowSize); -int pointer = runningLength; +if (addRowToUnsafe) { + int pointer = runningLength; -for (int i = 0; i < schema.length; i++) { - addToUnsafe(schema[i], indexRow, i); + for (int i = 0; i < schema.length; i++) { +addToUnsafe(schema[i], indexRow, i); + } + pointers[rowCount++] = pointer; +} else { + // add dataMap rows to in memory + addDataMapRow(indexRow); --- End diff -- Rename the method name to addDataMapRowToMemory ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178722797 --- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java --- @@ -206,6 +206,31 @@ public void invalidate(TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentif lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } + @Override public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, + BlockletDataMap blockletDataMap) throws IOException, MemoryException { +String uniqueTableSegmentIdentifier = +tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(); +Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier); +if (lock == null) { + lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier); +} +if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) { --- End diff -- add a comment why getIfPresent Check is required - Reason: As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry as in that case clearing unsafe memory need to be taken card. If at all datamap entry in the cache need to be overwritten then use the invalidate interface and then use the put interface ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178721874 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/BlockIndexStore.java --- @@ -229,6 +229,10 @@ private String getLruCacheKey(AbsoluteTableIdentifier absoluteTableIdentifier, .remove(getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), blockInfo)); } + @Override public void put(TableBlockUniqueIdentifier key, AbstractIndex value) { + + } + --- End diff -- throw unsupportedException ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178721900 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java --- @@ -140,6 +140,10 @@ public SegmentTaskIndexWrapper get(TableSegmentUniqueIdentifier tableSegmentUniq lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } + @Override public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value) { + + } + --- End diff -- throw unsupportedException ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178721496 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java --- @@ -19,15 +19,21 @@ import java.io.Serializable; import java.util.List; +import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + /** * Distributable datamap job to execute the #DistributableDataMapFormat in cluster. it prunes the * datamaps distributably and returns the final blocklet list */ public interface DataMapJob extends Serializable { + List execute(FileInputFormatdataMapFormat, CarbonTable carbonTable); + --- End diff -- Change the argument order, first should be carbonTable ---
[GitHub] carbondata pull request #2134: [WIP][CARBONDATA-2310] Refactored code to imp...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2134#discussion_r178720946 --- Diff: core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCache.java --- @@ -168,6 +168,10 @@ public ReverseDictionaryCache(CarbonLRUCache carbonLRUCache) { CacheType.REVERSE_DICTIONARY)); } + @Override public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) { + + } + --- End diff -- override the method in abstract class and in the implementation throw UnsupportedException for both reverse and forward cache ---