[GitHub] carbondata issue #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide Zeppel...

2018-08-19 Thread ajithme
Github user ajithme commented on the issue:

https://github.com/apache/carbondata/pull/2522
  
Merged


---


[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...

2018-08-19 Thread ajithme
Github user ajithme closed the pull request at:

https://github.com/apache/carbondata/pull/2522


---


[GitHub] carbondata issue #2631: [CARBONDATA-2826] SELECT support using distributed c...

2018-08-19 Thread ajithme
Github user ajithme commented on the issue:

https://github.com/apache/carbondata/pull/2631
  
Merged


---


[GitHub] carbondata pull request #2631: [CARBONDATA-2826] SELECT support using distri...

2018-08-19 Thread ajithme
Github user ajithme closed the pull request at:

https://github.com/apache/carbondata/pull/2631


---


[GitHub] carbondata pull request #2631: [CARBONDATA-2826] SELECT support using distri...

2018-08-12 Thread ajithme
GitHub user ajithme opened a pull request:

https://github.com/apache/carbondata/pull/2631

[CARBONDATA-2826] SELECT support using distributed carbon store

Provides select support with select columns pruning and filter pushdown 
using new RDD for distributed carbon store

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ajithme/carbondata sourcev2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/carbondata/pull/2631.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2631


commit c5cd3ccb663a7177ec393188aa0461620c254f4f
Author: Ajith 
Date:   2018-08-12T11:24:27Z

CARBONDATA-2826 SELECT support using distributed carbon store




---


[GitHub] carbondata pull request #2589: [CARBONDATA-2825][CARBONDATA-2828] CarbonStor...

2018-08-04 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207721977
  
--- Diff: 
store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java ---
@@ -145,26 +152,61 @@ private void createTable(TableInfo tableInfo, boolean 
ifNotExists) throws IOExce
 }
   }
 
