http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java new file mode 100644 index 0000000..025797b --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java @@ -0,0 +1,94 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; + +import org.apache.hawq.pxf.api.Metadata; + +/** + * Utility class for converting {@link Metadata} into a JSON format. + */ +public class MetadataResponseFormatter { + + private static final Log LOG = LogFactory.getLog(MetadataResponseFormatter.class); + + /** + * Converts list of {@link Metadata} to JSON String format. + * + * @param metadataList list of metadata objects to convert + * @return JSON formatted response + * @throws IOException if converting the data to JSON fails + */ + public static MetadataResponse formatResponse(List<Metadata> metadataList, String path) throws IOException { + /* print the fragment list to log when in debug level */ + if (LOG.isDebugEnabled()) { + MetadataResponseFormatter.printMetadata(metadataList, path); + } + + return new MetadataResponse(metadataList); + } + + /** + * Converts metadata list to a readable string. + * Intended for debugging purposes only. + */ + private static void printMetadata(List<Metadata> metadataList, String path) { + LOG.debug("Metadata List for path " + path + ": "); + + if (null == metadataList || metadataList.isEmpty()) { + LOG.debug("No metadata"); + return; + } + + for(Metadata metadata: metadataList) { + StringBuilder result = new StringBuilder(); + + if (metadata == null) { + result.append("None"); + LOG.debug(result); + continue; + } + + result.append("Metadata for item \"").append(metadata.getItem()).append("\": "); + + if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) { + result.append("None"); + } else { + int i = 0; + for (Metadata.Field field : metadata.getFields()) { + result.append("Field #").append(++i).append(": [") + .append("Name: ").append(field.getName()) + .append(", Type: ").append(field.getType().getTypeName()) + .append(", Source type: ").append(field.getSourceType()).append("] "); + } + } + LOG.debug(result); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java new file mode 100644 index 0000000..01a95ab --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java @@ -0,0 +1,179 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.BadRecordException; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.hawq.pxf.api.utilities.Utilities; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.*; +import java.nio.charset.CharacterCodingException; +import java.util.LinkedList; +import java.util.zip.ZipException; + +/** + * ReadBridge class creates appropriate accessor and resolver. It will then + * create the correct output conversion class (e.g. Text or GPDBWritable) and + * get records from accessor, let resolver deserialize them and reserialize them + * using the output conversion class. <br> + * The class handles BadRecordException and other exception type and marks the + * record as invalid for HAWQ. + */ +public class ReadBridge implements Bridge { + ReadAccessor fileAccessor = null; + ReadResolver fieldsResolver = null; + BridgeOutputBuilder outputBuilder = null; + LinkedList<Writable> outputQueue = null; + + private static final Log LOG = LogFactory.getLog(ReadBridge.class); + + /** + * C'tor - set the implementation of the bridge. + * + * @param protData input containing accessor and resolver names + * @throws Exception if accessor or resolver can't be instantiated + */ + public ReadBridge(ProtocolData protData) throws Exception { + outputBuilder = new BridgeOutputBuilder(protData); + outputQueue = new LinkedList<Writable>(); + fileAccessor = getFileAccessor(protData); + fieldsResolver = getFieldsResolver(protData); + } + + /** + * Accesses the underlying HDFS file. + */ + @Override + public boolean beginIteration() throws Exception { + return fileAccessor.openForRead(); + } + + /** + * Fetches next object from file and turn it into a record that the HAWQ + * backend can process. + */ + @Override + public Writable getNext() throws Exception { + Writable output = null; + OneRow onerow = null; + + if (!outputQueue.isEmpty()) { + return outputQueue.pop(); + } + + try { + while (outputQueue.isEmpty()) { + onerow = fileAccessor.readNextObject(); + if (onerow == null) { + fileAccessor.closeForRead(); + output = outputBuilder.getPartialLine(); + if (output != null) { + LOG.warn("A partial record in the end of the fragment"); + } + // if there is a partial line, return it now, otherwise it + // will return null + return output; + } + + // we checked before that outputQueue is empty, so we can + // override it. + outputQueue = outputBuilder.makeOutput(fieldsResolver.getFields(onerow)); + if (!outputQueue.isEmpty()) { + output = outputQueue.pop(); + break; + } + } + } catch (IOException ex) { + if (!isDataException(ex)) { + fileAccessor.closeForRead(); + throw ex; + } + output = outputBuilder.getErrorOutput(ex); + } catch (BadRecordException ex) { + String row_info = "null"; + if (onerow != null) { + row_info = onerow.toString(); + } + if (ex.getCause() != null) { + LOG.debug("BadRecordException " + ex.getCause().toString() + + ": " + row_info); + } else { + LOG.debug(ex.toString() + ": " + row_info); + } + output = outputBuilder.getErrorOutput(ex); + } catch (Exception ex) { + fileAccessor.closeForRead(); + throw ex; + } + + return output; + } + + public static ReadAccessor getFileAccessor(InputData inputData) + throws Exception { + return (ReadAccessor) Utilities.createAnyInstance(InputData.class, + inputData.getAccessor(), inputData); + } + + public static ReadResolver getFieldsResolver(InputData inputData) + throws Exception { + return (ReadResolver) Utilities.createAnyInstance(InputData.class, + inputData.getResolver(), inputData); + } + + /* + * There are many exceptions that inherit IOException. Some of them like + * EOFException are generated due to a data problem, and not because of an + * IO/connection problem as the father IOException might lead us to believe. + * For example, an EOFException will be thrown while fetching a record from + * a sequence file, if there is a formatting problem in the record. Fetching + * record from the sequence-file is the responsibility of the accessor so + * the exception will be thrown from the accessor. We identify this cases by + * analyzing the exception type, and when we discover that the actual + * problem was a data problem, we return the errorOutput GPDBWritable. + */ + private boolean isDataException(IOException ex) { + return (ex instanceof EOFException + || ex instanceof CharacterCodingException + || ex instanceof CharConversionException + || ex instanceof UTFDataFormatException || ex instanceof ZipException); + } + + @Override + public boolean setNext(DataInputStream inputStream) { + throw new UnsupportedOperationException("setNext is not implemented"); + } + + @Override + public boolean isThreadSafe() { + boolean result = ((Plugin) fileAccessor).isThreadSafe() + && ((Plugin) fieldsResolver).isThreadSafe(); + LOG.debug("Bridge is " + (result ? "" : "not ") + "thread safe"); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java new file mode 100644 index 0000000..d5ae66a --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java @@ -0,0 +1,131 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.DataInputStream; +import java.util.BitSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.AnalyzeUtils; +import org.apache.hawq.pxf.service.utilities.ProtocolData; + +/** + * ReadSamplingBridge wraps a ReadBridge, and returns only some of the output + * records, based on a ratio sample. The sample to pass or discard a record is + * done after all of the processing is completed ( + * {@code accessor -> resolver -> output builder}) to make sure there are no + * chunks of data instead of single records. <br> + * The goal is to get as uniform as possible sampling. This is achieved by + * creating a bit map matching the precision of the sampleRatio, so that for a + * ratio of 0.034, a bit-map of 1000 bits will be created, and 34 bits will be + * set. This map is matched against each read record, discarding ones with a 0 + * bit and continuing until a 1 bit record is read. + */ +public class ReadSamplingBridge implements Bridge { + + ReadBridge bridge; + + float sampleRatio; + BitSet sampleBitSet; + int bitSetSize; + int sampleSize; + int curIndex; + + private static final Log LOG = LogFactory.getLog(ReadSamplingBridge.class); + + /** + * C'tor - set the implementation of the bridge. + * + * @param protData input containing sampling ratio + * @throws Exception if the sampling ratio is wrong + */ + public ReadSamplingBridge(ProtocolData protData) throws Exception { + bridge = new ReadBridge(protData); + + this.sampleRatio = protData.getStatsSampleRatio(); + if (sampleRatio < 0.0001 || sampleRatio > 1.0) { + throw new IllegalArgumentException( + "sampling ratio must be a value between 0.0001 and 1.0. " + + "(value = " + sampleRatio + ")"); + } + + calculateBitSetSize(); + + this.sampleBitSet = AnalyzeUtils.generateSamplingBitSet(bitSetSize, + sampleSize); + this.curIndex = 0; + } + + private void calculateBitSetSize() { + + sampleSize = (int) (sampleRatio * 10000); + bitSetSize = 10000; + + while ((bitSetSize > 100) && (sampleSize % 10 == 0)) { + bitSetSize /= 10; + sampleSize /= 10; + } + LOG.debug("bit set size = " + bitSetSize + " sample size = " + + sampleSize); + } + + /** + * Fetches next sample, according to the sampling ratio. + */ + @Override + public Writable getNext() throws Exception { + Writable output = bridge.getNext(); + + // sample - if bit is false, advance to the next object + while (!sampleBitSet.get(curIndex)) { + + if (output == null) { + break; + } + incIndex(); + output = bridge.getNext(); + } + + incIndex(); + return output; + } + + private void incIndex() { + curIndex = (++curIndex) % bitSetSize; + } + + @Override + public boolean beginIteration() throws Exception { + return bridge.beginIteration(); + } + + @Override + public boolean setNext(DataInputStream inputStream) throws Exception { + return bridge.setNext(inputStream); + } + + @Override + public boolean isThreadSafe() { + return bridge.isThreadSafe(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java new file mode 100644 index 0000000..c3ee731 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java @@ -0,0 +1,117 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.api.*; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.hawq.pxf.api.utilities.Utilities; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInputStream; +import java.util.List; + +/* + * WriteBridge class creates appropriate accessor and resolver. + * It reads data from inputStream by the resolver, + * and writes it to the Hadoop storage with the accessor. + */ +public class WriteBridge implements Bridge { + private static final Log LOG = LogFactory.getLog(WriteBridge.class); + WriteAccessor fileAccessor = null; + WriteResolver fieldsResolver = null; + BridgeInputBuilder inputBuilder; + + /* + * C'tor - set the implementation of the bridge + */ + public WriteBridge(ProtocolData protocolData) throws Exception { + + inputBuilder = new BridgeInputBuilder(protocolData); + /* plugins accept InputData parameters */ + fileAccessor = getFileAccessor(protocolData); + fieldsResolver = getFieldsResolver(protocolData); + + } + + /* + * Accesses the underlying HDFS file + */ + @Override + public boolean beginIteration() throws Exception { + return fileAccessor.openForWrite(); + } + + /* + * Read data from stream, convert it using WriteResolver into OneRow object, and + * pass to WriteAccessor to write into file. + */ + @Override + public boolean setNext(DataInputStream inputStream) throws Exception { + + List<OneField> record = inputBuilder.makeInput(inputStream); + if (record == null) { + close(); + return false; + } + + OneRow onerow = fieldsResolver.setFields(record); + if (onerow == null) { + close(); + return false; + } + if (!fileAccessor.writeNextObject(onerow)) { + close(); + throw new BadRecordException(); + } + return true; + } + + private void close() throws Exception { + try { + fileAccessor.closeForWrite(); + } catch (Exception e) { + LOG.error("Failed to close bridge resources: " + e.getMessage()); + throw e; + } + } + + private static WriteAccessor getFileAccessor(InputData inputData) throws Exception { + return (WriteAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData); + } + + private static WriteResolver getFieldsResolver(InputData inputData) throws Exception { + return (WriteResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData); + } + + @Override + public Writable getNext() { + throw new UnsupportedOperationException("getNext is not implemented"); + } + + @Override + public boolean isThreadSafe() { + return ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java new file mode 100644 index 0000000..6b911f2 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java @@ -0,0 +1,98 @@ +package org.apache.hawq.pxf.service.io; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.UnsupportedOperationException; + +/** + * A serializable object for transporting a byte array through the Bridge + * framework + */ +public class BufferWritable implements Writable { + + byte[] buf = null; + + /** + * Constructs a BufferWritable. Copies the buffer reference and not the + * actual bytes. This class is used when we intend to transport a buffer + * through the Bridge framework without copying the data each time the + * buffer is passed between the Bridge objects. + * + * @param inBuf buffer + */ + public BufferWritable(byte[] inBuf) { + buf = inBuf; + } + + /** + * Serializes the fields of this object to <code>out</code>. + * + * @param out <code>DataOutput</code> to serialize this object into. + * @throws IOException if the buffer was not set + */ + @Override + public void write(DataOutput out) throws IOException { + if (buf == null) + throw new IOException("BufferWritable was not set"); + out.write(buf); + } + + /** + * Deserializes the fields of this object from <code>in</code>. + * <p> + * For efficiency, implementations should attempt to re-use storage in the + * existing object where possible. + * </p> + * + * @param in <code>DataInput</code> to deserialize this object from + * @throws UnsupportedOperationException this function is not supported + */ + @Override + public void readFields(DataInput in) { + throw new UnsupportedOperationException( + "BufferWritable.readFields() is not implemented"); + } + + /** + * Appends given app's buffer to existing buffer. + * <br> + * Not efficient - requires copying both this and the appended buffer. + * + * @param app buffer to append + */ + public void append(byte[] app) { + if (buf == null) { + buf = app; + return; + } + if (app == null) { + return; + } + + byte[] newbuf = new byte[buf.length + app.length]; + System.arraycopy(buf, 0, newbuf, 0, buf.length); + System.arraycopy(app, 0, newbuf, buf.length, app.length); + buf = newbuf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java new file mode 100644 index 0000000..5bc26f1 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java @@ -0,0 +1,893 @@ +package org.apache.hawq.pxf.service.io; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.*; +import java.util.Arrays; + +import static org.apache.hawq.pxf.api.io.DataType.*; + + +/** + * This class represents a GPDB record in the form of + * a Java object. + */ +public class GPDBWritable implements Writable { + /* + * GPDBWritable is using the following serialization form: + * Total Length | Version | Error Flag | # of columns | Col type |...| Col type | Null Bit array | Col val... + * 4 byte | 2 byte | 1 byte | 2 byte | 1 byte |...| 1 byte | ceil(# of columns/8) byte | Fixed or Var length + * + * For fixed length type, we know the length. + * In the col val, we align pad according to the alignment requirement of the type. + * For var length type, the alignment is always 4 byte. + * For var length type, col val is <4 byte length><payload val> + */ + + private static final Log LOG = LogFactory.getLog(GPDBWritable.class); + private static final int EOF = -1; + + /* + * Enum of the Database type + */ + private enum DBType { + BIGINT(8, 8), + BOOLEAN(1, 1), + FLOAT8(8, 8), + INTEGER(4, 4), + REAL(4, 4), + SMALLINT(2, 2), + BYTEA(4, -1), + TEXT(4, -1); + + private final int typelength; // -1 means var length + private final int alignment; + + DBType(int align, int len) { + this.typelength = len; + this.alignment = align; + } + + public int getTypeLength() { + return typelength; + } + + public boolean isVarLength() { + return typelength == -1; + } + + // return the alignment requirement of the type + public int getAlignment() { + return alignment; + } + } + + /* + * Constants + */ + private static final int PREV_VERSION = 1; + private static final int VERSION = 2; /* for backward compatibility */ + private static final String CHARSET = "UTF-8"; + + /* + * Local variables + */ + protected int[] colType; + protected Object[] colValue; + protected int alignmentOfEightBytes = 8; + protected byte errorFlag = 0; + protected int pktlen = EOF; + + public int[] getColType() { + return colType; + } + + /** + * An exception class for column type definition and + * set/get value mismatch. + */ + public class TypeMismatchException extends IOException { + public TypeMismatchException(String msg) { + super(msg); + } + } + + /** + * Empty Constructor + */ + public GPDBWritable() { + initializeEightByteAlignment(); + } + + /** + * Constructor to build a db record. colType defines the schema + * + * @param columnType the table column types + */ + public GPDBWritable(int[] columnType) { + initializeEightByteAlignment(); + colType = columnType; + colValue = new Object[columnType.length]; + } + + /** + * Constructor to build a db record from a serialized form. + * + * @param data a record in the serialized form + * @throws IOException if the data is malformatted. + */ + public GPDBWritable(byte[] data) throws IOException { + initializeEightByteAlignment(); + ByteArrayInputStream bis = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(bis); + + readFields(dis); + } + + /* + * Read first 4 bytes, and verify it's a valid packet length. + * Upon error returns EOF. + */ + private int readPktLen(DataInput in) throws IOException { + pktlen = EOF; + + try { + pktlen = in.readInt(); + } catch (EOFException e) { + LOG.debug("Reached end of stream (EOFException)"); + return EOF; + } + if (pktlen == EOF) { + LOG.debug("Reached end of stream (returned -1)"); + } + + return pktlen; + } + + @Override + public void readFields(DataInput in) throws IOException { + /* + * extract pkt len. + * + * GPSQL-1107: + * The DataInput might already be empty (EOF), but we can't check it beforehand. + * If that's the case, pktlen is updated to -1, to mark that the object is still empty. + * (can be checked with isEmpty()). + */ + pktlen = readPktLen(in); + if (isEmpty()) { + return; + } + + /* extract the version and col cnt */ + int version = in.readShort(); + int curOffset = 4 + 2; + int colCnt; + + /* !!! Check VERSION !!! */ + if (version != GPDBWritable.VERSION && version != GPDBWritable.PREV_VERSION) { + throw new IOException("Current GPDBWritable version(" + + GPDBWritable.VERSION + ") does not match input version(" + + version + ")"); + } + + if (version == GPDBWritable.VERSION) { + errorFlag = in.readByte(); + curOffset += 1; + } + + colCnt = in.readShort(); + curOffset += 2; + + /* Extract Column Type */ + colType = new int[colCnt]; + DBType[] coldbtype = new DBType[colCnt]; + for (int i = 0; i < colCnt; i++) { + int enumType = (in.readByte()); + curOffset += 1; + if (enumType == DBType.BIGINT.ordinal()) { + colType[i] = BIGINT.getOID(); + coldbtype[i] = DBType.BIGINT; + } else if (enumType == DBType.BOOLEAN.ordinal()) { + colType[i] = BOOLEAN.getOID(); + coldbtype[i] = DBType.BOOLEAN; + } else if (enumType == DBType.FLOAT8.ordinal()) { + colType[i] = FLOAT8.getOID(); + coldbtype[i] = DBType.FLOAT8; + } else if (enumType == DBType.INTEGER.ordinal()) { + colType[i] = INTEGER.getOID(); + coldbtype[i] = DBType.INTEGER; + } else if (enumType == DBType.REAL.ordinal()) { + colType[i] = REAL.getOID(); + coldbtype[i] = DBType.REAL; + } else if (enumType == DBType.SMALLINT.ordinal()) { + colType[i] = SMALLINT.getOID(); + coldbtype[i] = DBType.SMALLINT; + } else if (enumType == DBType.BYTEA.ordinal()) { + colType[i] = BYTEA.getOID(); + coldbtype[i] = DBType.BYTEA; + } else if (enumType == DBType.TEXT.ordinal()) { + colType[i] = TEXT.getOID(); + coldbtype[i] = DBType.TEXT; + } else { + throw new IOException("Unknown GPDBWritable.DBType ordinal value"); + } + } + + /* Extract null bit array */ + byte[] nullbytes = new byte[getNullByteArraySize(colCnt)]; + in.readFully(nullbytes); + curOffset += nullbytes.length; + boolean[] colIsNull = byteArrayToBooleanArray(nullbytes, colCnt); + + /* extract column value */ + colValue = new Object[colCnt]; + for (int i = 0; i < colCnt; i++) { + if (!colIsNull[i]) { + /* Skip the alignment padding */ + int skipbytes = roundUpAlignment(curOffset, coldbtype[i].getAlignment()) - curOffset; + for (int j = 0; j < skipbytes; j++) { + in.readByte(); + } + curOffset += skipbytes; + + /* For fixed length type, increment the offset according to type type length here. + * For var length type (BYTEA, TEXT), we'll read 4 byte length header and the + * actual payload. + */ + int varcollen = -1; + if (coldbtype[i].isVarLength()) { + varcollen = in.readInt(); + curOffset += 4 + varcollen; + } else { + curOffset += coldbtype[i].getTypeLength(); + } + + switch (DataType.get(colType[i])) { + case BIGINT: { + colValue[i] = in.readLong(); + break; + } + case BOOLEAN: { + colValue[i] = in.readBoolean(); + break; + } + case FLOAT8: { + colValue[i] = in.readDouble(); + break; + } + case INTEGER: { + colValue[i] = in.readInt(); + break; + } + case REAL: { + colValue[i] = in.readFloat(); + break; + } + case SMALLINT: { + colValue[i] = in.readShort(); + break; + } + + /* For BYTEA column, it has a 4 byte var length header. */ + case BYTEA: { + colValue[i] = new byte[varcollen]; + in.readFully((byte[]) colValue[i]); + break; + } + /* For text formatted column, it has a 4 byte var length header + * and it's always null terminated string. + * So, we can remove the last "\0" when constructing the string. + */ + case TEXT: { + byte[] data = new byte[varcollen]; + in.readFully(data, 0, varcollen); + colValue[i] = new String(data, 0, varcollen - 1, CHARSET); + break; + } + + default: + throw new IOException("Unknown GPDBWritable ColType"); + } + } + } + + /* Skip the ending alignment padding */ + int skipbytes = roundUpAlignment(curOffset, 8) - curOffset; + for (int j = 0; j < skipbytes; j++) { + in.readByte(); + } + curOffset += skipbytes; + + if (errorFlag != 0) { + throw new IOException("Received error value " + errorFlag + " from format"); + } + } + + @Override + public void write(DataOutput out) throws IOException { + int numCol = colType.length; + boolean[] nullBits = new boolean[numCol]; + int[] colLength = new int[numCol]; + byte[] enumType = new byte[numCol]; + int[] padLength = new int[numCol]; + byte[] padbytes = new byte[8]; + + /** + * Compute the total payload and header length + * header = total length (4 byte), Version (2 byte), Error (1 byte), #col (2 byte) + * col type array = #col * 1 byte + * null bit array = ceil(#col/8) + */ + int datlen = 4 + 2 + 1 + 2; + datlen += numCol; + datlen += getNullByteArraySize(numCol); + + for (int i = 0; i < numCol; i++) { + /* Get the enum type */ + DBType coldbtype; + switch (DataType.get(colType[i])) { + case BIGINT: + coldbtype = DBType.BIGINT; + break; + case BOOLEAN: + coldbtype = DBType.BOOLEAN; + break; + case FLOAT8: + coldbtype = DBType.FLOAT8; + break; + case INTEGER: + coldbtype = DBType.INTEGER; + break; + case REAL: + coldbtype = DBType.REAL; + break; + case SMALLINT: + coldbtype = DBType.SMALLINT; + break; + case BYTEA: + coldbtype = DBType.BYTEA; + break; + default: + coldbtype = DBType.TEXT; + } + enumType[i] = (byte) (coldbtype.ordinal()); + + /* Get the actual value, and set the null bit */ + if (colValue[i] == null) { + nullBits[i] = true; + colLength[i] = 0; + } else { + nullBits[i] = false; + + /* + * For fixed length type, we get the fixed length. + * For var len binary format, the length is in the col value. + * For text format, we must convert encoding first. + */ + if (!coldbtype.isVarLength()) { + colLength[i] = coldbtype.getTypeLength(); + } else if (!isTextForm(colType[i])) { + colLength[i] = ((byte[]) colValue[i]).length; + } else { + colLength[i] = ((String) colValue[i]).getBytes(CHARSET).length; + } + + /* calculate and add the type alignment padding */ + padLength[i] = roundUpAlignment(datlen, coldbtype.getAlignment()) - datlen; + datlen += padLength[i]; + + /* for variable length type, we add a 4 byte length header */ + if (coldbtype.isVarLength()) { + datlen += 4; + } + } + datlen += colLength[i]; + } + + /* + * Add the final alignment padding for the next record + */ + int endpadding = roundUpAlignment(datlen, 8) - datlen; + datlen += endpadding; + + /* Construct the packet header */ + out.writeInt(datlen); + out.writeShort(VERSION); + out.writeByte(errorFlag); + out.writeShort(numCol); + + /* Write col type */ + for (int i = 0; i < numCol; i++) { + out.writeByte(enumType[i]); + } + + /* Nullness */ + byte[] nullBytes = boolArrayToByteArray(nullBits); + out.write(nullBytes); + + /* Column Value */ + for (int i = 0; i < numCol; i++) { + if (!nullBits[i]) { + /* Pad the alignment byte first */ + if (padLength[i] > 0) { + out.write(padbytes, 0, padLength[i]); + } + + /* Now, write the actual column value */ + switch (DataType.get(colType[i])) { + case BIGINT: + out.writeLong(((Long) colValue[i])); + break; + case BOOLEAN: + out.writeBoolean(((Boolean) colValue[i])); + break; + case FLOAT8: + out.writeDouble(((Double) colValue[i])); + break; + case INTEGER: + out.writeInt(((Integer) colValue[i])); + break; + case REAL: + out.writeFloat(((Float) colValue[i])); + break; + case SMALLINT: + out.writeShort(((Short) colValue[i])); + break; + + /* For BYTEA format, add 4byte length header at the beginning */ + case BYTEA: + out.writeInt(colLength[i]); + out.write((byte[]) colValue[i]); + break; + + /* For text format, add 4byte length header. string is already '\0' terminated */ + default: { + out.writeInt(colLength[i]); + byte[] data = ((String) colValue[i]).getBytes(CHARSET); + out.write(data); + break; + } + } + } + } + + /* End padding */ + out.write(padbytes, 0, endpadding); + } + + /** + * Private helper to convert boolean array to byte array + */ + private static byte[] boolArrayToByteArray(boolean[] data) { + int len = data.length; + byte[] byts = new byte[getNullByteArraySize(len)]; + + for (int i = 0, j = 0, k = 7; i < data.length; i++) { + byts[j] |= (data[i] ? 1 : 0) << k--; + if (k < 0) { + j++; + k = 7; + } + } + return byts; + } + + /** + * Private helper to determine the size of the null byte array + */ + private static int getNullByteArraySize(int colCnt) { + return (colCnt / 8) + (colCnt % 8 != 0 ? 1 : 0); + } + + /** + * Private helper to convert byte array to boolean array + */ + private static boolean[] byteArrayToBooleanArray(byte[] data, int colCnt) { + boolean[] bools = new boolean[colCnt]; + for (int i = 0, j = 0, k = 7; i < bools.length; i++) { + bools[i] = ((data[j] >> k--) & 0x01) == 1; + if (k < 0) { + j++; + k = 7; + } + } + return bools; + } + + /** + * Private helper to round up alignment for the given length + */ + private int roundUpAlignment(int len, int align) { + int commonAlignment = align; + if (commonAlignment == 8) { + commonAlignment = alignmentOfEightBytes; + } + return (((len) + ((commonAlignment) - 1)) & ~((commonAlignment) - 1)); + } + + /** + * Getter/Setter methods to get/set the column value + */ + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setLong(int colIdx, Long val) + throws TypeMismatchException { + checkType(BIGINT, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setBoolean(int colIdx, Boolean val) + throws TypeMismatchException { + checkType(BOOLEAN, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setBytes(int colIdx, byte[] val) + throws TypeMismatchException { + checkType(BYTEA, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setString(int colIdx, String val) + throws TypeMismatchException { + checkType(TEXT, colIdx, true); + if (val != null) { + colValue[colIdx] = val + "\0"; + } else { + colValue[colIdx] = val; + } + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setFloat(int colIdx, Float val) + throws TypeMismatchException { + checkType(REAL, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setDouble(int colIdx, Double val) + throws TypeMismatchException { + checkType(FLOAT8, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setInt(int colIdx, Integer val) + throws TypeMismatchException { + checkType(INTEGER, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Sets the column value of the record. + * + * @param colIdx the column index + * @param val the value + * @throws TypeMismatchException the column type does not match + */ + public void setShort(int colIdx, Short val) + throws TypeMismatchException { + checkType(SMALLINT, colIdx, true); + colValue[colIdx] = val; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Long getLong(int colIdx) + throws TypeMismatchException { + checkType(BIGINT, colIdx, false); + return (Long) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Boolean getBoolean(int colIdx) + throws TypeMismatchException { + checkType(BOOLEAN, colIdx, false); + return (Boolean) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public byte[] getBytes(int colIdx) + throws TypeMismatchException { + checkType(BYTEA, colIdx, false); + return (byte[]) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public String getString(int colIdx) + throws TypeMismatchException { + checkType(TEXT, colIdx, false); + return (String) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Float getFloat(int colIdx) + throws TypeMismatchException { + checkType(REAL, colIdx, false); + return (Float) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Double getDouble(int colIdx) + throws TypeMismatchException { + checkType(FLOAT8, colIdx, false); + return (Double) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Integer getInt(int colIdx) + throws TypeMismatchException { + checkType(INTEGER, colIdx, false); + return (Integer) colValue[colIdx]; + } + + /** + * Gets the column value of the record. + * + * @param colIdx the column index + * @return column value + * @throws TypeMismatchException the column type does not match + */ + public Short getShort(int colIdx) + throws TypeMismatchException { + checkType(SMALLINT, colIdx, false); + return (Short) colValue[colIdx]; + } + + /** + * Sets the error field. + * + * @param errorVal the error value + */ + public void setError(boolean errorVal) { + errorFlag = errorVal ? (byte) 1 : (byte) 0; + } + + /** + * Returns a string representation of the object. + */ + @Override + public String toString() { + if (colType == null) { + return null; + } + StringBuilder result = new StringBuilder(); + for (int i = 0; i < colType.length; i++) { + result.append("Column ").append(i).append(":"); + if (colValue[i] != null) { + result.append(colType[i] == BYTEA.getOID() + ? byteArrayInString((byte[]) colValue[i]) + : colValue[i]); + } + result.append("\n"); + } + return result.toString(); + } + + /** + * Helper printing function + */ + private static String byteArrayInString(byte[] data) { + StringBuilder result = new StringBuilder(); + for (Byte b : data) { + result.append(b.intValue()).append(" "); + } + return result.toString(); + } + + /** + * Private Helper to check the type mismatch + * If the expected type is stored as string, then it must be set + * via setString. + * Otherwise, the type must match. + */ + private void checkType(DataType inTyp, int idx, boolean isSet) + throws TypeMismatchException { + if (idx < 0 || idx >= colType.length) { + throw new TypeMismatchException("Column index is out of range"); + } + + int exTyp = colType[idx]; + + if (isTextForm(exTyp)) { + if (inTyp != TEXT) { + throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), TEXT.getOID(), isSet)); + } + } else if (inTyp != DataType.get(exTyp)) { + throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), exTyp, isSet)); + } + } + + private String formErrorMsg(int inTyp, int colTyp, boolean isSet) { + return isSet + ? "Cannot set " + getTypeName(inTyp) + " to a " + getTypeName(colTyp) + " column" + : "Cannot get " + getTypeName(inTyp) + " from a " + getTypeName(colTyp) + " column"; + } + + /** + * Private Helper routine to tell whether a type is Text form or not + * + * @param type the type OID that we want to check + */ + private boolean isTextForm(int type) { + return !Arrays.asList(BIGINT, BOOLEAN, BYTEA, FLOAT8, INTEGER, REAL, SMALLINT).contains(DataType.get(type)); + } + + /** + * Helper to get the type name. + * If a given oid is not in the commonly used list, we + * would expect a TEXT for it (for the error message). + * + * @param oid type OID + * @return type name + */ + public static String getTypeName(int oid) { + switch (DataType.get(oid)) { + case BOOLEAN: + return "BOOLEAN"; + case BYTEA: + return "BYTEA"; + case CHAR: + return "CHAR"; + case BIGINT: + return "BIGINT"; + case SMALLINT: + return "SMALLINT"; + case INTEGER: + return "INTEGER"; + case TEXT: + return "TEXT"; + case REAL: + return "REAL"; + case FLOAT8: + return "FLOAT8"; + case BPCHAR: + return "BPCHAR"; + case VARCHAR: + return "VARCHAR"; + case DATE: + return "DATE"; + case TIME: + return "TIME"; + case TIMESTAMP: + return "TIMESTAMP"; + case NUMERIC: + return "NUMERIC"; + default: + return "TEXT"; + } + } + + /* + * Get alignment from command line to match to the alignment + * the C code uses (see gphdfs/src/protocol_formatter/common.c). + */ + private void initializeEightByteAlignment() { + String alignment = System.getProperty("greenplum.alignment"); + if (alignment == null) { + return; + } + alignmentOfEightBytes = Integer.parseInt(alignment); + } + + /** + * Returns if the writable object is empty, + * based on the pkt len as read from stream. + * -1 means nothing was read (eof). + * + * @return whether the writable object is empty + */ + public boolean isEmpty() { + return pktlen == EOF; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java new file mode 100644 index 0000000..253b525 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java @@ -0,0 +1,399 @@ +package org.apache.hawq.pxf.service.io; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.*; +import java.util.Arrays; + +/** + * This class stores text using standard UTF8 encoding. It provides methods to + * serialize, deserialize. The type of length is integer and is serialized using + * zero-compressed format. + */ +public class Text implements Writable { + + // for write + private byte[] buf; + private static final Log LOG = LogFactory.getLog(Text.class); + int curLoc; + private static final char LINE_DELIMITER = '\n'; + private static final int BUF_SIZE = 1024; + private static final int EOF = -1; + + private static final byte[] EMPTY_BYTES = new byte[0]; + private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() { + @Override + protected CharsetEncoder initialValue() { + return Charset.forName("UTF-8").newEncoder().onMalformedInput( + CodingErrorAction.REPORT).onUnmappableCharacter( + CodingErrorAction.REPORT); + } + }; + private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() { + @Override + protected CharsetDecoder initialValue() { + return Charset.forName("UTF-8").newDecoder().onMalformedInput( + CodingErrorAction.REPORT).onUnmappableCharacter( + CodingErrorAction.REPORT); + } + }; + private byte[] bytes; + private int length; + + public Text() { + bytes = EMPTY_BYTES; + buf = new byte[BUF_SIZE]; + } + + /** + * Construct from a string. + * + * @param string input string + */ + public Text(String string) { + set(string); + } + + /** + * Construct from another text. + * + * @param utf8 text to copy + */ + public Text(Text utf8) { + set(utf8); + } + + /** + * Construct from a byte array. + * + * @param utf8 input byte array + */ + public Text(byte[] utf8) { + set(utf8); + } + + public static boolean isNegativeVInt(byte value) { + return value < -120 || (value >= -112 && value < 0); + } + + public static long readVLong(DataInput stream) throws IOException { + byte firstByte = stream.readByte(); + int len = decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + for (int idx = 0; idx < len - 1; idx++) { + byte b = stream.readByte(); + i = i << 8; + i = i | (b & 0xFF); + } + return (isNegativeVInt(firstByte) ? (i ^ -1L) : i); + } + + public static int decodeVIntSize(byte value) { + if (value >= -112) { + return 1; + } else if (value < -120) { + return -119 - value; + } + return -111 - value; + } + + public static String decode(byte[] utf8, int start, int length) + throws CharacterCodingException { + return decode(ByteBuffer.wrap(utf8, start, length), true); + } + + /** + * Converts the provided byte array to a String using the UTF-8 encoding. If + * <code>replace</code> is true, then malformed input is replaced with the + * substitution character, which is U+FFFD. Otherwise the method throws a + * MalformedInputException. + * + * @param utf8 UTF-8 encoded byte array + * @param start start point + * @param length length of array + * @param replace whether to replace malformed input with substitution + * character + * @return decoded string + * @throws MalformedInputException if a malformed input is used + * @throws CharacterCodingException if the conversion failed + */ + public static String decode(byte[] utf8, int start, int length, + boolean replace) + throws CharacterCodingException { + return decode(ByteBuffer.wrap(utf8, start, length), replace); + } + + private static String decode(ByteBuffer utf8, boolean replace) + throws CharacterCodingException { + CharsetDecoder decoder = DECODER_FACTORY.get(); + if (replace) { + decoder.onMalformedInput(java.nio.charset.CodingErrorAction.REPLACE); + decoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + } + String str = decoder.decode(utf8).toString(); + // set decoder back to its default value: REPORT + if (replace) { + decoder.onMalformedInput(CodingErrorAction.REPORT); + decoder.onUnmappableCharacter(CodingErrorAction.REPORT); + } + return str; + } + + /** + * Converts the provided String to bytes using the UTF-8 encoding. If the + * input is malformed, invalid chars are replaced by a default value. + * + * @param string string to encode + * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is + * ByteBuffer.limit() + * @throws CharacterCodingException if conversion failed + */ + public static ByteBuffer encode(String string) + throws CharacterCodingException { + return encode(string, true); + } + + /** + * Converts the provided String to bytes using the UTF-8 encoding. If + * <code>replace</code> is true, then malformed input is replaced with the + * substitution character, which is U+FFFD. Otherwise the method throws a + * MalformedInputException. + * + * @param string string to encode + * @param replace whether to replace malformed input with substitution + * character + * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is + * ByteBuffer.limit() + * @throws MalformedInputException if a malformed input is used + * @throws CharacterCodingException if the conversion failed + */ + public static ByteBuffer encode(String string, boolean replace) + throws CharacterCodingException { + CharsetEncoder encoder = ENCODER_FACTORY.get(); + if (replace) { + encoder.onMalformedInput(CodingErrorAction.REPLACE); + encoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + } + ByteBuffer bytes = encoder.encode(CharBuffer.wrap(string.toCharArray())); + if (replace) { + encoder.onMalformedInput(CodingErrorAction.REPORT); + encoder.onUnmappableCharacter(CodingErrorAction.REPORT); + } + return bytes; + } + + /** + * Returns the raw bytes; however, only data up to {@link #getLength()} is + * valid. + * + * @return raw bytes of byte array + */ + public byte[] getBytes() { + return bytes; + } + + /** + * Returns the number of bytes in the byte array + * + * @return number of bytes in byte array + */ + public int getLength() { + return length; + } + + /** + * Sets to contain the contents of a string. + * + * @param string input string + */ + public void set(String string) { + try { + ByteBuffer bb = encode(string, true); + bytes = bb.array(); + length = bb.limit(); + } catch (CharacterCodingException e) { + throw new RuntimeException("Should not have happened " + + e.toString()); + } + } + + /** + * Sets to a UTF-8 byte array. + * + * @param utf8 input UTF-8 byte array + */ + public void set(byte[] utf8) { + set(utf8, 0, utf8.length); + } + + /** + * Copies a text. + * + * @param other text object to copy. + */ + public void set(Text other) { + set(other.getBytes(), 0, other.getLength()); + } + + /** + * Sets the Text to range of bytes. + * + * @param utf8 the data to copy from + * @param start the first position of the new string + * @param len the number of bytes of the new string + */ + public void set(byte[] utf8, int start, int len) { + setCapacity(len, false); + System.arraycopy(utf8, start, bytes, 0, len); + this.length = len; + } + + /** + * Appends a range of bytes to the end of the given text. + * + * @param utf8 the data to copy from + * @param start the first position to append from utf8 + * @param len the number of bytes to append + */ + public void append(byte[] utf8, int start, int len) { + setCapacity(length + len, true); + System.arraycopy(utf8, start, bytes, length, len); + length += len; + } + + /** + * Clears the string to empty. + */ + public void clear() { + length = 0; + } + + /* + * Sets the capacity of this Text object to <em>at least</em> + * <code>len</code> bytes. If the current buffer is longer, then the + * capacity and existing content of the buffer are unchanged. If + * <code>len</code> is larger than the current capacity, the Text object's + * capacity is increased to match. + * + * @param len the number of bytes we need + * + * @param keepData should the old data be kept + */ + private void setCapacity(int len, boolean keepData) { + if (bytes == null || bytes.length < len) { + byte[] newBytes = new byte[len]; + if (bytes != null && keepData) { + System.arraycopy(bytes, 0, newBytes, 0, length); + } + bytes = newBytes; + } + } + + /** + * Convert text back to string + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + try { + return decode(bytes, 0, length); + } catch (CharacterCodingException e) { + throw new RuntimeException("Should not have happened " + + e.toString()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + byte[] bytes = getBytes(); + out.write(bytes, 0, getLength()); + } + + /** + * deserialize + */ + @Override + public void readFields(DataInput inputStream) throws IOException { + + byte c; + curLoc = 0; + clear(); + while ((c = (byte) ((DataInputStream) inputStream).read()) != EOF) { + buf[curLoc] = c; + curLoc++; + + if (c == LINE_DELIMITER) { + LOG.trace("read one line, size " + curLoc); + break; + } + + if (isBufferFull()) { + flushBuffer(); + } + } + + if (!isBufferEmpty()) { + // the buffer doesn't end with a line break. + if (c == EOF) { + LOG.warn("Stream ended without line break"); + } + flushBuffer(); + } + } + + private boolean isBufferEmpty() { + return (curLoc == 0); + } + + private boolean isBufferFull() { + return (curLoc == BUF_SIZE); + } + + private void flushBuffer() { + append(buf, 0, curLoc); + curLoc = 0; + } + + /** + * Returns true iff <code>o</code> is a Text with the same contents. + */ + @Override + public boolean equals(Object o) { + return (o instanceof Text && Arrays.equals(bytes, ((Text) o).bytes)); + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java new file mode 100644 index 0000000..038da9c --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java @@ -0,0 +1,50 @@ +package org.apache.hawq.pxf.service.io; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * A serializable object which implements a simple, efficient, serialization + * protocol, based on {@link DataInput} and {@link DataOutput}. + */ +public interface Writable { + + /** + * Serialize the fields of this object to <code>out</code>. + * + * @param out <code>DataOutput</code> to serialize this object into. + * @throws IOException if I/O error occurs + */ + void write(DataOutput out) throws IOException; + + /** + * Deserialize the fields of this object from <code>in</code>. + * <p>For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.</p> + * + * @param in <code>DataInput</code> to deserialize this object from. + * @throws IOException if I/O error occurs + */ + void readFields(DataInput in) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java new file mode 100644 index 0000000..3a062c3 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java @@ -0,0 +1,189 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +import javax.servlet.ServletContext; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; + +import org.apache.catalina.connector.ClientAbortException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hawq.pxf.service.Bridge; +import org.apache.hawq.pxf.service.ReadBridge; +import org.apache.hawq.pxf.service.ReadSamplingBridge; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.hawq.pxf.service.utilities.SecuredHDFS; + +/* + * This class handles the subpath /<version>/Bridge/ of this + * REST component + */ +@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Bridge/") +public class BridgeResource extends RestResource { + + private static final Log LOG = LogFactory.getLog(BridgeResource.class); + /** + * Lock is needed here in the case of a non-thread-safe plugin. Using + * synchronized methods is not enough because the bridge work is called by + * jetty ({@link StreamingOutput}), after we are getting out of this class's + * context. + * <p/> + * BRIDGE_LOCK is accessed through lock() and unlock() functions, based on + * the isThreadSafe parameter that is determined by the bridge. + */ + private static final ReentrantLock BRIDGE_LOCK = new ReentrantLock(); + + public BridgeResource() { + } + + /** + * Used to be HDFSReader. Creates a bridge instance and iterates over its + * records, printing it out to outgoing stream. Outputs GPDBWritable or + * Text. + * + * Parameters come through HTTP header. + * + * @param servletContext Servlet context contains attributes required by + * SecuredHDFS + * @param headers Holds HTTP headers from request + * @return response object containing stream that will output records + * @throws Exception in case of wrong request parameters, or failure to + * initialize bridge + */ + @GET + @Produces(MediaType.APPLICATION_OCTET_STREAM) + public Response read(@Context final ServletContext servletContext, + @Context HttpHeaders headers) throws Exception { + // Convert headers into a regular map + Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders()); + + LOG.debug("started with parameters: " + params); + + ProtocolData protData = new ProtocolData(params); + SecuredHDFS.verifyToken(protData, servletContext); + Bridge bridge; + float sampleRatio = protData.getStatsSampleRatio(); + if (sampleRatio > 0) { + bridge = new ReadSamplingBridge(protData); + } else { + bridge = new ReadBridge(protData); + } + String dataDir = protData.getDataSource(); + // THREAD-SAFE parameter has precedence + boolean isThreadSafe = protData.isThreadSafe() && bridge.isThreadSafe(); + LOG.debug("Request for " + dataDir + " will be handled " + + (isThreadSafe ? "without" : "with") + " synchronization"); + + return readResponse(bridge, protData, isThreadSafe); + } + + Response readResponse(final Bridge bridge, ProtocolData protData, + final boolean threadSafe) { + final int fragment = protData.getDataFragment(); + final String dataDir = protData.getDataSource(); + + // Creating an internal streaming class + // which will iterate the records and put them on the + // output stream + final StreamingOutput streaming = new StreamingOutput() { + @Override + public void write(final OutputStream out) throws IOException, + WebApplicationException { + long recordCount = 0; + + if (!threadSafe) { + lock(dataDir); + } + try { + + if (!bridge.beginIteration()) { + return; + } + + Writable record; + DataOutputStream dos = new DataOutputStream(out); + LOG.debug("Starting streaming fragment " + fragment + + " of resource " + dataDir); + while ((record = bridge.getNext()) != null) { + record.write(dos); + ++recordCount; + } + LOG.debug("Finished streaming fragment " + fragment + + " of resource " + dataDir + ", " + recordCount + + " records."); + } catch (ClientAbortException e) { + // Occurs whenever client (HAWQ) decides the end the + // connection + LOG.error("Remote connection closed by HAWQ", e); + } catch (Exception e) { + LOG.error("Exception thrown when streaming", e); + throw new IOException(e.getMessage()); + } finally { + LOG.debug("Stopped streaming fragment " + fragment + + " of resource " + dataDir + ", " + recordCount + + " records."); + if (!threadSafe) { + unlock(dataDir); + } + } + } + }; + + return Response.ok(streaming, MediaType.APPLICATION_OCTET_STREAM).build(); + } + + /** + * Locks BRIDGE_LOCK + * + * @param path path for the request, used for logging. + */ + private void lock(String path) { + LOG.trace("Locking BridgeResource for " + path); + BRIDGE_LOCK.lock(); + LOG.trace("Locked BridgeResource for " + path); + } + + /** + * Unlocks BRIDGE_LOCK + * + * @param path path for the request, used for logging. + */ + private void unlock(String path) { + LOG.trace("Unlocking BridgeResource for " + path); + BRIDGE_LOCK.unlock(); + LOG.trace("Unlocked BridgeResource for " + path); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java new file mode 100644 index 0000000..1280c09 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java @@ -0,0 +1,148 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.catalina.connector.ClientAbortException; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +/** + * Class enhances the API of the HBASE rest server. + * Example for querying API getClusterNodesInfo from a web client + * <code>curl "http://localhost:51200/pxf/{version}/HadoopCluster/getNodesInfo"</code> + * /pxf/ is made part of the path when there is a webapp by that name in tcServer. + */ +@Path("/" + Version.PXF_PROTOCOL_VERSION + "/HadoopCluster/") +public class ClusterNodesResource { + private static final Log LOG = LogFactory.getLog(ClusterNodesResource.class); + + public ClusterNodesResource() { + } + + /** + * Function queries the Hadoop namenode with the getDataNodeStats API It + * gets the host's IP and REST port of every HDFS data node in the cluster. + * Then, it packs the results in JSON format and writes to the HTTP response + * stream. Response Examples:<br> + * <ol> + * <li>When there are no datanodes - getDataNodeStats returns an empty array + * <code>{"regions":[]}</code></li> + * <li>When there are datanodes + * <code>{"regions":[{"host":"1.2.3.1","port":50075},{"host":"1.2.3.2","port" + * :50075}]}</code></li> + * </ol> + * + * @return JSON response with nodes info + * @throws Exception if failed to retrieve info + */ + @GET + @Path("getNodesInfo") + @Produces("application/json") + public Response read() throws Exception { + LOG.debug("getNodesInfo started"); + StringBuilder jsonOutput = new StringBuilder("{\"regions\":["); + try { + /* + * 1. Initialize the HADOOP client side API for a distributed file + * system + */ + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + DistributedFileSystem dfs = (DistributedFileSystem) fs; + + /* + * 2. Query the namenode for the datanodes info. Only live nodes are + * returned - in accordance with the results returned by + * org.apache.hadoop.hdfs.tools.DFSAdmin#report(). + */ + DatanodeInfo[] liveNodes = dfs.getDataNodeStats(DatanodeReportType.LIVE); + + /* + * 3. Pack the datanodes info in a JSON text format and write it to + * the HTTP output stream. + */ + String prefix = ""; + for (DatanodeInfo node : liveNodes) { + verifyNode(node); + // write one node to the HTTP stream + jsonOutput.append(prefix).append(writeNode(node)); + prefix = ","; + } + jsonOutput.append("]}"); + LOG.debug("getNodesCluster output: " + jsonOutput); + } catch (NodeDataException e) { + LOG.error("Nodes verification failed", e); + throw e; + } catch (ClientAbortException e) { + LOG.error("Remote connection closed by HAWQ", e); + throw e; + } catch (java.io.IOException e) { + LOG.error("Unhandled exception thrown", e); + throw e; + } + + return Response.ok(jsonOutput.toString(), + MediaType.APPLICATION_JSON_TYPE).build(); + } + + private class NodeDataException extends java.io.IOException { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public NodeDataException(String paramString) { + super(paramString); + } + } + + private void verifyNode(DatanodeInfo node) throws NodeDataException { + int port = node.getInfoPort(); + String ip = node.getIpAddr(); + + if (StringUtils.isEmpty(ip)) { + throw new NodeDataException("Invalid IP: " + ip + " (Node " + node + + ")"); + } + + if (port <= 0) { + throw new NodeDataException("Invalid port: " + port + " (Node " + + node + ")"); + } + } + + String writeNode(DatanodeInfo node) throws java.io.IOException { + return "{\"host\":\"" + node.getIpAddr() + "\",\"port\":" + + node.getInfoPort() + "}"; + } +}