Re: Representation of logical type beam:logical_type:datetime:v1

2022-08-23 Thread Yi Hu via dev
Hi,

It now appears that if we want a clean solution then we have to add a fixed
size primitive type to Beam atomic types. Or, we then have a millis_instant
logical type that does not have to_language_type and to_representation_type
implementations. Any suggestions are welcome!

Best,
Yi

On Thu, Aug 18, 2022 at 10:07 AM Yi Hu  wrote:

>
>
> On Wed, Aug 17, 2022 at 5:14 PM Chamikara Jayalath 
> wrote:
>
>>
>> I think this is fine (even though it would add a small perf hit to
>> JdbcIO.Read). We also probably should make this conversion a utility method
>> that can be used elsewhere when we need to encode datetime fields.
>> We should also document that "beam:logical_type:datetime:v1" is not
>> portable (till we fix the incompatibility).
>>
>>
> +1 for utility method and documentation.
> If we were to change JDBC instead of make  millis_instant compatible to
> InstantCoder, this would only fix JDBC cross-language timestamps. I expect
> for other IO connectors this is still a problem and that is why I would
> like to take a generic approach. In general, inside each sdk we would like
> to follow the language specific convention of that sdk. I remember a
> related  discussion about the timestamp types:
> https://github.com/apache/beam/pull/17380#discussion_r852422314 which
> reached a conclusion that follows the language convention on timestamp
> values, e.g. use milli precision (long backed) Instant in Java; micro
> precision (float backed) timestamp in python.
>
> Best,
> Yi
>


Re: Representation of logical type beam:logical_type:datetime:v1

2022-08-18 Thread Yi Hu via dev
On Wed, Aug 17, 2022 at 5:14 PM Chamikara Jayalath 
wrote:

>
> I think this is fine (even though it would add a small perf hit to
> JdbcIO.Read). We also probably should make this conversion a utility method
> that can be used elsewhere when we need to encode datetime fields.
> We should also document that "beam:logical_type:datetime:v1" is not
> portable (till we fix the incompatibility).
>
>
+1 for utility method and documentation.
If we were to change JDBC instead of make  millis_instant compatible to
InstantCoder, this would only fix JDBC cross-language timestamps. I expect
for other IO connectors this is still a problem and that is why I would
like to take a generic approach. In general, inside each sdk we would like
to follow the language specific convention of that sdk. I remember a
related  discussion about the timestamp types:
https://github.com/apache/beam/pull/17380#discussion_r852422314 which
reached a conclusion that follows the language convention on timestamp
values, e.g. use milli precision (long backed) Instant in Java; micro
precision (float backed) timestamp in python.

Best,
Yi


Re: Representation of logical type beam:logical_type:datetime:v1

2022-08-17 Thread Chamikara Jayalath via dev
On Wed, Aug 17, 2022 at 12:59 PM Yi Hu via dev  wrote:

> Hi Brian,
>
> Currently Java's JDBCIO does not do special cross-language things. A
> DATETIME field type appears because the Row contains an Instant object. And
> the sdk will always encode the Instant object using InstantCoder. This is
> done for both Java pipelines and cross-language pipelines. To use
> millis_instant in JDBCIO and to avoid break Instant used elsewhere (like
> watermarks) I will need to change the type of timestamp returned by JDBC
> read from Instant to millis_instant.
>

I think this is fine (even though it would add a small perf hit to
JdbcIO.Read). We also probably should make this conversion a utility method
that can be used elsewhere when we need to encode datetime fields.
We should also document that "beam:logical_type:datetime:v1" is not
portable (till we fix the incompatibility).

Thanks,
Cham


>
> We could make the name of the logical type to be
> "beam:logical_type:millis_instant" which is backed by a big endian int 64.
>