-  public void dropTable(TableIdentifier table) throws IOException {
-String tablePath = store.getTablePath(table.getTableName(), 
table.getDatabaseName());
+  public void dropTable(TableIdentifier table) throws CarbonException {
+String tablePath = getTablePath(table.getTableName(), 
table.getDatabaseName());
 cache.remove(tablePath);
-FileFactory.deleteFile(tablePath);
+try {
+  FileFactory.deleteFile(tablePath);
+} catch (IOException e) {
+  throw new CarbonException(e);
+}
+  }
+
+  public TableInfo getTable(TableIdentifier table) throws CarbonException {
+return getTable(table, storeConf);
   }
 
-  public CarbonTable getTable(TableIdentifier table) throws IOException {
-String tablePath = store.getTablePath(table.getTableName(), 
table.getDatabaseName());
+  public static TableInfo getTable(TableIdentifier table, StoreConf 
storeConf)
+  throws CarbonException {
+String tablePath = getTablePath(table.getTableName(), 
table.getDatabaseName(), storeConf);
 if (cache.containsKey(tablePath)) {
   return cache.get(tablePath);
 } else {
-  org.apache.carbondata.format.TableInfo formatTableInfo =
-  
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
+  org.apache.carbondata.format.TableInfo formatTableInfo = null;
+  try {
+formatTableInfo = 
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
+  } catch (IOException e) {
+throw new CarbonException(e);
+  }
   SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
   TableInfo tableInfo = schemaConverter.fromExternalToWrapperTableInfo(
   formatTableInfo, table.getDatabaseName(), table.getTableName(), 
tablePath);
   tableInfo.setTablePath(tablePath);
-  CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
-  cache.put(tablePath, carbonTable);
-  return carbonTable;
+  cache.put(tablePath, tableInfo);
+  return tableInfo;
 }
   }
-}
\ No newline at end of file
+
+  public List listTable() throws CarbonException {
+throw new UnsupportedOperationException();
+  }
+
+  public TableDescriptor getDescriptor(TableIdentifier table) throws 
CarbonException {
+throw new UnsupportedOperationException();
+  }
+
+  public void alterTable(TableIdentifier table, TableDescriptor newTable) 
throws CarbonException {
+throw new UnsupportedOperationException();
+  }
+
+  public String getTablePath(String tableName, String databaseName) {
+Objects.requireNonNull(tableName);
+Objects.requireNonNull(databaseName);
+return String.format("%s/%s", storeConf.storeLocation(), tableName);
+  }
+  public static String getTablePath(String tableName, String databaseName, 
StoreConf storeConf) {
+Objects.requireNonNull(tableName);
+Objects.requireNonNull(databaseName);
+return String.format("%s/%s", storeConf.storeLocation(), tableName);
--- End diff --

must consider database name in path


---


[GitHub] carbondata pull request #2589: [CARBONDATA-2825][CARBONDATA-2828] CarbonStor...

2018-08-04 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207721786
  
--- Diff: 
store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
 ---
@@ -20,14 +20,14 @@
 import java.io.IOException;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.impl.rpc.RegistryService;
-import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest;
-import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse;
+import org.apache.carbondata.store.impl.service.RegistryService;
+import 
org.apache.carbondata.store.impl.service.model.RegisterWorkerRequest;
+import 
org.apache.carbondata.store.impl.service.model.RegisterWorkerResponse;
 
 import org.apache.hadoop.ipc.ProtocolSignature;
 
 @InterfaceAudience.Internal
-class RegistryServiceImpl implements RegistryService {
+class egistryServiceImpl implements RegistryService {
--- End diff --

typo, pls rename


---


[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API

2018-08-04 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207704316
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---
@@ -252,9 +262,24 @@ public Segment getSegment() {
 if (dataMapWriterPathExists) {
   dataMapWritePath = in.readUTF();
 }
+boolean filePathExists = in.readBoolean();
+if (filePathExists) {
+  filePath = in.readUTF();
+} else {
+  filePath = super.getPath().toString();
+}
   }
 
   @Override public void write(DataOutput out) throws IOException {
+if (super.getPath() != null) {
+  super.write(out);
+} else {
+  // see HADOOP-13519, after Java deserialization, super.filePath is
+  // null, so write our filePath instead
+  Text.writeString(out, filePath);
+  out.writeLong(getStart());
+  out.writeLong(getLength());
+}
 super.write(out);
--- End diff --

can delete this line else read will fail


---


[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API

2018-08-03 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207699358
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/store/ScanUnit.java ---
@@ -15,26 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.store.impl.rpc;
+package org.apache.carbondata.sdk.store;
 
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
-import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse;
-
-import org.apache.hadoop.ipc.VersionedProtocol;
-
-@InterfaceAudience.Internal
-public interface StoreService extends VersionedProtocol {
-
-  long versionID = 1L;
+import java.io.Serializable;
 
-  BaseResponse loadData(LoadDataRequest request);
-
-  QueryResponse query(Scan scan);
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
 
-  ShutdownResponse shutdown(ShutdownRequest request);
+/**
+ * An unit for the scanner in Carbon Store
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface ScanUnit extends Serializable, Writable {
--- End diff --

can remove Generics


---


[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API

2018-08-03 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207699345
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/store/BlockScanUnit.java ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.store;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
+/**
+ * It contains a block to scan, and a destination worker who should scan it
+ */
+@InterfaceAudience.Internal
+public class BlockScanUnit implements ScanUnit {
+
+  // the data block to scan
+  private CarbonInputSplit inputSplit;
+
+  // the worker who should scan this unit
+  private Schedulable schedulable;
--- End diff --

Add this in Writable interface else it will be null after deserialization


---


[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API

2018-08-03 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207699308
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---
@@ -444,4 +444,16 @@ public void setFormat(FileFormat fileFormat) {
   public Blocklet makeBlocklet() {
 return new Blocklet(getPath().getName(), blockletId);
   }
+
+  public String[] preferredLocations() {
--- End diff --

The super FileSplit.file is not serializable. Refer HADOOP-13519 so java 
serialization may return empty


---


[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API

2018-08-03 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207501460
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/store/descriptor/ScanDescriptor.java
 ---
@@ -15,23 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.store.api.descriptor;
+package org.apache.carbondata.sdk.store.descriptor;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Objects;
 
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
-public class SelectDescriptor {
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class ScanDescriptor implements Writable {
 
   private TableIdentifier table;
   private String[] projection;
   private Expression filter;
   private long limit;
--- End diff --

Must be Long.MAX_VALUE


---


[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API

2018-08-03 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207431095
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/store/ScannerImpl.java ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.sdk.store.service.DataService;
+import org.apache.carbondata.sdk.store.service.PruneService;
+import org.apache.carbondata.sdk.store.service.ServiceFactory;
+import org.apache.carbondata.sdk.store.service.model.PruneRequest;
+import org.apache.carbondata.sdk.store.service.model.PruneResponse;
+import org.apache.carbondata.sdk.store.service.model.ScanRequest;
+import org.apache.carbondata.sdk.store.service.model.ScanResponse;
+
+import org.apache.hadoop.conf.Configuration;
+
+class ScannerImpl implements Scanner {
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(ScannerImpl.class.getCanonicalName());
+
+  private PruneService pruneService;
+  private TableInfo tableInfo;
+
+  ScannerImpl(StoreConf conf, TableInfo tableInfo) throws IOException {
+this.pruneService = ServiceFactory.createPruneService(
+conf.masterHost(), conf.registryServicePort());
--- End diff --

must be prune service port


---


[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API

2018-08-03 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207431252
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/store/service/StoreService.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.store.service;
+
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+@InterfaceAudience.Internal
+public interface StoreService extends VersionedProtocol {
+  long versionID = 1L;
+
+  void createTable(TableDescriptor descriptor) throws CarbonException;
+
+  void dropTable(TableIdentifier table) throws CarbonException;
+
+  CarbonTable getTable(TableIdentifier table) throws CarbonException;
--- End diff --

hadoop RPC need response object to be a 
org.apache.hadoop.io.serializer.WritableSerialization


---


[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] Refactor CarbonStore API

2018-08-03 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r207433215
  
--- Diff: 
store/sdk/src/main/java/org/apache/carbondata/sdk/store/ScannerImpl.java ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.sdk.store.service.DataService;
+import org.apache.carbondata.sdk.store.service.PruneService;
+import org.apache.carbondata.sdk.store.service.ServiceFactory;
+import org.apache.carbondata.sdk.store.service.model.PruneRequest;
+import org.apache.carbondata.sdk.store.service.model.PruneResponse;
+import org.apache.carbondata.sdk.store.service.model.ScanRequest;
+import org.apache.carbondata.sdk.store.service.model.ScanResponse;
+
+import org.apache.hadoop.conf.Configuration;
+
+class ScannerImpl implements Scanner {
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(ScannerImpl.class.getCanonicalName());
+
+  private PruneService pruneService;
+  private TableInfo tableInfo;
+
+  ScannerImpl(StoreConf conf, TableInfo tableInfo) throws IOException {
+this.pruneService = ServiceFactory.createPruneService(
+conf.masterHost(), conf.registryServicePort());
+this.tableInfo = tableInfo;
+  }
+
+  /**
+   * Trigger a RPC to Carbon Master to do pruning
+   * @param table table identifier
+   * @param filterExpression expression of filter predicate given by user
+   * @return list of ScanUnit
+   * @throws CarbonException if any error occurs
+   */
+  @Override
+  public List prune(TableIdentifier table, Expression 
filterExpression)
+  throws CarbonException {
+try {
+  Configuration configuration = new Configuration();
+  CarbonInputFormat.setTableName(configuration, table.getTableName());
--- End diff --

can use CarbonInputFormat.setTableInfo(configuration, tableInfo); else 
org.apache.carbondata.hadoop.api.CarbonInputFormat#getAbsoluteTableIdentifier 
will have empty path


---


[GitHub] carbondata pull request #2589: [WIP][CARBONSTORE] add CTable interface in Ca...

2018-07-31 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2589#discussion_r206492090
  
--- Diff: store/core/pom.xml ---
@@ -48,8 +48,8 @@
 org.apache.maven.plugins
 maven-compiler-plugin
 
-  1.7
-  1.7
+  8
--- End diff --

8 is same as 1.8, as this is passed to javac
https://docs.oracle.com/javase/8/docs/technotes/tools/windows/javac.html


---


[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...

2018-07-19 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2522#discussion_r203618953
  
--- Diff: 
store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
 ---
@@ -42,20 +43,24 @@
   public ResponseEntity sql(@RequestBody SqlRequest request) 
throws StoreException {
 RequestValidator.validateSql(request);
 List rows;
+Dataset sqlDataFrame = null;
 try {
-  rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), 
request.getSqlStatement())
+  sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(),
+  request.getSqlStatement());
+  rows = sqlDataFrame
   .collectAsList();
--- End diff --

Done


---


[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...

2018-07-19 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2522#discussion_r203618896
  
--- Diff: 
store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
 ---
@@ -42,20 +43,24 @@
   public ResponseEntity sql(@RequestBody SqlRequest request) 
throws StoreException {
 RequestValidator.validateSql(request);
 List rows;
+Dataset sqlDataFrame = null;
 try {
-  rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), 
request.getSqlStatement())
+  sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(),
+  request.getSqlStatement());
+  rows = sqlDataFrame
   .collectAsList();
 } catch (AnalysisException e) {
   throw new StoreException(e.getSimpleMessage());
 } catch (Exception e) {
   throw new StoreException(e.getMessage());
 }
-Object[][] result = new Object[rows.size()][];
+Object[][] result = new Object[rows.size()+1][];
+result[0] = sqlDataFrame.schema().fieldNames();
 for (int i = 0; i < rows.size(); i++) {
   Row row = rows.get(i);
-  result[i] = new Object[row.size()];
+  result[i+1] = new Object[row.size()];
   for (int j = 0; j < row.size(); j++) {
-result[i][j] = row.get(j);
+result[i+1][j] = row.get(j);
--- End diff --

Done


---


[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...

2018-07-19 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2522#discussion_r203618925
  
--- Diff: 
store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
 ---
@@ -42,20 +43,24 @@
   public ResponseEntity sql(@RequestBody SqlRequest request) 
throws StoreException {
 RequestValidator.validateSql(request);
 List rows;
+Dataset sqlDataFrame = null;
 try {
-  rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), 
request.getSqlStatement())
+  sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(),
+  request.getSqlStatement());
+  rows = sqlDataFrame
   .collectAsList();
 } catch (AnalysisException e) {
   throw new StoreException(e.getSimpleMessage());
 } catch (Exception e) {
   throw new StoreException(e.getMessage());
 }
-Object[][] result = new Object[rows.size()][];
+Object[][] result = new Object[rows.size()+1][];
--- End diff --

Done


---


[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...

2018-07-19 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2522#discussion_r203618873
  
--- Diff: 
store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
 ---
@@ -42,20 +43,24 @@
   public ResponseEntity sql(@RequestBody SqlRequest request) 
throws StoreException {
 RequestValidator.validateSql(request);
 List rows;
+Dataset sqlDataFrame = null;
 try {
-  rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), 
request.getSqlStatement())
+  sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(),
+  request.getSqlStatement());
+  rows = sqlDataFrame
   .collectAsList();
 } catch (AnalysisException e) {
   throw new StoreException(e.getSimpleMessage());
 } catch (Exception e) {
   throw new StoreException(e.getMessage());
 }
-Object[][] result = new Object[rows.size()][];
+Object[][] result = new Object[rows.size()+1][];
+result[0] = sqlDataFrame.schema().fieldNames();
 for (int i = 0; i < rows.size(); i++) {
--- End diff --

replaced with stream construct


---


[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...

2018-07-19 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2522#discussion_r203618808
  
--- Diff: zeppelin/README.txt ---
@@ -0,0 +1,18 @@
+Please follow below steps to integrate with zeppelin
--- End diff --

Done:

https://github.com/ajithme/carbondata/tree/zeppelinsupport/integration/zeppelin


---


[GitHub] carbondata pull request #2522: [CARBONDATA-2752][CARBONSTORE] Carbon provide...

2018-07-18 Thread ajithme
GitHub user ajithme opened a pull request:

https://github.com/apache/carbondata/pull/2522

[CARBONDATA-2752][CARBONSTORE] Carbon provide Zeppelin support

Apache Zeppelin is a popular open web-based notebook that enables 
interactive data analytics. This is one of the favored solutions for providing 
UI frontend as it can support solutions like Spark already. Carbon can leverage 
this to provide a UI for its operations. After CARBONDATA-2688 which provides a 
carbon REST server, we can add a UI support from zeppelin to provide a complete 
solution.

 - [ ] Document update required? YES, need to update usage guide for 
Zeppelin integration

 - [ ] Testing done
1. Added UT
2. Done Testing manually by integration with Zeppelin

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ajithme/carbondata zeppelinsupport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/carbondata/pull/2522.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2522


commit 16c69836c3903150875582f950f20cd1189fc69a
Author: Ajith 
Date:   2018-07-18T11:18:54Z

CARBONDATA-2752 Zeppelin support

commit 13801259e5c46b1a4cc736fe94c4bf3678d75794
Author: Ajith 
Date:   2018-07-18T11:26:29Z

update doc




---


[GitHub] carbondata issue #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integration wit...

2018-07-18 Thread ajithme
Github user ajithme commented on the issue:

https://github.com/apache/carbondata/pull/2495
  
Merged 
https://github.com/apache/carbondata/commit/9ac55a5a656ebe106697ca76a04916bea2ef3109


---


[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

2018-07-18 Thread ajithme
Github user ajithme closed the pull request at:

https://github.com/apache/carbondata/pull/2495


---


[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

2018-07-12 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2495#discussion_r202232048
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
 ---
@@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
 val df = sparkSession.sql(query)
 var sourceTable: CarbonTable = null
+var dataFrame: Option[DataFrame] = None
 
 // find the streaming source table in the query
 // and replace it with StreamingRelation
-val streamLp = df.logicalPlan transform {
+df.logicalPlan transform {
   case r: LogicalRelation
 if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&

r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
 =>
-val (source, streamingRelation) = 
prepareStreamingRelation(sparkSession, r)
+val (source, resolvedFrame) = prepareDataFrame(sparkSession, r)
--- End diff --

Added method comments


---


[GitHub] carbondata pull request #2495: Added for kafka integration with Carbon Strea...

2018-07-11 Thread ajithme
GitHub user ajithme opened a pull request:

https://github.com/apache/carbondata/pull/2495

Added for kafka integration with Carbon StreamSQL

1. Pass source table properties to streamReader.load()
2. Do not pass schema when sparkSession.readStream
3. Remove querySchema validation against sink as dataFrame made from kafka 
source will not have schema ( its written in value column of schema )
4. Extract the dataframe from kafka source which contain actual data schema 
@ writeStream

Be sure to do all of the following checklist to help us incorporate 
your contribution quickly and easily:

 - [ ] Any interfaces changed? NO
 
 - [ ] Any backward compatibility impacted? NO
 
 - [ ] Document update required? Yes: Need to use CSV parser

 - [ ] Testing done Done
   
 - [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ajithme/carbondata kafkaStreamSQLIntegration

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/carbondata/pull/2495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2495


commit 0560c5e69c61d6594a91994da918493335bd0cb4
Author: Ajith 
Date:   2018-07-12T03:47:22Z

Added for kafka integration with Carbon StreamSQL
1. Pass source table properties to streamReader.load()
2. Do not pass schema when sparkSession.readStream
3. Remove querySchema validation against sink as dataFrame made from kafka 
source will not have schema ( its written in value column of schema )
4. Extract the dataframe from kafka source which contain actual data schema 
@ writeStream




---