Re: Incomplete Beam Schema -> Avro Schema conversion

2022-09-09 Thread Balázs Németh
Is it still better to have an asymmetric conversion that supports more data
types than not having these implemented, right? This contribution seems
simple enough, but that's definitely not true for the other direction (...
and I'm also biased, I only need Beam->Avro).

Brian Hulette via dev  ezt írta (időpont: 2022. aug.
23., K, 1:53):

> I don't think there's a reason for this, it's just that these logical
> types were defined after the Avro <-> Beam schema conversion. I think it
> would be worthwhile to add support for them, but we'd also need to look at
> the reverse (avro to beam) direction, which would map back to the catch-all
> DATETIME primitive type [1]. Changing that could break backwards
> compatibility.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L771-L776
>
> On Wed, Aug 17, 2022 at 2:53 PM Balázs Németh 
> wrote:
>
>> java.lang.RuntimeException: Unhandled logical type
>> beam:logical_type:date:v1
>>   at
>> org.apache.beam.sdk.schemas.utils.AvroUtils.getFieldSchema(AvroUtils.java:943)
>>   at
>> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroField(AvroUtils.java:306)
>>   at
>> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:341)
>>   at
>> org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java
>>
>> In
>> https://github.com/apache/beam/blob/7bb755906c350d77ba175e1bd990096fbeaf8e44/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L902-L944
>> it seems to me there are some missing options.
>>
>> For example
>> - FixedBytes.IDENTIFIER,
>> - EnumerationType.IDENTIFIER,
>> - OneOfType.IDENTIFIER
>> is there, but:
>> - org.apache.beam.sdk.schemas.logicaltypes.Date.IDENTIFIER
>> ("beam:logical_type:date:v1")
>> - org.apache.beam.sdk.schemas.logicaltypes.DateTime.IDENTIFIER
>> ("beam:logical_type:datetime:v1")
>> - org.apache.beam.sdk.schemas.logicaltypes.Time.IDENTIFIER
>> ("beam:logical_type:time:v1")
>> is missing.
>>
>> This in an example that fails:
>>
>>> import java.time.LocalDate;
>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
>>> import org.apache.beam.sdk.schemas.Schema;
>>> import org.apache.beam.sdk.schemas.Schema.FieldType;
>>> import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
>>> import org.apache.beam.sdk.schemas.utils.AvroUtils;
>>> import org.apache.beam.sdk.values.Row;
>>
>> // ...
>>
>> final Schema schema =
>>> Schema.builder()
>>> .addField("ymd",
>>> FieldType.logicalType(SqlTypes.DATE))
>>> .build();
>>>
>>> final Row row =
>>> Row.withSchema(schema)
>>> .withFieldValue("ymd", LocalDate.now())
>>> .build();
>>>
>>> System.out.println(BigQueryUtils.toTableSchema(schema)); // works
>>> System.out.println(BigQueryUtils.toTableRow(row)); // works
>>>
>>> System.out.println(AvroUtils.toAvroSchema(schema)); // fails
>>> System.out.println(AvroUtils.toGenericRecord(row)); // fails
>>
>>
>> Am I missing a reason for that or is it just not done properly yet? If
>> this is the case, am I right to assume that they should be represented in
>> the Avro format as the already existing cases?
>> "beam:logical_type:date:v1" vs "DATE"
>> "beam:logical_type:time:v1" vs "TIME"
>>
>>
>>


Incomplete Beam Schema -> Avro Schema conversion

2022-08-17 Thread Balázs Németh
java.lang.RuntimeException: Unhandled logical type beam:logical_type:date:v1
  at
org.apache.beam.sdk.schemas.utils.AvroUtils.getFieldSchema(AvroUtils.java:943)
  at
org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroField(AvroUtils.java:306)
  at
org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:341)
  at org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java

In
https://github.com/apache/beam/blob/7bb755906c350d77ba175e1bd990096fbeaf8e44/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L902-L944
it seems to me there are some missing options.

