Re: [Proposal] | Move FileIO and TextIO from :sdks:java:core to :sdks:java:io:file

2022-12-15 Thread Cristian Constantinescu
Counter argument to the "in one box" thing.

I would like to point out that "having things in one box" is not a reason
to have the code residing in the same module/project.

What the user sees and how the code is structured are two very different
things in my opinion. Beam can certainly have modules developed at
different speeds and packaged "in one box" before the release. The Spring
Framework is a good example of that practice.

I would also like to show a current very simple example where the Beam user
experience is lacking and is unpredictable. In other words, where the
integration between Beam components is non-existent, even if everything is
currently "in one box". Consider this code:

var options = PipelineOptionsFactory.fromArgs(args).create();
var p = Pipeline.create(options);
p.getCoderRegistry().registerCoderForClass(FooAvroRecord.class,
SerializableCoder.of(FooAvroRecord.class));
var file =
App.class.getClassLoader().getResource("avro1.avro").toURI().getPath();
var read = p.apply(AvroIO
.read(FooAvroRecord.class)
.from(file)
);
System.out.println("Using coder:" + read.getCoder());

Can you guess what coder this simple pipeline will output? If you guessed
SerializableCoder, you'd be wrong... it's "Using
coder:org.apache.beam.sdk.coders.AvroCoder@8b0f130", even if the user
explicitly specified the coder it wants to be used.

Going by the argument that there is better integration because "everything
is in one box", there shouldn't be this disconnect between AvroIO and the
CoderRegistry, but here we are.

There are countless examples of these user experiences issues that I can
provide.

Even more frustrating is that not only everything is in one box, but it's
mostly a **closed** box. A simple example, I want to extend the *Utils
(AvroUtils, POJOUtils, etc) so that their respective methods that return
Beam Schema or Schema Coder uses NanosInstant logical type for all
properties of java.time.Instant type because I don't use joda.time.Instant
anywhere in my code. Would be nice to override a given method or inject an
implementation that Bean internals will use or at least some configuration
based solution to achieve this. Yet, to my knowledge, that simply is not
possible right now, so things like the below are broken and very hard to
work around.

var options = PipelineOptionsFactory.fromArgs(args).create();
var p = Pipeline.create(options);
var file =
App.class.getClassLoader().getResource("avro1.avro").toURI().getPath();
var read = p.apply(AvroIO
.read(FooAvroRecord.class)
.withBeamSchemas(true)
.from(file)
);
System.out.println("Using coder:" + read.getCoder());

THis will crash and burn with the following:

Using coder:SchemaCoder
wrote:

> I agree with Sachin. Keeping components that users will have to bring
> together anyway leads to a better user experience. Counter example to that
> is GCP libraries in my opinion. It was a frequent struggle for users to
> find a working set of libraries until there was a BOM. And even after the
> BOM it is still somewhat of a struggle for users and the developers of
> those various libraries need to take on some of the toil of testing those
> various libraries together anyway.
>
> re: Talk it with a grain of salt since I'm not even a committer - All
> inputs are welcome here. I do not think my comments should carry more
> weight just because I am a committer.
>
> On Wed, Dec 14, 2022 at 9:36 AM Sachin Agarwal via dev <
> dev@beam.apache.org> wrote:
>
>> I strongly believe that we should continue to have Beam optimize for the
>> user - and while having separate components would allow those of us who are
>> contributors and committers move faster, the downsides of not having
>> everything "in one box" for a new user where the components are all
>> relatively guaranteed to work together at that version level are very high.
>>
>> Beam having everything included is absolutely a competitive advantage for
>> Beam and I would not want to lose that.
>>
>> On Wed, Dec 14, 2022 at 9:31 AM Byron Ellis via dev 
>> wrote:
>>
>>> Talk it with a grain of salt since I'm not even a committer, but is
>>> perhaps the reorganization of Beam into smaller components the real work of
>>> a 3.0 effort? Splitting of Beam into smaller more independently managed
>>> components would be a pretty huge breaking change from a dependency
>>> management perspective which would potentially be largely separate from any
>>> code changes.
>>>
>>> Best,
>>> B
>>>
>>> On Wed, Dec 14, 2022 at 9:23 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
 On 12 Dec 2022, at 22:23, Robert Bradshaw via dev 
 wrote:


 Saving up all the breaking changes until a major release definitely
 has its downsides (look at Python 3). The migration path is often as
 important (if not more so) than the final destination.


 Actually, it proves that the major releases *should 

Re: [Proposal] | Move FileIO and TextIO from :sdks:java:core to :sdks:java:io:file

2022-12-12 Thread Cristian Constantinescu
Hi,

"As for the pipeline update feature, we've long discussed
having "pick-your-implementation" transforms that specify
alternative, equivalent implementations."

Could someone point me to where this was discussed please? I seem to have
missed that whole topic. Is it like a dependency injection type of thing?
If so, it's one thing I would love to see in Beam.

Thanks,
Cristian

On Mon, Dec 12, 2022 at 4:23 PM Robert Bradshaw via dev 
wrote:

> Saving up all the breaking changes until a major release definitely
> has its downsides (look at Python 3). The migration path is often as
> important (if not more so) than the final destination.
>
> As for this particular change, I would question how the benefit (it's
> unclear what the exact benefit is--better internal organization?)
> exceeds the pain of making every user refactor their code. I think a
> stronger case can be made for things like the Avro dependency that
> cause real pain.
>
> As for the pipeline update feature, we've long discussed having
> "pick-your-implementation" transforms that specify alternative,
> equivalent implementations. Upgrades can choose the old one whereas
> new pipelines can get the latest and greatest. It won't solve all
> issues, and requires keeping old codepaths around, but could be an
> important step forward.
>
> On Mon, Dec 12, 2022 at 10:20 AM Kenneth Knowles  wrote:
> >
> > I agree with Mortiz. To answer a few specifics in my own words:
> >
> >  - It is a perfectly sensible refactor, but as a counterpoint without
> file-based IO the SDK isn't functional so it is also a reasonable design
> point to have this included. There are other things in the core SDK that
> are far less "core" and could be moved out with greater benefit. The main
> goal for any separation of modules would be lighter weight transitive
> dependencies, IMO.
> >
> >  - No, Beam has not made any deliberate breaking changes of this nature.
> Hence we are still on major version 2. We have made some bugfixes for data
> loss risks that could be called "breaking changes" but since the feature
> was unsafe to use in the first place we did not bump the major version.
> >
> >  - It is sometimes possible to do such a refactor and have the
> deprecated location proxy to the new location. In this case that seems hard
> to achieve.
> >
> >  - It is not actually necessary to maintain both locations, as we can
> declare the old location will be unmaintained (but left alone) and all new
> development goes to the new location. That isn't a great choice for users
> who may simply upgrade their SDK version and not notice that their old code
> is now pointing at a version that will not receive e.g. security updates.
> >
> >  - I like the style where if/when we transition from Beam 2 to Beam 3 we
> should have the exact functionality of Beam 3 available as an opt-in flag
> first. So if a user passes --beam-3 they get exactly what will be the
> default functionality when we bump the major version. It really is a
> problem to do a whole bunch of stuff feverishly before a major version
> bump. The other style that I think works well is the linux kernel style
> where major versions alternate between stable and unstable (in other words,
> returning to the 0.x style with every alternating version).
> >
> >  - I do think Beam suffers from fear and inability to do significant
> code gardening. I don't think backwards compatibility in the code sense is
> the biggest blocker. I think the "pipeline update" feature is perhaps the
> thing most holding Beam back from making radical rapid forward progress.
> >
> > Kenn
> >
> > On Mon, Dec 12, 2022 at 2:25 AM Moritz Mack  wrote:
> >>
> >> Hi Damon,
> >>
> >>
> >>
> >> I fear the current release / versioning strategy of Beam doesn’t lend
> itself well for such breaking changes. Alexey and I have spent quite some
> time discussing how to proceed with the problematic Avro dependency in core
> (and respectively AvroIO, of course).
> >>
> >> Such changes essentially always require duplicating code to continue
> supporting a deprecated legacy code path to not break users’ code. But this
> comes at a very high price. Until the deprecated code path can be finally
> removed again, it must be maintained in two places.
> >>
> >> Unfortunately, the removal of deprecated code is rather problematic
> without a major version release as it would break semantic versioning and
> people’s expectations. With that deprecations bear the inherent risk to
> unintentionally deplete quality rather than improving it.
> >>
> >> I’d therefore recommend against such efforts unless there’s very strong
> reasons to do so.
> >>
> >>
> >>
> >> Best, Moritz
> >>
> >>
> >>
> >> On 07.12.22, 18:05, "Damon Douglas via dev" 
> wrote:
> >>
> >>
> >>
> >> Hello Everyone, If you identify yourself on the Beam learning journey,
> even if this is your first day, please see yourself as a welcome
> participant in this conversation and consider reviewing the bottom portion
> of this 

Re: [DISCUSSION][JAVA] Current state of Java 17 support

2022-12-01 Thread Cristian Constantinescu
Hi,

I came across some Kafka info and would like to share for those
unaware. Kafka is planning to drop support for Java 8 in Kafka 4 (Java
8 is deprecated in Kafka 3), see KIP-750 [1].

I'm not sure when Kafka 4 is scheduled to be released (probably a few
years down the road), but when it happens, KafkaIO may not be able to
support it if we maintain Java 8 compatibility unless it remains on
Kafka 3.

Anyways, if not already done, I think it's a good idea to start
putting up serious warning flags around Beam used with Java 8, even
for Google cloud customers ;)

Cheers,
Cristian

[1] https://issues.apache.org/jira/browse/KAFKA-12894

On Wed, Nov 30, 2022 at 12:59 PM Kenneth Knowles  wrote:
>
> An important thing is to ensure that we do not accidentally depend on 
> something that would break Java 8 support.
>
> Currently our Java 11 and 17 tests build the code with Java 8 (just like our 
> released artifacts) and then compile and run the test code with the newer 
> JDK. This roughly matches the user scenario, I think. So it is a little more 
> complex than just having separate test runs for different JDK versions. But 
> it would be good to make this more symmetrical between JDK versions to 
> develop the mindset that JDK is always explicit.
>
> Kenn
>
> On Wed, Nov 30, 2022 at 9:48 AM Alexey Romanenko  
> wrote:
>>
>>
>> On 30 Nov 2022, at 03:56, Tomo Suzuki via dev  wrote:
>>
>> > Do we still need to support Java 8 SDK?
>>
>> Yes, for Google Cloud customers who still use Java 8, I want Apache Beam to 
>> support Java 8. Do you observe any special burden maintaining Java 8?
>>
>>
>> I can only think of the additional resources costs if we will test all 
>> supported JDKs, as Austin mentioned above. Imho, we should do that for all 
>> JDK that are officially supported.
>> Another less-costly way is to run the Java tests for all JDKs only during 
>> the release preparation stage.
>>
>> I agree that it would make sense to continue to support Java 8 until a 
>> significant number of users are using it.
>>
>> —
>> Alexey
>>
>>
>>
>> Regards,
>> Tomo
>>
>> On Tue, Nov 29, 2022 at 21:48 Austin Bennett  wrote:
>>>
>>> -1 for ongoing Java8 support [ or, said another way, +1 for dropping 
>>> support of Java8 ]
>>>
>>> +1 for having tests that run for ANY JDK that we say we support.  Is there 
>>> any reason the resources to support are too costly [ or outweigh the 
>>> benefits of additional confidence in ensuring we support what we say we do 
>>> ]?  I am not certain on whether this would only be critical for releases, 
>>> or should be done as part of regular CI.
>>>
>>> On Tue, Nov 29, 2022 at 8:51 AM Alexey Romanenko  
>>> wrote:

 Hello,

 I’m sorry if it’s already discussed somewhere but I find myself a little 
 bit lost in the subject.
 So, I’d like to clarify this - what is a current official state of Java 17 
 support at Beam?

 I recall that a great job was done to make Beam compatible with Java 17 
 [1] and Beam already provides “beam_java17_sdk” Docker image [2] but, 
 iiuc, Java 8 is still the default JVM to run all Java tests on Jenkins 
 ("Java PreCommit" in the first order) and there are only limited number of 
 tests that are running with JDK 11 and 17 on Jenkins by dedicated jobs.

 So, my question would sound like if Beam officially supports Java 17 (and 
 11), do we need to run all Beam Java SDK related tests (VR and IT test 
 including) against all supported Java SDKs?

 Do we still need to support Java 8 SDK?

 In the same time, as we are heading to move everything from Jenkins to 
 GitHub actions, what would be the default JDK there or we will run all 
 Java-related actions against all supported JDKs?

 —
 Alexey

 [1] https://issues.apache.org/jira/browse/BEAM-12240
 [2] https://hub.docker.com/r/apache/beam_java17_sdk



