[GitHub] carbondata issue #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide Zeppel...
Github user ajithme commented on the issue: https://github.com/apache/carbondata/pull/2522 Merged ---
[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...
Github user ajithme closed the pull request at: https://github.com/apache/carbondata/pull/2522 ---
[GitHub] carbondata issue #2631: [CARBONDATA-2826] SELECT support using distributed c...
Github user ajithme commented on the issue: https://github.com/apache/carbondata/pull/2631 Merged ---
[GitHub] carbondata pull request #2631: [CARBONDATA-2826] SELECT support using distri...
Github user ajithme closed the pull request at: https://github.com/apache/carbondata/pull/2631 ---
[GitHub] carbondata pull request #2631: [CARBONDATA-2826] SELECT support using distri...
GitHub user ajithme opened a pull request: https://github.com/apache/carbondata/pull/2631 [CARBONDATA-2826] SELECT support using distributed carbon store Provides select support with select columns pruning and filter pushdown using new RDD for distributed carbon store You can merge this pull request into a Git repository by running: $ git pull https://github.com/ajithme/carbondata sourcev2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2631.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2631 commit c5cd3ccb663a7177ec393188aa0461620c254f4f Author: Ajith Date: 2018-08-12T11:24:27Z CARBONDATA-2826 SELECT support using distributed carbon store ---
[GitHub] carbondata pull request #2589: [CARBONDATA-2825][CARBONDATA-2828] CarbonStor...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207721977 --- Diff: store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java --- @@ -145,26 +152,61 @@ private void createTable(TableInfo tableInfo, boolean ifNotExists) throws IOExce } } - public void dropTable(TableIdentifier table) throws IOException { -String tablePath = store.getTablePath(table.getTableName(), table.getDatabaseName()); + public void dropTable(TableIdentifier table) throws CarbonException { +String tablePath = getTablePath(table.getTableName(), table.getDatabaseName()); cache.remove(tablePath); -FileFactory.deleteFile(tablePath); +try { + FileFactory.deleteFile(tablePath); +} catch (IOException e) { + throw new CarbonException(e); +} + } + + public TableInfo getTable(TableIdentifier table) throws CarbonException { +return getTable(table, storeConf); } - public CarbonTable getTable(TableIdentifier table) throws IOException { -String tablePath = store.getTablePath(table.getTableName(), table.getDatabaseName()); + public static TableInfo getTable(TableIdentifier table, StoreConf storeConf) + throws CarbonException { +String tablePath = getTablePath(table.getTableName(), table.getDatabaseName(), storeConf); if (cache.containsKey(tablePath)) { return cache.get(tablePath); } else { - org.apache.carbondata.format.TableInfo formatTableInfo = - CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath)); + org.apache.carbondata.format.TableInfo formatTableInfo = null; + try { +formatTableInfo = CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath)); + } catch (IOException e) { +throw new CarbonException(e); + } SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); TableInfo tableInfo = schemaConverter.fromExternalToWrapperTableInfo( formatTableInfo, table.getDatabaseName(), table.getTableName(), tablePath); tableInfo.setTablePath(tablePath); - CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo); - cache.put(tablePath, carbonTable); - return carbonTable; + cache.put(tablePath, tableInfo); + return tableInfo; } } -} \ No newline at end of file + + public List listTable() throws CarbonException { +throw new UnsupportedOperationException(); + } + + public TableDescriptor getDescriptor(TableIdentifier table) throws CarbonException { +throw new UnsupportedOperationException(); + } + + public void alterTable(TableIdentifier table, TableDescriptor newTable) throws CarbonException { +throw new UnsupportedOperationException(); + } + + public String getTablePath(String tableName, String databaseName) { +Objects.requireNonNull(tableName); +Objects.requireNonNull(databaseName); +return String.format("%s/%s", storeConf.storeLocation(), tableName); + } + public static String getTablePath(String tableName, String databaseName, StoreConf storeConf) { +Objects.requireNonNull(tableName); +Objects.requireNonNull(databaseName); +return String.format("%s/%s", storeConf.storeLocation(), tableName); --- End diff -- must consider database name in path ---
[GitHub] carbondata pull request #2589: [CARBONDATA-2825][CARBONDATA-2828] CarbonStor...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207721786 --- Diff: store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java --- @@ -20,14 +20,14 @@ import java.io.IOException; import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.impl.rpc.RegistryService; -import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest; -import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse; +import org.apache.carbondata.store.impl.service.RegistryService; +import org.apache.carbondata.store.impl.service.model.RegisterWorkerRequest; +import org.apache.carbondata.store.impl.service.model.RegisterWorkerResponse; import org.apache.hadoop.ipc.ProtocolSignature; @InterfaceAudience.Internal -class RegistryServiceImpl implements RegistryService { +class egistryServiceImpl implements RegistryService { --- End diff -- typo, pls rename ---
[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207704316 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java --- @@ -252,9 +262,24 @@ public Segment getSegment() { if (dataMapWriterPathExists) { dataMapWritePath = in.readUTF(); } +boolean filePathExists = in.readBoolean(); +if (filePathExists) { + filePath = in.readUTF(); +} else { + filePath = super.getPath().toString(); +} } @Override public void write(DataOutput out) throws IOException { +if (super.getPath() != null) { + super.write(out); +} else { + // see HADOOP-13519, after Java deserialization, super.filePath is + // null, so write our filePath instead + Text.writeString(out, filePath); + out.writeLong(getStart()); + out.writeLong(getLength()); +} super.write(out); --- End diff -- can delete this line else read will fail ---
[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207699358 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/store/ScanUnit.java --- @@ -15,26 +15,27 @@ * limitations under the License. */ -package org.apache.carbondata.store.impl.rpc; +package org.apache.carbondata.sdk.store; -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.store.impl.rpc.model.BaseResponse; -import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest; -import org.apache.carbondata.store.impl.rpc.model.QueryResponse; -import org.apache.carbondata.store.impl.rpc.model.Scan; -import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest; -import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse; - -import org.apache.hadoop.ipc.VersionedProtocol; - -@InterfaceAudience.Internal -public interface StoreService extends VersionedProtocol { - - long versionID = 1L; +import java.io.Serializable; - BaseResponse loadData(LoadDataRequest request); - - QueryResponse query(Scan scan); +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.metadata.schema.table.Writable; - ShutdownResponse shutdown(ShutdownRequest request); +/** + * An unit for the scanner in Carbon Store + */ +@InterfaceAudience.User +@InterfaceStability.Unstable +public interface ScanUnit extends Serializable, Writable { --- End diff -- can remove Generics ---
[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207699345 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/store/BlockScanUnit.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.hadoop.CarbonInputSplit; + +/** + * It contains a block to scan, and a destination worker who should scan it + */ +@InterfaceAudience.Internal +public class BlockScanUnit implements ScanUnit { + + // the data block to scan + private CarbonInputSplit inputSplit; + + // the worker who should scan this unit + private Schedulable schedulable; --- End diff -- Add this in Writable interface else it will be null after deserialization ---
[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207699308 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java --- @@ -444,4 +444,16 @@ public void setFormat(FileFormat fileFormat) { public Blocklet makeBlocklet() { return new Blocklet(getPath().getName(), blockletId); } + + public String[] preferredLocations() { --- End diff -- The super FileSplit.file is not serializable. Refer HADOOP-13519 so java serialization may return empty ---
[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207501460 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/ScanDescriptor.java --- @@ -15,23 +15,33 @@ * limitations under the License. */ -package org.apache.carbondata.store.api.descriptor; +package org.apache.carbondata.sdk.store.descriptor; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Objects; +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.util.ObjectSerializationUtil; -public class SelectDescriptor { +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.User +@InterfaceStability.Evolving +public class ScanDescriptor implements Writable { private TableIdentifier table; private String[] projection; private Expression filter; private long limit; --- End diff -- Must be Long.MAX_VALUE ---
[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207431095 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/store/ScannerImpl.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.api.CarbonInputFormat; +import org.apache.carbondata.sdk.store.conf.StoreConf; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; +import org.apache.carbondata.sdk.store.service.DataService; +import org.apache.carbondata.sdk.store.service.PruneService; +import org.apache.carbondata.sdk.store.service.ServiceFactory; +import org.apache.carbondata.sdk.store.service.model.PruneRequest; +import org.apache.carbondata.sdk.store.service.model.PruneResponse; +import org.apache.carbondata.sdk.store.service.model.ScanRequest; +import org.apache.carbondata.sdk.store.service.model.ScanResponse; + +import org.apache.hadoop.conf.Configuration; + +class ScannerImpl implements Scanner { + private static final LogService LOGGER = + LogServiceFactory.getLogService(ScannerImpl.class.getCanonicalName()); + + private PruneService pruneService; + private TableInfo tableInfo; + + ScannerImpl(StoreConf conf, TableInfo tableInfo) throws IOException { +this.pruneService = ServiceFactory.createPruneService( +conf.masterHost(), conf.registryServicePort()); --- End diff -- must be prune service port ---
[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207431252 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/StoreService.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store.service; + +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; + +import org.apache.hadoop.ipc.VersionedProtocol; + +@InterfaceAudience.Internal +public interface StoreService extends VersionedProtocol { + long versionID = 1L; + + void createTable(TableDescriptor descriptor) throws CarbonException; + + void dropTable(TableIdentifier table) throws CarbonException; + + CarbonTable getTable(TableIdentifier table) throws CarbonException; --- End diff -- hadoop RPC need response object to be a org.apache.hadoop.io.serializer.WritableSerialization ---
[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r207433215 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/store/ScannerImpl.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.store; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.api.CarbonInputFormat; +import org.apache.carbondata.sdk.store.conf.StoreConf; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; +import org.apache.carbondata.sdk.store.service.DataService; +import org.apache.carbondata.sdk.store.service.PruneService; +import org.apache.carbondata.sdk.store.service.ServiceFactory; +import org.apache.carbondata.sdk.store.service.model.PruneRequest; +import org.apache.carbondata.sdk.store.service.model.PruneResponse; +import org.apache.carbondata.sdk.store.service.model.ScanRequest; +import org.apache.carbondata.sdk.store.service.model.ScanResponse; + +import org.apache.hadoop.conf.Configuration; + +class ScannerImpl implements Scanner { + private static final LogService LOGGER = + LogServiceFactory.getLogService(ScannerImpl.class.getCanonicalName()); + + private PruneService pruneService; + private TableInfo tableInfo; + + ScannerImpl(StoreConf conf, TableInfo tableInfo) throws IOException { +this.pruneService = ServiceFactory.createPruneService( +conf.masterHost(), conf.registryServicePort()); +this.tableInfo = tableInfo; + } + + /** + * Trigger a RPC to Carbon Master to do pruning + * @param table table identifier + * @param filterExpression expression of filter predicate given by user + * @return list of ScanUnit + * @throws CarbonException if any error occurs + */ + @Override + public List prune(TableIdentifier table, Expression filterExpression) + throws CarbonException { +try { + Configuration configuration = new Configuration(); + CarbonInputFormat.setTableName(configuration, table.getTableName()); --- End diff -- can use CarbonInputFormat.setTableInfo(configuration, tableInfo); else org.apache.carbondata.hadoop.api.CarbonInputFormat#getAbsoluteTableIdentifier will have empty path ---
[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] add CTable interface in Ca...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2589#discussion_r206492090 --- Diff: store/core/pom.xml --- @@ -48,8 +48,8 @@ org.apache.maven.plugins maven-compiler-plugin - 1.7 - 1.7 + 8 --- End diff -- 8 is same as 1.8, as this is passed to javac https://docs.oracle.com/javase/8/docs/technotes/tools/windows/javac.html ---
[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2522#discussion_r203618953 --- Diff: store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java --- @@ -42,20 +43,24 @@ public ResponseEntity sql(@RequestBody SqlRequest request) throws StoreException { RequestValidator.validateSql(request); List rows; +Dataset sqlDataFrame = null; try { - rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), request.getSqlStatement()) + sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(), + request.getSqlStatement()); + rows = sqlDataFrame .collectAsList(); --- End diff -- Done ---
[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2522#discussion_r203618896 --- Diff: store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java --- @@ -42,20 +43,24 @@ public ResponseEntity sql(@RequestBody SqlRequest request) throws StoreException { RequestValidator.validateSql(request); List rows; +Dataset sqlDataFrame = null; try { - rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), request.getSqlStatement()) + sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(), + request.getSqlStatement()); + rows = sqlDataFrame .collectAsList(); } catch (AnalysisException e) { throw new StoreException(e.getSimpleMessage()); } catch (Exception e) { throw new StoreException(e.getMessage()); } -Object[][] result = new Object[rows.size()][]; +Object[][] result = new Object[rows.size()+1][]; +result[0] = sqlDataFrame.schema().fieldNames(); for (int i = 0; i < rows.size(); i++) { Row row = rows.get(i); - result[i] = new Object[row.size()]; + result[i+1] = new Object[row.size()]; for (int j = 0; j < row.size(); j++) { -result[i][j] = row.get(j); +result[i+1][j] = row.get(j); --- End diff -- Done ---
[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2522#discussion_r203618925 --- Diff: store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java --- @@ -42,20 +43,24 @@ public ResponseEntity sql(@RequestBody SqlRequest request) throws StoreException { RequestValidator.validateSql(request); List rows; +Dataset sqlDataFrame = null; try { - rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), request.getSqlStatement()) + sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(), + request.getSqlStatement()); + rows = sqlDataFrame .collectAsList(); } catch (AnalysisException e) { throw new StoreException(e.getSimpleMessage()); } catch (Exception e) { throw new StoreException(e.getMessage()); } -Object[][] result = new Object[rows.size()][]; +Object[][] result = new Object[rows.size()+1][]; --- End diff -- Done ---
[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2522#discussion_r203618873 --- Diff: store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java --- @@ -42,20 +43,24 @@ public ResponseEntity sql(@RequestBody SqlRequest request) throws StoreException { RequestValidator.validateSql(request); List rows; +Dataset sqlDataFrame = null; try { - rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), request.getSqlStatement()) + sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(), + request.getSqlStatement()); + rows = sqlDataFrame .collectAsList(); } catch (AnalysisException e) { throw new StoreException(e.getSimpleMessage()); } catch (Exception e) { throw new StoreException(e.getMessage()); } -Object[][] result = new Object[rows.size()][]; +Object[][] result = new Object[rows.size()+1][]; +result[0] = sqlDataFrame.schema().fieldNames(); for (int i = 0; i < rows.size(); i++) { --- End diff -- replaced with stream construct ---
[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2522#discussion_r203618808 --- Diff: zeppelin/README.txt --- @@ -0,0 +1,18 @@ +Please follow below steps to integrate with zeppelin --- End diff -- Done: https://github.com/ajithme/carbondata/tree/zeppelinsupport/integration/zeppelin ---
[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...
GitHub user ajithme opened a pull request: https://github.com/apache/carbondata/pull/2522 [CARBONDATA-2752][CARBONSTORE] Carbon provide Zeppelin support Apache Zeppelin is a popular open web-based notebook that enables interactive data analytics. This is one of the favored solutions for providing UI frontend as it can support solutions like Spark already. Carbon can leverage this to provide a UI for its operations. After CARBONDATA-2688 which provides a carbon REST server, we can add a UI support from zeppelin to provide a complete solution. - [ ] Document update required? YES, need to update usage guide for Zeppelin integration - [ ] Testing done 1. Added UT 2. Done Testing manually by integration with Zeppelin You can merge this pull request into a Git repository by running: $ git pull https://github.com/ajithme/carbondata zeppelinsupport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2522.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2522 commit 16c69836c3903150875582f950f20cd1189fc69a Author: Ajith Date: 2018-07-18T11:18:54Z CARBONDATA-2752 Zeppelin support commit 13801259e5c46b1a4cc736fe94c4bf3678d75794 Author: Ajith Date: 2018-07-18T11:26:29Z update doc ---
[GitHub] carbondata issue #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integration wit...
Github user ajithme commented on the issue: https://github.com/apache/carbondata/pull/2495 Merged https://github.com/apache/carbondata/commit/9ac55a5a656ebe106697ca76a04916bea2ef3109 ---
[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...
Github user ajithme closed the pull request at: https://github.com/apache/carbondata/pull/2495 ---
[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2495#discussion_r202232048 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala --- @@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val df = sparkSession.sql(query) var sourceTable: CarbonTable = null +var dataFrame: Option[DataFrame] = None // find the streaming source table in the query // and replace it with StreamingRelation -val streamLp = df.logicalPlan transform { +df.logicalPlan transform { case r: LogicalRelation if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => -val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r) +val (source, resolvedFrame) = prepareDataFrame(sparkSession, r) --- End diff -- Added method comments ---
[GitHub] carbondata pull request #2495: Added for kafka integration with Carbon Strea...
GitHub user ajithme opened a pull request: https://github.com/apache/carbondata/pull/2495 Added for kafka integration with Carbon StreamSQL 1. Pass source table properties to streamReader.load() 2. Do not pass schema when sparkSession.readStream 3. Remove querySchema validation against sink as dataFrame made from kafka source will not have schema ( its written in value column of schema ) 4. Extract the dataframe from kafka source which contain actual data schema @ writeStream Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? NO - [ ] Any backward compatibility impacted? NO - [ ] Document update required? Yes: Need to use CSV parser - [ ] Testing done Done - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ajithme/carbondata kafkaStreamSQLIntegration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2495.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2495 commit 0560c5e69c61d6594a91994da918493335bd0cb4 Author: Ajith Date: 2018-07-12T03:47:22Z Added for kafka integration with Carbon StreamSQL 1. Pass source table properties to streamReader.load() 2. Do not pass schema when sparkSession.readStream 3. Remove querySchema validation against sink as dataFrame made from kafka source will not have schema ( its written in value column of schema ) 4. Extract the dataframe from kafka source which contain actual data schema @ writeStream ---