[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383270#comment-16383270
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user asfgit closed the pull request at:

https://github.com/apache/trafodion/pull/1461


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16382787#comment-16382787
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

GitHub user selvaganesang opened a pull request:

https://github.com/apache/trafodion/pull/1461

 [TRA[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for 
text formatted hive tables

 Removed the dependency on libhdfs in Trafodion code in some more portions 
of the code.

Also, introduced HDFS Scan to use the java layer to read HDFS data 
providing the following features:
a) Prefetch the data using double buffering concept
b) Avoid unnecessary data copy
c) Ensure that the data read is initiated to be read into the other 
buffer in java layer, while it is being processed in one buffer in the native 
side, without any hiccups.

This HDFS Scan is still being tested and hence it is turned off by 
default.

Also changed the code to obtain millisecond-resolution modification 
timestamp for HDFS files


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/selvaganesang/trafodion libhdfs_removal

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/trafodion/pull/1461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1461


commit ac7066074611a09da33adf88673c2f023e7dda7d
Author: selvaganesang 
Date:   2018-02-28T19:27:40Z

[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text 
formatted hive tables

commit e303b3a083154779bcde8a84e1e2abff12d365e8
Author: selvaganesang 
Date:   2018-02-28T22:25:43Z

[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text 
formatted hive tables

Removed the dependency on libhdfs in Trafodion code in some more portions 
of the code.

Also, introduced HDFS Scan to use the java layer to read HDFS data 
providing the following features:
a) Prefetch the data using double buffering concept
b) Avoid unnecessary data copy
c) Ensure that the data read is initiated to be read into the other buffer 
in java layer, while it is being processed in one buffer in the native side, 
without any hiccups.

This HDFS Scan is still being tested and hence it is turned off by default.

Also changed the code to obtain millisecond-resolution modification 
timestamp for HDFS files




> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16367807#comment-16367807
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user asfgit closed the pull request at:

