Re: Representation of logical type beam:logical_type:datetime:v1
Hi, It now appears that if we want a clean solution then we have to add a fixed size primitive type to Beam atomic types. Or, we then have a millis_instant logical type that does not have to_language_type and to_representation_type implementations. Any suggestions are welcome! Best, Yi On Thu, Aug 18, 2022 at 10:07 AM Yi Hu wrote: > > > On Wed, Aug 17, 2022 at 5:14 PM Chamikara Jayalath > wrote: > >> >> I think this is fine (even though it would add a small perf hit to >> JdbcIO.Read). We also probably should make this conversion a utility method >> that can be used elsewhere when we need to encode datetime fields. >> We should also document that "beam:logical_type:datetime:v1" is not >> portable (till we fix the incompatibility). >> >> > +1 for utility method and documentation. > If we were to change JDBC instead of make millis_instant compatible to > InstantCoder, this would only fix JDBC cross-language timestamps. I expect > for other IO connectors this is still a problem and that is why I would > like to take a generic approach. In general, inside each sdk we would like > to follow the language specific convention of that sdk. I remember a > related discussion about the timestamp types: > https://github.com/apache/beam/pull/17380#discussion_r852422314 which > reached a conclusion that follows the language convention on timestamp > values, e.g. use milli precision (long backed) Instant in Java; micro > precision (float backed) timestamp in python. > > Best, > Yi >
Re: Representation of logical type beam:logical_type:datetime:v1
On Wed, Aug 17, 2022 at 5:14 PM Chamikara Jayalath wrote: > > I think this is fine (even though it would add a small perf hit to > JdbcIO.Read). We also probably should make this conversion a utility method > that can be used elsewhere when we need to encode datetime fields. > We should also document that "beam:logical_type:datetime:v1" is not > portable (till we fix the incompatibility). > > +1 for utility method and documentation. If we were to change JDBC instead of make millis_instant compatible to InstantCoder, this would only fix JDBC cross-language timestamps. I expect for other IO connectors this is still a problem and that is why I would like to take a generic approach. In general, inside each sdk we would like to follow the language specific convention of that sdk. I remember a related discussion about the timestamp types: https://github.com/apache/beam/pull/17380#discussion_r852422314 which reached a conclusion that follows the language convention on timestamp values, e.g. use milli precision (long backed) Instant in Java; micro precision (float backed) timestamp in python. Best, Yi
Re: Representation of logical type beam:logical_type:datetime:v1
On Wed, Aug 17, 2022 at 12:59 PM Yi Hu via dev wrote: > Hi Brian, > > Currently Java's JDBCIO does not do special cross-language things. A > DATETIME field type appears because the Row contains an Instant object. And > the sdk will always encode the Instant object using InstantCoder. This is > done for both Java pipelines and cross-language pipelines. To use > millis_instant in JDBCIO and to avoid break Instant used elsewhere (like > watermarks) I will need to change the type of timestamp returned by JDBC > read from Instant to millis_instant. > I think this is fine (even though it would add a small perf hit to JdbcIO.Read). We also probably should make this conversion a utility method that can be used elsewhere when we need to encode datetime fields. We should also document that "beam:logical_type:datetime:v1" is not portable (till we fix the incompatibility). Thanks, Cham > > We could make the name of the logical type to be > "beam:logical_type:millis_instant" which is backed by a big endian int 64. > > Best, > Yi > > > On Fri, Aug 12, 2022 at 5:28 PM Brian Hulette wrote: > >> Ah sorry, I forgot that INT64 is encoded with VarIntCoder, so we can't >> simulate TimestampCoder with a logical type. >> >> I think the ideal end state would be to have a well-defined >> beam:logical_type:millis_instant that we use for cross-language (when >> appropriate), and never use DATETIME at cross-language boundaries. Would it >> be possible to add millis_instant, and use that for JDBC read/write instead >> of DATETIME? >> >> Separately we could consider how to resolve the conflicting definitions >> of beam:logical_type:datetime:v1. I'm not quite sure how/if we can do that >> without breaking pipeline update. >> >> Brian >> >> >> On Fri, Aug 12, 2022 at 7:50 AM Yi Hu via dev >> wrote: >> >>> Hi Cham, >>> >>> Thanks for the comments. >>> >>> > > ii. "beam:logical_type:instant:v1" is still backed by INT64, but in > implementation it will use BigEndianLongCoder to encode/decode the stream. > > Is this to be compatible with the current Java implementation ? And we have to update other SDKs to use big endian coders when encoding/decoding the "beam:logical_type:instant:v1" logical type ? >>> Yes, and the proposal is aimed to keep the Java SDK change minimal; we >>> have to update other SDKs to make it work. Currently python and go sdk does >>> not implement "beam:logical_type:datetime:v1" (will >>> be "beam:logical_type:instant:v1") at all. >>> >>> > For the second step ii, the problem is that there is a primitive type > backed by a fixed length integer coder. Currently INT8, INT16, INT32, > INT64... are all backed by VarInt (and there is ongoing work to use fixed > size big endian to encode INT8, INT16 ( > https://github.com/apache/beam/issues/19815)). Ideally I would think > (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT) > primitive type is backed by VarInt. But this may be a more substantial > change for the current code base. > I'm a bit confused by this. Did you mean that there's *no* primitive type backed by a fixed length integer coder ? Also, by primitive, I'm assuming you mean Beam Schema types here. >>> Yes I mean Beam Schema types here. The proto for datetime(instant) >>> logical type is constructed here: >>> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202 >>> It is represented by an INT64 atomic type. In cross-language case, >>> another SDK receives proto and decodes the stream according to the proto. >>> Currently I do not see an atomic type that will be decoded using a >>> fixed-length BigEndianLong coder. INT8, ..., INT64 will all be decoded with >>> VarInt. >>> >>> As a workaround in the PR (#22561), in python's RowCoder I explicitly >>> set the coder for "beam:logical_type:datetime:v1" (will >>> be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a >>> way to keep the logic contained in the logical type implementation, e.g. in >>> to_language_type and to_representation_type method. To do this I will need >>> an atomic type that is decoded using the BigEndianLong coder. >>> Please point out if I was wrong. >>> >>> Best, >>> Yi >>> >>
Re: Representation of logical type beam:logical_type:datetime:v1
Hi Brian, Currently Java's JDBCIO does not do special cross-language things. A DATETIME field type appears because the Row contains an Instant object. And the sdk will always encode the Instant object using InstantCoder. This is done for both Java pipelines and cross-language pipelines. To use millis_instant in JDBCIO and to avoid break Instant used elsewhere (like watermarks) I will need to change the type of timestamp returned by JDBC read from Instant to millis_instant. We could make the name of the logical type to be "beam:logical_type:millis_instant" which is backed by a big endian int 64. Best, Yi On Fri, Aug 12, 2022 at 5:28 PM Brian Hulette wrote: > Ah sorry, I forgot that INT64 is encoded with VarIntCoder, so we can't > simulate TimestampCoder with a logical type. > > I think the ideal end state would be to have a well-defined > beam:logical_type:millis_instant that we use for cross-language (when > appropriate), and never use DATETIME at cross-language boundaries. Would it > be possible to add millis_instant, and use that for JDBC read/write instead > of DATETIME? > > Separately we could consider how to resolve the conflicting definitions of > beam:logical_type:datetime:v1. I'm not quite sure how/if we can do that > without breaking pipeline update. > > Brian > > > On Fri, Aug 12, 2022 at 7:50 AM Yi Hu via dev wrote: > >> Hi Cham, >> >> Thanks for the comments. >> >> >>> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in implementation it will use BigEndianLongCoder to encode/decode the stream. >>> Is this to be compatible with the current Java implementation ? And we >>> have to update other SDKs to use big endian coders when encoding/decoding >>> the "beam:logical_type:instant:v1" logical type ? >>> >>> >> Yes, and the proposal is aimed to keep the Java SDK change minimal; we >> have to update other SDKs to make it work. Currently python and go sdk does >> not implement "beam:logical_type:datetime:v1" (will >> be "beam:logical_type:instant:v1") at all. >> >> >>> >>> For the second step ii, the problem is that there is a primitive type backed by a fixed length integer coder. Currently INT8, INT16, INT32, INT64... are all backed by VarInt (and there is ongoing work to use fixed size big endian to encode INT8, INT16 ( https://github.com/apache/beam/issues/19815)). Ideally I would think (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT) primitive type is backed by VarInt. But this may be a more substantial change for the current code base. >>> >>> I'm a bit confused by this. Did you mean that there's *no* primitive >>> type backed by a fixed length integer coder ? Also, by primitive, I'm >>> assuming you mean Beam Schema types here. >>> >>> >> Yes I mean Beam Schema types here. The proto for datetime(instant) >> logical type is constructed here: >> https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202 >> It is represented by an INT64 atomic type. In cross-language case, >> another SDK receives proto and decodes the stream according to the proto. >> Currently I do not see an atomic type that will be decoded using a >> fixed-length BigEndianLong coder. INT8, ..., INT64 will all be decoded with >> VarInt. >> >> As a workaround in the PR (#22561), in python's RowCoder I explicitly set >> the coder for "beam:logical_type:datetime:v1" (will >> be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a >> way to keep the logic contained in the logical type implementation, e.g. in >> to_language_type and to_representation_type method. To do this I will need >> an atomic type that is decoded using the BigEndianLong coder. >> Please point out if I was wrong. >> >> Best, >> Yi >> >
Re: Representation of logical type beam:logical_type:datetime:v1
Ah sorry, I forgot that INT64 is encoded with VarIntCoder, so we can't simulate TimestampCoder with a logical type. I think the ideal end state would be to have a well-defined beam:logical_type:millis_instant that we use for cross-language (when appropriate), and never use DATETIME at cross-language boundaries. Would it be possible to add millis_instant, and use that for JDBC read/write instead of DATETIME? Separately we could consider how to resolve the conflicting definitions of beam:logical_type:datetime:v1. I'm not quite sure how/if we can do that without breaking pipeline update. Brian On Fri, Aug 12, 2022 at 7:50 AM Yi Hu via dev wrote: > Hi Cham, > > Thanks for the comments. > > >> >>> >>> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in >>> implementation it will use BigEndianLongCoder to encode/decode the stream. >>> >>> >> Is this to be compatible with the current Java implementation ? And we >> have to update other SDKs to use big endian coders when encoding/decoding >> the "beam:logical_type:instant:v1" logical type ? >> >> > Yes, and the proposal is aimed to keep the Java SDK change minimal; we > have to update other SDKs to make it work. Currently python and go sdk does > not implement "beam:logical_type:datetime:v1" (will > be "beam:logical_type:instant:v1") at all. > > >> >> >>> For the second step ii, the problem is that there is a primitive type >>> backed by a fixed length integer coder. Currently INT8, INT16, INT32, >>> INT64... are all backed by VarInt (and there is ongoing work to use fixed >>> size big endian to encode INT8, INT16 ( >>> https://github.com/apache/beam/issues/19815)). Ideally I would think >>> (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT) >>> primitive type is backed by VarInt. But this may be a more substantial >>> change for the current code base. >>> >> >> I'm a bit confused by this. Did you mean that there's *no* primitive >> type backed by a fixed length integer coder ? Also, by primitive, I'm >> assuming you mean Beam Schema types here. >> >> > Yes I mean Beam Schema types here. The proto for datetime(instant) logical > type is constructed here: > https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202 > It is represented by an INT64 atomic type. In cross-language case, another > SDK receives proto and decodes the stream according to the proto. Currently > I do not see an atomic type that will be decoded using a fixed-length > BigEndianLong coder. INT8, ..., INT64 will all be decoded with VarInt. > > As a workaround in the PR (#22561), in python's RowCoder I explicitly set > the coder for "beam:logical_type:datetime:v1" (will > be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a > way to keep the logic contained in the logical type implementation, e.g. in > to_language_type and to_representation_type method. To do this I will need > an atomic type that is decoded using the BigEndianLong coder. > Please point out if I was wrong. > > Best, > Yi >
Re: Representation of logical type beam:logical_type:datetime:v1
Hi Cham, Thanks for the comments. > >> >> ii. "beam:logical_type:instant:v1" is still backed by INT64, but in >> implementation it will use BigEndianLongCoder to encode/decode the stream. >> >> > Is this to be compatible with the current Java implementation ? And we > have to update other SDKs to use big endian coders when encoding/decoding > the "beam:logical_type:instant:v1" logical type ? > > Yes, and the proposal is aimed to keep the Java SDK change minimal; we have to update other SDKs to make it work. Currently python and go sdk does not implement "beam:logical_type:datetime:v1" (will be "beam:logical_type:instant:v1") at all. > > >> For the second step ii, the problem is that there is a primitive type >> backed by a fixed length integer coder. Currently INT8, INT16, INT32, >> INT64... are all backed by VarInt (and there is ongoing work to use fixed >> size big endian to encode INT8, INT16 ( >> https://github.com/apache/beam/issues/19815)). Ideally I would think >> (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT) >> primitive type is backed by VarInt. But this may be a more substantial >> change for the current code base. >> > > I'm a bit confused by this. Did you mean that there's *no* primitive type > backed by a fixed length integer coder ? Also, by primitive, I'm assuming > you mean Beam Schema types here. > > Yes I mean Beam Schema types here. The proto for datetime(instant) logical type is constructed here: https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L202 It is represented by an INT64 atomic type. In cross-language case, another SDK receives proto and decodes the stream according to the proto. Currently I do not see an atomic type that will be decoded using a fixed-length BigEndianLong coder. INT8, ..., INT64 will all be decoded with VarInt. As a workaround in the PR (#22561), in python's RowCoder I explicitly set the coder for "beam:logical_type:datetime:v1" (will be "beam:logical_type:instant:v1") to be TimestampCoder. I do not find a way to keep the logic contained in the logical type implementation, e.g. in to_language_type and to_representation_type method. To do this I will need an atomic type that is decoded using the BigEndianLong coder. Please point out if I was wrong. Best, Yi
Re: Representation of logical type beam:logical_type:datetime:v1
On Thu, Aug 11, 2022 at 8:50 AM Yi Hu via dev wrote: > Hi, > > tl;dr There are two (or three) different representations used for the URN > "beam:logical_type:datetime:v1" in Beam Java SDK. Clarification or clean up > is needed. > > I recently try to resolve a long-time issue > https://github.com/apache/beam/issues/19817 for the datetime logical type > cross-language support. However I notice that currently in Java SDK this > URN is referred in two places: > > (1) > https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java#L49 > where it has a representation of Row {Date: INT64, Time: INT64} > > (2) > https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L67 > where it is represented by a single INT64 > > Moreover, there is a third, "actual" representation that is used when one > encodes a joda.DateTime or joda.Instant object in a Row, which is an int64 > type encoded with fixed-size big endian. Note that this is different from > (2) because INT64 will be decoded using a VarInt coder which causes data > corruption when other sdk (e.g. python) pulls rows from java ptransform. > > To resolve this I try to implement the "actual" representation of logical > type "beam:logical_type:datetime:v1" ( > https://github.com/apache/beam/pull/22561 for contexts). It then becomes > necessary to resolve this inconsistency because we have reached a point of > adding a known logical type in schemas.proto. I am consider some solutions: > > i. Rename the logical type of case (2) as "beam:logical_type:instant:v1" > as suggested by Brian in an earlier review. > +1 > > ii. "beam:logical_type:instant:v1" is still backed by INT64, but in > implementation it will use BigEndianLongCoder to encode/decode the stream. > > Is this to be compatible with the current Java implementation ? And we have to update other SDKs to use big endian coders when encoding/decoding the "beam:logical_type:instant:v1" logical type ? > For the second step ii, the problem is that there is a primitive type > backed by a fixed length integer coder. Currently INT8, INT16, INT32, > INT64... are all backed by VarInt (and there is ongoing work to use fixed > size big endian to encode INT8, INT16 ( > https://github.com/apache/beam/issues/19815)). Ideally I would think > (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT) > primitive type is backed by VarInt. But this may be a more substantial > change for the current code base. > I'm a bit confused by this. Did you mean that there's *no* primitive type backed by a fixed length integer coder ? Also, by primitive, I'm assuming you mean Beam Schema types here. Thanks, Cham > > I would like to have opinions from the community. Thanks for your > attention! > > Regards, > Yi > > -- > > Yi Hu, (he/him/his) > > Software Engineer > > >
Representation of logical type beam:logical_type:datetime:v1
Hi, tl;dr There are two (or three) different representations used for the URN "beam:logical_type:datetime:v1" in Beam Java SDK. Clarification or clean up is needed. I recently try to resolve a long-time issue https://github.com/apache/beam/issues/19817 for the datetime logical type cross-language support. However I notice that currently in Java SDK this URN is referred in two places: (1) https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java#L49 where it has a representation of Row {Date: INT64, Time: INT64} (2) https://github.com/apache/beam/blob/cf9ea1f442636f781b9f449e953016bb39622781/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L67 where it is represented by a single INT64 Moreover, there is a third, "actual" representation that is used when one encodes a joda.DateTime or joda.Instant object in a Row, which is an int64 type encoded with fixed-size big endian. Note that this is different from (2) because INT64 will be decoded using a VarInt coder which causes data corruption when other sdk (e.g. python) pulls rows from java ptransform. To resolve this I try to implement the "actual" representation of logical type "beam:logical_type:datetime:v1" ( https://github.com/apache/beam/pull/22561 for contexts). It then becomes necessary to resolve this inconsistency because we have reached a point of adding a known logical type in schemas.proto. I am consider some solutions: i. Rename the logical type of case (2) as "beam:logical_type:instant:v1" as suggested by Brian in an earlier review. ii. "beam:logical_type:instant:v1" is still backed by INT64, but in implementation it will use BigEndianLongCoder to encode/decode the stream. For the second step ii, the problem is that there is a primitive type backed by a fixed length integer coder. Currently INT8, INT16, INT32, INT64... are all backed by VarInt (and there is ongoing work to use fixed size big endian to encode INT8, INT16 ( https://github.com/apache/beam/issues/19815)). Ideally I would think (INT8, INT16, INT32, INT64) are all fixed and having a generic (INT) primitive type is backed by VarInt. But this may be a more substantial change for the current code base. I would like to have opinions from the community. Thanks for your attention! Regards, Yi -- Yi Hu, (he/him/his) Software Engineer