Revert "Revert "PHOENIX-4874 psql doesn't support date/time with values smaller than milliseconds(Rajeshbabu)""
This reverts commit 0ce663c53089efa9821a4423fb973b274ef67638. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4c63e129 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4c63e129 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4c63e129 Branch: refs/heads/omid2 Commit: 4c63e129277757929c4cf847b66fcd55b2c8e801 Parents: 77a7c2c Author: James Taylor <jamestay...@apache.org> Authored: Tue Oct 16 14:45:05 2018 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Tue Oct 16 14:45:05 2018 -0700 ---------------------------------------------------------------------- .../phoenix/util/csv/CsvUpsertExecutor.java | 20 +++++--- .../phoenix/util/json/JsonUpsertExecutor.java | 3 ++ .../util/AbstractUpsertExecutorTest.java | 51 +++++++++++++++----- 3 files changed, 54 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c63e129/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java index cd40b44..d2529f7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java @@ -20,6 +20,7 @@ package org.apache.phoenix.util.csv; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Timestamp; import java.sql.Types; import java.util.List; import java.util.Properties; @@ -30,6 +31,7 @@ import org.apache.commons.csv.CSVRecord; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.expression.function.EncodeFormat; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.IllegalDataException; @@ -41,6 +43,7 @@ import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.DateUtil; +import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.UpsertExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,9 +128,9 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> { private final String binaryEncoding; SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) { - Properties props; + ReadOnlyProps props; try { - props = conn.getClientInfo(); + props = conn.unwrap(PhoenixConnection.class).getQueryServices().getProps(); } catch (SQLException e) { throw new RuntimeException(e); } @@ -139,23 +142,23 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> { String dateFormat; int dateSqlType = dataType.getResultSetSqlType(); if (dateSqlType == Types.DATE) { - dateFormat = props.getProperty(QueryServices.DATE_FORMAT_ATTRIB, + dateFormat = props.get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT); } else if (dateSqlType == Types.TIME) { - dateFormat = props.getProperty(QueryServices.TIME_FORMAT_ATTRIB, + dateFormat = props.get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT); } else { - dateFormat = props.getProperty(QueryServices.TIMESTAMP_FORMAT_ATTRIB, + dateFormat = props.get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT); } - String timeZoneId = props.getProperty(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB, + String timeZoneId = props.get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB, QueryServicesOptions.DEFAULT_DATE_FORMAT_TIMEZONE); this.dateTimeParser = DateUtil.getDateTimeParser(dateFormat, dataType, timeZoneId); } else { this.dateTimeParser = null; } this.codec = codec; - this.binaryEncoding = props.getProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, + this.binaryEncoding = props.get(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, QueryServicesOptions.DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING); } @@ -165,6 +168,9 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> { if (input == null || input.isEmpty()) { return null; } + if (dataType == PTimestamp.INSTANCE) { + return DateUtil.parseTimestamp(input); + } if (dateTimeParser != null) { long epochTime = dateTimeParser.parseDateTime(input); byte[] byteValue = new byte[dataType.getByteSize()]; http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c63e129/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java index ffa797d..fa14079 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java @@ -185,6 +185,9 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> { if (input == null) { return null; } + if (dataType == PTimestamp.INSTANCE) { + return DateUtil.parseTimestamp(input.toString()); + } if (dateTimeParser != null && input instanceof String) { final String s = (String) input; long epochTime = dateTimeParser.parseDateTime(s); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c63e129/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java index 2b2544d..3ea997b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java @@ -25,13 +25,20 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import java.io.IOException; import java.sql.Connection; +import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Timestamp; import java.sql.Types; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatterBuilder; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.TimeZone; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; @@ -41,6 +48,7 @@ import org.apache.phoenix.schema.types.PArrayDataType; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PIntegerArray; +import org.apache.phoenix.schema.types.PTimestamp; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -57,6 +65,8 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles protected abstract UpsertExecutor<R, F> getUpsertExecutor(); protected abstract R createRecord(Object... columnValues) throws IOException; protected abstract UpsertExecutor<R, F> getUpsertExecutor(Connection conn); + + private static String TIMESTAMP_WITH_NANOS = "2006-11-03 00:00:00.001003000"; @Before public void setUp() throws SQLException { @@ -66,11 +76,14 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles new ColumnInfo("AGE", Types.INTEGER), new ColumnInfo("VALUES", PIntegerArray.INSTANCE.getSqlType()), new ColumnInfo("BEARD", Types.BOOLEAN), - new ColumnInfo("PIC", Types.BINARY)); + new ColumnInfo("PIC", Types.BINARY), + new ColumnInfo("T", Types.TIMESTAMP)); preparedStatement = mock(PreparedStatement.class); upsertListener = mock(UpsertExecutor.UpsertListener.class); - conn = DriverManager.getConnection(getUrl()); + Properties properties = new Properties(); + properties.setProperty("phoenix.query.dateFormatTimeZone", DateUtil.DEFAULT_TIME_ZONE_ID); + conn = DriverManager.getConnection(getUrl(), properties); } @After @@ -82,8 +95,9 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles public void testExecute() throws Exception { byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); String encodedBinaryData = Base64.encodeBytes(binaryData); - getUpsertExecutor().execute(createRecord(123L, "NameValue", 42, - Arrays.asList(1, 2, 3), true, encodedBinaryData)); + getUpsertExecutor().execute( + createRecord(123L, "NameValue", 42, Arrays.asList(1, 2, 3), true, encodedBinaryData, + Timestamp.valueOf(TIMESTAMP_WITH_NANOS))); verify(upsertListener).upsertDone(1L); verifyNoMoreInteractions(upsertListener); @@ -94,6 +108,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); verify(preparedStatement).setObject(5, Boolean.TRUE); verify(preparedStatement).setObject(6, binaryData); + verify(preparedStatement).setObject(7, DateUtil.parseTimestamp(TIMESTAMP_WITH_NANOS)); verify(preparedStatement).execute(); verifyNoMoreInteractions(preparedStatement); } @@ -112,7 +127,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); String encodedBinaryData = Base64.encodeBytes(binaryData); R recordWithTooManyFields = createRecord(123L, "NameValue", 42, Arrays.asList(1, 2, 3), - true, encodedBinaryData, "garbage"); + true, encodedBinaryData, Timestamp.valueOf(TIMESTAMP_WITH_NANOS), "garbage"); getUpsertExecutor().execute(recordWithTooManyFields); verify(upsertListener).upsertDone(1L); @@ -124,6 +139,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); verify(preparedStatement).setObject(5, Boolean.TRUE); verify(preparedStatement).setObject(6, binaryData); + verify(preparedStatement).setObject(7, DateUtil.parseTimestamp(TIMESTAMP_WITH_NANOS)); verify(preparedStatement).execute(); verifyNoMoreInteractions(preparedStatement); } @@ -132,8 +148,9 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles public void testExecute_NullField() throws Exception { byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); String encodedBinaryData = Base64.encodeBytes(binaryData); - getUpsertExecutor().execute(createRecord(123L, "NameValue", null, - Arrays.asList(1, 2, 3), false, encodedBinaryData)); + getUpsertExecutor().execute( + createRecord(123L, "NameValue", null, Arrays.asList(1, 2, 3), false, encodedBinaryData, + Timestamp.valueOf(TIMESTAMP_WITH_NANOS))); verify(upsertListener).upsertDone(1L); verifyNoMoreInteractions(upsertListener); @@ -144,6 +161,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); verify(preparedStatement).setObject(5, Boolean.FALSE); verify(preparedStatement).setObject(6, binaryData); + verify(preparedStatement).setObject(7, DateUtil.parseTimestamp(TIMESTAMP_WITH_NANOS)); verify(preparedStatement).execute(); verifyNoMoreInteractions(preparedStatement); } @@ -152,8 +170,9 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles public void testExecute_InvalidType() throws Exception { byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); String encodedBinaryData = Base64.encodeBytes(binaryData); - R recordWithInvalidType = createRecord(123L, "NameValue", "ThisIsNotANumber", - Arrays.asList(1, 2, 3), true, encodedBinaryData); + R recordWithInvalidType = + createRecord(123L, "NameValue", "ThisIsNotANumber", Arrays.asList(1, 2, 3), true, + encodedBinaryData, Timestamp.valueOf(TIMESTAMP_WITH_NANOS)); getUpsertExecutor().execute(recordWithInvalidType); verify(upsertListener).errorOnRecord(eq(recordWithInvalidType), any(Throwable.class)); @@ -164,7 +183,9 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles public void testExecute_InvalidBoolean() throws Exception { byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); String encodedBinaryData = Base64.encodeBytes(binaryData); - R csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,NotABoolean,"+encodedBinaryData); + R csvRecordWithInvalidType = + createRecord("123,NameValue,42,1:2:3,NotABoolean," + encodedBinaryData + "," + + TIMESTAMP_WITH_NANOS); getUpsertExecutor().execute(csvRecordWithInvalidType); verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class)); @@ -173,7 +194,9 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles @Test public void testExecute_InvalidBinary() throws Exception { String notBase64Encoded="#@$df"; - R csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,true,"+notBase64Encoded); + R csvRecordWithInvalidType = + createRecord("123,NameValue,42,1:2:3,true," + notBase64Encoded + "," + + TIMESTAMP_WITH_NANOS); getUpsertExecutor().execute(csvRecordWithInvalidType); verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class)); @@ -184,8 +207,9 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles String asciiValue="#@$df"; Properties info=new Properties(); info.setProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING,"ASCII"); - getUpsertExecutor(DriverManager.getConnection(getUrl(),info)).execute(createRecord(123L, "NameValue", 42, - Arrays.asList(1, 2, 3), true, asciiValue)); + getUpsertExecutor(DriverManager.getConnection(getUrl(), info)).execute( + createRecord(123L, "NameValue", 42, Arrays.asList(1, 2, 3), true, asciiValue, + Timestamp.valueOf(TIMESTAMP_WITH_NANOS))); verify(upsertListener).upsertDone(1L); verifyNoMoreInteractions(upsertListener); @@ -196,6 +220,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); verify(preparedStatement).setObject(5, Boolean.TRUE); verify(preparedStatement).setObject(6, Bytes.toBytes(asciiValue)); + verify(preparedStatement).setObject(7, DateUtil.parseTimestamp(TIMESTAMP_WITH_NANOS)); verify(preparedStatement).execute(); verifyNoMoreInteractions(preparedStatement); }