> Best,
> Yi
>
>
> On Fri, Aug 12, 2022 at 5:28 PM Brian Hulette  wrote:
>
>> Ah sorry, I forgot that INT64 is encoded with VarIntCoder, so we can't
>> simulate TimestampCoder with a logical type.
>>
>> I think the ideal end state would be to have a well-defined
>> beam:logical_type:millis_instant that we use for cross-language (when
>> appropriate), and never use DATETIME at cross-language boundaries. Would it
>> be possible to add millis_instant, and use that for JDBC read/write instead
>> of DATETIME?
>>
>> Separately we could consider how to resolve the conflicting definitions
>> of beam:logical_type:datetime:v1. I'm not quite sure how/if we can do that
>> without breaking pipeline update.
>>
>> Brian
>>
>>
>> On Fri, Aug 12, 2022 at 7:50 AM Yi Hu via dev 
>> wrote:
>>
>>> Hi Cham,
>>>
>>> Thanks for the comments.
>>>
>>>

>
> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in
> implementation it will use BigEndianLongCoder to encode/decode the stream.
>
>
 Is this to be compatible with the current Java implementation ? And we
 have to update other SDKs to use big endian coders when encoding/decoding
 the "beam:logical_type:instant:v1" logical type ?


>>> Yes, and the proposal is aimed to keep the Java SDK change minimal; we
>>> have to update other SDKs to make it work. Currently python and go sdk does
>>> not implement "beam:logical_type:datetime:v1" (will
>>> be "beam:logical_type:instant:v1") at all.
>>>
>>>