https://github.com/apache/trafodion/pull/1417


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360349#comment-16360349
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167474575
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---
@@ -0,0 +1,289 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+// This class implements an efficient mechanism to read hdfs files
+// Trafodion ExHdfsScan operator provides a range of scans to be performed.
+// The range consists of a hdfs filename, offset and length to be read
+// This class takes in two ByteBuffers. These ByteBuffer can be either 
direct buffers
+// backed up native buffers or indirect buffer backed by java arrays.
+// All the ranges are read alternating between the two buffers using 
ExecutorService
+// using CachedThreadPool mechanism. 
+// For a given HdfsScan instance, only one thread(IO thread) is scheduled 
to read
+// the next full or partial buffer while the main thread processes the 
previously
+// read information from the other buffer
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import org.trafodion.sql.HDFSClient;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.fs.FileStatus;
+import java.net.URI;
+
+public class HdfsScan 
+{
+   static Logger logger_ = Logger.getLogger(HdfsScan.class.getName());
+   private ByteBuffer buf_[];
+   private int bufLen_[];
+   private HDFSClient hdfsClient_[];
+   private int currRange_;
+   private long currPos_;
+   private long lenRemain_;
+   private int lastBufCompleted_ = -1;
+   private boolean scanCompleted_;
+   
+   class HdfsScanRange 
+   {
+  String filename_;
+  long pos_;
+  long len_;
+  int tdbRangeNum_;
+  
+  HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
+  {
+ filename_ = filename;
+ pos_ = pos;
+ len_ = len;
+ tdbRangeNum_ = tdbRangeNum;
+  }
+   }
+   
+   private HdfsScanRange hdfsScanRanges_[];
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+   }
+
+   public HdfsScan() 
+   {
+   }
+
+   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String 
filename[], long pos[], long len[], int rangeNum[]) throws IOException
+   {
+  buf_ = new ByteBuffer[2];
+  bufLen_ = new int[2];
+
+  buf_[0] = buf1;
+  buf_[1] = buf2;
+
+  for (int i = 0; i < 2 ; i++) {
+  if (buf_[i].hasArray())
+ bufLen_[i] = buf_[i].array().length;
+  else
+ bufLen_[i] = buf_[i].capacity();
+  }
+  hdfsClient_ = new HDFSClient[2];
+  hdfsScanRanges_ = new HdfsScanRange[filename.length]; 
+  for (int i = 0; i < filename.length; i++) {
+ 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360350#comment-16360350
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167474597
  
--- Diff: core/sql/executor/SequenceFileReader.h ---
@@ -199,15 +185,6 @@ class SequenceFileWriter : public JavaObjectInterface
   
   // Close the file.
   SFW_RetCodeclose();
-
-  SFW_RetCodehdfsCreate(const char* path, NABoolean compress);
-  SFW_RetCodehdfsWrite(const char* data, Int64 size);
-  SFW_RetCodehdfsMergeFiles(const NAString& srcPath,
- const NAString& dstPath);
-  SFW_RetCodehdfsDeletePath(const NAString& delPath);
-  SFW_RetCodehdfsCleanUnloadPath(const NAString& uldPath );
-  SFW_RetCodehdfsExists(const NAString& uldPath,  NABoolean & exists );
-  SFW_RetCodehdfsClose();
   SFW_RetCoderelease();
 
   virtual char*  getErrorText(SFW_RetCode errEnum);
--- End diff --

Yes. It can be static


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360344#comment-16360344
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167474246
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private int rangeNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+   private int isEOF_ = 0; 
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
+   {
+  HDFSRead() 
+  {
+  }
+ 
+  public Object call() throws IOException 
+  {
+ int bytesRead;
+ int totalBytesRead = 0;
+ if (! buf_.hasArray()) {
--- End diff --

HdfsScan supports both direct and non-direct buffers.  Currently we use 
Direct byte buffers wherein the ByteBuffer is wrapping the native array in C++. 
However, it is possible for hdfsScan to use non-direct ByteBuffer which is 
wrapper for byteArray in jVM memory


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360340#comment-16360340
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167473755
  
--- Diff: core/sql/executor/ExExeUtilGet.cpp ---
@@ -3521,13 +3521,9 @@ 
ExExeUtilGetHbaseObjectsTcb::ExExeUtilGetHbaseObjectsTcb(
  ex_globals * glob)
  : ExExeUtilGetMetadataInfoTcb( exe_util_tdb, glob)
 {
-  int jniDebugPort = 0;
-  int jniDebugTimeout = 0;
   ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
(char*)exe_util_tdb.server(), 
-   (char*)exe_util_tdb.zkPort(),
-jniDebugPort,
-jniDebugTimeout);
+   (char*)exe_util_tdb.zkPort());
--- End diff --

DebugPort needs to be initialized as part of JVM invocation. See 
JavaObjectInterface:;createJVM. This debugPort is never used. Because it is per 
process, it needs be be made static


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360339#comment-16360339
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167473694
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -1149,8 +1297,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
if (hdfsScanTdb().continueOnError())
{
  if ((pentry_down->downState.request == ex_queue::GET_N) &&
- (pentry_down->downState.requestValue == matches_))
-   step_ = CLOSE_FILE;
+ (pentry_down->downState.requestValue == matches_)) {
+ if (useLibhdfsScan_)
+step_ = CLOSE_HDFS_CURSOR;
--- End diff --

No Thanks for catching it.. Will change it back to CLOSE_HDFS_CURSOR


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360334#comment-16360334
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167473572
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 
 if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
   step_ = DONE;
-else
-  step_ = INIT_HDFS_CURSOR;
+else {
+  if (useLibhdfsScan_)
+ step_ = INIT_HDFS_CURSOR;
+  else
+ step_ = SETUP_HDFS_SCAN;
+}
   }
   break;
-
+case SETUP_HDFS_SCAN:
+  {   
+ if (hdfsScan_ != NULL)
+NADELETE(hdfsScan_, HdfsScan, getHeap());
+ hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), 
hdfsScanBuf_, hdfsScanBufMaxSize_, 
+_, beginRangeNum_, 
numRanges_, hdfsScanTdb().rangeTailIOSize_, 
+hdfsStats_, hdfsScanRetCode);
+ if (hdfsScanRetCode != HDFS_SCAN_OK)
+ {
+setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
+  currContext->getJniErrorStr(), NULL);
  
+step_ = HANDLE_ERROR_AND_DONE;
+break;
+ } 
+ bufBegin_ = NULL;
+ bufEnd_ = NULL;
+ bufLogicalEnd_ = NULL;
+ headRoomCopied_ = 0;
+ prevRangeNum_ = -1; 
+ currRangeBytesRead_ = 0;   
+ recordSkip_ = FALSE;
+ extraBytesRead_ = 0;
+ step_ = TRAF_HDFS_READ;
+  } 
+  break;
+case TRAF_HDFS_READ:
+  {
+ hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap 
*)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
+ if (hdfsScanRetCode == HDFS_SCAN_EOR) {
+step_ = DONE;
+break;
+ }
+ else if (hdfsScanRetCode != HDFS_SCAN_OK) {
+setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
+  currContext->getJniErrorStr(), NULL);
  
+step_ = HANDLE_ERROR_AND_DONE;
+break;
+ } 
+ hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
+ bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED];
+ if (retArray_[RANGE_NO] != prevRangeNum_) {  
+currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
+bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
+if (hdfo->getStartOffset() == 0)
+   recordSkip_ = FALSE;
+else
+   recordSkip_ = TRUE; 
+ } else {
+currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
+bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - 
headRoomCopied_;
+recordSkip_ = FALSE;
+ }
+ if (currRangeBytesRead_ > hdfo->getBytesToRead())
+extraBytesRead_ = currRangeBytesRead_ - 
hdfo->getBytesToRead(); 
+ else
+extraBytesRead_ = 0;
+ // headRoom_ is the number of extra bytes read 
(rangeTailIOSize)
+ // If EOF is reached while reading the range and the 
extraBytes read
+ // is less than headRoom_, then process all the data till EOF 
+ if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
+extraBytesRead_ = 0;
+ bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED] - extraBytesRead_;
+ prevRangeNum_ = retArray_[RANGE_NO];
+ headRoomCopied_ = 0;
+ if (recordSkip_) {
+   hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
+ hdfsScanTdb().recordDelimiter_, 
+  (char *)bufEnd_,
+ checkRangeDelimiter_, 
+ hdfsScanTdb().getHiveScanMode(), );
+if (hdfsBufNextRow_ == NULL) {
--- End diff --

Yes


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360293#comment-16360293
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r165533175
  
--- Diff: core/sql/executor/ExExeUtilGet.cpp ---
@@ -3521,13 +3521,9 @@ 
ExExeUtilGetHbaseObjectsTcb::ExExeUtilGetHbaseObjectsTcb(
  ex_globals * glob)
  : ExExeUtilGetMetadataInfoTcb( exe_util_tdb, glob)
 {
-  int jniDebugPort = 0;
-  int jniDebugTimeout = 0;
   ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
(char*)exe_util_tdb.server(), 
-   (char*)exe_util_tdb.zkPort(),
-jniDebugPort,
-jniDebugTimeout);
+   (char*)exe_util_tdb.zkPort());
--- End diff --