>> --
>> Regards,
>> Tomo
>>
>>


Re: Using unbounded source as a side input for a DoFn

2022-07-21 Thread Cristian Constantinescu
Disclaimer: I am not an expert, but I kinda worked on something similar.

A few points I'd like to bring up:
- Side inputs do not trigger the processElement function when new elements
are added to the input. That means that if your side input doesn't have the
desired other item in the side input at the time it's processed, too bad.
Unless you use timers to reprocess it at a later time when the side input
might have more data.

- If your goal to somewhat combine two PCollections, I would suggest you
look into CoGroupByKey[1] and it's schema aware brother CoGroup [2], you
can then use a global window that triggers on every element
(AfterProcessingTime.pastFirstElementInPane).

- The input of a PTransform can also be a PCollectionTuple and in your
inner ParDo, you can loop through items of either collection. Something
like this. Pseudocode:

class FooTransfrom extends PTransform>{
private TupleTag aTag = new TupleTag() {} <- curly brackets
are important as far as I know
private TupleTag bTag = new TupleTag() {}
// getters for the above fields

public  PCollection  expand(PCollectionTuple input){
return input.apply(new FooDoFn(aTag, bTag));
}

private static class FooDoFn extends DoFn{
constructor(TupleTag aTag, TupleTag bTag){
// set fields
}
public void processElement(Context ctx){
var itemFromA = ctx.element().get(this.aTag);
if(itemFromA != null) { logic }

var itemsFromB = ctx.element().get(this.bTag);
if(itemFromB != null) { logic } // adding these to a state variable would
effectively be an unbounded side input
}
}
}

Hope it helps,
Cristian

[1]
https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/
[2]
https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html

On Thu, Jul 21, 2022 at 1:45 AM Sahil Modak 
wrote:

> Hi,
>
> We are looking to use the side input feature for one of our DoFns. The
> side input has to be a PCollection which is being constructed from a
> subscription using PubsubIO.read
>
> We want our primary DoFn which operates in a global window KV pair to
> access this side input.
> The goal is to have all the messages of this unbounded source (side input)
> to be available across all the KV pairs in our input DoFn which will use
> this side input.
>
> Is it possible to have an unbounded source (like pubsub) as a side input?
>
> Thanks,
> Sahil
>


Re: [DISCUSS] Next steps for update of Avro dependency in Beam

2022-05-13 Thread Cristian Constantinescu
Hey everyone,

I appreciate that the Beam community is having another look at this.

I don't have any strong opinions on these options, but think
that extracting Avro in its own extension is the better choice.

I've already given a stab at extracting Avro in its own module (PR: 12748
[1]). The way I see it, this is less of a technical challenge and more of a
Beam community consensus challenge because of backwards compatibility and
historical reasons. If the beam community does reach consensus, I'd be more
than happy to do the work.

Cheers,
Cristian

[1] https://github.com/apache/beam/pull/12748

On Fri, May 13, 2022 at 8:56 AM Moritz Mack  wrote:

> Thanks so much for all these pointers, Alexey. Having that context really
> helps!
>
>
>
> Skimming through the past conversations, this one key consideration hasn’t
> changed and seems still critical:
>
> AvroCoder is the de facto standard for encoding complex user types (with
> SchemaCoder still being experimental).
>
> Consequently, any backwards incompatible change in that respect would
> massively impact users.
>
>
>
> Having that in mind, here’s my 2 cents on the options mentioned:
>
>
>
>1. Bump Avro in core:
>   1. This will seamlessly work for users that just need a coder for
>   complex types, but don’t use Avro themselves.
>   2. Anyone using Avro APIs from user code might get into trouble.
>2. Support different Avro versions & making the dependency provided
>   1. In terms of user impact, making Avro provided will be very
>   troublesome. Unless Avro remains available on the classpath through
>   different ways, this would break things in a really bad way. Worst of 
> all,
>   it might not even break at compile time but at runtime only.
>   2. I don’t think this option requires Avro being a provided
>   dependency. If the default is kept as is, anyone can (safely) bump Avro 
> to
>   a higher (supported) version on their classpath. In that case it would 
> be
>   good to warn users about CVEs in logs if they remain on an old version 
> of
>   Avro. But I’m not sure how feasible (reliable) it is to detect the Avro
>   minor version given that things might be repackaged into fat jars.
>3. Extract Avro as an extension
>   1. I agree with Brian, I don’t think there’s a need to keep a
>   shaded/vendored Avro in core… but
>   2. It’s pretty hard to get this done without causing trouble for
>   users. Making this a seamless migration would require core to depend on 
> the
>   new extension module and to maintain all the old public user facing 
> APIs.
>   3. Duplicating packages between core and the new extension module
>   is going to cause issues with Java 11. That pretty much means existing
>   public user facing APIs have to remain as a (deprecated) façade in core
>   using the code in the new extension.
>   4. The above makes the effort almost look worthless in the short
>   term, but it would certainly help to have a well-defined scope that can 
> be
>   tested for compatibility with multiple different Avro versions.
>
>
>
> To finally make progress on this topic, I’d suggest starting off with
> supporting multiple different Avro versions in core (2) but to keep the
> default Avro version as is.
>
> At the same time, extracting Avro as extension in a* backwards compatible
> *way seems to be the right path forward long term together with a well
> communicated deprecation plan.
>
>
>
> Best,
>
> Moritz
>
>
>
>
>
> *From: *Brian Hulette 
> *Date: *Thursday, 12. May 2022 at 23:21
> *To: *dev 
> *Subject: *Re: [DISCUSS] Next steps for update of Avro dependency in Beam
>
> Regarding Option (3) "but keep and shade Avro for “core” needs as v.1.8.2
> (still have an issue with CVEs)" Do we actually need to keep avro in core
> for any reason? I thought we only had it in core for AvroCoder, schema
> support, and
>
> ZjQcmQRYFpfptBannerStart
>
> *This Message Is From an External Sender *
>
> This message came from outside your organization.
>
> Exercise caution when opening attachments or clicking any links.
>
> ZjQcmQRYFpfptBannerEnd
>
> Regarding Option (3) "but keep and shade Avro for “core” needs as v.1.8.2
> (still have an issue with CVEs)"
>
>
>
> Do we actually need to keep avro in core for any reason? I thought we only
> had it in core for AvroCoder, schema support, and IOs, which I think are
> all reasonable to separate out into an extension (this would be comparable
> to the protobuf extension). To confirm I just grepped for files in core
> that import avro:
>
> ❯ grep -liIrn 'import org\.apache\.avro' sdks/java/core/src/main
>
> sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
>
> sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java
>
> sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
>
> 

Re: Bean 2.36.0 + Flink 1.13 appears to be broken

2022-02-08 Thread Cristian Constantinescu
Hey Tomo,

Thanks for the tip! It turns out my deployment project (the one that
creates the fat jar) and my pipelines project (the one with actual code)
had mismatching Beam versions.

User error, sorry about that.

Thanks for your help,
Cristian

On Tue, Feb 8, 2022 at 3:32 PM Tomo Suzuki  wrote:

> IncompatibleClassChangeError usually occurs when some a class comes from
> an older version of JAR file.
>
> Do you know which JAR file provides
> "org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum"
> when the exception happens?
>
> I often use "getProtectionDomain()"
> https://stackoverflow.com/a/56000383/975074 to find the JAR file from a
> class.
>
>
> On Tue, Feb 8, 2022 at 3:18 PM Cristian Constantinescu 
> wrote:
>
>> Hi everyone,
>>
>> I am very excited with the 2.36 release, especially the stopReadOffset
>> addition to the KafkaSourceDescriptors. With it, I can read sections of a
>> topic and create state,effectively having a bounded kafka source, before
>> reading new items that need processing.
>>
>> Unfortunately, running the pipeline from the Flink CLI produces the
>> following error:
>>
>> Pretty printing Flink args:
>> --detached
>> --class=namespace.pipeline.App
>> /opt/bns/etf/data-platform/user_code/pipelines/cerberus-etl-pipelines.jar
>> --configFilePath=/path/to/config.properties
>> --runner=FlinkRunner
>> --streaming
>> --checkpointingInterval=3
>> --stateBackend=filesystem
>> --stateBackendStoragePath=file:///path/to/state
>> --numberOfExecutionRetries=2
>> --fasterCopy
>> --debugThrowExceptions
>> java.lang.IncompatibleClassChangeError: Class
>> org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum does
>> not implement the requested interface
>> org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum
>> at
>> org.apache.beam.sdk.transforms.resourcehints.ResourceHints.getUrn(ResourceHints.java:50)
>> at
>> org.apache.beam.sdk.transforms.resourcehints.ResourceHints.(ResourceHints.java:54)
>> at org.apache.beam.sdk.Pipeline.(Pipeline.java:523)
>> at org.apache.beam.sdk.Pipeline.create(Pipeline.java:157)
>> at lines containing Pipeline.create(options) <--- my code
>> at namespace.pipeline.App.main(App.java:42) <-- my code
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source)
>> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>
>> Any advice would be appreciated.
>>
>> Thank you,
>> Cristian
>>
>
>
> --
> Regards,
> Tomo
>


Bean 2.36.0 + Flink 1.13 appears to be broken

2022-02-08 Thread Cristian Constantinescu
Hi everyone,

I am very excited with the 2.36 release, especially the stopReadOffset
addition to the KafkaSourceDescriptors. With it, I can read sections of a
topic and create state,effectively having a bounded kafka source, before
reading new items that need processing.

Unfortunately, running the pipeline from the Flink CLI produces the
following error:

Pretty printing Flink args:
--detached
--class=namespace.pipeline.App
/opt/bns/etf/data-platform/user_code/pipelines/cerberus-etl-pipelines.jar
--configFilePath=/path/to/config.properties
--runner=FlinkRunner
--streaming
--checkpointingInterval=3
--stateBackend=filesystem
--stateBackendStoragePath=file:///path/to/state
--numberOfExecutionRetries=2
--fasterCopy
--debugThrowExceptions
java.lang.IncompatibleClassChangeError: Class
org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum does
not implement the requested interface
org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum
at
org.apache.beam.sdk.transforms.resourcehints.ResourceHints.getUrn(ResourceHints.java:50)
at
org.apache.beam.sdk.transforms.resourcehints.ResourceHints.(ResourceHints.java:54)
at org.apache.beam.sdk.Pipeline.(Pipeline.java:523)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:157)
at lines containing Pipeline.create(options) <--- my code
at namespace.pipeline.App.main(App.java:42) <-- my code
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Any advice would be appreciated.

Thank you,
Cristian


Re: Beam State with the Flink Runner when things go wrong

2022-02-04 Thread Cristian Constantinescu
Hi everyone,

I did a little more testing.

Passing the Flink "-s" flag to Flink CLI to submit the job correctly
restores it from the given checkpoint:
flink run --detached -s "checkpoint path" -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2 ... other flink parameters as specified in [1]

