Re: External transform API in Java SDK

2020-01-02 Thread Heejong Lee
If we pass in TypeDescriptor objects instead of Java type information for
the compiler, we could match the returning coders and the given type
descriptors at pipeline construction time. It would be helpful to prevent
pipeline failing by class cast exception in runners. I've create the jira
ticket: https://issues.apache.org/jira/browse/BEAM-9048

On Mon, Dec 30, 2019 at 10:27 AM Luke Cwik  wrote:

>
>
> On Mon, Dec 23, 2019 at 12:20 PM Heejong Lee  wrote:
>
>>
>>
>> On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik  wrote:
>>
>>> What do side inputs look like?
>>>
>>
>> A user needs to first pass PCollections for side inputs into the external
>> transform in addition to ordinary input PCollections and define
>> PCollectionViews inside the external transform something like:
>>
>> PCollectionTuple pTuple =
>> PCollectionTuple.of("main1", main1)
>> .and("main2", main2)
>> .and("side", side)
>> .apply(External.of(...).withMultiOutputs());
>>
>> public static class TestTransform extends PTransform> PCollectionTuple> {
>>   @Override
>>   public PCollectionTuple expand(PCollectionTuple input) {
>> PCollectionView sideView = 
>> input.get("side").apply(View.asSingleton());
>> PCollection main =
>> PCollectionList.of(input.get("main1"))
>> .and(input.get("main2"))
>> .apply(Flatten.pCollections())
>> .apply(
>> ParDo.of(
>> new DoFn() {
>>   @ProcessElement
>>   public void processElement(
>>   @Element String x,
>>   OutputReceiver out,
>>   DoFn.ProcessContext c) {
>>         out.output(x + c.sideInput(sideView));
>>   }
>> })
>> .withSideInputs(sideView));
>>
>>
>>
>>> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee  wrote:
>>>
>>>> I wanted to know if anybody has any comment on external transform API
>>>> for Java SDK.
>>>>
>>>> `External.of()` can create external transform for Java SDK. Depending
>>>> on input and output types, two additional methods are provided:
>>>> `withMultiOutputs()` which specifies the type of PCollection and
>>>> `withOutputType()` which specifies the type of output element. Some
>>>> examples are:
>>>>
>>>> PCollection col =
>>>> testPipeline
>>>> .apply(Create.of("1", "2", "3"))
>>>> .apply(External.of(*...*));
>>>>
>>>> This is okay without additional methods since 1) input and output types
>>>> of external transform can be inferred 2) output PCollection is singular.
>>>>
>>>
>>> How does the type/coder at runtime get inferred (doesn't java's type
>>> erasure get rid of this information)?
>>>
>>
>>>
>>>> PCollectionTuple pTuple =
>>>> testPipeline
>>>> .apply(Create.of(1, 2, 3, 4, 5, 6))
>>>> .apply(
>>>> External.of(*...*).withMultiOutputs());
>>>>
>>>> This requires `withMultiOutputs()` since output PCollection is
>>>> PCollectionTuple.
>>>>
>>>
>>> Shouldn't this require a mapping from "output" name to coder/type
>>> variable to be specified as an argument to withMultiOutputs?
>>>
>>>
>>>> PCollection pCol =
>>>> testPipeline
>>>> .apply(Create.of("1", "2", "2", "3", "3", "3"))
>>>> .apply(
>>>> External.of(...)
>>>> .>withOutputType())
>>>> .apply(
>>>> "toString",
>>>> MapElements.into(TypeDescriptors.strings()).via(   
>>>>  x -> String.format("%s->%s", x.getKey(), x.getValue(;
>>>>
>>>>  This requires `withOutputType()` since the output element type cannot
>>>> be inferred from method chaining. I think some users may feel awkward to
>>>> call method only with the type parameter and empty parenthesis. Without
>>>> `withOutputType()`, the type of outpu

Re: External transform API in Java SDK

2019-12-30 Thread Luke Cwik
On Mon, Dec 23, 2019 at 12:20 PM Heejong Lee  wrote:

>
>
> On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik  wrote:
>
>> What do side inputs look like?
>>
>
> A user needs to first pass PCollections for side inputs into the external
> transform in addition to ordinary input PCollections and define
> PCollectionViews inside the external transform something like:
>
> PCollectionTuple pTuple =
> PCollectionTuple.of("main1", main1)
> .and("main2", main2)
> .and("side", side)
> .apply(External.of(...).withMultiOutputs());
>
> public static class TestTransform extends PTransform PCollectionTuple> {
>   @Override
>   public PCollectionTuple expand(PCollectionTuple input) {
> PCollectionView sideView = 
> input.get("side").apply(View.asSingleton());
> PCollection main =
> PCollectionList.of(input.get("main1"))
> .and(input.get("main2"))
> .apply(Flatten.pCollections())
> .apply(
> ParDo.of(
> new DoFn() {
>   @ProcessElement
>   public void processElement(
>   @Element String x,
>   OutputReceiver out,
>   DoFn.ProcessContext c) {
> out.output(x + c.sideInput(sideView));
>   }
>     })
>         .withSideInputs(sideView));
>
>
>
>> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee  wrote:
>>
>>> I wanted to know if anybody has any comment on external transform API
>>> for Java SDK.
>>>
>>> `External.of()` can create external transform for Java SDK. Depending on
>>> input and output types, two additional methods are provided:
>>> `withMultiOutputs()` which specifies the type of PCollection and
>>> `withOutputType()` which specifies the type of output element. Some
>>> examples are:
>>>
>>> PCollection col =
>>> testPipeline
>>> .apply(Create.of("1", "2", "3"))
>>> .apply(External.of(*...*));
>>>
>>> This is okay without additional methods since 1) input and output types
>>> of external transform can be inferred 2) output PCollection is singular.
>>>
>>
>> How does the type/coder at runtime get inferred (doesn't java's type
>> erasure get rid of this information)?
>>
>
>>
>>> PCollectionTuple pTuple =
>>> testPipeline
>>> .apply(Create.of(1, 2, 3, 4, 5, 6))
>>> .apply(
>>> External.of(*...*).withMultiOutputs());
>>>
>>> This requires `withMultiOutputs()` since output PCollection is
>>> PCollectionTuple.
>>>
>>
>> Shouldn't this require a mapping from "output" name to coder/type
>> variable to be specified as an argument to withMultiOutputs?
>>
>>
>>> PCollection pCol =
>>> testPipeline
>>> .apply(Create.of("1", "2", "2", "3", "3", "3"))
>>> .apply(
>>> External.of(...)
>>> .>withOutputType())
>>> .apply(
>>> "toString",
>>> MapElements.into(TypeDescriptors.strings()).via(
>>> x -> String.format("%s->%s", x.getKey(), x.getValue(;
>>>
>>>  This requires `withOutputType()` since the output element type cannot
>>> be inferred from method chaining. I think some users may feel awkward to
>>> call method only with the type parameter and empty parenthesis. Without
>>> `withOutputType()`, the type of output element will be java.lang.Object
>>> which might still be forcefully casted to KV.
>>>
>>
>> How does the output type get preserved in this case (since Java's type
>> erasure would remove > after compilation and coder
>> inference in my opinion should be broken and or choosing something generic
>> like serializable)?
>>
>
> The expansion service is responsible for using cross-language compatible
> coders in the returning expanded transforms and these are the coders used
> in the runtime. Type information annotated by additional methods here is
> for compile-time type safety of external transforms.
>

Note that *.>withOutputType()* could be changed to
*.withOutputType()* and we would get a *PCollection* since
*withOutputType* doesn't actually do anything at runtime and is just to
make types align during compilation.

Is there a way to ensure that the output type is actually compatible with
the coder that was returned after expansion (this would likely require you
to pass in typing information into *withOutputType*, see
TypeDescriptors[1])?

1:
https://github.com/apache/beam/blob/4c18cb4ada2650552a0006dfffd68d0775dd76c6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java


>>
> Thanks,
>>> Heejong
>>>
>>


Re: External transform API in Java SDK

2019-12-23 Thread Heejong Lee
On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik  wrote:

> What do side inputs look like?
>

A user needs to first pass PCollections for side inputs into the external
transform in addition to ordinary input PCollections and define
PCollectionViews inside the external transform something like:

PCollectionTuple pTuple =
PCollectionTuple.of("main1", main1)
.and("main2", main2)
.and("side", side)
.apply(External.of(...).withMultiOutputs());

public static class TestTransform extends PTransform {
  @Override
  public PCollectionTuple expand(PCollectionTuple input) {
PCollectionView sideView =
input.get("side").apply(View.asSingleton());
PCollection main =
PCollectionList.of(input.get("main1"))
.and(input.get("main2"))
.apply(Flatten.pCollections())
.apply(
ParDo.of(
new DoFn() {
  @ProcessElement
  public void processElement(
  @Element String x,
  OutputReceiver out,
  DoFn.ProcessContext c) {
out.output(x + c.sideInput(sideView));
  }
})
.withSideInputs(sideView));



> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee  wrote:
>
>> I wanted to know if anybody has any comment on external transform API for
>> Java SDK.
>>
>> `External.of()` can create external transform for Java SDK. Depending on
>> input and output types, two additional methods are provided:
>> `withMultiOutputs()` which specifies the type of PCollection and
>> `withOutputType()` which specifies the type of output element. Some
>> examples are:
>>
>> PCollection col =
>> testPipeline
>> .apply(Create.of("1", "2", "3"))
>> .apply(External.of(*...*));
>>
>> This is okay without additional methods since 1) input and output types
>> of external transform can be inferred 2) output PCollection is singular.
>>
>
> How does the type/coder at runtime get inferred (doesn't java's type
> erasure get rid of this information)?
>

>
>> PCollectionTuple pTuple =
>> testPipeline
>> .apply(Create.of(1, 2, 3, 4, 5, 6))
>> .apply(
>> External.of(*...*).withMultiOutputs());
>>
>> This requires `withMultiOutputs()` since output PCollection is
>> PCollectionTuple.
>>
>
> Shouldn't this require a mapping from "output" name to coder/type variable
> to be specified as an argument to withMultiOutputs?
>
>
>> PCollection pCol =
>> testPipeline
>> .apply(Create.of("1", "2", "2", "3", "3", "3"))
>> .apply(
>> External.of(...)
>> .>withOutputType())
>> .apply(
>> "toString",
>> MapElements.into(TypeDescriptors.strings()).via(
>> x -> String.format("%s->%s", x.getKey(), x.getValue(;
>>
>>  This requires `withOutputType()` since the output element type cannot be
>> inferred from method chaining. I think some users may feel awkward to call
>> method only with the type parameter and empty parenthesis. Without
>> `withOutputType()`, the type of output element will be java.lang.Object
>> which might still be forcefully casted to KV.
>>
>
> How does the output type get preserved in this case (since Java's type
> erasure would remove > after compilation and coder
> inference in my opinion should be broken and or choosing something generic
> like serializable)?
>

The expansion service is responsible for using cross-language compatible
coders in the returning expanded transforms and these are the coders used
in the runtime. Type information annotated by additional methods here is
for compile-time type safety of external transforms.


>
>
Thanks,
>> Heejong
>>
>


Re: External transform API in Java SDK

2019-12-20 Thread Luke Cwik
What do side inputs look like?

On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee  wrote:

> I wanted to know if anybody has any comment on external transform API for
> Java SDK.
>
> `External.of()` can create external transform for Java SDK. Depending on
> input and output types, two additional methods are provided:
> `withMultiOutputs()` which specifies the type of PCollection and
> `withOutputType()` which specifies the type of output element. Some
> examples are:
>
> PCollection col =
> testPipeline
> .apply(Create.of("1", "2", "3"))
> .apply(External.of(*...*));
>
> This is okay without additional methods since 1) input and output types of
> external transform can be inferred 2) output PCollection is singular.
>

How does the type/coder at runtime get inferred (doesn't java's type
erasure get rid of this information)?


> PCollectionTuple pTuple =
> testPipeline
> .apply(Create.of(1, 2, 3, 4, 5, 6))
> .apply(
> External.of(*...*).withMultiOutputs());
>
> This requires `withMultiOutputs()` since output PCollection is
> PCollectionTuple.
>

Shouldn't this require a mapping from "output" name to coder/type variable
to be specified as an argument to withMultiOutputs?


> PCollection pCol =
> testPipeline
> .apply(Create.of("1", "2", "2", "3", "3", "3"))
> .apply(
> External.of(...)
> .>withOutputType())
> .apply(
> "toString",
> MapElements.into(TypeDescriptors.strings()).via(x 
> -> String.format("%s->%s", x.getKey(), x.getValue(;
>
>  This requires `withOutputType()` since the output element type cannot be
> inferred from method chaining. I think some users may feel awkward to call
> method only with the type parameter and empty parenthesis. Without
> `withOutputType()`, the type of output element will be java.lang.Object
> which might still be forcefully casted to KV.
>

How does the output type get preserved in this case (since Java's type
erasure would remove > after compilation and coder
inference in my opinion should be broken and or choosing something generic
like serializable)?

Thanks,
> Heejong
>


External transform API in Java SDK

2019-12-19 Thread Heejong Lee
I wanted to know if anybody has any comment on external transform API for
Java SDK.

`External.of()` can create external transform for Java SDK. Depending on
input and output types, two additional methods are provided:
`withMultiOutputs()` which specifies the type of PCollection and
`withOutputType()` which specifies the type of output element. Some
examples are:

PCollection col =
testPipeline
.apply(Create.of("1", "2", "3"))
.apply(External.of(*...*));

This is okay without additional methods since 1) input and output types of
external transform can be inferred 2) output PCollection is singular.

PCollectionTuple pTuple =
testPipeline
.apply(Create.of(1, 2, 3, 4, 5, 6))
.apply(
External.of(*...*).withMultiOutputs());

This requires `withMultiOutputs()` since output PCollection is
PCollectionTuple.

PCollection pCol =
testPipeline
.apply(Create.of("1", "2", "2", "3", "3", "3"))
.apply(
External.of(...)
.>withOutputType())
.apply(
"toString",
MapElements.into(TypeDescriptors.strings()).via(
 x -> String.format("%s->%s", x.getKey(), x.getValue(;

 This requires `withOutputType()` since the output element type cannot be
inferred from method chaining. I think some users may feel awkward to call
method only with the type parameter and empty parenthesis. Without
`withOutputType()`, the type of output element will be java.lang.Object
which might still be forcefully casted to KV.

Thanks,
Heejong