Could you please comment on how we can attach jdb to say sqlci to debug 
some code in HBaseClient.java or HiveClient.java. I was under the impression 
that jniDebugPort helped with that. I see the two removed instance members have 
now been made static class variables of JOI. I could not catch where they were 
initialized though. Please point me to that code.


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360296#comment-16360296
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167412036
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 
 if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
   step_ = DONE;
-else
-  step_ = INIT_HDFS_CURSOR;
+else {
+  if (useLibhdfsScan_)
+ step_ = INIT_HDFS_CURSOR;
+  else
+ step_ = SETUP_HDFS_SCAN;
+}
   }
   break;
-
+case SETUP_HDFS_SCAN:
+  {   
+ if (hdfsScan_ != NULL)
+NADELETE(hdfsScan_, HdfsScan, getHeap());
+ hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), 
hdfsScanBuf_, hdfsScanBufMaxSize_, 
+_, beginRangeNum_, 
numRanges_, hdfsScanTdb().rangeTailIOSize_, 
+hdfsStats_, hdfsScanRetCode);
+ if (hdfsScanRetCode != HDFS_SCAN_OK)
+ {
+setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
+  currContext->getJniErrorStr(), NULL);
  
+step_ = HANDLE_ERROR_AND_DONE;
+break;
+ } 
+ bufBegin_ = NULL;
+ bufEnd_ = NULL;
+ bufLogicalEnd_ = NULL;
+ headRoomCopied_ = 0;
+ prevRangeNum_ = -1; 
+ currRangeBytesRead_ = 0;   
+ recordSkip_ = FALSE;
+ extraBytesRead_ = 0;
+ step_ = TRAF_HDFS_READ;
+  } 
+  break;
+case TRAF_HDFS_READ:
+  {
+ hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap 
*)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
+ if (hdfsScanRetCode == HDFS_SCAN_EOR) {
+step_ = DONE;
+break;
+ }
+ else if (hdfsScanRetCode != HDFS_SCAN_OK) {
+setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
+  currContext->getJniErrorStr(), NULL);
  