Passing the beam Flink pipeline option "savepoint_path" to the Flink CLI to
submit the job does NOT correctly restore it from the given checkpoint. For
the same reason as before, "flink_master" is not being set.
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --savepoint_path="checkpoint path" etc...

However, setting "flink_master" doesn't seem to work with the Flink CLI,
getting "The RemoteEnvironment cannot be instantiated when running in a
pre-defined context (such as Command Line Client, Scala Shell or
TestEnvironment)."
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --flink_master=localhost:portnumber
--savepoint_path="checkpoint path" etc...

As a workaround for now, I will use the -s parameter with the Flink CLI.
I'm surprised that this just came up, I'd think that restoring from
savepoint/checkpoint with Flink and Beam is a pretty common usage scenario.
I guess people would use the Flink parameters when possible. In my case, I
prefer to find the latest checkpoint/savepoint in code and use
options.setSavepointPath(path) programmatically before
Pipeline.create(options) call is made.

Cheers,
Cristian

[1] https://beam.apache.org/documentation/runners/flink/


On Fri, Feb 4, 2022 at 10:17 AM Cristian Constantinescu 
wrote:

> Hey Jan,
>
> I agree that silently ignoring the parameter is misleading and, in my
> case, time consuming.
>
> I will gladly create the JIRA and PR. I do have some other things I want
> to contribute to Beam. Will get to them soon.
>
> On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský  wrote:
>
>> +dev 
>>
>> Hi Cristian,
>>
>> the savepointPath should not be ignored. We need to verify if local
>> environment supports savepoints (I suppose it does) and in that case we
>> should use it. In the case it does not we should throw exception as silent
>> ignoring of the savepoint is misleading.
>>
>> Would you file a JIRA? Or possibly create a PR to fix this?
>>
>> Best,
>>
>>  Jan
>> On 2/3/22 07:12, Cristian Constantinescu wrote:
>>
>> Hi everyone,
>>
>> I've done some digging within the Beam source code. It looks like when
>> the flinkMaster argument is not set, the savepointPath is not used at all.
>> [1]
>>
>> In fact the only time the savepointPath argument is used within all of
>> Beam's source code is on lines 183 and 186 of the same file. [2]
>>
>> Of course, I did all my testing locally on my dev box with the embedded
>> Flink cluster that Beam starts, which from the looks of it, does NOT use
>> the savepointPath at all.
>>
>> If someone familiar with the code can confirm this finding, I can update
>> the documentation to explicitly state that savepoint resuming is not
>> supported locally.
>>
>> I will do more testing around this with a real Flink cluster and see if
>> the behavior is different than the one described in my first email.
>>
>> Thanks,
>> Cristian
>>
>> [1]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
>> [2]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>>
>> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu 
>> wrote:
>>
>>> Hey Pavel,
>>>
>>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>>> from the IDE, copying by hand:
>>>
>>> KafkaIO.read()
>>> .withBootStrapServer("address")
>>> .withTopic("topic")
>>>
>>> .withKeyDeserializer(StringDeserializer.class)
>>>
>>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>>
>>> .withConsumerConfigUpdates(map)
>>> .withReadCommitted()
>>> .commitOffsetInFinalize()
>>>
>>> .withProcessingTime();
>>>
>>>
>>> The config map is:
>>> enable.auto.commit -> false
>>> group.id -> some group
>>> auto.offset.reset -> earliest
>>> specific.avro.reader -> false
>>>
>>>
>>> On Wed, Feb 2, 202

Re: Beam State with the Flink Runner when things go wrong

2022-02-04 Thread Cristian Constantinescu
Hey Jan,

I agree that silently ignoring the parameter is misleading and, in my case,
time consuming.

I will gladly create the JIRA and PR. I do have some other things I want to
contribute to Beam. Will get to them soon.

On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský  wrote:

> +dev 
>
> Hi Cristian,
>
> the savepointPath should not be ignored. We need to verify if local
> environment supports savepoints (I suppose it does) and in that case we
> should use it. In the case it does not we should throw exception as silent
> ignoring of the savepoint is misleading.
>
> Would you file a JIRA? Or possibly create a PR to fix this?
>
> Best,
>
>  Jan
> On 2/3/22 07:12, Cristian Constantinescu wrote:
>
> Hi everyone,
>
> I've done some digging within the Beam source code. It looks like when the
> flinkMaster argument is not set, the savepointPath is not used at all. [1]
>
> In fact the only time the savepointPath argument is used within all of
> Beam's source code is on lines 183 and 186 of the same file. [2]
>
> Of course, I did all my testing locally on my dev box with the embedded
> Flink cluster that Beam starts, which from the looks of it, does NOT use
> the savepointPath at all.
>
> If someone familiar with the code can confirm this finding, I can update
> the documentation to explicitly state that savepoint resuming is not
> supported locally.
>
> I will do more testing around this with a real Flink cluster and see if
> the behavior is different than the one described in my first email.
>
> Thanks,
> Cristian
>
> [1]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
> [2]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>
> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu 
> wrote:
>
>> Hey Pavel,
>>
>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>> from the IDE, copying by hand:
>>
>> KafkaIO.read()
>> .withBootStrapServer("address")
>> .withTopic("topic")
>>
>> .withKeyDeserializer(StringDeserializer.class)
>>
>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>
>> .withConsumerConfigUpdates(map)
>> .withReadCommitted()
>> .commitOffsetInFinalize()
>>
>> .withProcessingTime();
>>
>>
>> The config map is:
>> enable.auto.commit -> false
>> group.id -> some group
>> auto.offset.reset -> earliest
>> specific.avro.reader -> false
>>
>>
>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin 
>> wrote:
>>
>>> Hello Christian,
>>>
>>> Thanks for posting here the detailed scenario of your experiments. I
>>> think it may be important to share your KafkaIO configuration here too. For
>>> example, are you setting this config anyhow?
>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu 
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I'm trying to figure out how pipeline state works with Beam running on
>>>> Flink Classic. Would appreciate some help with the below.
>>>>
>>>> My understanding is that on recovery (whether from a checkpoint or
>>>> savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
>>>> with whatever state they had when the pipeline crashed. For example the
>>>> Kafka operator might contain the latest *safe* offset to restart from. But
>>>> I'm not seeing this when I introduce exceptions in the pipeline.
>>>>
>>>> My pipeline is as follows:
>>>> 1. Read a Kafka topic from start
>>>> 2. Have a DoFn that stores all incoming messages in a BagState
>>>> 3. Above DoFn triggers a timer set in such a way that it triggers after
>>>> there are a few checkpoints created and kept because of
>>>> --externalizeCheckpointsEnabled = true. This timer handler then outputs the
>>>> elements to the next operator, in this case KafkaIo.Write.
>>>> 4. Before the timer in #3 

Re: [DISCUSS] Migrate Jira to GitHub Issues?

2022-01-31 Thread Cristian Constantinescu
I've been semi-following this thread, apologies if this has been raised
already.

>From a user point of view, in some corporate environments (that I've worked
at), Github is blocked. That includes the issues part. The Apache Jira is
not blocked and does at times provide value while investigating issues.

Obviously, users stuck in those unfortunate circonstances can just use
their personal device. Not advocating any direction on the matter, just
putting this out there.

On Mon, Jan 31, 2022 at 12:21 PM Zachary Houfek  wrote:

> I added a suggestion that I don't think was discussed here:
>
> I know that we currently can link multiple PRs to a single Jira, but
> GitHub assumes a PR linked to an issue fixes the issue. You also need write
> access to the repository to link the PR outside of using a "closing
> keyword". (For reference: Linking a pull request to an issue
> 
> )
>
> I'm not sure how much this could sway the decisions but thought it was
> worth bringing up.
>
> Regards,
> Zach
>
> On Mon, Jan 31, 2022 at 12:06 PM Jarek Potiuk  wrote:
>
>> Just a comment here to clarify the labels from someone who has been using
>> both - ASF (and not only) JIRA and GitHub.
>>
>> The experience from  JIRA labels might be awfully misleading. The JIRA
>> labels are a mess in the ASF because they are shared between projects and
>> everyone can create a new label. "Mess" is actually quite an understatement
>> IMHO.
>>
>> The labels in GitHub Issues are "per-project" and they can only be added
>> and modified by maintainers (and only maintainers and "issue triagers" can
>> actually assign them other than the initial assignment when you create an
>> issue.
>>
>> Thanks to that, it is much easier to agree on the common "conventions" to
>> use and avoid creating new ones accidentally.
>>
>> We have quite a success with using the labels in Airflow as we use some
>> of the stuff below:
>>
>> Re - some fancy enforcement/management, yeah. There are good techniques
>> to control how/when the labels are attached:
>>
>> 1) You can create separate templates for Bugs/Features that can have
>> different labels pre-assigned. See here:
>> https://github.com/apache/airflow/tree/main/.github/ISSUE_TEMPLATE -
>> this way you can delegate to the users to make basic "label choice" when
>> they enter issues (though limited - 4-5 types of issues to choose from is
>> really maximum what is reasonable).
>> 2) The same "Issue Templates" already have the option to choose
>> "selectable fields" at entry - you can define free-form entries, drop-down,
>> checkboxes and a few others. This is as close as it can get to "fields".
>> Then (this is something you'd have to code) you could easily write or use
>> an existing GithubAction or bot that will assign the labels based on the
>> initial selection done by the user at entry. We have not done it yet but we
>> might.
>> 3) In PRs you can (and we do that in Airflow) write your bot or use
>> existing GitHub Actions to automatically select the labels based on the
>> "files" that have been changed in the PR: We are doing precisely that in
>> airflow and it works pretty well:
>> https://github.com/apache/airflow/blob/main/.github/boring-cyborg.yml
>>
>> You are in full control, and you can choose the convention and approach
>> for the project.
>> There are literally hundreds of GitHub Actions out there and you can
>> easily write a new one to manage it and you do not need anything but PR
>> merged to the repository to enable and configure those actions.
>> As maintainers, you do not have to create an INFRA JIRA(ehm!) tickets to
>> manage them. You do not have to share anything with other projects.
>>
>> That's my 2 cents :)
>>
>> J.
>>
>>
>> On Mon, Jan 31, 2022 at 5:45 PM Kenneth Knowles  wrote:
>>
>>> Maybe controversial: I think some things implemented "via labels"
>>> shouldn't get full credit so I suggested changing them from green to yellow
>>> :-)
>>>
>>> There's a really big difference between a free-form label and a field
>>> where you know that there is exactly one value and the value is from a
>>> limited set of options. For example priorities could be missing, duplicate
>>> (both "P1" and "P2") or invented ("P8") unless labels have the ability to
>>> have some fancy enforcement (I haven't looked at this). Same for resolution
>>> status (is "Not a problem" just a label added as commentary or is it a
>>> resolution status?) and issue type (something could be marked "bug" and
>>> "feature request").
>>>
>>> Kenn
>>>
>>> On Mon, Jan 31, 2022 at 8:38 AM Kenneth Knowles  wrote:
>>>
 Great. I added a few items to the "summary of discussion" even though
 they weren't discussed here just to identify things that I care about and
 how you might do them in GitHub Issues.

 Kenn

 On Sat, Jan 29, 2022 at 6:20 PM Aizhamal Nurmamat kyzy <
 aizha...@apache.org> wrote:

> 

Potential bug: AutoValue + Memoized fields

2021-10-28 Thread Cristian Constantinescu
Hi everyone,

Looks like Beam has a little bit of an issue when using AutoValues with
Memoized (cached) fields. It's not a big issue, and the workaround is
simply not using Memoised fields at the cost of a little performance. (See
comment in code snippet)

The code further below produces this exception:
Exception in thread "main" java.lang.NullPointerException
at
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createGetter(JavaBeanUtils.java:155)
at
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$1(JavaBeanUtils.java:145)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$2(JavaBeanUtils.java:146)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
at
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.getGetters(JavaBeanUtils.java:140)
at
org.apache.beam.sdk.schemas.AutoValueSchema.fieldValueGetters(AutoValueSchema.java:72)
at org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
at org.apache.beam.sdk.values.RowWithGetters.(RowWithGetters.java:66)
at
org.apache.beam.sdk.values.Row$Builder.withFieldValueGetters(Row.java:835)
at
org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$ToRowWithValueGetters.apply(GetterBasedSchemaProvider.java:64)
at
org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$ToRowWithValueGetters.apply(GetterBasedSchemaProvider.java:49)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at
org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:413)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:370)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:277)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177)
at org.whatever.testing.App.main(App.java:24)