For example
- FixedBytes.IDENTIFIER,
- EnumerationType.IDENTIFIER,
- OneOfType.IDENTIFIER
is there, but:
- org.apache.beam.sdk.schemas.logicaltypes.Date.IDENTIFIER
("beam:logical_type:date:v1")
- org.apache.beam.sdk.schemas.logicaltypes.DateTime.IDENTIFIER
("beam:logical_type:datetime:v1")
- org.apache.beam.sdk.schemas.logicaltypes.Time.IDENTIFIER
("beam:logical_type:time:v1")
is missing.

This in an example that fails:

> import java.time.LocalDate;
> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
> import org.apache.beam.sdk.schemas.Schema;
> import org.apache.beam.sdk.schemas.Schema.FieldType;
> import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
> import org.apache.beam.sdk.schemas.utils.AvroUtils;
> import org.apache.beam.sdk.values.Row;

// ...

final Schema schema =
> Schema.builder()
> .addField("ymd",
> FieldType.logicalType(SqlTypes.DATE))
> .build();
>
> final Row row =
> Row.withSchema(schema)
> .withFieldValue("ymd", LocalDate.now())
> .build();
>
> System.out.println(BigQueryUtils.toTableSchema(schema)); // works
> System.out.println(BigQueryUtils.toTableRow(row)); // works
>
> System.out.println(AvroUtils.toAvroSchema(schema)); // fails
> System.out.println(AvroUtils.toGenericRecord(row)); // fails


Am I missing a reason for that or is it just not done properly yet? If this
is the case, am I right to assume that they should be represented in the
Avro format as the already existing cases?
"beam:logical_type:date:v1" vs "DATE"
"beam:logical_type:time:v1" vs "TIME"


Re: KafkaIO.java.configuredKafkaCommit() inconsistency

2022-08-08 Thread Balázs Németh
thanks, see https://github.com/apache/beam/issues/22631 +
https://github.com/apache/beam/pull/22633

John Casey via dev  ezt írta (időpont: 2022. aug. 8.,
H, 21:30):

> Which looking at your message again, would imply that the
> configuredKafkaCommit() method shouldn't inspect isolation.level
>
> On Mon, Aug 8, 2022 at 3:27 PM John Casey  wrote:
>
>> .withReadCommitted() doesn't commit messages when read, it instead
>> specifies that the kafka consumer should only read messages that have
>> themselves been committed to kafka.
>>
>> Its use is for exactly once applications.
>>
>>
>>
>> On Mon, Aug 8, 2022 at 3:16 PM Balázs Németh 
>> wrote:
>>
>>> I have been reading from Kafka and trying to figure out which offset
>>> management would be the best for my use-case. During that I noticed
>>> something odd.
>>>
>>>
>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362
>>>
>>> private boolean configuredKafkaCommit() {
>>>   return getConsumerConfig().get("isolation.level") ==
>>> "read_committed"
>>>   ||
>>> Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
>>> }
>>>
>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298
>>>
>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334
>>>
>>> The name of the method, and how it's being used in the code certainly
>>> suggest that using read_committed isolation level handles and commits
>>> kafka offsets.Seemed strange, but I'm not a Kafka pro, so let's test it.
>>> Well it does not.
>>>
>>> - using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it
>>> - using ONLY commitOffsetsInFinalize() does commit it
>>>
>>> - using ONLY withReadCommitted() does NOT commit it
>>>
>>> Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read
>>>
>>> So is it a bug, or what am I missing here?
>>>
>>> If it is indeed a bug, then is it with the read_committed (so it should
>>> commit it although found no explicit documentation about that anywhere), or
>>> having that isolation level shouldn't prefer the commit in the finalize and
>>> that method is wrong?
>>>
>>>
>>>
>>>


KafkaIO.java.configuredKafkaCommit() inconsistency

2022-08-08 Thread Balázs Németh
I have been reading from Kafka and trying to figure out which offset
management would be the best for my use-case. During that I noticed
something odd.

https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362