+step_ = HANDLE_ERROR_AND_DONE;
+break;
+ } 
+ hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
+ bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED];
+ if (retArray_[RANGE_NO] != prevRangeNum_) {  
+currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
+bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
+if (hdfo->getStartOffset() == 0)
+   recordSkip_ = FALSE;
+else
+   recordSkip_ = TRUE; 
+ } else {
+currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
+bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - 
headRoomCopied_;
+recordSkip_ = FALSE;
+ }
+ if (currRangeBytesRead_ > hdfo->getBytesToRead())
+extraBytesRead_ = currRangeBytesRead_ - 
hdfo->getBytesToRead(); 
+ else
+extraBytesRead_ = 0;
+ // headRoom_ is the number of extra bytes read 
(rangeTailIOSize)
+ // If EOF is reached while reading the range and the 
extraBytes read
+ // is less than headRoom_, then process all the data till EOF 
+ if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
+extraBytesRead_ = 0;
+ bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED] - extraBytesRead_;
+ prevRangeNum_ = retArray_[RANGE_NO];
+ headRoomCopied_ = 0;
+ if (recordSkip_) {
+   hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
+ hdfsScanTdb().recordDelimiter_, 
+  (char *)bufEnd_,
+ checkRangeDelimiter_, 
+ hdfsScanTdb().getHiveScanMode(), );
+if (hdfsBufNextRow_ == NULL) {
--- End diff --

The last record in a file sometimes has no recordDelimiter. Hive accepts 
this. After some trial and error, the libhdfs approach does too. Are we 
handling 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360294#comment-16360294
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167412178
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -118,15 +119,39 @@ ExHdfsScanTcb::ExHdfsScanTcb(
   , dataModCheckDone_(FALSE)
   , loggingErrorDiags_(NULL)
   , loggingFileName_(NULL)
+  , hdfsClient_(NULL)
+  , hdfsScan_(NULL)
+  , hdfsStats_(NULL)
   , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), 
hdfsScanTdb.getHdfsFileInfoList()->numEntries())
 {
   Space * space = (glob ? glob->getSpace() : 0);
   CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
+  useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
+  if (isSequenceFile())
+ useLibhdfsScan_ = TRUE;
   lobGlob_ = NULL;
-  const int readBufSize =  (Int32)hdfsScanTdb.hdfsBufSize_;
-  hdfsScanBuffer_ = new(space) char[ readBufSize + 1 ]; 
-  hdfsScanBuffer_[readBufSize] = '\0';
-
+  hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
+  headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_;
+
+  if (useLibhdfsScan_) {
+ hdfsScanBuffer_ = new(heap) char[ hdfsScanBufMaxSize_ + 1 ]; 
+ hdfsScanBuffer_[hdfsScanBufMaxSize_] = '\0';
+  } else {
+ hdfsScanBufBacking_[0] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * 
(headRoom_)];
--- End diff --

Could we please test this logic with extremely wide rows. Currently we have 
a limitation that the maximum row size cannot be larger than 
hdfsScanBufMaxSize_. It will be a good test to have 10 rows of this size, say 
in 2 or more files and check if we can process it. Logic seems good, this is a 
test suggestion. As you know scanBufNaxSize can re reduced with a cqd, to avoid 
having to deal with rows that are several MBs wide.


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>    

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360298#comment-16360298
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167467102
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private int rangeNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+   private int isEOF_ = 0; 
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
+   {
+  HDFSRead() 
+  {
+  }
+ 
+  public Object call() throws IOException 
+  {
+ int bytesRead;
+ int totalBytesRead = 0;
+ if (! buf_.hasArray()) {
+try {
+  fsdis_.seek(pos_);
--- End diff --

Should we set readahead on this stream?


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360303#comment-16360303
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167465177
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
--- End diff --

Thank you for the comments. Much appreciated.


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360299#comment-16360299
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167457124
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -1149,8 +1297,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
if (hdfsScanTdb().continueOnError())
{
  if ((pentry_down->downState.request == ex_queue::GET_N) &&
- (pentry_down->downState.requestValue == matches_))
-   step_ = CLOSE_FILE;
+ (pentry_down->downState.requestValue == matches_)) {
+ if (useLibhdfsScan_)
+step_ = CLOSE_HDFS_CURSOR;
--- End diff --

Is this intentional, to change from CLOSE_FILE to CLOSE_HDFS_CURSOR?


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360302#comment-16360302
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r167467533
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---
@@ -0,0 +1,289 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+// This class implements an efficient mechanism to read hdfs files
+// Trafodion ExHdfsScan operator provides a range of scans to be performed.
+// The range consists of a hdfs filename, offset and length to be read
+// This class takes in two ByteBuffers. These ByteBuffer can be either 
direct buffers
+// backed up native buffers or indirect buffer backed by java arrays.
+// All the ranges are read alternating between the two buffers using 
ExecutorService
+// using CachedThreadPool mechanism. 
+// For a given HdfsScan instance, only one thread(IO thread) is scheduled 
to read
+// the next full or partial buffer while the main thread processes the 
previously
+// read information from the other buffer
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import org.trafodion.sql.HDFSClient;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.fs.FileStatus;
+import java.net.URI;
+
+public class HdfsScan 
+{
+   static Logger logger_ = Logger.getLogger(HdfsScan.class.getName());
+   private ByteBuffer buf_[];
+   private int bufLen_[];
+   private HDFSClient hdfsClient_[];
+   private int currRange_;
+   private long currPos_;
+   private long lenRemain_;
+   private int lastBufCompleted_ = -1;
+   private boolean scanCompleted_;
+   
+   class HdfsScanRange 
+   {
+  String filename_;
+  long pos_;
+  long len_;
+  int tdbRangeNum_;
+  
+  HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
+  {
+ filename_ = filename;
+ pos_ = pos;
+ len_ = len;
+ tdbRangeNum_ = tdbRangeNum;
+  }
+   }
+   
+   private HdfsScanRange hdfsScanRanges_[];
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+   }
+
+   public HdfsScan() 
+   {
+   }
+
+   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String 
filename[], long pos[], long len[], int rangeNum[]) throws IOException
+   {
+  buf_ = new ByteBuffer[2];
+  bufLen_ = new int[2];
+
+  buf_[0] = buf1;
+  buf_[1] = buf2;
+
+  for (int i = 0; i < 2 ; i++) {
+  if (buf_[i].hasArray())
+ bufLen_[i] = buf_[i].array().length;
+  else
+ bufLen_[i] = buf_[i].capacity();
+  }
+  hdfsClient_ = new HDFSClient[2];
+  hdfsScanRanges_ = new HdfsScanRange[filename.length]; 
+  for (int i = 0; i < filename.length; i++) {
+ 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347522#comment-16347522
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r165170894
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
--- End diff --

HdfsClient is meant for both read and write to hdfs text files. Need to 
check if we can use for other format files too. HdfsClient is also used to 
create Hdfs files..

I have made 2nd commit that might help to understand it better with some 
comments in ExHdfsScan.h


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342290#comment-16342290
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164280205
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
--- End diff --

Could you please explain how the classes HdfsClient, HdfsClient.HDFSRead 
and HdfsScan are related? Thank you for the nice comments in HdfsScan.java. I 
took HdfsClient to be the class that contains all the methods that we removed 
from SequenceFileWriter.  If that is true, why do we need a HDFSRead subclass? 
Is this for the future or is there some functionality that I missed. For error 
row logging do we need read?  


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342288#comment-16342288
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164280041
  
--- Diff: core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+
+   static {
+  String confFile = System.getProperty("trafodion.log4j.configFile");
+  System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+  if (confFile == null) {
+ confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+  }
+  PropertyConfigurator.configure(confFile);
+  config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+  executorService_ = Executors.newCachedThreadPool();
+  try {
+ defaultFs_ = FileSystem.get(config_);
+  }
+  catch (IOException ioe) {
+ throw new RuntimeException("Exception in HDFSClient static 
block", ioe);
+  }
+   }
+
+   class HDFSRead implements Callable 
+   {
+  int length_;
+
+  HDFSRead(int length) 
+  {
+ length_ = length;
+  }
+ 
+  public Object call() throws IOException 
+  {
+ int bytesRead;
+ if (buf_.hasArray())
+bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, 
length_);
+ else
+ {
+buf_.limit(bufOffset_ + length_);
+bytesRead = fsdis_.read(buf_);
+ }
+ return new Integer(bytesRead);
+  }
+   }
+   
+   public HDFSClient() 
+   {
+   }
+ 
+   public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long 
position, int length) throws IOException
+   {
+  bufNo_ = bufNo; 
+  filename_ = filename;
+  Path filepath = new 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342270#comment-16342270
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164278860
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -1948,6 +1948,54 @@ short ExHdfsScanTcb::handleDone(ExWorkProcRetcode 
)
   return 0;
 }
 
+void ExHdfsScanTcb::handleException(NAHeap *heap,
--- End diff --

JavaObjectInterface is the base class for all these classes.


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342238#comment-16342238
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164276782
  
--- Diff: core/sql/executor/HdfsClient_JNI.cpp ---
@@ -0,0 +1,452 @@
+//**
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+// **
+
+#include "QRLogger.h"
+#include "Globals.h"
+#include "jni.h"
+#include "HdfsClient_JNI.h"
+
+// 
===
+// = Class HdfsScan
+// 
===
+
+JavaMethodInit* HdfsScan::JavaMethods_ = NULL;
+jclass HdfsScan::javaClass_ = 0;
+bool HdfsScan::javaMethodsInitialized_ = false;
+pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = 
PTHREAD_MUTEX_INITIALIZER;
+
+static const char* const hdfsScanErrorEnumStr[] = 
+{
--- End diff --

I am surprised that this is empty. Is that because HdfsScan java side is 
now in preview and error handling has not been introduced yet?


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U 

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342235#comment-16342235
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164276621
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -1948,6 +1948,54 @@ short ExHdfsScanTcb::handleDone(ExWorkProcRetcode 
)
   return 0;
 }
 
+void ExHdfsScanTcb::handleException(NAHeap *heap,
--- End diff --

Makes me wish we had a common base class for HBaseAccess, HdfsClient and 
maybe SequenceFileRead/Write. Any think that requiresJava/JNI to go to another 
file format. Could some of this logic be shared in that common base class? This 
is not a request for change, simply a question that will help me understand 
future refactoring choices.


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342233#comment-16342233
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user sureshsubbiah commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164276571
  
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -283,6 +285,8 @@ void ExHdfsScanTcb::freeResources()
  ExpLOBinterfaceCleanup(lobGlob_, (NAHeap 
*)getGlobals()->getDefaultHeap());
  lobGlob_ = NULL;
   }
+  if (hdfsClient_ != NULL) 
+ NADELETE(hdfsClient_, HdfsClient, getHeap());
--- End diff --

Same comment as Eric on ExHBaseAccess::freeResources(). Should we release 
loggingFileName_ here? Constructor guarantees it is never null.


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341705#comment-16341705
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164237633
  
--- Diff: core/sql/executor/ExFastTransport.h ---
@@ -407,6 +408,7 @@ class ExHdfsFastExtractTcb : public ExFastExtractTcb
   
   NABoolean isSequenceFile();
   void createSequenceFileError(Int32 sfwRetCode);
+  void createHdfsClientFileError(Int32 sfwRetCode);
--- End diff --

Will do in my next commit though it is a just parameter name in the method 
declaration.


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341531#comment-16341531
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user selvaganesang commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164211946
  
--- Diff: core/sql/executor/ExHbaseAccess.cpp ---
@@ -502,6 +506,8 @@ void ExHbaseAccessTcb::freeResources()
  NADELETEBASIC(directRowBuffer_, getHeap());
   if (colVal_.val != NULL)
  NADELETEBASIC(colVal_.val, getHeap());
+  if (hdfsClient_ != NULL) 
+ NADELETE(hdfsClient_, HdfsClient, getHeap());
 }
--- End diff --

Yes. It is a good catch. I will fix this too in my next commit.  This won't 
cause memory leak as such because the heap is destroyed.


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341414#comment-16341414
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user eowhadi commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164189696
  
--- Diff: core/sql/executor/ExHbaseAccess.cpp ---
@@ -502,6 +506,8 @@ void ExHbaseAccessTcb::freeResources()
  NADELETEBASIC(directRowBuffer_, getHeap());
   if (colVal_.val != NULL)
  NADELETEBASIC(colVal_.val, getHeap());
+  if (hdfsClient_ != NULL) 
+ NADELETE(hdfsClient_, HdfsClient, getHeap());
 }
--- End diff --

I am wondering if there is a need to delete the new introduces 
loggingFileName_ here?


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341394#comment-16341394
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
---

Github user eowhadi commented on a diff in the pull request:

https://github.com/apache/trafodion/pull/1417#discussion_r164186395
  
--- Diff: core/sql/executor/ExFastTransport.h ---
@@ -407,6 +408,7 @@ class ExHdfsFastExtractTcb : public ExFastExtractTcb
   
   NABoolean isSequenceFile();
   void createSequenceFileError(Int32 sfwRetCode);
+  void createHdfsClientFileError(Int32 sfwRetCode);
--- End diff --

should be hdfsClientRetCode instead of sfwRetCode



> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>   U hdfsCloseFile
>  U hdfsConnect
>  U hdfsDelete
>  U hdfsExists
>  U hdfsFlush
>      U hdfsFreeFileInfo
>  U hdfsGetPathInfo
>  U hdfsListDirectory
>  U hdfsOpenFile
>  U hdfsPread
>  U hdfsRename
>  U hdfsWrite
>  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

2018-01-18 Thread Sandhya Sundaresan (JIRA)

[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331099#comment-16331099
 ] 

Sandhya Sundaresan commented on TRAFODION-2917:
---

Description of the current flow is below. As you can see the opens, reads, and 
main thread processing are highly synchronized so there is no waiting anywhere. 
If the new implementation can retain this level of parallel reading and yet be 
a lot simpler it is worth investing the time in getting rid of this code since 
this code is pretty difficult to debug although we have ironed out many hangs 
and bugs. So I think we should retain this code with a toggle  for a while 
since we’ve ironed out a lot of bugs. We can switch over to new code after some 
exposure and extensive testing. 

 

Here is how the flow goes currently :

1) main thread opens one range (64MB). This open sets up some initial 
structures, open hdfs file and kicks off one worker thread on its mission to 
start fetching, say, 16KB buffers. It does not wait for worker thread to 
complete the fetches. It just returns .

 

2) Main thread determines that there will be one more range to work on after 
the current one completes. So, it calls pre-open on the next range (ie., next 
128MB, a new file). This pre-open request just puts a new pre-open object in 
the pre-open list. Does nothing at this point.

 

3) Main thread then issues a read. Since worker thread had already begun 
fetching 16KB buffers in (1), the main thread most likely will not need to wait 
and the data will be ready. It keeps consuming the buffers and  recycling them 
back into postFetchBufList.

 

4) When done, main thread closes the cursor.

 

What goes in the worker thread when it gets kicked off in (1) -

 

1) Worker thread reads, say 16KB of data and buffers them in prefetchBufList. 
It continues doing this until either buffer limit is reached, or range is 
completely fetched, or was asked prematurely to stop reading.

 

2) After the entire range is completely read, it processes the pre-open request 
that was queued in (2) above. Pre-open is simply another open, which kicks off 
second worker thread to start fetching the next range. The first worker thread 
then goes back to sleep. The pre-open is necessary to  avoid a delay when going 
from one range to another. The cost otherwise would be I/O time to open the new 
hdfs file, position to the correct offset within that file and then kick off 
the next worker thread. With pre-open, the main thread sees a continues stream 
of data already fetched when it does the next open.

 

So with these two parallel activities, data is always prefetched upto one range 
in advance, not more. This is because a new pre-open request is only scheduled 
at the time of new open in the main thread.

 

Only one worker thread reads the data and puts it in the prefetchBufList. This 
is because, the buffers in prefetchBufList are sequentially ordered by the 
offset of the 16KB data read from hdfs file. It cannot have out of order 
buffers because the main thread assumes the next buffer it reads begins at 
where the previous buffer ends. The second worker thread is needed so that when 
first one processes a pre-open, it can kick off second one to start prefetching.

 

Worker threads go away when shutdown request is scheduled. This is in 
LobGlobals destructor which gets called as part of TCB destructor.

> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
>  Issue Type: New Feature
>  Components: sql-general
>Reporter: Selvaganesan Govindarajan
>Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the