package org.whatever.testing;

import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

import java.util.Arrays;

public class App {

public static void main(String[] args) {
var options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();

var p = Pipeline.create(options);
p

.apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build(
.apply(Convert.to(FooAutoValue.class))

.apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i
-> {
System.out.println(i.toString());
return i;
}))
;
p.run().waitUntilFinish();
}

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public static abstract class FooAutoValue {
public abstract String getDummyProp();

@Memoized // <-- commenting this line makes everything work
public String getSomething(){
return "sldj";
}

@SchemaCreate
public static FooAutoValue create(String dummyProp) {
return builder()
.setDummyProp(dummyProp)
.build();
}

public static Builder builder() {
return new AutoValue_App_FooAutoValue.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setDummyProp(String newDummyProp);

public abstract FooAutoValue build();
}
}
}


>From what I can see, instead of getting the fields from the abstract class,
it's 

Re: Potential Bug: Beam + Flink + AutoValue Builders

2021-10-26 Thread Cristian Constantinescu
Hi Reuven,

Thanks for the quick reply.

Could you elaborate why Beam needs to create the Builder dynamically
through reflection (basically using reflection to create an instance of a
package private class)? As far as AutoValue goes, it looks like an
anti-pattern to try to create an instance of the generated builder by
calling the AutoValue generated class (AutoValue_App_FooAutoValue in this
case). I think that normally, the only place that can call the auto
generated builder constructor is from the user code abstract class
(FooAutoValue) through:

public static Builder builder() {
return new AutoValue_App_FooAutoValue.Builder();
}

In fact, this method is directly called when using the @SchemaCreate
method, regardless if the create method is called through reflection or
not. I guess what I'm asking is, could beam not call the
FooAutoValue.builder() dynamically if directly is not possible?


On Tue, Oct 26, 2021 at 2:15 PM Reuven Lax  wrote:

> Beam needs to create these elements dynamically. when decoding records, so
> it can't easily call the builder directly.
>
> My first guess is that there's a classloader issue here. Flink does some
> fancy classloader munging, and that might be breaking an assumption in this
> code. Passing in the correct classloader should hopefully fix this.
>
> Reuven
>
>
> On Tue, Oct 26, 2021 at 10:59 AM Cristian Constantinescu 
> wrote:
>
>> Hi everyone,
>>
>> Not sure if anyone is using Beam with the Flink Runner and AutoValue
>> builders. For me, it doesn't work. I have some questions and a workaround
>> for anyone in the same boat.
>>
>> Beam 2.31, Flink 1.13, AutoValue 1.8.2
>>
>> Here's the code:
>>
>> package org.whatever.testing;
>>
>> import com.google.auto.value.AutoValue;
>> import org.apache.beam.sdk.Pipeline;
>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>> import org.apache.beam.sdk.schemas.AutoValueSchema;
>> import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
>> import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
>> import org.apache.beam.sdk.schemas.transforms.Convert;
>> import org.apache.beam.sdk.transforms.Create;
>> import org.apache.beam.sdk.transforms.MapElements;
>> import org.apache.beam.sdk.values.TypeDescriptor;
>>
>> import java.util.Arrays;
>>
>> public class App {
>>
>> public static void main(String[] args) {
>> var options = 
>> PipelineOptionsFactory.fromArgs(args).withValidation().create();
>>
>> var p = Pipeline.create(options);
>> p
>> 
>> .apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build(
>> .apply(Convert.to(FooAutoValue.class))
>> 
>> .apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i -> {
>> System.out.println(i.toString());
>> return i;
>> }))
>> ;
>> p.run().waitUntilFinish();
>> }
>> @AutoValue
>> @DefaultSchema(AutoValueSchema.class)
>> public static abstract class FooAutoValue {
>> public abstract String getDummyProp();
>>
>> //@SchemaCreate
>> //public static FooAutoValue create(String dummyProp) {
>> //return builder()
>> //.setDummyProp(dummyProp)
>> //.build();
>> //}
>>
>> public static Builder builder() {
>> return new AutoValue_App_FooAutoValue.Builder();
>> }
>>
>> @AutoValue.Builder
>> public abstract static class Builder {
>> public abstract Builder setDummyProp(String newDummyProp);
>>
>> public abstract FooAutoValue build();
>> }
>> }
>> }
>>
>> Note that it doesn't matter if FooAutoValue is an inner class or in its
>> own file as a top level non static class. For simplicity here I'm
>> converting the objects to the same class, in prod code the input is of
>> another type with equivalent schema.
>>
>> And the stack trace:
>>
>> Caused by: java.lang.IllegalAccessError: failed to access class
>> org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class
>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1
>> (org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed
>> module of loader 'app';
>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 is in
>> unnamed module of loader org.apache.flink.util.

Potential Bug: Beam + Flink + AutoValue Builders

2021-10-26 Thread Cristian Constantinescu
Hi everyone,

Not sure if anyone is using Beam with the Flink Runner and AutoValue
builders. For me, it doesn't work. I have some questions and a workaround
for anyone in the same boat.

Beam 2.31, Flink 1.13, AutoValue 1.8.2

Here's the code:

package org.whatever.testing;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

import java.util.Arrays;

public class App {

public static void main(String[] args) {
var options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();

var p = Pipeline.create(options);
p

.apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build(
.apply(Convert.to(FooAutoValue.class))

.apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i
-> {
System.out.println(i.toString());
return i;
}))
;
p.run().waitUntilFinish();
}
@AutoValue
@DefaultSchema(AutoValueSchema.class)
public static abstract class FooAutoValue {
public abstract String getDummyProp();

//@SchemaCreate
//public static FooAutoValue create(String dummyProp) {
//return builder()
//.setDummyProp(dummyProp)
//.build();
//}

public static Builder builder() {
return new AutoValue_App_FooAutoValue.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setDummyProp(String newDummyProp);

public abstract FooAutoValue build();
}
}
}

Note that it doesn't matter if FooAutoValue is an inner class or in its own
file as a top level non static class. For simplicity here I'm converting
the objects to the same class, in prod code the input is of another type
with equivalent schema.

And the stack trace:

Caused by: java.lang.IllegalAccessError: failed to access class
org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class
org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1
(org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed
module of loader 'app';
org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 is in
unnamed module of loader org.apache.flink.util.ChildFirstClassLoader
@26f4afda)
at
org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1.create(Unknown
Source)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:96)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:66)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:95)
at
org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:518)
at
org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:500)
at
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
at
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
at
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:80)
at
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)


Workaround:
- Uncomment the schemaCreate method
- compile the code with "-parameters", if using maven:


org.apache.maven.plugins
maven-compiler-plugin
3.8.1



-parameters





My questions:
1. Why is Beam trying to get the generated AutoValue Builder through
reflection using AutoValueUtils.getAutoValueGeneratedName (builds
"AutoValue_App_FooAutoValue$Builder") instead of calling
FooAutoValue.builder() directly without reflection?
2. With flink, given the fancy classloader work Flink does [1], in the
AutoValueUtils.createBuilderCreator 

Re: Why is Avro Date field using InstantCoder?

2021-10-22 Thread Cristian Constantinescu
Hi everyone,

I'm still trying to figure out what the best path forward is on my project.
While doing some research, I came across the confluent interoperability
page [1]. Beam is currently using version 5.3.2 of the confluent libs. They
have an end of support on July 19, 2022. It's the last version that
supports Avro 1.8 (actually 1.8.1). The 5.4.x confluent libs passes to Avro
1.9.1 and 6.2.x (latest) goes to Avro 1.10.1 (latest being 1.10.2).

What's the plan in this case, where dependencies reach their end of life?
Does the Beam plan on staying on the 5.3.2 version even if they are not
supported anymore?

Thanks,
Cristian

[1]
https://docs.confluent.io/platform/current/installation/versions-interoperability.html

On Mon, Oct 18, 2021 at 2:42 PM Brian Hulette  wrote:

> Note there was some work done to make Beam work with Avro 1.9 [1]
> (presumably this only works with joda time though? certainly the bytebuddy
> generated code would). Avro 1.9 compatibility is not verified continuously
> though, there was an effort a while ago to test Beam against multiple
> versions of Avro [2] but I don't think it went anywhere. There's also some
> discussion about API compatibility concerns in [2].
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-9144
> [2]
> https://lists.apache.org/thread.html/r2739eb540ea2b8d8c50db71850361b8b1347df66e4357001b334987b%40%3Cdev.beam.apache.org%3E
>
> On Mon, Oct 18, 2021 at 11:19 AM Daniel Collins 
> wrote:
>
>> > I see a lot of static calls
>>
>> A lot of these calls bottom out in usage of ServiceLoader
>> <http://docs.oracle.com/javase/6/docs/api/java/util/ServiceLoader.html>,
>> effectively acting as an ergonomic API on top of it, with AutoService
>> <https://github.com/google/auto/tree/master/service> used to register
>> new handlers. If they're not currently, and there's some extension point
>> which would be helpful to you, its quite likely that you could get buy in
>> to adding another such extension point.
>>
>> > after I find a workaround
>>
>> I think if the issue is that AvroUtils currently uses an old version of
>> avro, forking it into your own package and version bumping it might be a
>> good place to start for a workaround. Any changes committed to the beam
>> repo will take 1-2 months to make it to a non-snapshot build even if you do
>> find a long term solution acceptable to all interested parties.
>>
>> -Daniel
>>
>> On Mon, Oct 18, 2021 at 1:46 PM Cristian Constantinescu 
>> wrote:
>>
>>> I will have a look after I find a workaround as I really need to deliver
>>> some things and using Avro 1.8 isn't really an option.
>>>
>>> But once that's done, I'd love to find ways to make Beam less dependent
>>> on Avro 1.8 considering it was released in 2017.
>>>
>>> On Mon, Oct 18, 2021 at 12:34 PM Reuven Lax  wrote:
>>>
>>>> Do you know if it's easy to detect which version of Avro is being used?
>>>>
>>>> On Sun, Oct 17, 2021 at 10:20 PM Cristian Constantinescu <
>>>> zei...@gmail.com> wrote:
>>>>
>>>>> If I had to change things, I would:
>>>>>
>>>>> 1. When deriving the SCHEMA add a few new types (JAVA_TIME, JAVA_DATE
>>>>> or something along those lines).
>>>>> 2. RowCoderGenerator around line 159 calls
>>>>> "SchemaCoder.coderForFieldType(schema.getField(rowIndex).getType().withNullable(false));"
>>>>> which eventually gets to SchemaCoderHelpers.coderForFieldType. There,
>>>>> CODER_MAP has a hard reference on InstantCoder for DATETIME. Maybe that 
>>>>> map
>>>>> can be augmented (possibly dynamically) with new
>>>>> fieldtypes-coder combinations to take care of the new types from #1.
>>>>>
>>>>> I would also like to ask. Looking through the Beam code, I see a lot
>>>>> of static calls. Just wondering why it's done this way. I'm used to
>>>>> projects having some form of dependency injection involved and static 
>>>>> calls
>>>>> being frowned upon (lack of mockability, hidden dependencies, tight
>>>>> coupling etc). The only reason I can think of is serializability given
>>>>> Beam's multi-node processing?
>>>>>
>>>>>
>>>>> On Sat, Oct 16, 2021 at 3:11 AM Reuven Lax  wrote:
>>>>>
>>>>>> Is the Schema inference the only reason we can't upgrade Avro, or are
>>>>>> there other blockers? Is there any way we can tell at runtime which 
>&

