Re: External transform API in Java SDK
If we pass in TypeDescriptor objects instead of Java type information for the compiler, we could match the returning coders and the given type descriptors at pipeline construction time. It would be helpful to prevent pipeline failing by class cast exception in runners. I've create the jira ticket: https://issues.apache.org/jira/browse/BEAM-9048 On Mon, Dec 30, 2019 at 10:27 AM Luke Cwik wrote: > > > On Mon, Dec 23, 2019 at 12:20 PM Heejong Lee wrote: > >> >> >> On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik wrote: >> >>> What do side inputs look like? >>> >> >> A user needs to first pass PCollections for side inputs into the external >> transform in addition to ordinary input PCollections and define >> PCollectionViews inside the external transform something like: >> >> PCollectionTuple pTuple = >> PCollectionTuple.of("main1", main1) >> .and("main2", main2) >> .and("side", side) >> .apply(External.of(...).withMultiOutputs()); >> >> public static class TestTransform extends PTransform> PCollectionTuple> { >> @Override >> public PCollectionTuple expand(PCollectionTuple input) { >> PCollectionView sideView = >> input.get("side").apply(View.asSingleton()); >> PCollection main = >> PCollectionList.of(input.get("main1")) >> .and(input.get("main2")) >> .apply(Flatten.pCollections()) >> .apply( >> ParDo.of( >> new DoFn() { >> @ProcessElement >> public void processElement( >> @Element String x, >> OutputReceiver out, >> DoFn.ProcessContext c) { >> out.output(x + c.sideInput(sideView)); >> } >> }) >> .withSideInputs(sideView)); >> >> >> >>> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee wrote: >>> >>>> I wanted to know if anybody has any comment on external transform API >>>> for Java SDK. >>>> >>>> `External.of()` can create external transform for Java SDK. Depending >>>> on input and output types, two additional methods are provided: >>>> `withMultiOutputs()` which specifies the type of PCollection and >>>> `withOutputType()` which specifies the type of output element. Some >>>> examples are: >>>> >>>> PCollection col = >>>> testPipeline >>>> .apply(Create.of("1", "2", "3")) >>>> .apply(External.of(*...*)); >>>> >>>> This is okay without additional methods since 1) input and output types >>>> of external transform can be inferred 2) output PCollection is singular. >>>> >>> >>> How does the type/coder at runtime get inferred (doesn't java's type >>> erasure get rid of this information)? >>> >> >>> >>>> PCollectionTuple pTuple = >>>> testPipeline >>>> .apply(Create.of(1, 2, 3, 4, 5, 6)) >>>> .apply( >>>> External.of(*...*).withMultiOutputs()); >>>> >>>> This requires `withMultiOutputs()` since output PCollection is >>>> PCollectionTuple. >>>> >>> >>> Shouldn't this require a mapping from "output" name to coder/type >>> variable to be specified as an argument to withMultiOutputs? >>> >>> >>>> PCollection pCol = >>>> testPipeline >>>> .apply(Create.of("1", "2", "2", "3", "3", "3")) >>>> .apply( >>>> External.of(...) >>>> .>withOutputType()) >>>> .apply( >>>> "toString", >>>> MapElements.into(TypeDescriptors.strings()).via( >>>> x -> String.format("%s->%s", x.getKey(), x.getValue(; >>>> >>>> This requires `withOutputType()` since the output element type cannot >>>> be inferred from method chaining. I think some users may feel awkward to >>>> call method only with the type parameter and empty parenthesis. Without >>>> `withOutputType()`, the type of outpu
Re: External transform API in Java SDK
On Mon, Dec 23, 2019 at 12:20 PM Heejong Lee wrote: > > > On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik wrote: > >> What do side inputs look like? >> > > A user needs to first pass PCollections for side inputs into the external > transform in addition to ordinary input PCollections and define > PCollectionViews inside the external transform something like: > > PCollectionTuple pTuple = > PCollectionTuple.of("main1", main1) > .and("main2", main2) > .and("side", side) > .apply(External.of(...).withMultiOutputs()); > > public static class TestTransform extends PTransform PCollectionTuple> { > @Override > public PCollectionTuple expand(PCollectionTuple input) { > PCollectionView sideView = > input.get("side").apply(View.asSingleton()); > PCollection main = > PCollectionList.of(input.get("main1")) > .and(input.get("main2")) > .apply(Flatten.pCollections()) > .apply( > ParDo.of( > new DoFn() { > @ProcessElement > public void processElement( > @Element String x, > OutputReceiver out, > DoFn.ProcessContext c) { > out.output(x + c.sideInput(sideView)); > } > }) > .withSideInputs(sideView)); > > > >> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee wrote: >> >>> I wanted to know if anybody has any comment on external transform API >>> for Java SDK. >>> >>> `External.of()` can create external transform for Java SDK. Depending on >>> input and output types, two additional methods are provided: >>> `withMultiOutputs()` which specifies the type of PCollection and >>> `withOutputType()` which specifies the type of output element. Some >>> examples are: >>> >>> PCollection col = >>> testPipeline >>> .apply(Create.of("1", "2", "3")) >>> .apply(External.of(*...*)); >>> >>> This is okay without additional methods since 1) input and output types >>> of external transform can be inferred 2) output PCollection is singular. >>> >> >> How does the type/coder at runtime get inferred (doesn't java's type >> erasure get rid of this information)? >> > >> >>> PCollectionTuple pTuple = >>> testPipeline >>> .apply(Create.of(1, 2, 3, 4, 5, 6)) >>> .apply( >>> External.of(*...*).withMultiOutputs()); >>> >>> This requires `withMultiOutputs()` since output PCollection is >>> PCollectionTuple. >>> >> >> Shouldn't this require a mapping from "output" name to coder/type >> variable to be specified as an argument to withMultiOutputs? >> >> >>> PCollection pCol = >>> testPipeline >>> .apply(Create.of("1", "2", "2", "3", "3", "3")) >>> .apply( >>> External.of(...) >>> .>withOutputType()) >>> .apply( >>> "toString", >>> MapElements.into(TypeDescriptors.strings()).via( >>> x -> String.format("%s->%s", x.getKey(), x.getValue(; >>> >>> This requires `withOutputType()` since the output element type cannot >>> be inferred from method chaining. I think some users may feel awkward to >>> call method only with the type parameter and empty parenthesis. Without >>> `withOutputType()`, the type of output element will be java.lang.Object >>> which might still be forcefully casted to KV. >>> >> >> How does the output type get preserved in this case (since Java's type >> erasure would remove > after compilation and coder >> inference in my opinion should be broken and or choosing something generic >> like serializable)? >> > > The expansion service is responsible for using cross-language compatible > coders in the returning expanded transforms and these are the coders used > in the runtime. Type information annotated by additional methods here is > for compile-time type safety of external transforms. > Note that *.>withOutputType()* could be changed to *.withOutputType()* and we would get a *PCollection* since *withOutputType* doesn't actually do anything at runtime and is just to make types align during compilation. Is there a way to ensure that the output type is actually compatible with the coder that was returned after expansion (this would likely require you to pass in typing information into *withOutputType*, see TypeDescriptors[1])? 1: https://github.com/apache/beam/blob/4c18cb4ada2650552a0006dfffd68d0775dd76c6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java >> > Thanks, >>> Heejong >>> >>
Re: External transform API in Java SDK
On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik wrote: > What do side inputs look like? > A user needs to first pass PCollections for side inputs into the external transform in addition to ordinary input PCollections and define PCollectionViews inside the external transform something like: PCollectionTuple pTuple = PCollectionTuple.of("main1", main1) .and("main2", main2) .and("side", side) .apply(External.of(...).withMultiOutputs()); public static class TestTransform extends PTransform { @Override public PCollectionTuple expand(PCollectionTuple input) { PCollectionView sideView = input.get("side").apply(View.asSingleton()); PCollection main = PCollectionList.of(input.get("main1")) .and(input.get("main2")) .apply(Flatten.pCollections()) .apply( ParDo.of( new DoFn() { @ProcessElement public void processElement( @Element String x, OutputReceiver out, DoFn.ProcessContext c) { out.output(x + c.sideInput(sideView)); } }) .withSideInputs(sideView)); > On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee wrote: > >> I wanted to know if anybody has any comment on external transform API for >> Java SDK. >> >> `External.of()` can create external transform for Java SDK. Depending on >> input and output types, two additional methods are provided: >> `withMultiOutputs()` which specifies the type of PCollection and >> `withOutputType()` which specifies the type of output element. Some >> examples are: >> >> PCollection col = >> testPipeline >> .apply(Create.of("1", "2", "3")) >> .apply(External.of(*...*)); >> >> This is okay without additional methods since 1) input and output types >> of external transform can be inferred 2) output PCollection is singular. >> > > How does the type/coder at runtime get inferred (doesn't java's type > erasure get rid of this information)? > > >> PCollectionTuple pTuple = >> testPipeline >> .apply(Create.of(1, 2, 3, 4, 5, 6)) >> .apply( >> External.of(*...*).withMultiOutputs()); >> >> This requires `withMultiOutputs()` since output PCollection is >> PCollectionTuple. >> > > Shouldn't this require a mapping from "output" name to coder/type variable > to be specified as an argument to withMultiOutputs? > > >> PCollection pCol = >> testPipeline >> .apply(Create.of("1", "2", "2", "3", "3", "3")) >> .apply( >> External.of(...) >> .>withOutputType()) >> .apply( >> "toString", >> MapElements.into(TypeDescriptors.strings()).via( >> x -> String.format("%s->%s", x.getKey(), x.getValue(; >> >> This requires `withOutputType()` since the output element type cannot be >> inferred from method chaining. I think some users may feel awkward to call >> method only with the type parameter and empty parenthesis. Without >> `withOutputType()`, the type of output element will be java.lang.Object >> which might still be forcefully casted to KV. >> > > How does the output type get preserved in this case (since Java's type > erasure would remove > after compilation and coder > inference in my opinion should be broken and or choosing something generic > like serializable)? > The expansion service is responsible for using cross-language compatible coders in the returning expanded transforms and these are the coders used in the runtime. Type information annotated by additional methods here is for compile-time type safety of external transforms. > > Thanks, >> Heejong >> >
Re: External transform API in Java SDK
What do side inputs look like? On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee wrote: > I wanted to know if anybody has any comment on external transform API for > Java SDK. > > `External.of()` can create external transform for Java SDK. Depending on > input and output types, two additional methods are provided: > `withMultiOutputs()` which specifies the type of PCollection and > `withOutputType()` which specifies the type of output element. Some > examples are: > > PCollection col = > testPipeline > .apply(Create.of("1", "2", "3")) > .apply(External.of(*...*)); > > This is okay without additional methods since 1) input and output types of > external transform can be inferred 2) output PCollection is singular. > How does the type/coder at runtime get inferred (doesn't java's type erasure get rid of this information)? > PCollectionTuple pTuple = > testPipeline > .apply(Create.of(1, 2, 3, 4, 5, 6)) > .apply( > External.of(*...*).withMultiOutputs()); > > This requires `withMultiOutputs()` since output PCollection is > PCollectionTuple. > Shouldn't this require a mapping from "output" name to coder/type variable to be specified as an argument to withMultiOutputs? > PCollection pCol = > testPipeline > .apply(Create.of("1", "2", "2", "3", "3", "3")) > .apply( > External.of(...) > .>withOutputType()) > .apply( > "toString", > MapElements.into(TypeDescriptors.strings()).via(x > -> String.format("%s->%s", x.getKey(), x.getValue(; > > This requires `withOutputType()` since the output element type cannot be > inferred from method chaining. I think some users may feel awkward to call > method only with the type parameter and empty parenthesis. Without > `withOutputType()`, the type of output element will be java.lang.Object > which might still be forcefully casted to KV. > How does the output type get preserved in this case (since Java's type erasure would remove > after compilation and coder inference in my opinion should be broken and or choosing something generic like serializable)? Thanks, > Heejong >
External transform API in Java SDK
I wanted to know if anybody has any comment on external transform API for Java SDK. `External.of()` can create external transform for Java SDK. Depending on input and output types, two additional methods are provided: `withMultiOutputs()` which specifies the type of PCollection and `withOutputType()` which specifies the type of output element. Some examples are: PCollection col = testPipeline .apply(Create.of("1", "2", "3")) .apply(External.of(*...*)); This is okay without additional methods since 1) input and output types of external transform can be inferred 2) output PCollection is singular. PCollectionTuple pTuple = testPipeline .apply(Create.of(1, 2, 3, 4, 5, 6)) .apply( External.of(*...*).withMultiOutputs()); This requires `withMultiOutputs()` since output PCollection is PCollectionTuple. PCollection pCol = testPipeline .apply(Create.of("1", "2", "2", "3", "3", "3")) .apply( External.of(...) .>withOutputType()) .apply( "toString", MapElements.into(TypeDescriptors.strings()).via( x -> String.format("%s->%s", x.getKey(), x.getValue(; This requires `withOutputType()` since the output element type cannot be inferred from method chaining. I think some users may feel awkward to call method only with the type parameter and empty parenthesis. Without `withOutputType()`, the type of output element will be java.lang.Object which might still be forcefully casted to KV. Thanks, Heejong