[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/trafodion/pull/1417


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-12 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167601286
  
--- Diff: core/sql/generator/GenRelScan.cpp ---
@@ -1391,6 +1391,9 @@ if (hTabStats->isOrcFile())
   hdfsscan_tdb->setUseCif(useCIF);
   hdfsscan_tdb->setUseCifDefrag(useCIFDegrag);
 
+  if (CmpCommon::getDefault(USE_LIBHDFS_SCAN) == DF_ON)
--- End diff --

I need to confirm if compression is already supported.  Are there any tests 
for compression in hive regressions already? If so, it is handled already.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-12 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167600705
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -118,15 +119,39 @@ ExHdfsScanTcb::ExHdfsScanTcb(
   , dataModCheckDone_(FALSE)
   , loggingErrorDiags_(NULL)
   , loggingFileName_(NULL)
+  , hdfsClient_(NULL)
+  , hdfsScan_(NULL)
+  , hdfsStats_(NULL)
   , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), 
hdfsScanTdb.getHdfsFileInfoList()->numEntries())
 {
   Space * space = (glob ? glob->getSpace() : 0);
   CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
+  useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
+  if (isSequenceFile())
+ useLibhdfsScan_ = TRUE;
   lobGlob_ = NULL;
-  const int readBufSize =  (Int32)hdfsScanTdb.hdfsBufSize_;
-  hdfsScanBuffer_ = new(space) char[ readBufSize + 1 ]; 
-  hdfsScanBuffer_[readBufSize] = '\0';
-
+  hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
+  headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_;
+
+  if (useLibhdfsScan_) {
+ hdfsScanBuffer_ = new(heap) char[ hdfsScanBufMaxSize_ + 1 ]; 
+ hdfsScanBuffer_[hdfsScanBufMaxSize_] = '\0';
+  } else {
+ hdfsScanBufBacking_[0] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * 
(headRoom_)];
--- End diff --

This behavior remains unchanged. Let me check if there is a regression test 
suite already for it


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167474597
  
--- Diff: core/sql/executor/SequenceFileReader.h ---
@@ -199,15 +185,6 @@ class SequenceFileWriter : public JavaObjectInterface
   
   // Close the file.
   SFW_RetCodeclose();
-
-  SFW_RetCodehdfsCreate(const char* path, NABoolean compress);
-  SFW_RetCodehdfsWrite(const char* data, Int64 size);
-  SFW_RetCodehdfsMergeFiles(const NAString& srcPath,
- const NAString& dstPath);
-  SFW_RetCodehdfsDeletePath(const NAString& delPath);
-  SFW_RetCodehdfsCleanUnloadPath(const NAString& uldPath );
-  SFW_RetCodehdfsExists(const NAString& uldPath,  NABoolean & exists );
-  SFW_RetCodehdfsClose();
   SFW_RetCoderelease();
 
   virtual char*  getErrorText(SFW_RetCode errEnum);
--- End diff --

Yes. It can be static


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167474575
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---
@@ -0,0 +1,289 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+// This class implements an efficient mechanism to read hdfs files
+// Trafodion ExHdfsScan operator provides a range of scans to be performed.
+// The range consists of a hdfs filename, offset and length to be read
+// This class takes in two ByteBuffers. These ByteBuffer can be either 
direct buffers
+// backed up native buffers or indirect buffer backed by java arrays.
+// All the ranges are read alternating between the two buffers using 
ExecutorService
+// using CachedThreadPool mechanism. 
+// For a given HdfsScan instance, only one thread(IO thread) is scheduled 
to read
+// the next full or partial buffer while the main thread processes the 
previously
+// read information from the other buffer
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import org.trafodion.sql.HDFSClient;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.fs.FileStatus;
+import java.net.URI;
+
+public class HdfsScan 
+{
+   static Logger logger_ = Logger.getLogger(HdfsScan.class.getName());
+   private ByteBuffer buf_[];
+   private int bufLen_[];
+   private HDFSClient hdfsClient_[];
+   private int currRange_;
+   private long currPos_;
+   private long lenRemain_;
+   private int lastBufCompleted_ = -1;
+   private boolean scanCompleted_;
+   
+   class HdfsScanRange 
+   {
+  String filename_;
+  long pos_;
+  long len_;
+  int tdbRangeNum_;
+  
+  HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
+  {
+ filename_ = filename;
+ pos_ = pos;
+ len_ = len;
+ tdbRangeNum_ = tdbRangeNum;
+  }
+   }
+   
+   private HdfsScanRange hdfsScanRanges_[];
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+   }
+
+   public HdfsScan() 
+   {
+   }
+
+   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String 
filename[], long pos[], long len[], int rangeNum[]) throws IOException
+   {
+  buf_ = new ByteBuffer[2];
+  bufLen_ = new int[2];
+
+  buf_[0] = buf1;
+  buf_[1] = buf2;
+
+  for (int i = 0; i < 2 ; i++) {
+  if (buf_[i].hasArray())
+ bufLen_[i] = buf_[i].array().length;
+  else
+ bufLen_[i] = buf_[i].capacity();
+  }
+  hdfsClient_ = new HDFSClient[2];
+  hdfsScanRanges_ = new HdfsScanRange[filename.length]; 
+  for (int i = 0; i < filename.length; i++) {
+ hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], 
len[i], rangeNum[i]);
--- End diff --