> For the second step ii, the problem is that there is a primitive type
> backed by a fixed length integer coder. Currently INT8, INT16, INT32,
> INT64... are all backed by VarInt (and there is ongoing work to use fixed
> size big endian to encode INT8, INT16 (
> https://github.com/apache/beam/issues/19815)). Ideally I would think
> (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT)
> primitive type is backed by VarInt. But this may be a more substantial
> change for the current code base.
>

 I'm a bit confused by this. Did you mean that there's *no* primitive
 type backed by a fixed length integer coder ? Also, by primitive, I'm
 assuming you mean Beam Schema types here.


>>> Yes I mean Beam Schema types here. The proto for datetime(instant)
>>> logical type is constructed here:
>>> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202
>>> It is represented by an INT64 atomic type. In cross-language case,
>>> another SDK receives proto and decodes the stream according to the proto.
>>> Currently I do not see an atomic type that will be decoded using a
>>> fixed-length BigEndianLong coder. INT8, ..., INT64 will all be decoded with
>>> VarInt.
>>>
>>> As a workaround in the PR (#22561), in python's RowCoder I explicitly
>>> set the coder for "beam:logical_type:datetime:v1" (will
>>> be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a
>>> way to keep the logic contained in the logical type implementation, e.g. in
>>> to_language_type and to_representation_type method. To do this I will need
>>> an atomic type that is decoded using the BigEndianLong coder.
>>> Please point out if I was wrong.
>>>
>>> Best,
>>> Yi
>>>
>>


Re: Representation of logical type beam:logical_type:datetime:v1

2022-08-17 Thread Yi Hu via dev
Hi Brian,

Currently Java's JDBCIO does not do special cross-language things. A
DATETIME field type appears because the Row contains an Instant object. And
the sdk will always encode the Instant object using InstantCoder. This is
done for both Java pipelines and cross-language pipelines. To use
millis_instant in JDBCIO and to avoid break Instant used elsewhere (like
watermarks) I will need to change the type of timestamp returned by JDBC
read from Instant to millis_instant.

We could make the name of the logical type to be
"beam:logical_type:millis_instant" which is backed by a big endian int 64.

Best,
Yi


On Fri, Aug 12, 2022 at 5:28 PM Brian Hulette  wrote:

> Ah sorry, I forgot that INT64 is encoded with VarIntCoder, so we can't
> simulate TimestampCoder with a logical type.
>
> I think the ideal end state would be to have a well-defined
> beam:logical_type:millis_instant that we use for cross-language (when
> appropriate), and never use DATETIME at cross-language boundaries. Would it
> be possible to add millis_instant, and use that for JDBC read/write instead
> of DATETIME?
>
> Separately we could consider how to resolve the conflicting definitions of
> beam:logical_type:datetime:v1. I'm not quite sure how/if we can do that
> without breaking pipeline update.
>
> Brian
>
>
> On Fri, Aug 12, 2022 at 7:50 AM Yi Hu via dev  wrote:
>
>> Hi Cham,
>>
>> Thanks for the comments.
>>
>>
>>>

 ii. "beam:logical_type:instant:v1" is still backed by INT64, but in
 implementation it will use BigEndianLongCoder to encode/decode the stream.


>>> Is this to be compatible with the current Java implementation ? And we
>>> have to update other SDKs to use big endian coders when encoding/decoding
>>> the "beam:logical_type:instant:v1" logical type ?
>>>
>>>
>> Yes, and the proposal is aimed to keep the Java SDK change minimal; we
>> have to update other SDKs to make it work. Currently python and go sdk does
>> not implement "beam:logical_type:datetime:v1" (will
>> be "beam:logical_type:instant:v1") at all.
>>
>>
>>>
>>>
 For the second step ii, the problem is that there is a primitive type
 backed by a fixed length integer coder. Currently INT8, INT16, INT32,
 INT64... are all backed by VarInt (and there is ongoing work to use fixed
 size big endian to encode INT8, INT16 (
 https://github.com/apache/beam/issues/19815)). Ideally I would think
 (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT)
 primitive type is backed by VarInt. But this may be a more substantial
 change for the current code base.

>>>
>>> I'm a bit confused by this. Did you mean that there's *no* primitive
>>> type backed by a fixed length integer coder ? Also, by primitive, I'm
>>> assuming you mean Beam Schema types here.
>>>
>>>
>> Yes I mean Beam Schema types here. The proto for datetime(instant)
>> logical type is constructed here:
>> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202
>> It is represented by an INT64 atomic type. In cross-language case,
>> another SDK receives proto and decodes the stream according to the proto.
>> Currently I do not see an atomic type that will be decoded using a
>> fixed-length BigEndianLong coder. INT8, ..., INT64 will all be decoded with
>> VarInt.
>>
>> As a workaround in the PR (#22561), in python's RowCoder I explicitly set
>> the coder for "beam:logical_type:datetime:v1" (will
>> be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a
>> way to keep the logic contained in the logical type implementation, e.g. in
>> to_language_type and to_representation_type method. To do this I will need
>> an atomic type that is decoded using the BigEndianLong coder.
>> Please point out if I was wrong.
>>
>> Best,
>> Yi
>>
>


Re: Representation of logical type beam:logical_type:datetime:v1

2022-08-12 Thread Brian Hulette via dev
Ah sorry, I forgot that INT64 is encoded with VarIntCoder, so we can't
simulate TimestampCoder with a logical type.

I think the ideal end state would be to have a well-defined
beam:logical_type:millis_instant that we use for cross-language (when
appropriate), and never use DATETIME at cross-language boundaries. Would it
be possible to add millis_instant, and use that for JDBC read/write instead
of DATETIME?

Separately we could consider how to resolve the conflicting definitions of
beam:logical_type:datetime:v1. I'm not quite sure how/if we can do that
without breaking pipeline update.

Brian


On Fri, Aug 12, 2022 at 7:50 AM Yi Hu via dev  wrote:

> Hi Cham,
>
> Thanks for the comments.
>
>
>>
>>>
>>> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in
>>> implementation it will use BigEndianLongCoder to encode/decode the stream.
>>>
>>>
>> Is this to be compatible with the current Java implementation ? And we
>> have to update other SDKs to use big endian coders when encoding/decoding
>> the "beam:logical_type:instant:v1" logical type ?
>>
>>
> Yes, and the proposal is aimed to keep the Java SDK change minimal; we
> have to update other SDKs to make it work. Currently python and go sdk does
> not implement "beam:logical_type:datetime:v1" (will
> be "beam:logical_type:instant:v1") at all.
>
>
>>
>>
>>> For the second step ii, the problem is that there is a primitive type
>>> backed by a fixed length integer coder. Currently INT8, INT16, INT32,
>>> INT64... are all backed by VarInt (and there is ongoing work to use fixed
>>> size big endian to encode INT8, INT16 (
>>> https://github.com/apache/beam/issues/19815)). Ideally I would think
>>> (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT)
>>> primitive type is backed by VarInt. But this may be a more substantial
>>> change for the current code base.
>>>
>>
>> I'm a bit confused by this. Did you mean that there's *no* primitive
>> type backed by a fixed length integer coder ? Also, by primitive, I'm
>> assuming you mean Beam Schema types here.
>>
>>
> Yes I mean Beam Schema types here. The proto for datetime(instant) logical
> type is constructed here:
> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202
> It is represented by an INT64 atomic type. In cross-language case, another
> SDK receives proto and decodes the stream according to the proto. Currently
> I do not see an atomic type that will be decoded using a fixed-length
> BigEndianLong coder. INT8, ..., INT64 will all be decoded with VarInt.
>
> As a workaround in the PR (#22561), in python's RowCoder I explicitly set
> the coder for "beam:logical_type:datetime:v1" (will
> be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a
> way to keep the logic contained in the logical type implementation, e.g. in
> to_language_type and to_representation_type method. To do this I will need
> an atomic type that is decoded using the BigEndianLong coder.
> Please point out if I was wrong.
>
> Best,
> Yi
>


Re: Representation of logical type beam:logical_type:datetime:v1

2022-08-12 Thread Yi Hu via dev
Hi Cham,

Thanks for the comments.


>
>>
>> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in
>> implementation it will use BigEndianLongCoder to encode/decode the stream.
>>
>>
> Is this to be compatible with the current Java implementation ? And we
> have to update other SDKs to use big endian coders when encoding/decoding
> the "beam:logical_type:instant:v1" logical type ?
>
>
Yes, and the proposal is aimed to keep the Java SDK change minimal; we have
to update other SDKs to make it work. Currently python and go sdk does not
implement "beam:logical_type:datetime:v1" (will
be "beam:logical_type:instant:v1") at all.


>
>
>> For the second step ii, the problem is that there is a primitive type
>> backed by a fixed length integer coder. Currently INT8, INT16, INT32,
>> INT64... are all backed by VarInt (and there is ongoing work to use fixed
>> size big endian to encode INT8, INT16 (
>> https://github.com/apache/beam/issues/19815)). Ideally I would think
>> (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT)
>> primitive type is backed by VarInt. But this may be a more substantial
>> change for the current code base.
>>
>
> I'm a bit confused by this. Did you mean that there's *no* primitive type
> backed by a fixed length integer coder ? Also, by primitive, I'm assuming
> you mean Beam Schema types here.
>
>
Yes I mean Beam Schema types here. The proto for datetime(instant) logical
type is constructed here:
https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202
It is represented by an INT64 atomic type. In cross-language case, another
SDK receives proto and decodes the stream according to the proto. Currently
I do not see an atomic type that will be decoded using a fixed-length
BigEndianLong coder. INT8, ..., INT64 will all be decoded with VarInt.

As a workaround in the PR (#22561), in python's RowCoder I explicitly set
the coder for "beam:logical_type:datetime:v1" (will
be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a
way to keep the logic contained in the logical type implementation, e.g. in
to_language_type and to_representation_type method. To do this I will need
an atomic type that is decoded using the BigEndianLong coder.
Please point out if I was wrong.

Best,
Yi


Re: Representation of logical type beam:logical_type:datetime:v1

2022-08-11 Thread Chamikara Jayalath via dev
On Thu, Aug 11, 2022 at 8:50 AM Yi Hu via dev  wrote:

> Hi,
>
> tl;dr There are two (or three) different representations used for the URN
> "beam:logical_type:datetime:v1" in Beam Java SDK. Clarification or clean up
> is needed.
>
> I recently try to resolve a long-time issue
> https://github.com/apache/beam/issues/19817 for the datetime logical type
> cross-language support. However I notice that currently in Java SDK this
> URN is referred in two places:
>
> (1)
> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java#L49
> where it has a representation of Row {Date: INT64, Time: INT64}
>
> (2)
> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L67
> where it is represented by a single INT64
>
> Moreover, there is a third, "actual" representation that is used when one
> encodes a joda.DateTime or joda.Instant object in a Row, which is an int64
> type encoded with fixed-size big endian. Note that this is different from
> (2) because INT64 will be decoded using a VarInt coder which causes data
> corruption when other sdk (e.g. python) pulls rows from java ptransform.
>
> To resolve this I try to implement the "actual" representation of logical
> type "beam:logical_type:datetime:v1" (
> https://github.com/apache/beam/pull/22561 for contexts). It then becomes
> necessary to resolve this inconsistency because we have reached a point of
> adding a known logical type in schemas.proto. I am consider some solutions:
>
> i. Rename the logical type of case (2) as "beam:logical_type:instant:v1"
> as suggested by Brian in an earlier review.
>

+1


>
> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in
> implementation it will use BigEndianLongCoder to encode/decode the stream.
>
>
Is this to be compatible with the current Java implementation ? And we have
to update other SDKs to use big endian coders when encoding/decoding the
"beam:logical_type:instant:v1" logical type ?



> For the second step ii, the problem is that there is a primitive type
> backed by a fixed length integer coder. Currently INT8, INT16, INT32,
> INT64... are all backed by VarInt (and there is ongoing work to use fixed
> size big endian to encode INT8, INT16 (
> https://github.com/apache/beam/issues/19815)). Ideally I would think
> (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT)
> primitive type is backed by VarInt. But this may be a more substantial
> change for the current code base.
>

I'm a bit confused by this. Did you mean that there's *no* primitive type
backed by a fixed length integer coder ? Also, by primitive, I'm assuming
you mean Beam Schema types here.

Thanks,
Cham


>
> I would like to have opinions from the community. Thanks for your
> attention!
>
> Regards,
> Yi
>
> --
>
> Yi Hu, (he/him/his)
>
> Software Engineer
>
>
>


Representation of logical type beam:logical_type:datetime:v1

2022-08-11 Thread Yi Hu via dev
Hi,

tl;dr There are two (or three) different representations used for the URN
"beam:logical_type:datetime:v1" in Beam Java SDK. Clarification or clean up
is needed.

I recently try to resolve a long-time issue
https://github.com/apache/beam/issues/19817 for the datetime logical type
cross-language support. However I notice that currently in Java SDK this
URN is referred in two places:

(1)
https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java#L49
where it has a representation of Row {Date: INT64, Time: INT64}

(2)
https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L67
where it is represented by a single INT64

Moreover, there is a third, "actual" representation that is used when one
encodes a joda.DateTime or joda.Instant object in a Row, which is an int64
type encoded with fixed-size big endian. Note that this is different from
(2) because INT64 will be decoded using a VarInt coder which causes data
corruption when other sdk (e.g. python) pulls rows from java ptransform.

To resolve this I try to implement the "actual" representation of logical
type "beam:logical_type:datetime:v1" (
https://github.com/apache/beam/pull/22561 for contexts). It then becomes
necessary to resolve this inconsistency because we have reached a point of
adding a known logical type in schemas.proto. I am consider some solutions:

i. Rename the logical type of case (2) as "beam:logical_type:instant:v1" as
suggested by Brian in an earlier review.

ii. "beam:logical_type:instant:v1" is still backed by INT64, but in
implementation it will use BigEndianLongCoder to encode/decode the stream.

For the second step ii, the problem is that there is a primitive type
backed by a fixed length integer coder. Currently INT8, INT16, INT32,
INT64... are all backed by VarInt (and there is ongoing work to use fixed
size big endian to encode INT8, INT16 (
https://github.com/apache/beam/issues/19815)). Ideally I would think (INT8,
INT16, INT32, INT64) are all fixed and having a generic (INT) primitive
type is backed by VarInt. But this may be a more substantial change for the
current code base.

I would like to have opinions from the community. Thanks for your attention!

Regards,
Yi

-- 

Yi Hu, (he/him/his)

Software Engineer