Re: Why is Avro Date field using InstantCoder?

2021-10-17 Thread Cristian Constantinescu
If I had to change things, I would:

1. When deriving the SCHEMA add a few new types (JAVA_TIME, JAVA_DATE or
something along those lines).
2. RowCoderGenerator around line 159 calls
"SchemaCoder.coderForFieldType(schema.getField(rowIndex).getType().withNullable(false));"
which eventually gets to SchemaCoderHelpers.coderForFieldType. There,
CODER_MAP has a hard reference on InstantCoder for DATETIME. Maybe that map
can be augmented (possibly dynamically) with new
fieldtypes-coder combinations to take care of the new types from #1.

I would also like to ask. Looking through the Beam code, I see a lot of
static calls. Just wondering why it's done this way. I'm used to projects
having some form of dependency injection involved and static calls being
frowned upon (lack of mockability, hidden dependencies, tight coupling
etc). The only reason I can think of is serializability given Beam's
multi-node processing?


On Sat, Oct 16, 2021 at 3:11 AM Reuven Lax  wrote:

> Is the Schema inference the only reason we can't upgrade Avro, or are
> there other blockers? Is there any way we can tell at runtime which version
> of Avro is running? Since we generate the conversion code at runtime with
> ByteBuddy, we could potentially just generate different conversions
> depending on the Avro version.
>
> On Fri, Oct 15, 2021 at 11:56 PM Cristian Constantinescu 
> wrote:
>
>> Those are fair points. However please consider that there might be new
>> users who will decide that Beam isn't suitable because of things like
>> requiring Avro 1.8, Joda time, old Confluent libraries, and, when I started
>> using Beam about a year ago, Java 8 (I think we're okay with Java 11 now).
>>
>> I guess what I'm saying is that there's definitely a non-negligible cost
>> associated with old 3rd party libs in Beam's code (even if efforts are put
>> in to minimize them).
>>
>> On Sat, Oct 16, 2021 at 2:33 AM Reuven Lax  wrote:
>>
>>>
>>>
>>> On Fri, Oct 15, 2021 at 11:13 PM Cristian Constantinescu <
>>> zei...@gmail.com> wrote:
>>>
>>>> To my knowledge and reading through AVRO's Jira[1], it does not support
>>>> jodatime anymore.
>>>>
>>>> It seems everything related to this Avro 1.8 dependency is tricky. If
>>>> you recall, it also prevents us from upgrading to the latest Confluent
>>>> libs... for enabling Beam to use protobufs schemas with the schema
>>>> registry. (I was also the one who brought that issue up, also made an
>>>> exploratory PR to move AVRO outside of Beam core.)
>>>>
>>>> I understand that Beam tries to maintain public APIs stable, but I'd
>>>> like to put forward two points:
>>>> 1) Schemas are experimental, hence there shouldn't be any API
>>>> stability guarantees there.
>>>>
>>>
>>> Unfortunately at this point, they aren't really. As a community we've
>>> been bad about removing the Experimental label - many core, core parts of
>>> Beam are still labeled experimental (sources, triggering, state, timers).
>>> Realistically they are no longer experimental.
>>>
>>> 2) Maybe this is the perfect opportunity for the Beam community to think
>>>> about the long term effects of old dependencies within Beam's codebase, and
>>>> especially how to deal with them. Perhaps starting/maintaining an
>>>> "experimental" branch/maven-published-artifacts where Beam does not
>>>> guarantee backwards compatibility (or maintains it for a shorter period of
>>>> time) is something to think about.
>>>>
>>>
>>> This is why we usually try to prevent third-party libraries from being
>>> in our public API. However in this case, that's tricky.
>>>
>>> The beam community can of course decide to break backwards
>>> compatibility. However as stated today, it is maintained. The last time we
>>> broke backwards compatibility was when the old Dataflow API was
>>> transitioned to Beam, and it was very painful. It took multiple years to
>>> get some users onto the Beam API due to the code changes required to
>>> migrate (and those required code changes weren't terribly invasive).
>>>
>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/AVRO-2335
>>>>
>>>> On Sat, Oct 16, 2021 at 12:40 AM Reuven Lax  wrote:
>>>>
>>>>> Does this mean more recent versions of avro aren't backwards
>>>>> compatible with avro 1.8? If so, this might be tricky to fix, since Beam
>>>>> maintains backwards compatibi

Re: Why is Avro Date field using InstantCoder?

2021-10-16 Thread Cristian Constantinescu
Those are fair points. However please consider that there might be new
users who will decide that Beam isn't suitable because of things like
requiring Avro 1.8, Joda time, old Confluent libraries, and, when I started
using Beam about a year ago, Java 8 (I think we're okay with Java 11 now).

I guess what I'm saying is that there's definitely a non-negligible cost
associated with old 3rd party libs in Beam's code (even if efforts are put
in to minimize them).

On Sat, Oct 16, 2021 at 2:33 AM Reuven Lax  wrote:

>
>
> On Fri, Oct 15, 2021 at 11:13 PM Cristian Constantinescu 
> wrote:
>
>> To my knowledge and reading through AVRO's Jira[1], it does not support
>> jodatime anymore.
>>
>> It seems everything related to this Avro 1.8 dependency is tricky. If you
>> recall, it also prevents us from upgrading to the latest Confluent libs...
>> for enabling Beam to use protobufs schemas with the schema registry. (I was
>> also the one who brought that issue up, also made an exploratory PR to move
>> AVRO outside of Beam core.)
>>
>> I understand that Beam tries to maintain public APIs stable, but I'd like
>> to put forward two points:
>> 1) Schemas are experimental, hence there shouldn't be any API
>> stability guarantees there.
>>
>
> Unfortunately at this point, they aren't really. As a community we've been
> bad about removing the Experimental label - many core, core parts of Beam
> are still labeled experimental (sources, triggering, state, timers).
> Realistically they are no longer experimental.
>
> 2) Maybe this is the perfect opportunity for the Beam community to think
>> about the long term effects of old dependencies within Beam's codebase, and
>> especially how to deal with them. Perhaps starting/maintaining an
>> "experimental" branch/maven-published-artifacts where Beam does not
>> guarantee backwards compatibility (or maintains it for a shorter period of
>> time) is something to think about.
>>
>
> This is why we usually try to prevent third-party libraries from being in
> our public API. However in this case, that's tricky.
>
> The beam community can of course decide to break backwards compatibility.
> However as stated today, it is maintained. The last time we broke backwards
> compatibility was when the old Dataflow API was transitioned to Beam, and
> it was very painful. It took multiple years to get some users onto the Beam
> API due to the code changes required to migrate (and those required code
> changes weren't terribly invasive).
>
>
>>
>> [1] https://issues.apache.org/jira/browse/AVRO-2335
>>
>> On Sat, Oct 16, 2021 at 12:40 AM Reuven Lax  wrote:
>>
>>> Does this mean more recent versions of avro aren't backwards compatible
>>> with avro 1.8? If so, this might be tricky to fix, since Beam maintains
>>> backwards compatibility on its public API.
>>>
>>> On Fri, Oct 15, 2021 at 5:38 PM Cristian Constantinescu <
>>> zei...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've created a small demo project to show the issue[1].
>>>>
>>>> I've looked at the beam code and all the avro tests use avro 1.8...
>>>> which is hardcoded to Joda, hence why all the tests pass. Avro changed to
>>>> java.time (as most other recent java projects) after 1.8. Is there a plan
>>>> for Beam to do the same?
>>>>
>>>> Does anyone use Avro with java.time instead of joda.time that could
>>>> give me ideas how to make it work?
>>>>
>>>> Thank you,
>>>> Cristian
>>>>
>>>> [1] https://github.com/zeidoo/beamjavadates-poc
>>>>
>>>> On Thu, Oct 14, 2021 at 5:35 PM Brian Hulette 
>>>> wrote:
>>>>
>>>>> +dev 
>>>>>
>>>>> This sounds like a bug in the Avro schema mapping. I *think* the
>>>>> intention is that we generate logic for converting Date to/from Instant
>>>>> when making a getters for a RowWithGetters backed by Avro.
>>>>>
>>>>> Brian
>>>>>
>>>>> On Thu, Oct 14, 2021 at 4:43 AM Cristian Constantinescu <
>>>>> zei...@gmail.com> wrote:
>>>>>
>>>>>> A little bit more color on this.
>>>>>>
>>>>>> That field is nested inside a avro record like so (not syntactically
>>>>>> correct):
>>>>>> {
>>>>>> type: record,
>>>>>> fields: [
>>>>>>  {
>>>>>>   name: whatever,
&g

Re: Why is Avro Date field using InstantCoder?

2021-10-16 Thread Cristian Constantinescu
To my knowledge and reading through AVRO's Jira[1], it does not support
jodatime anymore.

It seems everything related to this Avro 1.8 dependency is tricky. If you
recall, it also prevents us from upgrading to the latest Confluent libs...
for enabling Beam to use protobufs schemas with the schema registry. (I was
also the one who brought that issue up, also made an exploratory PR to move
AVRO outside of Beam core.)

I understand that Beam tries to maintain public APIs stable, but I'd like
to put forward two points:
1) Schemas are experimental, hence there shouldn't be any API
stability guarantees there.
2) Maybe this is the perfect opportunity for the Beam community to think
about the long term effects of old dependencies within Beam's codebase, and
especially how to deal with them. Perhaps starting/maintaining an
"experimental" branch/maven-published-artifacts where Beam does not
guarantee backwards compatibility (or maintains it for a shorter period of
time) is something to think about.

[1] https://issues.apache.org/jira/browse/AVRO-2335

On Sat, Oct 16, 2021 at 12:40 AM Reuven Lax  wrote:

> Does this mean more recent versions of avro aren't backwards compatible
> with avro 1.8? If so, this might be tricky to fix, since Beam maintains
> backwards compatibility on its public API.
>
> On Fri, Oct 15, 2021 at 5:38 PM Cristian Constantinescu 
> wrote:
>
>> Hi all,
>>
>> I've created a small demo project to show the issue[1].
>>
>> I've looked at the beam code and all the avro tests use avro 1.8...
>> which is hardcoded to Joda, hence why all the tests pass. Avro changed to
>> java.time (as most other recent java projects) after 1.8. Is there a plan
>> for Beam to do the same?
>>
>> Does anyone use Avro with java.time instead of joda.time that could give
>> me ideas how to make it work?
>>
>> Thank you,
>> Cristian
>>
>> [1] https://github.com/zeidoo/beamjavadates-poc
>>
>> On Thu, Oct 14, 2021 at 5:35 PM Brian Hulette 
>> wrote:
>>
>>> +dev 
>>>
>>> This sounds like a bug in the Avro schema mapping. I *think* the
>>> intention is that we generate logic for converting Date to/from Instant
>>> when making a getters for a RowWithGetters backed by Avro.
>>>
>>> Brian
>>>
>>> On Thu, Oct 14, 2021 at 4:43 AM Cristian Constantinescu <
>>> zei...@gmail.com> wrote:
>>>
>>>> A little bit more color on this.
>>>>
>>>> That field is nested inside a avro record like so (not syntactically
>>>> correct):
>>>> {
>>>> type: record,
>>>> fields: [
>>>>  {
>>>>   name: whatever,
>>>>   type: record {
>>>>fields: [
>>>>{
>>>>  "name": "somedate",
>>>>  "type: {"type": "int", "logicalType": "date"}
>>>> }
>>>>]
>>>> }
>>>> ]
>>>>
>>>> The outer layer record uses SchemaCoderHelper.coderForFieldType on all
>>>> it's fields to create the coder. However, that method when called for the
>>>> inner record field just returns a SchemaCoder.of(fieldType.getRowSchema())
>>>> which doesn't take into account LogicalTypes.
>>>>
>>>> I think that's where the problem is. If anyone who knows that code
>>>> could have a look and let me know their thoughts, I can try to fix the
>>>> issue if we agree that there is one.
>>>>
>>>> On Thu, Oct 14, 2021 at 7:12 AM Cristian Constantinescu <
>>>> zei...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have the following field in one of my avro schemas:
>>>>>
>>>>> {
>>>>> "name": "somedate",
>>>>> "type: {"type": "int", "logicalType": "date"}
>>>>> }
>>>>>
>>>>> This generates a java.time.LocalDate field in the corresponding java
>>>>> class (call it Foo).
>>>>>
>>>>> AvroUtils.toBeamSchema(FooClass.getSchema()) will return that field as
>>>>> DATETIME in the Beam schema. because of AvroUtils.toFieldType (around line
>>>>> 275, where TimestampMillis and Date are both stored as DATETIME).
>>>>>
>>>>> My problem is that in SchemaCoderHelpers, DATETIME is set to use
>>>>> InstantCoder which expects a joda Instant as input not a LocalDate. So 
>>>>> when
>>>>> my PTransform returns Foo objects it crashes with "class
>>>>> java.time.LocalDate cannot be cast to class org.joda.time.Instant..." when
>>>>> trying to encode that field using InstantCoder.encode().
>>>>>
>>>>> Is there a workaround for this issue?
>>>>>
>>>>> Thank you,
>>>>> Cristian
>>>>>
>>>>> PS: I did search the mailing list and google, but didn't find anything
>>>>> related except a thread on AvroCoder.JodaTimestampConversion, which I 
>>>>> don't
>>>>> think it applies to here.
>>>>>
>>>>


Re: Why is Avro Date field using InstantCoder?

2021-10-15 Thread Cristian Constantinescu
Hi all,

I've created a small demo project to show the issue[1].

I've looked at the beam code and all the avro tests use avro 1.8...
which is hardcoded to Joda, hence why all the tests pass. Avro changed to
java.time (as most other recent java projects) after 1.8. Is there a plan
for Beam to do the same?

Does anyone use Avro with java.time instead of joda.time that could give me
ideas how to make it work?

Thank you,
Cristian

[1] https://github.com/zeidoo/beamjavadates-poc

On Thu, Oct 14, 2021 at 5:35 PM Brian Hulette  wrote:

> +dev 
>
> This sounds like a bug in the Avro schema mapping. I *think* the intention
> is that we generate logic for converting Date to/from Instant when making a
> getters for a RowWithGetters backed by Avro.
>
> Brian
>
> On Thu, Oct 14, 2021 at 4:43 AM Cristian Constantinescu 
> wrote:
>
>> A little bit more color on this.
>>
>> That field is nested inside a avro record like so (not syntactically
>> correct):
>> {
>> type: record,
>> fields: [
>>  {
>>   name: whatever,
>>   type: record {
>>fields: [
>>{
>>  "name": "somedate",
>>  "type: {"type": "int", "logicalType": "date"}
>> }
>>]
>> }
>> ]
>>
>> The outer layer record uses SchemaCoderHelper.coderForFieldType on all
>> it's fields to create the coder. However, that method when called for the
>> inner record field just returns a SchemaCoder.of(fieldType.getRowSchema())
>> which doesn't take into account LogicalTypes.
>>
>> I think that's where the problem is. If anyone who knows that code could
>> have a look and let me know their thoughts, I can try to fix the issue if
>> we agree that there is one.
>>
>> On Thu, Oct 14, 2021 at 7:12 AM Cristian Constantinescu 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have the following field in one of my avro schemas:
>>>
>>> {
>>> "name": "somedate",
>>> "type: {"type": "int", "logicalType": "date"}
>>> }
>>>
>>> This generates a java.time.LocalDate field in the corresponding java
>>> class (call it Foo).
>>>
>>> AvroUtils.toBeamSchema(FooClass.getSchema()) will return that field as
>>> DATETIME in the Beam schema. because of AvroUtils.toFieldType (around line
>>> 275, where TimestampMillis and Date are both stored as DATETIME).
>>>
>>> My problem is that in SchemaCoderHelpers, DATETIME is set to use
>>> InstantCoder which expects a joda Instant as input not a LocalDate. So when
>>> my PTransform returns Foo objects it crashes with "class
>>> java.time.LocalDate cannot be cast to class org.joda.time.Instant..." when
>>> trying to encode that field using InstantCoder.encode().
>>>
>>> Is there a workaround for this issue?
>>>
>>> Thank you,
>>> Cristian
>>>
>>> PS: I did search the mailing list and google, but didn't find anything
>>> related except a thread on AvroCoder.JodaTimestampConversion, which I don't
>>> think it applies to here.
>>>
>>


Re: Schemas with uppercase fields in beans

2021-09-28 Thread Cristian Constantinescu
Hey Brian,

I can't remember exactly, I think that's exactly what I did in point #1 but
forgot to mention that I also decorated with
@DefaultSchema(JavaBeanSchema.class) . So to be clear, I used both
@DefaultSchema(JavaBeanSchema.class) and
@SchemaCaseFormat(CaseFormat.UPPER_UNDERSCORE) and got null getters/setters.

I can write up a small project to illustrate the scenarios and confirm
exactly how things work.

On Tue, Sep 28, 2021 at 2:00 PM Brian Hulette  wrote:

> > 1. Decorate my class with @SchemaCaseFormat(CaseFormat.UPPER_UNDERSCORE)
> and have my fields/getters/setters follow camelCase naming. Unfortunately,
> JavaBeanUtils.validateJavaBean sees all the setters as NULL. I'm not sure
> why. I could investigate after I deliver a few things.
>
> I think this is what you need, your use-case sounds like the exact case
> that SchemaCaseFormat was created for. Did you try using
> @SchemaCaseFormat(CaseFormat.UPPER_UNDERSCORE) in conjunction with
> camel-cased getters/setters, i.e.:
>
> @DefaultSchema(JavaBeanSchema.class)
> @SchemaCaseFormat(CaseFormat.UPPER_UNDERSCORE)
> public class Foo {
>   public String getActionId(){..}
>   private void  setActionId(String actionId){..}
> }
>
> Brian
>
> On Sun, Sep 12, 2021 at 9:16 AM Cristian Constantinescu 
> wrote:
>
>> Hey all,
>>
>> I might have found a bug in the way we create schemas from Java Beans.
>>
>> My schema has to have uppercase field names.
>>
>> So I have something like :
>>
>> @DefaultSchema(JavaBeanSchema.class)
>> public class Foo {
>> private String ACTION_ID;
>>
>> private String getACTION_ID(){..}
>> private void  setACTION_ID(String ACTION_ID){..}
>> }
>>
>> Problem is that once the schema is parsed, all the fields have their
>> first letter lowercased. I think it's because of the
>> ReflectUtils.stripPrefix method.
>>
>> As workarounds, I have tried:
>>
>> 1. Decorate my class with @SchemaCaseFormat(CaseFormat.UPPER_UNDERSCORE)
>> and have my fields/getters/setters follow camelCase naming. Unfortunately,
>> JavaBeanUtils.validateJavaBean sees all the setters as NULL. I'm not sure
>> why. I could investigate after I deliver a few things.
>>
>> 2. Use AutoValue. Unfortunately with Flink, I get things like this:
>> Failed to access class package.AutoValue_Foo$Builder from class
>> package.SchemaUserTypeCreator@SchemaCodeGen$someRandomString
>> package.AutoValue_Foo$Builder is in unnamed module of loader 'app';
>> package.SchemaUserTypeCreator@SchemaCodeGen$someRandomString is in
>> unnamed module of loader org.apache.flink.util.ChildFirstClassLoader . I
>> would love for this to work. Very sad it doesn't.
>>
>> 3. Use Pojos with @DefaultSchema(JavaFieldSchema.class), this seems to
>> work as expected, not 100% sure yet.
>>
>> If this is indeed a bug, could someone create a Jira and I could have a
>> look into it in the near future.
>>
>> Thanks,
>> Cristian
>>
>


Re: Possible bug in GetterBasedSchemaProvider.fromRowFunction

2021-09-15 Thread Cristian Constantinescu
I'll give it another shot and let you know if it works. I could update the
documentation accordingly afterwards.

On Wed, Sep 15, 2021 at 6:08 PM Reuven Lax  wrote:

> Yes you would - that's the rationale for adding support for generic types
> in schema inference.
>
> On Wed, Sep 15, 2021 at 3:06 PM Cristian Constantinescu 
> wrote:
>
>> I think I tried that, but can't remember for sure (I'm like 80% sure,
>> sorry for the uncertainty, I've been trying many things for various
>> problems). And it didn't work. However, if I understand this solution
>> correctly, that means that I would have to create these join classes for
>> every type I want to join. Is that right?
>>
>> On Wed, Sep 15, 2021 at 5:56 PM Reuven Lax  wrote:
>>
>>> Could you actually fill in the generic type for Iterable? e.g.
>>> Iterable lhs; I think without that, the schema won't match.
>>>
>>> On Wed, Sep 15, 2021 at 2:52 PM Cristian Constantinescu <
>>> zei...@gmail.com> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> Thanks for getting back to me.
>>>>
>>>> To answer your question my initial Joined pojo is:
>>>>
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>   public class JoinedValue {
>>>> public JoinedKey key;
>>>>
>>>> public Iterable lhs;
>>>> public Iterable rhs;
>>>>   }
>>>>
>>>>
>>>> Which is exactly the same as the documentation page, minus the field
>>>> names. This is my concern mainly, following the steps documentation does
>>>> not work when running the pipeline. I'll try to set up a sample project to
>>>> illustrate this if you think it would be helpful.
>>>>
>>>> On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax  wrote:
>>>>
>>>>>
>>>>>
>>>>> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu <
>>>>> zei...@gmail.com> wrote:
>>>>>
>>>>>> Hello everyone,
>>>>>>
>>>>>> As I'm continuing to remove my usage of Row and replacing it with
>>>>>> Pojos, I'm following the documentation for the CoGroup transform [1].
>>>>>>
>>>>>> As per the documentation, I have created a JoinedKey and a
>>>>>> JoinedValue, exactly as the examples given in the documentation except 
>>>>>> that
>>>>>> the key has propA. B and C.
>>>>>>
>>>>>> I then execute this code:
>>>>>> PCollectionTyple.of("lhs", lhs).and("rhs",
>>>>>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))
>>>>>>
>>>>>> And I get this:
>>>>>> Exception in thread "main" java.lang.RuntimeException: Cannot convert
>>>>>> between types that don't have equivalent schemas. input schema: Fields:
>>>>>> Field{name=key, description=, type=ROW>>>>> propC STRING> NOT NULL, options={{}}}
>>>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>>>> Encoding positions:
>>>>>> {lhs=1, rhs=2, key=0}
>>>>>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
>>>>>> Fields:
>>>>>> Field{name=key, description=, type=ROW>>>>> propC STRING> NOT NULL, options={{}}}
>>>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>>>> Encoding positions:
>>>>>> {lhs=1, rhs=2, key=0}
>>>>>>
>>>>>> This is probably because lhs and rhs are Iterable and it's trying to
>>>>>> compare the schemas from the CoGroup Rows for lhs and rhs and the 
>>>>>> Iterable
>>>>>> properties from the Joined pojo. We should update the documentation as it
>>>>>> doesn't reflect how the code actually behaves (Unless I missed 
>>>>>> something).
>>>>>>
>>>>>
>>>>> I'm not sure I  understand the issue here. What exactly does your
>>>>> Joined pojo look like?
>>>>>
>>>>>>
>>>>>> My next step was to try to make the Joined Pojo 

Re: Possible bug in GetterBasedSchemaProvider.fromRowFunction

2021-09-15 Thread Cristian Constantinescu
I think I tried that, but can't remember for sure (I'm like 80% sure, sorry
for the uncertainty, I've been trying many things for various problems).
And it didn't work. However, if I understand this solution correctly, that
means that I would have to create these join classes for every type I want
to join. Is that right?

On Wed, Sep 15, 2021 at 5:56 PM Reuven Lax  wrote:

> Could you actually fill in the generic type for Iterable? e.g.
> Iterable lhs; I think without that, the schema won't match.
>
> On Wed, Sep 15, 2021 at 2:52 PM Cristian Constantinescu 
> wrote:
>
>> Hi Reuven,
>>
>> Thanks for getting back to me.
>>
>> To answer your question my initial Joined pojo is:
>>
>> @DefaultSchema(JavaFieldSchema.class)
>>   public class JoinedValue {
>> public JoinedKey key;
>>
>> public Iterable lhs;
>> public Iterable rhs;
>>   }
>>
>>
>> Which is exactly the same as the documentation page, minus the field
>> names. This is my concern mainly, following the steps documentation does
>> not work when running the pipeline. I'll try to set up a sample project to
>> illustrate this if you think it would be helpful.
>>
>> On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax  wrote:
>>
>>>
>>>
>>> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu <
>>> zei...@gmail.com> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> As I'm continuing to remove my usage of Row and replacing it with
>>>> Pojos, I'm following the documentation for the CoGroup transform [1].
>>>>
>>>> As per the documentation, I have created a JoinedKey and a JoinedValue,
>>>> exactly as the examples given in the documentation except that the key has
>>>> propA. B and C.
>>>>
>>>> I then execute this code:
>>>> PCollectionTyple.of("lhs", lhs).and("rhs",
>>>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))
>>>>
>>>> And I get this:
>>>> Exception in thread "main" java.lang.RuntimeException: Cannot convert
>>>> between types that don't have equivalent schemas. input schema: Fields:
>>>> Field{name=key, description=, type=ROW>>> propC STRING> NOT NULL, options={{}}}
>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>> Encoding positions:
>>>> {lhs=1, rhs=2, key=0}
>>>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
>>>> Fields:
>>>> Field{name=key, description=, type=ROW>>> propC STRING> NOT NULL, options={{}}}
>>>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>>>> Encoding positions:
>>>> {lhs=1, rhs=2, key=0}
>>>>
>>>> This is probably because lhs and rhs are Iterable and it's trying to
>>>> compare the schemas from the CoGroup Rows for lhs and rhs and the Iterable
>>>> properties from the Joined pojo. We should update the documentation as it
>>>> doesn't reflect how the code actually behaves (Unless I missed something).
>>>>
>>>
>>> I'm not sure I  understand the issue here. What exactly does your Joined
>>> pojo look like?
>>>
>>>>
>>>> My next step was to try to make the Joined Pojo generic. Like this:
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>> public class Joined {
>>>> public JoinedKey key;
>>>> public Iterable lhs;
>>>> public Iterable rhs;
>>>> }
>>>>
>>>
>>> Unfortunately schema inference doesn't work today with generic classes.
>>> I believe that it's possible to fix this (e.g. we do support Coder
>>> inference in such cases), but today this won't work.
>>>
>>>
>>>>
>>>> And then execute this code:
>>>>
>>>> var joinedTypeDescriptor = new TypeDescriptor>>> MyRhsPojo>>(){};
>>>>
>>>>var keyCoder = SchemaCoder.of(keySchema,
>>>> TypeDescriptor.of(JoinedKey.class), new
>>>> JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new
>>>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)));
>>>> var valueCoder = SchemaCoder.of(keySchema,
>>>> joinedTypeDescriptor, new
>

Re: Possible bug in GetterBasedSchemaProvider.fromRowFunction

2021-09-15 Thread Cristian Constantinescu
Hi Reuven,

Thanks for getting back to me.

To answer your question my initial Joined pojo is:

@DefaultSchema(JavaFieldSchema.class)
  public class JoinedValue {
public JoinedKey key;

public Iterable lhs;
public Iterable rhs;
  }


Which is exactly the same as the documentation page, minus the field names.
This is my concern mainly, following the steps documentation does not work
when running the pipeline. I'll try to set up a sample project to
illustrate this if you think it would be helpful.

On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax  wrote:

>
>
> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu 
> wrote:
>
>> Hello everyone,
>>
>> As I'm continuing to remove my usage of Row and replacing it with Pojos,
>> I'm following the documentation for the CoGroup transform [1].
>>
>> As per the documentation, I have created a JoinedKey and a JoinedValue,
>> exactly as the examples given in the documentation except that the key has
>> propA. B and C.
>>
>> I then execute this code:
>> PCollectionTyple.of("lhs", lhs).and("rhs",
>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))
>>
>> And I get this:
>> Exception in thread "main" java.lang.RuntimeException: Cannot convert
>> between types that don't have equivalent schemas. input schema: Fields:
>> Field{name=key, description=, type=ROW> STRING> NOT NULL, options={{}}}
>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Encoding positions:
>> {lhs=1, rhs=2, key=0}
>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
>> Fields:
>> Field{name=key, description=, type=ROW> propC STRING> NOT NULL, options={{}}}
>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Encoding positions:
>> {lhs=1, rhs=2, key=0}
>>
>> This is probably because lhs and rhs are Iterable and it's trying to
>> compare the schemas from the CoGroup Rows for lhs and rhs and the Iterable
>> properties from the Joined pojo. We should update the documentation as it
>> doesn't reflect how the code actually behaves (Unless I missed something).
>>
>
> I'm not sure I  understand the issue here. What exactly does your Joined
> pojo look like?
>
>>
>> My next step was to try to make the Joined Pojo generic. Like this:
>> @DefaultSchema(JavaFieldSchema.class)
>> public class Joined {
>> public JoinedKey key;
>> public Iterable lhs;
>> public Iterable rhs;
>> }
>>
>
> Unfortunately schema inference doesn't work today with generic classes. I
> believe that it's possible to fix this (e.g. we do support Coder inference
> in such cases), but today this won't work.
>
>
>>
>> And then execute this code:
>>
>> var joinedTypeDescriptor = new TypeDescriptor> MyRhsPojo>>(){};
>>
>>var keyCoder = SchemaCoder.of(keySchema,
>> TypeDescriptor.of(JoinedKey.class), new
>> JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new
>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)));
>> var valueCoder = SchemaCoder.of(keySchema, joinedTypeDescriptor,
>> new JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new
>> JavaFieldSchema().fromRowFunction(joinedTypeDescriptor));
>> var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>> .apply(Cogroupby...)
>> .apply(Convert.to(joinedTypeDescriptor))
>>
>> But this give me:
>> Exception in thread "main" java.lang.ClassCastException: class
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>> cannot be cast to class java.lang.Class
>> (org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>> is in unnamed module of loader 'app'; java.lang.Class is in module
>> java.base of loader 'bootstrap')
>> at
>> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider.fromRowFunction(GetterBasedSchemaProvider.java:105)
>> (irrelevant stacktrace omitted for brevity)
>>
>> It looks like GetterBasedSchemaProvider.fromRowFunction has an explicit
>> cast to "Class" but there could be instances where a guava type is passed
>> in.
>>
>> So my workaround for now, as elegant as a roadkill, is to do things
>> manually as below. (actual class names replaced with RhsPojo)
>>
>>  va