private boolean configuredKafkaCommit() {
  return getConsumerConfig().get("isolation.level") == "read_committed"
  ||
Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298
https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334

The name of the method, and how it's being used in the code certainly
suggest that using read_committed isolation level handles and commits kafka
offsets.Seemed strange, but I'm not a Kafka pro, so let's test it. Well it
does not.

- using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it
- using ONLY commitOffsetsInFinalize() does commit it

- using ONLY withReadCommitted() does NOT commit it

Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read

So is it a bug, or what am I missing here?

If it is indeed a bug, then is it with the read_committed (so it should
commit it although found no explicit documentation about that anywhere), or
having that isolation level shouldn't prefer the commit in the finalize and
that method is wrong?


Re: Questions regarding contribution: Support for reading Kafka topics from any startReadTime in Java

2022-06-28 Thread Balázs Németh
Do you mean if the "startReadTime" as a configuration on its own shifts the
watermark already even if we receive an older entry first? That I do not
know, but based on the code I would say it does not. So if I get it right,
if it would, then my original idea would work fine, and we have to think
further only if this doesn't happen?

For SDF:
@GetInitialWatermarkEstimatorState is the one that has to consider
startReadTime(), similarly to
https://github.com/apache/beam/blob/25ca4f0fddd011bfc593e72eea6d32c040808b29/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java#L103-L106
?

For legacy read:
Setting the PartitionState.lastWatermark (not only the nextOffset) in
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.setupInitialOffset(PartitionState)
and/or modifying the return value
of org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.getWatermark() should
be it then?



Kenneth Knowles  ezt írta (időpont: 2022. jún. 16., Cs,
23:57):

> These last two emails make sense to me, from both of you.
>
> Does startReadTime impact the watermark in the right way so that late data
> is dropped, or whatnot? Because even if you have a partition and you _do_
> find an offset that works, late data could come in. So if you restrict the
> time to be in the past, and you treat late data consistently across
> partitions, then it will not be observable.
>
> I think it would be good to try to come up with some way of describing
> this as an invariant like "the observed behavior of startReadTime is the
> same even when X Y Z are different".
>
> Kenn
>
> On Wed, Jun 15, 2022 at 3:43 PM Balázs Németh 
> wrote:
>
>> Well, a sanity check to only allow startReadTime in the past seemed
>> obvious to me. I can't really think of any common use-case when you want to
>> start processing after a specific time in the future and only idle until
>> that, but the other direction has multiple.
>>
>> The question regarding this IMO is how much of a "safety delay" from
>> "now" - if any - we have to keep if we were unable to find any newer
>> offset. I mean a message that is older than the startReadTime might be
>> still in processing and might not be available at the kafka cluster yet
>> before getting the offset. It has to be either without any check like that
>> and let the developers care about this, or it needs a filter that drops
>> messages prior to the timestamp.
>>
>> Daniel Collins  ezt írta (időpont: 2022. jún. 16.,
>> Cs, 0:19):
>>
>>> This may or may not be reasonable. Lets assume I pick startReadTime =
>>> now + 1 minute. Your logic will start returning a lot of records with
>>> values == now + 1ms. Is that reasonable for users of this API? Maybe, maybe
>>> not.
>>>
>>> -Daniel
>>>
>>> On Wed, Jun 15, 2022 at 6:16 PM Balázs Németh 
>>> wrote:
>>>
>>>> Not a single objection means my idea seems okay then? :)
>>>>
>>>> Balázs Németh  ezt írta (időpont: 2022. máj.
>>>> 26., Cs, 5:59):
>>>>
>>>>> https://issues.apache.org/jira/browse/BEAM-14518
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198
>>>>>
>>>>> Right now the 'startReadTime' config for KafkaIO.Read looks up an
>>>>> offset in every topic partition that is newer or equal to that timestamp.
>>>>> The problem is that if we use a timestamp that is so new, that we don't
>>>>> have any newer/equal message in the partition. In that case the code fails
>>>>> with an exception. Meanwhile in certain cases it makes no sense as we 
>>>>> could
>>>>> actually make it work.
>>>>>
>>>>> If we don't get an offset from calling `consumer.offsetsForTimes`, we
>>>>> should call `endOffsets`, and use the returned offset + 1. That is 
>>>>> actually
>>>>> the offset we will have to read next time.
>>>>>
>>>>> Even if `endOffsets` can't return an offset we could use 0 as the
>>>>> offset to read from.
>>>>>
>>>>>
>>>>>
>>>>> Am I missing something here? Is it okay to contribute this?
>>>>>
>>>>


