[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-02-05 Thread sansanichfb
Github user sansanichfb closed the pull request at:

https://github.com/apache/incubator-hawq/pull/1326


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-02-05 Thread sansanichfb
Github user sansanichfb commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r166175619
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java
 ---
@@ -0,0 +1,168 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Parquet file accessor.
--- End diff --

Added 


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-02-05 Thread sansanichfb
Github user sansanichfb commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r166173537
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
 ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, 
String compCodec) {
  * @param fsp file split to be serialized
  * @return byte serialization of fsp
  * @throws IOException if I/O errors occur while writing to the 
underlying
- * stream
+ * stream
  */
 public static byte[] prepareFragmentMetadata(FileSplit fsp)
 throws IOException {
-ByteArrayOutputStream byteArrayStream = new 
ByteArrayOutputStream();
-ObjectOutputStream objectStream = new ObjectOutputStream(
-byteArrayStream);
-objectStream.writeLong(fsp.getStart());
-objectStream.writeLong(fsp.getLength());
-objectStream.writeObject(fsp.getLocations());
+
+return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), 
fsp.getLocations());
+
+}
+
+public static byte[] prepareFragmentMetadata(long start, long length, 
String[] locations)
--- End diff --

Both functions are used so I would rather keep them both for sake of 
compatibility.


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-02-02 Thread sansanichfb
Github user sansanichfb commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r165799526
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java
 ---
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParquetDataFragmenter extends Fragmenter {
+private Job job;
+
+public ParquetDataFragmenter(InputData md) {
+super(md);
+JobConf jobConf = new JobConf(new Configuration(), 
ParquetDataFragmenter.class);
+try {
+job = Job.getInstance(jobConf);
+} catch (IOException e) {
+throw new RuntimeException("Unable to instantiate a job for 
reading fragments", e);
+}
+}
+
+
+@Override
+public List getFragments() throws Exception {
+String absoluteDataPath = 
HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+ArrayList splits = getSplits(new 
Path(absoluteDataPath));
+
+for (InputSplit split : splits) {
+FileSplit fsp = (FileSplit) split;
+
+String filepath = fsp.getPath().toUri().getPath();
+String[] hosts = fsp.getLocations();
+
+Path file = new Path(filepath);
+
+ParquetMetadata metadata = ParquetFileReader.readFooter(
+job.getConfiguration(), file, 
ParquetMetadataConverter.NO_FILTER);
+MessageType schema = metadata.getFileMetaData().getSchema();
+
+byte[] fragmentMetadata = 
HdfsUtilities.prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), 
fsp.getLocations());
--- End diff --

This method is needed to support 
`org.apache.hadoop.mapreduce.lib.input.FileSplit` type.


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-02-02 Thread sansanichfb
Github user sansanichfb commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r165783402
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
 ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, 
String compCodec) {
  * @param fsp file split to be serialized
  * @return byte serialization of fsp
  * @throws IOException if I/O errors occur while writing to the 
underlying
- * stream
+ * stream
  */
 public static byte[] prepareFragmentMetadata(FileSplit fsp)
 throws IOException {
-ByteArrayOutputStream byteArrayStream = new 
ByteArrayOutputStream();
-ObjectOutputStream objectStream = new ObjectOutputStream(
-byteArrayStream);
-objectStream.writeLong(fsp.getStart());
-objectStream.writeLong(fsp.getLength());
-objectStream.writeObject(fsp.getLocations());
+
+return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), 
fsp.getLocations());
+
+}
+
+public static byte[] prepareFragmentMetadata(long start, long length, 
String[] locations)
+throws IOException {
+
+ByteArrayOutputStream byteArrayStream = 
writeBaseFragmentInfo(start, length, locations);
 
 return byteArrayStream.toByteArray();
+
+}
+
+private static ByteArrayOutputStream writeBaseFragmentInfo(long start, 
long length, String[] locations) throws IOException {
+ByteArrayOutputStream byteArrayStream = new 
ByteArrayOutputStream();
+ObjectOutputStream objectStream = new 
ObjectOutputStream(byteArrayStream);
+objectStream.writeLong(start);
+objectStream.writeLong(length);
+objectStream.writeObject(locations);
+return byteArrayStream;
+}
+
+public static byte[] prepareFragmentMetadata(long start,
--- End diff --

Thanks, deleted as unnecessary 


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-02-02 Thread sansanichfb
Github user sansanichfb commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r165782793
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java
 ---
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParquetDataFragmenter extends Fragmenter {
+private Job job;
+
+public ParquetDataFragmenter(InputData md) {
+super(md);
+JobConf jobConf = new JobConf(new Configuration(), 
ParquetDataFragmenter.class);
+try {
+job = Job.getInstance(jobConf);
+} catch (IOException e) {
+throw new RuntimeException("Unable to instantiate a job for 
reading fragments", e);
+}
+}
+
+
+@Override
+public List getFragments() throws Exception {
+String absoluteDataPath = 
HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+ArrayList splits = getSplits(new 
Path(absoluteDataPath));
--- End diff --

Thanks, updated


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-02-02 Thread sansanichfb
Github user sansanichfb commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r165782764
  
--- Diff: pxf/pxf-service/src/scripts/pxf-env.sh ---
@@ -54,3 +54,5 @@ export HADOOP_DISTRO=${HADOOP_DISTRO}
 # Parent directory of Hadoop client installation (optional)
 # used in case of tarball-based installation when all clients are under a 
common parent directory
 export HADOOP_ROOT=${HADOOP_ROOT}
+
+export 
CATALINA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
--- End diff --

Sure, deleted.


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-01-02 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159308210
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
 ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, 
String compCodec) {
  * @param fsp file split to be serialized
  * @return byte serialization of fsp
  * @throws IOException if I/O errors occur while writing to the 
underlying
- * stream
+ * stream
  */
 public static byte[] prepareFragmentMetadata(FileSplit fsp)
 throws IOException {
-ByteArrayOutputStream byteArrayStream = new 
ByteArrayOutputStream();
-ObjectOutputStream objectStream = new ObjectOutputStream(
-byteArrayStream);
-objectStream.writeLong(fsp.getStart());
-objectStream.writeLong(fsp.getLength());
-objectStream.writeObject(fsp.getLocations());
+
+return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), 
fsp.getLocations());
+
+}
+
+public static byte[] prepareFragmentMetadata(long start, long length, 
String[] locations)
--- End diff --

or better to incorporate 2 lines from this function into the parent 
function, if it only is used once.


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-01-02 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159306278
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java
 ---
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParquetDataFragmenter extends Fragmenter {
+private Job job;
+
+public ParquetDataFragmenter(InputData md) {
+super(md);
+JobConf jobConf = new JobConf(new Configuration(), 
ParquetDataFragmenter.class);
+try {
+job = Job.getInstance(jobConf);
+} catch (IOException e) {
+throw new RuntimeException("Unable to instantiate a job for 
reading fragments", e);
+}
+}
+
+
+@Override
+public List getFragments() throws Exception {
+String absoluteDataPath = 
HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+ArrayList splits = getSplits(new 
Path(absoluteDataPath));
--- End diff --

usually best to declare type as List, especially since there is no direct 
access calls that would justify narrowing this to ArrayList


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-01-02 Thread denalex
Github user denalex commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159308579
  
--- Diff: pxf/pxf-service/src/scripts/pxf-env.sh ---
@@ -54,3 +54,5 @@ export HADOOP_DISTRO=${HADOOP_DISTRO}
 # Parent directory of Hadoop client installation (optional)
 # used in case of tarball-based installation when all clients are under a 
common parent directory
 export HADOOP_ROOT=${HADOOP_ROOT}
+
+export 
CATALINA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
--- End diff --

this is for debugging, should not be committed !


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-01-02 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159298493
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java
 ---
@@ -0,0 +1,168 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Parquet file accessor.
--- End diff --

Also mention what exactly the accessor call returns (record or chunk etc)


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-01-02 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159297714
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
 ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, 
String compCodec) {
  * @param fsp file split to be serialized
  * @return byte serialization of fsp
  * @throws IOException if I/O errors occur while writing to the 
underlying
- * stream
+ * stream
  */
 public static byte[] prepareFragmentMetadata(FileSplit fsp)
 throws IOException {
-ByteArrayOutputStream byteArrayStream = new 
ByteArrayOutputStream();
-ObjectOutputStream objectStream = new ObjectOutputStream(
-byteArrayStream);
-objectStream.writeLong(fsp.getStart());
-objectStream.writeLong(fsp.getLength());
-objectStream.writeObject(fsp.getLocations());
+
+return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), 
fsp.getLocations());
+
+}
+
+public static byte[] prepareFragmentMetadata(long start, long length, 
String[] locations)
--- End diff --

This can be made private 


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-01-02 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159299485
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetFileAccessor.java
 ---
@@ -0,0 +1,168 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Parquet file accessor.
+ */
+public class ParquetFileAccessor extends Plugin implements ReadAccessor {
+private ParquetFileReader reader;
+private MessageColumnIO columnIO;
+private RecordIterator recordIterator;
+private MessageType schema;
+
+
+private class RecordIterator implements Iterator {
+
+private final ParquetFileReader reader;
+private PageReadStore currentRowGroup;
+private RecordReader recordReader;
+private long rowsRemainedInRowGroup;
+
+public RecordIterator(ParquetFileReader reader) {
+this.reader = reader;
+readNextRowGroup();
+}
+
+@Override
+public boolean hasNext() {
+return rowsRemainedInRowGroup > 0;
+}
+
+@Override
+public OneRow next() {
+return new OneRow(null, readNextGroup());
+}
+
+@Override
+public void remove() {
+throw new UnsupportedOperationException();
+}
+
+private void readNextRowGroup() {
+try {
+currentRowGroup = reader.readNextRowGroup();
+} catch (IOException e) {
+throw new RuntimeException("Error occurred during reading 
new row group", e);
+}
+if (currentRowGroup == null)
+return;
+rowsRemainedInRowGroup = currentRowGroup.getRowCount();
+recordReader = columnIO.getRecordReader(currentRowGroup, new 
GroupRecordConverter(schema));
+}
+
+private Group readNextGroup() {
+Group g = null;
+if (rowsRemainedInRowGroup == 0) {
+readNextRowGroup();
+if (currentRowGroup != null) {
+g = recordReader.read();
+}
+} else {
+g = recordReader.read();
+if (g == null) {
--- End diff --

Instead of this. Why don't we simply invoke recordReader.read() 
currentRowGroup.getRowCount() # of times instead of decrementing 
rowsRemainedInRowGroup which is making this code look complicated


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-01-02 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159298191
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ParquetDataFragmenter.java
 ---
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParquetDataFragmenter extends Fragmenter {
+private Job job;
+
+public ParquetDataFragmenter(InputData md) {
+super(md);
+JobConf jobConf = new JobConf(new Configuration(), 
ParquetDataFragmenter.class);
+try {
+job = Job.getInstance(jobConf);
+} catch (IOException e) {
+throw new RuntimeException("Unable to instantiate a job for 
reading fragments", e);
+}
+}
+
+
+@Override
--- End diff --

Comments would be useful describing the components of the Fragment data here


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2018-01-02 Thread shivzone
Github user shivzone commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159297386
  
--- Diff: 
pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
 ---
@@ -151,18 +153,42 @@ public static boolean isThreadSafe(String dataDir, 
String compCodec) {
  * @param fsp file split to be serialized
  * @return byte serialization of fsp
  * @throws IOException if I/O errors occur while writing to the 
underlying
- * stream
+ * stream
  */
 public static byte[] prepareFragmentMetadata(FileSplit fsp)
 throws IOException {
-ByteArrayOutputStream byteArrayStream = new 
ByteArrayOutputStream();
-ObjectOutputStream objectStream = new ObjectOutputStream(
-byteArrayStream);
-objectStream.writeLong(fsp.getStart());
-objectStream.writeLong(fsp.getLength());
-objectStream.writeObject(fsp.getLocations());
+
+return prepareFragmentMetadata(fsp.getStart(), fsp.getLength(), 
fsp.getLocations());
+
+}
+
+public static byte[] prepareFragmentMetadata(long start, long length, 
String[] locations)
+throws IOException {
+
+ByteArrayOutputStream byteArrayStream = 
writeBaseFragmentInfo(start, length, locations);
 
 return byteArrayStream.toByteArray();
+
+}
+
+private static ByteArrayOutputStream writeBaseFragmentInfo(long start, 
long length, String[] locations) throws IOException {
+ByteArrayOutputStream byteArrayStream = new 
ByteArrayOutputStream();
+ObjectOutputStream objectStream = new 
ObjectOutputStream(byteArrayStream);
+objectStream.writeLong(start);
+objectStream.writeLong(length);
+objectStream.writeObject(locations);
+return byteArrayStream;
+}
+
+public static byte[] prepareFragmentMetadata(long start,
--- End diff --

I do'nt see this function used anywhere ?


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2017-12-29 Thread sansanichfb
Github user sansanichfb commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159110011
  
--- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---
@@ -184,4 +184,13 @@ under the License.
 
org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver
 
 
+
+Parquet
--- End diff --

The storage layer is determined in fragmenter and accessor, resolver should 
remain the same. I would leave it as Parquet and later reconsider storage vs 
format naming conventions for all profiles.


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2017-12-29 Thread lisakowen
Github user lisakowen commented on a diff in the pull request:

https://github.com/apache/incubator-hawq/pull/1326#discussion_r159073951
  
--- Diff: pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---
@@ -184,4 +184,13 @@ under the License.
 
org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver
 
 
+
+Parquet
--- End diff --

if someone could create another connector/profile that would access parquet 
files from a different external data store, we should consider qualifying the 
profile name, i.e. HdfsParquet.  (along the same lines, perhaps we should 
qualify the profiles named Avro and Json.)


---


[GitHub] incubator-hawq pull request #1326: HAWQ-1575. Implemented readable Parquet p...

2017-12-28 Thread sansanichfb
GitHub user sansanichfb opened a pull request:

https://github.com/apache/incubator-hawq/pull/1326

HAWQ-1575. Implemented readable Parquet profile for PXF.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sansanichfb/incubator-hawq HAWQ-1575

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-hawq/pull/1326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1326


commit f03bbb062b0e1d522dbae6d30d84cd0364f2af1c
Author: Alex Diachenko 
Date:   2017-11-30T22:14:26Z

HAWQ-1575. Implemented readable Parquet profile for PXF.




---