Schemas with uppercase fields in beans

2021-09-12 Thread Cristian Constantinescu
Hey all,

I might have found a bug in the way we create schemas from Java Beans.

My schema has to have uppercase field names.

So I have something like :

@DefaultSchema(JavaBeanSchema.class)
public class Foo {
private String ACTION_ID;

private String getACTION_ID(){..}
private void  setACTION_ID(String ACTION_ID){..}
}

Problem is that once the schema is parsed, all the fields have their first
letter lowercased. I think it's because of the ReflectUtils.stripPrefix
method.

As workarounds, I have tried:

1. Decorate my class with @SchemaCaseFormat(CaseFormat.UPPER_UNDERSCORE)
and have my fields/getters/setters follow camelCase naming. Unfortunately,
JavaBeanUtils.validateJavaBean sees all the setters as NULL. I'm not sure
why. I could investigate after I deliver a few things.

2. Use AutoValue. Unfortunately with Flink, I get things like this: Failed
to access class package.AutoValue_Foo$Builder from class
package.SchemaUserTypeCreator@SchemaCodeGen$someRandomString
package.AutoValue_Foo$Builder is in unnamed module of loader 'app';
package.SchemaUserTypeCreator@SchemaCodeGen$someRandomString is in
unnamed module of loader org.apache.flink.util.ChildFirstClassLoader . I
would love for this to work. Very sad it doesn't.