Re: Questions regarding contribution: Support for reading Kafka topics from any startReadTime in Java

2022-06-15 Thread Balázs Németh
Well, a sanity check to only allow startReadTime in the past seemed obvious
to me. I can't really think of any common use-case when you want to start
processing after a specific time in the future and only idle until that,
but the other direction has multiple.

The question regarding this IMO is how much of a "safety delay" from "now"
- if any - we have to keep if we were unable to find any newer offset. I
mean a message that is older than the startReadTime might be still in
processing and might not be available at the kafka cluster yet before
getting the offset. It has to be either without any check like that and let
the developers care about this, or it needs a filter that drops messages
prior to the timestamp.

Daniel Collins  ezt írta (időpont: 2022. jún. 16.,
Cs, 0:19):

> This may or may not be reasonable. Lets assume I pick startReadTime =
> now + 1 minute. Your logic will start returning a lot of records with
> values == now + 1ms. Is that reasonable for users of this API? Maybe, maybe
> not.
>
> -Daniel
>
> On Wed, Jun 15, 2022 at 6:16 PM Balázs Németh 
> wrote:
>
>> Not a single objection means my idea seems okay then? :)
>>
>> Balázs Németh  ezt írta (időpont: 2022. máj. 26.,
>> Cs, 5:59):
>>
>>> https://issues.apache.org/jira/browse/BEAM-14518
>>>
>>>
>>> https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198
>>>
>>> Right now the 'startReadTime' config for KafkaIO.Read looks up an offset
>>> in every topic partition that is newer or equal to that timestamp. The
>>> problem is that if we use a timestamp that is so new, that we don't have
>>> any newer/equal message in the partition. In that case the code fails with
>>> an exception. Meanwhile in certain cases it makes no sense as we could
>>> actually make it work.
>>>
>>> If we don't get an offset from calling `consumer.offsetsForTimes`, we
>>> should call `endOffsets`, and use the returned offset + 1. That is actually
>>> the offset we will have to read next time.
>>>
>>> Even if `endOffsets` can't return an offset we could use 0 as the offset
>>> to read from.
>>>
>>>
>>>
>>> Am I missing something here? Is it okay to contribute this?
>>>
>>


Re: Questions regarding contribution: Support for reading Kafka topics from any startReadTime in Java

2022-06-15 Thread Balázs Németh
Not a single objection means my idea seems okay then? :)

Balázs Németh  ezt írta (időpont: 2022. máj. 26.,
Cs, 5:59):

> https://issues.apache.org/jira/browse/BEAM-14518
>
>
> https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198
>
> Right now the 'startReadTime' config for KafkaIO.Read looks up an offset
> in every topic partition that is newer or equal to that timestamp. The
> problem is that if we use a timestamp that is so new, that we don't have
> any newer/equal message in the partition. In that case the code fails with
> an exception. Meanwhile in certain cases it makes no sense as we could
> actually make it work.
>
> If we don't get an offset from calling `consumer.offsetsForTimes`, we
> should call `endOffsets`, and use the returned offset + 1. That is actually
> the offset we will have to read next time.
>
> Even if `endOffsets` can't return an offset we could use 0 as the offset
> to read from.
>
>
>
> Am I missing something here? Is it okay to contribute this?
>


Questions regarding contribution: Support for reading Kafka topics from any startReadTime in Java

2022-05-25 Thread Balázs Németh
https://issues.apache.org/jira/browse/BEAM-14518

https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198

Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in
every topic partition that is newer or equal to that timestamp. The problem
is that if we use a timestamp that is so new, that we don't have any
newer/equal message in the partition. In that case the code fails with an
exception. Meanwhile in certain cases it makes no sense as we could
actually make it work.

If we don't get an offset from calling `consumer.offsetsForTimes`, we
should call `endOffsets`, and use the returned offset + 1. That is actually
the offset we will have to read next time.

Even if `endOffsets` can't return an offset we could use 0 as the offset to
read from.



Am I missing something here? Is it okay to contribute this?