PHOENIX-3406 CSV BulkLoad MR job incorrectly handle ROW_TIMESTAMP

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cec7e1cf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cec7e1cf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cec7e1cf

Branch: refs/heads/master
Commit: cec7e1cf8794e7ec0ee5c8be9a32e33cd211ec3b
Parents: c8cbb5e
Author: Sergey Soldatov <s...@apache.org>
Authored: Tue Oct 25 14:09:54 2016 -0700
Committer: Sergey Soldatov <s...@apache.org>
Committed: Tue Sep 5 12:46:47 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/CsvBulkLoadToolIT.java      | 38 ++++++++++++++++++++
 .../mapreduce/FormatToBytesWritableMapper.java  |  1 +
 .../mapreduce/FormatToKeyValueReducer.java      |  7 ++--
 3 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cec7e1cf/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 5a186a0..40fe900 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -92,6 +92,44 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
         rs.close();
         stmt.close();
     }
+    @Test
+    public void testImportWithRowTimestamp() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE S.TABLE9 (ID INTEGER NOT NULL , NAME 
VARCHAR, T DATE NOT NULL," +
+                " " +
+                "CONSTRAINT PK PRIMARY KEY (ID, T ROW_TIMESTAMP))");
+
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new 
Path("/tmp/input1.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,Name 1,1970/01/01");
+        printWriter.println("2,Name 2,1971/01/01");
+        printWriter.println("3,Name 2,1972/01/01");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(new 
Configuration(getUtility().getConfiguration()));
+        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input1.csv",
+                "--table", "table9",
+                "--schema", "s",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table9 
WHERE T < to_date" +
+                "('1972-01-01') AND T > to_date('1970-01-01') ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("Name 2", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1971-01-01"), rs.getDate(3));
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
+
 
     @Test
     public void testImportWithTabs() throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cec7e1cf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index 1dae981..360859e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -314,6 +314,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
                     continue;
                 }
                 outputStream.writeByte(cell.getTypeByte());
+                WritableUtils.writeVLong(outputStream,cell.getTimestamp());
                 WritableUtils.writeVInt(outputStream, i);
                 WritableUtils.writeVInt(outputStream, cell.getValueLength());
                 outputStream.write(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cec7e1cf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 07cf285..72af1a7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -144,6 +144,7 @@ public class FormatToKeyValueReducer
             DataInputStream input = new DataInputStream(new 
ByteArrayInputStream(aggregatedArray.get()));
             while (input.available() != 0) {
                 byte type = input.readByte();
+                long timestamp = WritableUtils.readVLong(input);
                 int index = WritableUtils.readVInt(input);
                 ImmutableBytesWritable family;
                 ImmutableBytesWritable cq;
@@ -161,10 +162,10 @@ public class FormatToKeyValueReducer
                 KeyValue.Type kvType = KeyValue.Type.codeToType(type);
                 switch (kvType) {
                     case Put: // not null value
-                        kv = builder.buildPut(key.getRowkey(), family, cq, 
value);
+                        kv = builder.buildPut(key.getRowkey(), family, cq, 
timestamp, value);
                         break;
                     case DeleteColumn: // null value
-                        kv = builder.buildDeleteColumns(key.getRowkey(), 
family, cq);
+                        kv = builder.buildDeleteColumns(key.getRowkey(), 
family, cq, timestamp);
                         break;
                     default:
                         throw new IOException("Unsupported KeyValue type " + 
kvType);
@@ -180,4 +181,4 @@ public class FormatToKeyValueReducer
             if (++index % 100 == 0) context.setStatus("Wrote " + index);
         }
     }
-}
\ No newline at end of file
+}

Reply via email to