3. Use Pojos with @DefaultSchema(JavaFieldSchema.class), this seems to work
as expected, not 100% sure yet.

If this is indeed a bug, could someone create a Jira and I could have a
look into it in the near future.

Thanks,
Cristian


Re: [PROPOSAL] Stable URL for "current" API Documentation

2021-06-17 Thread Cristian Constantinescu
Big +1 here. In the past few days I've replaced the 2.*.0 part of the
google found javadoc url with 2.29.0 more times than I could count. I
should have made a pipeline with a session window to count those
replacements them though :P

On Thu, Jun 17, 2021 at 12:18 PM Robert Bradshaw 
wrote:

> This makes a lot of sense to me.
>
> On Thu, Jun 17, 2021 at 9:03 AM Brian Hulette  wrote:
> >
> > Hi everyone,
> > You may have noticed that our API Documentation could really use some
> SEO. It's possible to search for Beam APIs (e.g. "beam dataframe read_csv"
> [1] or "beam java ParquetIO" [2]) and you will be directed to some
> documentation, but it almost always takes you to an old version. I think
> this could be significantly improved if we just make one change: rather
> than making https://beam.apache.org/releases/javadoc/current redirect to
> the latest release, we should just always stage the latest documentation
> there.
> >
> > To be clear I'm not 100% sure this will help. I haven't talked to any
> search engineers or SEO experts about it. I'm just looking at other
> projects as a point of reference. I've found that I never have trouble
> finding the latest pandas documentation (e.g. "pandas read_csv" [3]) since
> it always directs to "pandas-docs/stable/" rather than a particular version
> number.
> >
> > We should also make sure the version number shows up in the page title,
> it looks like this isn't the case for Python right now.
> >
> > Would there be any objections to making this change?
> >
> > Also are there thoughts on how to make the change? Presumably this is
> something we'd have to update in the release process.
> >
> > Thanks,
> > Brian
> >
> > [1]
> https://beam.apache.org/releases/pydoc/2.25.0/apache_beam.dataframe.io.html
> > [2]
> https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html
> > [3]
> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
>


Tutorial - How to run a Beam pipeline with Flink on Kubernetes Natively

2021-03-22 Thread Cristian Constantinescu
Hi everyone,

I spent the week-end putting the pieces together to run Beam with the Flink
Runner on Kubernetes. While I did find very good articles and videos about
Beam and Flink and Kubernetes separately, I didn't find one that mixes all
three of them in the same pot.

So, I wrote a small demo project/tutorial combining all the bits and pieces
of information I found on my quest. It's here
https://github.com/zeidoo/tutorial-bean-flink-kubernetes-pojo. Any feedback
is welcome.

I hope it helps someone.

Cheers everyone!

PS: I didn't find on the Beam documentation website that the easiest way
to pass PoJos around transforms is to make them implement Serializable.
I've put a small section about that in the tutorial.


Re: Beam with Confluent Schema Registry and protobuf

2021-01-12 Thread Cristian Constantinescu
Hi Alexey,

The short answer is, I hope the code in my sample project makes its way
back into Beam, but it may take some time. Hopefully until then people who
are in a similar position have a demo of a workaround they can work with.