I think it should work. Let me confirm it


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167474490
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private int rangeNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+   private int isEOF_ = 0; 
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
+   {
+  HDFSRead() 
+  {
+  }
+ 
+  public Object call() throws IOException 
+  {
+ int bytesRead;
+ int totalBytesRead = 0;
+ if (! buf_.hasArray()) {
+try {
+  fsdis_.seek(pos_);
--- End diff --

Didn't understand the comment. We do readAhead in HdfsScan. While one 
buffer is being processed, the another buffer is filled up with remaining 
portion in the range or the next range.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167474246
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private int rangeNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+   private int isEOF_ = 0; 
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
+   {
+  HDFSRead() 
+  {
+  }
+ 
+  public Object call() throws IOException 
+  {
+ int bytesRead;
+ int totalBytesRead = 0;
+ if (! buf_.hasArray()) {
--- End diff --

HdfsScan supports both direct and non-direct buffers.  Currently we use 
Direct byte buffers wherein the ByteBuffer is wrapping the native array in C++. 
However, it is possible for hdfsScan to use non-direct ByteBuffer which is 
wrapper for byteArray in jVM memory


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167473755
  
--- Diff: core/sql/executor/ExExeUtilGet.cpp ---
@@ -3521,13 +3521,9 @@ 
ExExeUtilGetHbaseObjectsTcb::ExExeUtilGetHbaseObjectsTcb(
  ex_globals * glob)
  : ExExeUtilGetMetadataInfoTcb( exe_util_tdb, glob)
 {
-  int jniDebugPort = 0;
-  int jniDebugTimeout = 0;
   ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
(char*)exe_util_tdb.server(), 
-   (char*)exe_util_tdb.zkPort(),
-jniDebugPort,
-jniDebugTimeout);
+   (char*)exe_util_tdb.zkPort());
--- End diff --

DebugPort needs to be initialized as part of JVM invocation. See 
JavaObjectInterface:;createJVM. This debugPort is never used. Because it is per 
process, it needs be be made static


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167473694
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -1149,8 +1297,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
if (hdfsScanTdb().continueOnError())
{
  if ((pentry_down->downState.request == ex_queue::GET_N) &&
- (pentry_down->downState.requestValue == matches_))
-   step_ = CLOSE_FILE;
+ (pentry_down->downState.requestValue == matches_)) {
+ if (useLibhdfsScan_)
+step_ = CLOSE_HDFS_CURSOR;
--- End diff --

No Thanks for catching it.. Will change it back to CLOSE_HDFS_CURSOR


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167473572
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 
 if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
   step_ = DONE;
-else
-  step_ = INIT_HDFS_CURSOR;
+else {
+  if (useLibhdfsScan_)
+ step_ = INIT_HDFS_CURSOR;
+  else
+ step_ = SETUP_HDFS_SCAN;
+}
   }
   break;
-
+case SETUP_HDFS_SCAN:
+  {   
+ if (hdfsScan_ != NULL)
+NADELETE(hdfsScan_, HdfsScan, getHeap());
+ hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), 
hdfsScanBuf_, hdfsScanBufMaxSize_, 
+_, beginRangeNum_, 
numRanges_, hdfsScanTdb().rangeTailIOSize_, 
+hdfsStats_, hdfsScanRetCode);
+ if (hdfsScanRetCode != HDFS_SCAN_OK)
+ {
+setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
+  currContext->getJniErrorStr(), NULL);
  
+step_ = HANDLE_ERROR_AND_DONE;
+break;
+ } 
+ bufBegin_ = NULL;
+ bufEnd_ = NULL;
+ bufLogicalEnd_ = NULL;
+ headRoomCopied_ = 0;
+ prevRangeNum_ = -1; 
+ currRangeBytesRead_ = 0;   
+ recordSkip_ = FALSE;
+ extraBytesRead_ = 0;
+ step_ = TRAF_HDFS_READ;
+  } 
+  break;
+case TRAF_HDFS_READ:
+  {
+ hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap 
*)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
+ if (hdfsScanRetCode == HDFS_SCAN_EOR) {
+step_ = DONE;
+break;
+ }
+ else if (hdfsScanRetCode != HDFS_SCAN_OK) {
+setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
+  currContext->getJniErrorStr(), NULL);
  
+step_ = HANDLE_ERROR_AND_DONE;
+break;
+ } 
+ hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
+ bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED];
+ if (retArray_[RANGE_NO] != prevRangeNum_) {  
+currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
+bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
+if (hdfo->getStartOffset() == 0)
+   recordSkip_ = FALSE;
+else
+   recordSkip_ = TRUE; 
+ } else {
+currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
+bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - 
headRoomCopied_;
+recordSkip_ = FALSE;
+ }
+ if (currRangeBytesRead_ > hdfo->getBytesToRead())
+extraBytesRead_ = currRangeBytesRead_ - 
hdfo->getBytesToRead(); 
+ else
+extraBytesRead_ = 0;
+ // headRoom_ is the number of extra bytes read 
(rangeTailIOSize)
+ // If EOF is reached while reading the range and the 
extraBytes read
+ // is less than headRoom_, then process all the data till EOF 
+ if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
+extraBytesRead_ = 0;
+ bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED] - extraBytesRead_;
+ prevRangeNum_ = retArray_[RANGE_NO];
+ headRoomCopied_ = 0;
+ if (recordSkip_) {
+   hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
+ hdfsScanTdb().recordDelimiter_, 
+  (char *)bufEnd_,
+ checkRangeDelimiter_, 
+ hdfsScanTdb().getHiveScanMode(), );
+if (hdfsBufNextRow_ == NULL) {
--- End diff --

Yes


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167473505
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -380,10 +407,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
   HdfsFileInfo *hdfo = NULL;
   Lng32 openType = 0;
   int changedLen = 0;
- ContextCli *currContext = 
getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
-   hdfsFS hdfs = 
currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
-   hdfsFileInfo *dirInfo = NULL;
-   Int32 hdfsErrorDetail = 0;//this is errno returned form underlying 
hdfsOpenFile call.
+  ContextCli *currContext = 
getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
+  hdfsFS hdfs = 
currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
--- End diff --

Didn't make any change in this line except for aligning it


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167464842
  
--- Diff: core/sql/executor/SequenceFileReader.h ---
@@ -199,15 +185,6 @@ class SequenceFileWriter : public JavaObjectInterface
   
   // Close the file.
   SFW_RetCodeclose();
-
-  SFW_RetCodehdfsCreate(const char* path, NABoolean compress);
-  SFW_RetCodehdfsWrite(const char* data, Int64 size);
-  SFW_RetCodehdfsMergeFiles(const NAString& srcPath,
- const NAString& dstPath);
-  SFW_RetCodehdfsDeletePath(const NAString& delPath);
-  SFW_RetCodehdfsCleanUnloadPath(const NAString& uldPath );
-  SFW_RetCodehdfsExists(const NAString& uldPath,  NABoolean & exists );
-  SFW_RetCodehdfsClose();
   SFW_RetCoderelease();
 
   virtual char*  getErrorText(SFW_RetCode errEnum);
--- End diff --

Should this be static instead of virtual? 


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167457124
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -1149,8 +1297,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
if (hdfsScanTdb().continueOnError())
{
  if ((pentry_down->downState.request == ex_queue::GET_N) &&
- (pentry_down->downState.requestValue == matches_))
-   step_ = CLOSE_FILE;
+ (pentry_down->downState.requestValue == matches_)) {
+ if (useLibhdfsScan_)
+step_ = CLOSE_HDFS_CURSOR;
--- End diff --

Is this intentional, to change from CLOSE_FILE to CLOSE_HDFS_CURSOR?


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167412178
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -118,15 +119,39 @@ ExHdfsScanTcb::ExHdfsScanTcb(
   , dataModCheckDone_(FALSE)
   , loggingErrorDiags_(NULL)
   , loggingFileName_(NULL)
+  , hdfsClient_(NULL)
+  , hdfsScan_(NULL)
+  , hdfsStats_(NULL)
   , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), 
hdfsScanTdb.getHdfsFileInfoList()->numEntries())
 {
   Space * space = (glob ? glob->getSpace() : 0);
   CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
+  useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
+  if (isSequenceFile())
+ useLibhdfsScan_ = TRUE;
   lobGlob_ = NULL;
-  const int readBufSize =  (Int32)hdfsScanTdb.hdfsBufSize_;
-  hdfsScanBuffer_ = new(space) char[ readBufSize + 1 ]; 
-  hdfsScanBuffer_[readBufSize] = '\0';
-
+  hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
+  headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_;
+
+  if (useLibhdfsScan_) {
+ hdfsScanBuffer_ = new(heap) char[ hdfsScanBufMaxSize_ + 1 ]; 
+ hdfsScanBuffer_[hdfsScanBufMaxSize_] = '\0';
+  } else {
+ hdfsScanBufBacking_[0] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * 
(headRoom_)];
--- End diff --

Could we please test this logic with extremely wide rows. Currently we have 
a limitation that the maximum row size cannot be larger than 
hdfsScanBufMaxSize_. It will be a good test to have 10 rows of this size, say 
in 2 or more files and check if we can process it. Logic seems good, this is a 
test suggestion. As you know scanBufNaxSize can re reduced with a cqd, to avoid 
having to deal with rows that are several MBs wide.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167466431
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private int rangeNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+   private int isEOF_ = 0; 
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
+   {
+  HDFSRead() 
+  {
+  }
+ 
+  public Object call() throws IOException 
+  {
+ int bytesRead;
+ int totalBytesRead = 0;
+ if (! buf_.hasArray()) {
--- End diff --

Does this condition mean that somehow the buffer we got from the C side is 
empty or not accessible?


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167465177
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
--- End diff --

Thank you for the comments. Much appreciated.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167412036
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 
 if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
   step_ = DONE;
-else
-  step_ = INIT_HDFS_CURSOR;
+else {
+  if (useLibhdfsScan_)
+ step_ = INIT_HDFS_CURSOR;
+  else
+ step_ = SETUP_HDFS_SCAN;
+}
   }
   break;
-
+case SETUP_HDFS_SCAN:
+  {   
+ if (hdfsScan_ != NULL)
+NADELETE(hdfsScan_, HdfsScan, getHeap());
+ hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), 
hdfsScanBuf_, hdfsScanBufMaxSize_, 
+_, beginRangeNum_, 
numRanges_, hdfsScanTdb().rangeTailIOSize_, 
+hdfsStats_, hdfsScanRetCode);
+ if (hdfsScanRetCode != HDFS_SCAN_OK)
+ {
+setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
+  currContext->getJniErrorStr(), NULL);
  
+step_ = HANDLE_ERROR_AND_DONE;
+break;
+ } 
+ bufBegin_ = NULL;
+ bufEnd_ = NULL;
+ bufLogicalEnd_ = NULL;
+ headRoomCopied_ = 0;
+ prevRangeNum_ = -1; 
+ currRangeBytesRead_ = 0;   
+ recordSkip_ = FALSE;
+ extraBytesRead_ = 0;
+ step_ = TRAF_HDFS_READ;
+  } 
+  break;
+case TRAF_HDFS_READ:
+  {
+ hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap 
*)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
+ if (hdfsScanRetCode == HDFS_SCAN_EOR) {
+step_ = DONE;
+break;
+ }
+ else if (hdfsScanRetCode != HDFS_SCAN_OK) {
+setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
+  currContext->getJniErrorStr(), NULL);
  
+step_ = HANDLE_ERROR_AND_DONE;
+break;
+ } 
+ hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
+ bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED];
+ if (retArray_[RANGE_NO] != prevRangeNum_) {  
+currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
+bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
+if (hdfo->getStartOffset() == 0)
+   recordSkip_ = FALSE;
+else
+   recordSkip_ = TRUE; 
+ } else {
+currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
+bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - 
headRoomCopied_;
+recordSkip_ = FALSE;
+ }
+ if (currRangeBytesRead_ > hdfo->getBytesToRead())
+extraBytesRead_ = currRangeBytesRead_ - 
hdfo->getBytesToRead(); 
+ else
+extraBytesRead_ = 0;
+ // headRoom_ is the number of extra bytes read 
(rangeTailIOSize)
+ // If EOF is reached while reading the range and the 
extraBytes read
+ // is less than headRoom_, then process all the data till EOF 
+ if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
+extraBytesRead_ = 0;
+ bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED] - extraBytesRead_;
+ prevRangeNum_ = retArray_[RANGE_NO];
+ headRoomCopied_ = 0;
+ if (recordSkip_) {
+   hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
+ hdfsScanTdb().recordDelimiter_, 
+  (char *)bufEnd_,
+ checkRangeDelimiter_, 
+ hdfsScanTdb().getHiveScanMode(), );
+if (hdfsBufNextRow_ == NULL) {
--- End diff --

The last record in a file sometimes has no recordDelimiter. Hive accepts 
this. After some trial and error, the libhdfs approach does too. Are we 
handling that case correctly? I cannot tell, this is a question.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167467102
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private int rangeNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+   private int isEOF_ = 0; 
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
+   {
+  HDFSRead() 
+  {
+  }
+ 
+  public Object call() throws IOException 
+  {
+ int bytesRead;
+ int totalBytesRead = 0;
+ if (! buf_.hasArray()) {
+try {
+  fsdis_.seek(pos_);
--- End diff --

Should we set readahead on this stream?


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167401848
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -380,10 +407,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
   HdfsFileInfo *hdfo = NULL;
   Lng32 openType = 0;
   int changedLen = 0;
- ContextCli *currContext = 
getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
-   hdfsFS hdfs = 
currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
-   hdfsFileInfo *dirInfo = NULL;
-   Int32 hdfsErrorDetail = 0;//this is errno returned form underlying 
hdfsOpenFile call.
+  ContextCli *currContext = 
getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
+  hdfsFS hdfs = 
currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
--- End diff --

I could not find where port_ member of the tdb is set. Can you please 
explain?


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-02-11 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167464350
  
--- Diff: core/sql/generator/GenRelScan.cpp ---
@@ -1391,6 +1391,9 @@ if (hTabStats->isOrcFile())
   hdfsscan_tdb->setUseCif(useCIF);
   hdfsscan_tdb->setUseCifDefrag(useCIFDegrag);
 
+  if (CmpCommon::getDefault(USE_LIBHDFS_SCAN) == DF_ON)
--- End diff --

Should we add a check that says if a table has any file that is compressed 
then we will not disable libhdfs scan? Or maybe support for compressed files is 
coming soon and for now we set this cqd OFF only if we thing conditions warrant 
it?


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-31 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r165170894
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
--- End diff --

HdfsClient is meant for both read and write to hdfs text files. Need to 
check if we can use for other format files too. HdfsClient is also used to 
create Hdfs files..

I have made 2nd commit that might help to understand it better with some 
comments in ExHdfsScan.h


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-27 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164280205
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
--- End diff --

Could you please explain how the classes HdfsClient, HdfsClient.HDFSRead 
and HdfsScan are related? Thank you for the nice comments in HdfsScan.java. I 
took HdfsClient to be the class that contains all the methods that we removed 
from SequenceFileWriter.  If that is true, why do we need a HDFSRead subclass? 
Is this for the future or is there some functionality that I missed. For error 
row logging do we need read?  


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-27 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164280041
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
+   {
+  int length_;
+
+  HDFSRead(int length) 
+  {
+ length_ = length;
+  }
+ 
+  public Object call() throws IOException 
+  {
+ int bytesRead;
+ if (buf_.hasArray())
+bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, 
length_);
+ else
+ {
+buf_.limit(bufOffset_ + length_);
+bytesRead = fsdis_.read(buf_);
+ }
+ return new Integer(bytesRead);
+  }
+   }
+   
+   public HDFSClient() 
+   {
+   }
+ 
+   public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long 
position, int length) throws IOException
+   {
+  bufNo_ = bufNo; 
+  filename_ = filename;
+  Path filepath = new Path(filename_);
+  fs_ = FileSystem.get(filepath.toUri(),config_);
+  fsdis_ = fs_.open(filepath);
+  blockSize_ = (int)fs_.getDefaultBlockSize(filepath);
+  buf_  = buffer;
+  bufOffset_ = 0;
+  pos_ = 

[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-27 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164279712
  
--- Diff: core/sql/executor/HdfsClient_JNI.cpp ---
@@ -0,0 +1,452 @@
+//**
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+// **
+
+#include "QRLogger.h"
+#include "Globals.h"
+#include "jni.h"
+#include "HdfsClient_JNI.h"
+
+// 
===
+// = Class HdfsScan
+// 
===
+
+JavaMethodInit* HdfsScan::JavaMethods_ = NULL;
+jclass HdfsScan::javaClass_ = 0;
+bool HdfsScan::javaMethodsInitialized_ = false;
+pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = 
PTHREAD_MUTEX_INITIALIZER;
+
+static const char* const hdfsScanErrorEnumStr[] = 
+{
+};
+
+ 

+//
+// 

+//
+HDFS_Scan_RetCode HdfsScan::init()
+{
+  static char className[]="org/trafodion/sql/HdfsScan";
+  HDFS_Scan_RetCode rc; 
+
+  if (javaMethodsInitialized_)
+return (HDFS_Scan_RetCode)JavaObjectInterface::init(className, 
javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); 
+  else
+  {
+pthread_mutex_lock(_);
+if (javaMethodsInitialized_)
+{
+  pthread_mutex_unlock(_);
+  return (HDFS_Scan_RetCode)JavaObjectInterface::init(className, 
javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
+}
+JavaMethods_ = new JavaMethodInit[JM_LAST];
+
+JavaMethods_[JM_CTOR  ].jm_name  = "";
+JavaMethods_[JM_CTOR  ].jm_signature = "()V";
+JavaMethods_[JM_INIT_SCAN_RANGES].jm_name  = "";
+JavaMethods_[JM_INIT_SCAN_RANGES].jm_signature = 
"(Ljava/lang/Object;Ljava/lang/Object;[Ljava/lang/String;[J[J)V";
+JavaMethods_[JM_TRAF_HDFS_READ].jm_name  = "trafHdfsRead";
+JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I";
+   
+rc = (HDFS_Scan_RetCode)JavaObjectInterface::init(className, 
javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
+javaMethodsInitialized_ = TRUE;
+pthread_mutex_unlock(_);
+  }
+  return rc;
+}
+
+char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum)
+{
+  if (errEnum < (HDFS_Scan_RetCode)JOI_LAST)
+return JavaObjectInterface::getErrorText((JOI_RetCode)errEnum);
+  else
+return (char*)hdfsScanErrorEnumStr[errEnum-HDFS_SCAN_FIRST-1];
--- End diff --

I wonder why there is a "-1" here, but no equivalent "-1"  in 
HdfsClient::getErrorText()


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-27 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164278889
  
--- Diff: core/sql/executor/HdfsClient_JNI.cpp ---
@@ -0,0 +1,452 @@
+//**
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+// **
+
+#include "QRLogger.h"
+#include "Globals.h"
+#include "jni.h"
+#include "HdfsClient_JNI.h"
+
+// 
===
+// = Class HdfsScan
+// 
===
+
+JavaMethodInit* HdfsScan::JavaMethods_ = NULL;
+jclass HdfsScan::javaClass_ = 0;
+bool HdfsScan::javaMethodsInitialized_ = false;
+pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = 
PTHREAD_MUTEX_INITIALIZER;
+
+static const char* const hdfsScanErrorEnumStr[] = 
+{
--- End diff --

This will be populated when HdfsScan JNI methods are introduced


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-27 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164278860
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -1948,6 +1948,54 @@ short ExHdfsScanTcb::handleDone(ExWorkProcRetcode 
)
   return 0;
 }
 
+void ExHdfsScanTcb::handleException(NAHeap *heap,
--- End diff --

JavaObjectInterface is the base class for all these classes.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-27 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164276782
  
--- Diff: core/sql/executor/HdfsClient_JNI.cpp ---
@@ -0,0 +1,452 @@
+//**
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+// **
+
+#include "QRLogger.h"
+#include "Globals.h"
+#include "jni.h"
+#include "HdfsClient_JNI.h"
+
+// 
===
+// = Class HdfsScan
+// 
===
+
+JavaMethodInit* HdfsScan::JavaMethods_ = NULL;
+jclass HdfsScan::javaClass_ = 0;
+bool HdfsScan::javaMethodsInitialized_ = false;
+pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = 
PTHREAD_MUTEX_INITIALIZER;
+
+static const char* const hdfsScanErrorEnumStr[] = 
+{
--- End diff --

I am surprised that this is empty. Is that because HdfsScan java side is 
now in preview and error handling has not been introduced yet?


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-27 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164276621
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -1948,6 +1948,54 @@ short ExHdfsScanTcb::handleDone(ExWorkProcRetcode 
)
   return 0;
 }
 
+void ExHdfsScanTcb::handleException(NAHeap *heap,
--- End diff --

Makes me wish we had a common base class for HBaseAccess, HdfsClient and 
maybe SequenceFileRead/Write. Any think that requiresJava/JNI to go to another 
file format. Could some of this logic be shared in that common base class? This 
is not a request for change, simply a question that will help me understand 
future refactoring choices.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-27 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164275585
  
--- Diff: core/sql/executor/ExHbaseIUD.cpp ---
@@ -1158,7 +1158,7 @@ 
ExHbaseAccessBulkLoadPrepSQTcb::ExHbaseAccessBulkLoadPrepSQTcb(
   "traf_upsert_err",
   fileNum,
   loggingFileName_);
-   LoggingFileCreated_ = FALSE;
+   loggingFileCreated_ = FALSE;
--- End diff --

Is this line necessary now? loggingFileCreated_ belongs to the base class 
now I gather. Its constructor would have done this already.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-27 Thread sureshsubbiah
Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164275414
  
--- Diff: core/sql/executor/ExFastTransport.cpp ---
@@ -1248,6 +1279,23 @@ void 
ExHdfsFastExtractTcb::createSequenceFileError(Int32 sfwRetCode)
   updateWorkATPDiagsArea(diagsArea);
 }
 
+void ExHdfsFastExtractTcb::createHdfsClientFileError(Int32 
hdfsClientRetCode)
+{
+  ContextCli *currContext = GetCliGlobals()->currContext();
+
+  ComDiagsArea * diagsArea = NULL;
+  char* errorMsg = 
hdfsClient_->getErrorText((HDFS_Client_RetCode)hdfsClientRetCode);
+  ExRaiseSqlError(getHeap(),
+  ,
+  (ExeErrorCode)(8447),
+  NULL, NULL, NULL, NULL,
+  errorMsg,
+(char *)currContext->getJniErrorStr().data());
+  //ex_queue_entry *pentry_down = qParent_.down->getHeadEntry();
--- End diff --

Consider removing these 2 lines of comments in next commit. There are 
similar lines in the createSequenceFileError() method that could be removed.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-26 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164237633
  
--- Diff: core/sql/executor/ExFastTransport.h ---
@@ -407,6 +408,7 @@ class ExHdfsFastExtractTcb : public ExFastExtractTcb
   
   NABoolean isSequenceFile();
   void createSequenceFileError(Int32 sfwRetCode);
+  void createHdfsClientFileError(Int32 sfwRetCode);
--- End diff --

Will do in my next commit though it is a just parameter name in the method 
declaration.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-26 Thread selvaganesang
Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164211946
  
--- Diff: core/sql/executor/ExHbaseAccess.cpp ---
@@ -502,6 +506,8 @@ void ExHbaseAccessTcb::freeResources()
  NADELETEBASIC(directRowBuffer_, getHeap());
   if (colVal_.val != NULL)
  NADELETEBASIC(colVal_.val, getHeap());
+  if (hdfsClient_ != NULL) 
+ NADELETE(hdfsClient_, HdfsClient, getHeap());
 }
--- End diff --

Yes. It is a good catch. I will fix this too in my next commit.  This won't 
cause memory leak as such because the heap is destroyed.


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-26 Thread eowhadi
Github user eowhadi commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164189696
  
--- Diff: core/sql/executor/ExHbaseAccess.cpp ---
@@ -502,6 +506,8 @@ void ExHbaseAccessTcb::freeResources()
  NADELETEBASIC(directRowBuffer_, getHeap());
   if (colVal_.val != NULL)
  NADELETEBASIC(colVal_.val, getHeap());
+  if (hdfsClient_ != NULL) 
+ NADELETE(hdfsClient_, HdfsClient, getHeap());
 }
--- End diff --

I am wondering if there is a need to delete the new introduces 
loggingFileName_ here?


---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-26 Thread eowhadi
Github user eowhadi commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164186395
  
--- Diff: core/sql/executor/ExFastTransport.h ---
@@ -407,6 +408,7 @@ class ExHdfsFastExtractTcb : public ExFastExtractTcb
   
   NABoolean isSequenceFile();
   void createSequenceFileError(Int32 sfwRetCode);
+  void createHdfsClientFileError(Int32 sfwRetCode);
--- End diff --

should be hdfsClientRetCode instead of sfwRetCode



---


[GitHub] trafodion pull request #1417: [TRAFODION-2917] Refactor Trafodion implementa...

2018-01-26 Thread selvaganesang
GitHub user selvaganesang opened a pull request:

https://github.com/apache/trafodion/pull/1417

[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for t…

…ext formatted hive tables

Part-1 changes.

Created a new class org.trafodion.sql.HDFSClient. Any direct HDFS access
will be routed to this class via JNI instead of using libhdfs.

Modified the existing code expect for the following to route the HDFS 
request via this class
1. LOB access
2. Direct HDFS scan of the table
3. Sample data creation during update stats

Added a new class org.trafodio.sql.HdfsScan for scanning one or many ranges 
of a Hive
text formatted table. This class will be used for Direct HDFS scan in near 
future.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/selvaganesang/trafodion hdfs_scan_improvements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/trafodion/pull/1417.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1417


commit 60db153329d1ee7088f6805ef3c5eb9eb8b600de
Author: selvaganesang 
Date:   2018-01-26T16:40:37Z

[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text 
formatted hive tables

Part-1 changes.

Created a new class org.trafodion.sql.HDFSClient. Any direct HDFS access
will be routed to this class via JNI instead of using libhdfs.

Modified the existing code expect for the following to route the HDFS 
request via this class
1. LOB access
2. Direct HDFS scan of the table
3. Sample data creation during update stats

Added a new class org.trafodio.sql.HdfsScan for scanning one or many ranges 
of a Hive
text formatted table. This class will be used for Direct HDFS scan in near 
future.




---