The longer answer is:

The history of this is a bit lengthy, let me make a summary for those who
are not aware of it.

1. Beam uses old versions (5.3) of Confluent. Those versions do not support
protobuf or JSON schemas. That was added in 5.5. Confluent is now at
version 6.

2. The newer Confluent libs depend on Avro 1.9 (last time I checked). Which
conflicts with Beams dependency on Avro 1.8.

3. Avro 1.9+ has breaking changes from 1.8.

4. There were at least two attempts at updating Beam to use Avro 1.9. One
of those was mine. It included an attempt to move all of Avro outside of
Beam core to it's own module, like Beam's protobuf support.

5. Unfortunately, that isn't a viable path forward for the community as it
breaks Beam's backwards compatibility. But it did provide insights on the
spread of Avro inside of Beam.

6. The latest agreement is that there needs some tests that need to be
added to Beam to check compatibility with Avro 1.9+.

I wanted to take this task up, but didn't get to it yet. Furthermore, I'm
not sure how those tests are going to help with the upgrade of the
Confluent libs as I'm not sure we can have two versions of Avro inside of
Beam. I'll have to check to see what's possible.

Once the Confluent libs are updated, adding support for protofbufs and the
Confluent schema registry looks trivial. We just need to decide if we use
the ConfluentSchemaRegistryDeserializerProvider for the task or if we have
a separate class for similar functionality using protobuf.



On Tue., Jan. 12, 2021, 09:58 Alexey Romanenko, 
wrote:

> Hi Cristian,
>
> Great!
>
> Would you be interested to add this functionality into
> original ConfluentSchemaRegistryDeserializerProvider (along with Json
> support maybe) [1] ?
> Though, it seems that it will require an update of Confluent deps to at
> least version 5.5 [2] and it can be blocked by Beam Avro deps update.
>
> [1] https://issues.apache.org/jira/browse/BEAM-9330
> [2] https://www.confluent.io/blog/introducing-confluent-platform-5-5/
>
> On 9 Jan 2021, at 05:16, Cristian Constantinescu  wrote:
>
> Hi everyone,
>
> Beam currently has a dependency on older versions of the Confluent libs.
> It makes it difficult to use Protobufs with the Confluent Schema Registry
> as ConfluentSchemaRegistryDeserializerProvider only supports Avro.
>
> I put up together a very simple project to demo how it can be done without
> touching any files inside of Beam. You can find it here:
> https://github.com/zeidoo/beam-confluent-schema-registry-protobuf
>
> Any comments are welcomed, especially if there are better ways of doing
> things.
>
> Cheers!
>
>
>


Beam with Confluent Schema Registry and protobuf

2021-01-08 Thread Cristian Constantinescu
Hi everyone,

Beam currently has a dependency on older versions of the Confluent libs. It
makes it difficult to use Protobufs with the Confluent Schema Registry as
ConfluentSchemaRegistryDeserializerProvider only supports Avro.

I put up together a very simple project to demo how it can be done without
touching any files inside of Beam. You can find it here:
https://github.com/zeidoo/beam-confluent-schema-registry-protobuf

Any comments are welcomed, especially if there are better ways of doing
things.

Cheers!


Re: [DISCUSS] Move Avro dependency out of core Beam

2020-09-21 Thread Cristian Constantinescu
All the proposed solutions seem reasonable. I'm not sure if one has an edge
over the other. I guess it depends on how cautiously the community would
like to move.

Maybe it's just my impression, but it seems to me that there are a few
changes that are held back for the sake of backwards compatibility. If this
is truly the case, maybe we can start thinking about the next major version
of Beam where:
- the dependency report email (Beam Dependency Check Report (2020-09-21))
can be tackled. There are quite a few deps in there that need to be
updated. I'm sure there will be more breaking changes.
- review some architectural decisions that are difficult to correct without
breaking things (eg: Avro in Core)
- compatibility with Java 11+
- features that we can't implement with our current code base

I'm not sure what Beam's roadmap is, but maybe we could set up a branch in
the main repo (and the checks) and try to tackle all this work so we get a
better idea of the scope (and unforeseen issues that will come forward)
that's really needed for a potential RC build in the short term future.

On Wed, Sep 16, 2020 at 6:40 AM Robert Bradshaw  wrote:

> An adapter seems a reasonable approach, and shouldn't be too hard.
>
> If the breakage is "we no longer provide Avro 1.8 by default; please
> depend on it explicitly if this breaks you" that seems reasonable to me, as
> it's easy to detect and remedy.
>
> On Tue, Sep 15, 2020 at 2:42 PM Ismaël Mejía  wrote:
>
>> Avro differences in our implementation are pretty minimal if you look at
>> the PR,
>> to the point that an Adapter should be really tiny if even needed.
>>
>> The big backwards incompatible changes in Avro > 1.8 were to remove
>> external
>> libraries from the public APIs e.g. guava, jackson and joda-time. Of
>> course this
>> does not seem to be much but almost every project using Avro was using
>> some of
>> these dependencies, luckily for Beam it was only joda-time and that is
>> already
>> fixed.
>>
>> Keeping backwards compatibility by making Avro part of an extension that
>> is
>> optional for core and using only Avro 1.8 compatible features on Beam's
>> source
>> code is the simplest path, and allow us to avoid all the issues, notice
>> that the
>> dependency that triggered the need for Avro 1.9 (and this thread) is a
>> runtime
>> dependency used by Confluent Schema Registry and this is an issue because
>> sdks-java-core is leaking Avro. Apart from this I am not aware of any
>> feature in
>> any other project that obliges anyone to use Avro 1.9 or 1.10 specific
>> code.
>>
>> Of course a really valid reason to want to use a more recent version of
>> Avro is
>> that Avro 1.8 leaks also unmaintained dependencies with serious security
>> issues
>> (Jackson 1.x).
>>
>> So in the end my main concern is breaking existing users code, this has
>> less
>> impact for us (Talend) but probably more for the rest of the community,
>> but if
>> we agree to break backwards compatibility for the sake of cleanliness
>> well we
>> should probably proceed, and of course give users also some warning.
>>
>> On Mon, Sep 14, 2020 at 7:13 PM Luke Cwik  wrote:
>> >
>> > In the Kafka module we reflectively figure out which version of Kafka
>> exists[1] on the classpath and then reflectively invoke some APIs to work
>> around differences in Kafka allowing our users to bring whichever version
>> they want.
>> >
>> > We could do something similar here and reflectively figure out which
>> Avro is on the classpath and invoke the appropriate methods. If we are
>> worried about performance of using reflection, we can write and compile two
>> different versions of an Avro adapter class and choose which one to use
>> (using reflection only once during class loading).
>> >
>> > e.g.
>> > AvroAdapter {
>> >   static final AvroAdapter INSTANCE;
>> >   static {
>> > if (avro19?) {
>> >   INSTANCE = new Avro19Adapater();
>> > } else {
>> >   INSTANCE = new Avro18Adapter();
>> >   }
>> >
>> >   ... methods needed for AvroAdapter implementations ...
>> > }
>> >
>> > Using reflection allows the user to choose which version they use and
>> pushes down the incompatibility issue from Apache Beam to our deps (e.g.
>> Spark).
>> >
>> > 1:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
>> >
>> > On Fri, Sep 11, 2020 at 11:42 AM Kenneth Knowles 
>> wrote:
>> >>
>> >> I am not deep on the details myself but have reviewed various Avro
>> upgrade changes such as https://github.com/apache/beam/pull/9779 and
>> also some internal that I cannot link to. I believe the changes are small
>> and quite possibly we can create sdks/java/extensions/avro that works with
>> both Avro 1.8 and 1.9 and make Dataflow worker compatible with whatever the
>> user chooses. (I would expect Spark is trying to get to that point too?)
>> >>
>> >> So then if we have that can we achieve the goals? Spark runner users
>> that do not use Avro 

Re: [DISCUSS] Move Avro dependency out of core Beam

2020-09-11 Thread Cristian Constantinescu
Hi everyone,

PR https://github.com/apache/beam/pull/12748 now passes all the checks, and
could potentially be merged (not advocating this, just saying). I've
rebased on the latest master as of today. I've also left a comment in the
PR with the high level changes for ALL the modules. I encourage all the
interested parties to skim through that and raise any concerns they might
have.

Also note that while I'm pretty good at refactoring things, Java isn't my
strong language. Please keep that in mind as you review the code changes.

That being said, my main goal is to get Beam to play nice with the new
Confluent Schema libraries that include support for Protobuf and JSON
schemas. But the Confluent libs depend on avro 1.9, and Beam is on 1.8.
Upgrading Beam to use avro 1.9 has proven difficult (see
https://github.com/apache/beam/pull/9779) hence why Avro should be taken
out of core.

If you have any concerns or any particular tests I should run, please let
me know.

Thank you!

On Fri, Sep 11, 2020 at 5:48 AM Brian Hulette  wrote:

>
>
> On Tue, Sep 8, 2020 at 9:18 AM Robert Bradshaw 
> wrote:
>
>> IIRC Dataflow (and perhaps others) implicitly depend on Avro to write
>> out intermediate files (e.g. for non-shuffle Fusion breaks). Would
>> this break if we just removed it?
>>
>
> I think Dataflow would just need to declare a dependency on the new
> extension.
>
>
>>
>> On Thu, Sep 3, 2020 at 10:51 PM Reuven Lax  wrote:
>> >
>> > As for 2, maybe it's time to remove @Experimental from SchemaCoder?
>> >
>>
>
> Probably worth a separate thread about dropping `@Experimental` on
> SchemaCoder. I'd be ok with that, the only breaking change I have in mind
> is that I think we should deprecate and remove the DATETIME primitive type,
> replacing it with a logical type.
>
>
>> > 1 is tricky though. Changes like this have caused a lot of trouble for
>> users in the past, and I think some users still have unpleasant memories of
>> being told "you just have to change some package names and imports."
>> >
>>
>
> We could mitigate this by first adding the new extension module and
> deprecating the core Beam counterpart for a release (or multiple releases).
>
>
>> > On Thu, Sep 3, 2020 at 6:18 PM Brian Hulette 
>> wrote:
>> >>
>> >> Hi everyone,
>> >> The fact that core Beam has a dependency on Avro has led to a lot of
>> headaches when users (or runners) are using a different version. zeidoo [1]
>> was generous enough to put up a WIP PR [2] that moves everything that
>> depends on Avro (primarily AvroCoder and the Avro SchemaProvider I believe)
>> out of core Beam and into a separate extensions module. This way we could
>> have multiple extensions for different versions of Avro in the future.
>> >>
>> >> As I understand it, the downsides to making this change are:
>> >> 1) It's a breaking change, users with AvroCoder in their pipeline will
>> need to change their build dependencies and import statements.
>> >> 2) AvroCoder is the only (non-experimental) coder in core Beam that
>> can encode complex user types. So new users will need to dabble with the
>> Experimental SchemaCoder or add a second dependency to build a pipeline
>> with their own types.
>> >>
>> >> I think these costs are outweighed by the benefit of removing the
>> dependency in core Beam, but I wanted to reach out to the community to see
>> if there are any objections.
>> >>
>> >> Brian
>> >>
>> >> [1] github.com/zeidoo
>> >> [2] https://github.com/apache/beam/pull/12748
>>
>


BEAM-9330

2020-08-28 Thread Cristian Constantinescu
Hello,

I would like to contribute to
https://issues.apache.org/jira/browse/BEAM-9330, getting Beam to support
PROTOBUF and JSON schemas with Confluent Schema Registry.

I'm a bit of a Java newb (C# and Go are more up my alley) and this would be
my first contribution to any ASF projects. As such, if anyone a bit more
experienced is interested in a bit of pair programming with me or has time
for a bit of hand holding, I'd really appreciate it.

Thanks!