Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-03 Thread Jan Lukavský
Yes, I think we can definitely start with spark-specific config option, 
but there could be value for other runners to know if output of 
@ProcessElement is somewhat limited in size (e.g. can be can be included 
in single bundle), or needs to be actively split. This could then be 
incorporated into the naive bounded implementation that is reused by 
multiple runners [1], which currently does not do any (active) splits of 
running restriction. But this might be a different discussion.


 Jan

[1]https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java#L111

On 1/3/23 11:38, Jozef Vilcek wrote:
Regarding splitting, I think SDF is being split on spark runner, but I 
agree with Jan's comments about split's contract. Specific SDF is also 
free to make decisions about how big the minimal split will be and the 
runner should be able to process that with reasonable resources. E.g. 
ParquetIO is splitting on format's row groups. If the row group is 
larger and format contains a lot of well compressed column data, it 
will challenge memory resources.


Jan, as for suggested options to implement it, I have an MR with 
approach 1) to translate all SDFs to two-threaded executions. I did 
consider something like option 3) but I was not sure if it makes sense 
in general for other runners as well for Spark. It begs a question for 
me if it ever makes sense to create SDF and want it on Spark not to 
use 2 thread execution and possibly apply memory pressure.


On Mon, Jan 2, 2023 at 4:49 PM Jan Lukavský  wrote:

There are different translations of streaming and batch Pipelines
in SparkRunner, this thread was focused on the batch part, if I
understand it correctly. Unbounded PCollections are not supported
in batch Spark (by definition). I agree that fixing the splitting
is a valid option, though it still requires unnecessarily big heap
for buffering and/or might induce some overhead with splitting the
restriction. Not to mention, that the splitting is somewhat
optional in the contract of SDF (the DoFn might not support it, if
it is bounded), so it might not solve the issue for all SDFs. The
source might not even be splittable at all (e.g. a completely
compressed blob, without any blocks).

 Jan

On 1/2/23 16:22, Daniel Collins via dev wrote:

If spark's SDF solution doesn't support splitting, fixing that
seems like the best solution to me. Splitting is the mechanism
exposed by the model to actually limit the amount of data
produced in a bundle. If unsupported, then unbounded-per-element
SDFs wouldn't be supported at all.

-Daniel

On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský  wrote:

Hi Jozef,

I agree that this issue is most likely related to Spark for
the reason how Spark uses functional style for doing flatMap().

It could be fixed with the following two options:

 a) SparkRunner's SDF implementation does not use splitting -
it could be fixed so that the SDF is stopped after N elements
buffered via trySplit, buffer gets flushed and the
restriction is resumed

 b) alternatively use two threads and a BlockingQueue between
them, which is what you propose

The number of output elements per input element is bounded
(we are talking about batch case anyway), but bounded does
not mean it has to fit to memory. Furthermore, unnecessary
buffering of large number of elements is memory-inefficient,
which is why I think that the two-thread approach (b) should
be the most efficient. The option (a) seems orthogonal and
might be implemented as well.

It rises the question of how to determine if the runner
should do some special translation of SDF in this case. There
are probably only these options:

 1) translate all SDFs to two-thread execution

 2) add runtime flag, that will turn the translation on (once
turned on, it will translate all SDFs) - this is the current
proposal

 3) extend @DoFn.BoundedPerElement annotation with some kind
of (optional) hint - e.g.
@DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the default
would be Bounded.FITS_IN_MEMORY (which is the current approach)

The approach (3) seems to give more information to all
runners and might result in the ability to apply various
optimizations for multiple runners, so I'd say that this
might be the ideal variant.

  Jan

On 12/29/22 13:07, Jozef Vilcek wrote:

I am surprised to hear that Dataflow runner ( which I never
used ) would have this kind oflimitation. I see that the
`OutputManager` interface is implemented to write to
`Receiver` [1] which follows the push model. Do you have a

Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-03 Thread Jozef Vilcek
Regarding splitting, I think SDF is being split on spark runner, but I
agree with Jan's comments about split's contract. Specific SDF is also free
to make decisions about how big the minimal split will be and the runner
should be able to process that with reasonable resources. E.g. ParquetIO is
splitting on format's row groups. If the row group is larger and format
contains a lot of well compressed column data, it will challenge memory
resources.

Jan, as for suggested options to implement it, I have an MR with approach
1) to translate all SDFs to two-threaded executions. I did consider
something like option 3) but I was not sure if it makes sense in general
for other runners as well for Spark. It begs a question for me if it ever
makes sense to create SDF and want it on Spark not to use 2 thread
execution and possibly apply memory pressure.


On Mon, Jan 2, 2023 at 4:49 PM Jan Lukavský  wrote:

> There are different translations of streaming and batch Pipelines in
> SparkRunner, this thread was focused on the batch part, if I understand it
> correctly. Unbounded PCollections are not supported in batch Spark (by
> definition). I agree that fixing the splitting is a valid option, though it
> still requires unnecessarily big heap for buffering and/or might induce
> some overhead with splitting the restriction. Not to mention, that the
> splitting is somewhat optional in the contract of SDF (the DoFn might not
> support it, if it is bounded), so it might not solve the issue for all
> SDFs. The source might not even be splittable at all (e.g. a completely
> compressed blob, without any blocks).
>
>  Jan
> On 1/2/23 16:22, Daniel Collins via dev wrote:
>
> If spark's SDF solution doesn't support splitting, fixing that seems like
> the best solution to me. Splitting is the mechanism exposed by the model to
> actually limit the amount of data produced in a bundle. If unsupported,
> then unbounded-per-element SDFs wouldn't be supported at all.
>
> -Daniel
>
> On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský  wrote:
>
>> Hi Jozef,
>>
>> I agree that this issue is most likely related to Spark for the reason
>> how Spark uses functional style for doing flatMap().
>>
>> It could be fixed with the following two options:
>>
>>  a) SparkRunner's SDF implementation does not use splitting - it could be
>> fixed so that the SDF is stopped after N elements buffered via trySplit,
>> buffer gets flushed and the restriction is resumed
>>
>>  b) alternatively use two threads and a BlockingQueue between them, which
>> is what you propose
>>
>> The number of output elements per input element is bounded (we are
>> talking about batch case anyway), but bounded does not mean it has to fit
>> to memory. Furthermore, unnecessary buffering of large number of elements
>> is memory-inefficient, which is why I think that the two-thread approach
>> (b) should be the most efficient. The option (a) seems orthogonal and might
>> be implemented as well.
>>
>> It rises the question of how to determine if the runner should do some
>> special translation of SDF in this case. There are probably only these
>> options:
>>
>>  1) translate all SDFs to two-thread execution
>>
>>  2) add runtime flag, that will turn the translation on (once turned on,
>> it will translate all SDFs) - this is the current proposal
>>
>>  3) extend @DoFn.BoundedPerElement annotation with some kind of
>> (optional) hint - e.g. @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the
>> default would be Bounded.FITS_IN_MEMORY (which is the current approach)
>>
>> The approach (3) seems to give more information to all runners and might
>> result in the ability to apply various optimizations for multiple runners,
>> so I'd say that this might be the ideal variant.
>>
>>   Jan
>> On 12/29/22 13:07, Jozef Vilcek wrote:
>>
>> I am surprised to hear that Dataflow runner ( which I never used ) would
>> have this kind oflimitation. I see that the `OutputManager` interface is
>> implemented to write to `Receiver` [1] which follows the push model. Do you
>> have a reference I can take a look to review the must fit memory
>> limitation?
>>
>> In Spark, the problem is that the leaf operator pulls data from previous
>> ones by consuming an `Iterator` of values. As per your suggestion, this is
>> not a problem with `sources` because they hold e.g. source file and can
>> pull data as they are being requested. This gets problematic exactly with
>> SDF and flatMaps and not sources. It could be one of the reasons why SDF
>> performed badly on Spark where community reported performance degradation
>> [2] and increases memory use [3]
>>
>> My proposed solution is to, similar as Dataflow, use `Receiver`-like
>> implementation for DoFns which can output large number of elements. For
>> now, this WIP targets SDFs only.
>>
>> [1]
>> https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
>> [2] 

Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-02 Thread Jan Lukavský
There are different translations of streaming and batch Pipelines in 
SparkRunner, this thread was focused on the batch part, if I understand 
it correctly. Unbounded PCollections are not supported in batch Spark 
(by definition). I agree that fixing the splitting is a valid option, 
though it still requires unnecessarily big heap for buffering and/or 
might induce some overhead with splitting the restriction. Not to 
mention, that the splitting is somewhat optional in the contract of SDF 
(the DoFn might not support it, if it is bounded), so it might not solve 
the issue for all SDFs. The source might not even be splittable at all 
(e.g. a completely compressed blob, without any blocks).


 Jan

On 1/2/23 16:22, Daniel Collins via dev wrote:
If spark's SDF solution doesn't support splitting, fixing that seems 
like the best solution to me. Splitting is the mechanism exposed by 
the model to actually limit the amount of data produced in a bundle. 
If unsupported, then unbounded-per-element SDFs wouldn't be supported 
at all.


-Daniel

On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský  wrote:

Hi Jozef,

I agree that this issue is most likely related to Spark for the
reason how Spark uses functional style for doing flatMap().

It could be fixed with the following two options:

 a) SparkRunner's SDF implementation does not use splitting - it
could be fixed so that the SDF is stopped after N elements
buffered via trySplit, buffer gets flushed and the restriction is
resumed

 b) alternatively use two threads and a BlockingQueue between
them, which is what you propose

The number of output elements per input element is bounded (we are
talking about batch case anyway), but bounded does not mean it has
to fit to memory. Furthermore, unnecessary buffering of large
number of elements is memory-inefficient, which is why I think
that the two-thread approach (b) should be the most efficient. The
option (a) seems orthogonal and might be implemented as well.

It rises the question of how to determine if the runner should do
some special translation of SDF in this case. There are probably
only these options:

 1) translate all SDFs to two-thread execution

 2) add runtime flag, that will turn the translation on (once
turned on, it will translate all SDFs) - this is the current proposal

 3) extend @DoFn.BoundedPerElement annotation with some kind of
(optional) hint - e.g.
@DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the default would
be Bounded.FITS_IN_MEMORY (which is the current approach)

The approach (3) seems to give more information to all runners and
might result in the ability to apply various optimizations for
multiple runners, so I'd say that this might be the ideal variant.

  Jan

On 12/29/22 13:07, Jozef Vilcek wrote:

I am surprised to hear that Dataflow runner ( which I never used
) would have this kind oflimitation. I see that the
`OutputManager` interface is implemented to write to `Receiver`
[1] which follows the push model. Do you have a reference I can
take a look to review the must fit memory limitation?

In Spark, the problem is that the leaf operator pulls data from
previous ones by consuming an `Iterator` of values. As per your
suggestion, this is not a problem with `sources` because they
hold e.g. source file and can pull data as they are being
requested. This gets problematic exactly with SDF and flatMaps
and not sources. It could be one of the reasons why SDF performed
badly on Spark where community reported performance degradation
[2] and increases memory use [3]

My proposed solution is to, similar as Dataflow, use
`Receiver`-like implementation for DoFns which can output large
number of elements. For now, this WIP targets SDFs only.

[1]

https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
[2] https://github.com/apache/beam/pull/14755
[3]

https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005



On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev
 wrote:

I believe that for dataflow runner, the result of
processElement must also fit in memory, so this is not just a
constraint for the spark runner.

The best approach at present might be to convert the source
from a flatMap to an SDF that reads out chunks of the file at
a time, and supports runner checkpointing (i.e. with a file
seek point to resume from) to chunk your data in a way that
doesn't 

Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-02 Thread Daniel Collins via dev
If spark's SDF solution doesn't support splitting, fixing that seems like
the best solution to me. Splitting is the mechanism exposed by the model to
actually limit the amount of data produced in a bundle. If unsupported,
then unbounded-per-element SDFs wouldn't be supported at all.

-Daniel

On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský  wrote:

> Hi Jozef,
>
> I agree that this issue is most likely related to Spark for the reason how
> Spark uses functional style for doing flatMap().
>
> It could be fixed with the following two options:
>
>  a) SparkRunner's SDF implementation does not use splitting - it could be
> fixed so that the SDF is stopped after N elements buffered via trySplit,
> buffer gets flushed and the restriction is resumed
>
>  b) alternatively use two threads and a BlockingQueue between them, which
> is what you propose
>
> The number of output elements per input element is bounded (we are talking
> about batch case anyway), but bounded does not mean it has to fit to
> memory. Furthermore, unnecessary buffering of large number of elements is
> memory-inefficient, which is why I think that the two-thread approach (b)
> should be the most efficient. The option (a) seems orthogonal and might be
> implemented as well.
>
> It rises the question of how to determine if the runner should do some
> special translation of SDF in this case. There are probably only these
> options:
>
>  1) translate all SDFs to two-thread execution
>
>  2) add runtime flag, that will turn the translation on (once turned on,
> it will translate all SDFs) - this is the current proposal
>
>  3) extend @DoFn.BoundedPerElement annotation with some kind of (optional)
> hint - e.g. @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the default
> would be Bounded.FITS_IN_MEMORY (which is the current approach)
>
> The approach (3) seems to give more information to all runners and might
> result in the ability to apply various optimizations for multiple runners,
> so I'd say that this might be the ideal variant.
>
>   Jan
> On 12/29/22 13:07, Jozef Vilcek wrote:
>
> I am surprised to hear that Dataflow runner ( which I never used ) would
> have this kind oflimitation. I see that the `OutputManager` interface is
> implemented to write to `Receiver` [1] which follows the push model. Do you
> have a reference I can take a look to review the must fit memory
> limitation?
>
> In Spark, the problem is that the leaf operator pulls data from previous
> ones by consuming an `Iterator` of values. As per your suggestion, this is
> not a problem with `sources` because they hold e.g. source file and can
> pull data as they are being requested. This gets problematic exactly with
> SDF and flatMaps and not sources. It could be one of the reasons why SDF
> performed badly on Spark where community reported performance degradation
> [2] and increases memory use [3]
>
> My proposed solution is to, similar as Dataflow, use `Receiver`-like
> implementation for DoFns which can output large number of elements. For
> now, this WIP targets SDFs only.
>
> [1]
> https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
> [2] https://github.com/apache/beam/pull/14755
> [3]
> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005
>
> On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev <
> dev@beam.apache.org> wrote:
>
>> I believe that for dataflow runner, the result of processElement must
>> also fit in memory, so this is not just a constraint for the spark runner.
>>
>> The best approach at present might be to convert the source from a
>> flatMap to an SDF that reads out chunks of the file at a time, and supports
>> runner checkpointing (i.e. with a file seek point to resume from) to chunk
>> your data in a way that doesn't require the runner to support unbounded
>> outputs from any individual @ProcessElements downcall.
>>
>> -Daniel
>>
>> On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek 
>> wrote:
>>
>>> Hello,
>>>
>>> I am working on an issue which currently limits spark runner by
>>> requiring the result of processElement to fit the memory [1]. This is
>>> problematic e.g for flatMap where the input element is file split and
>>> generates possibly large output.
>>>
>>> The intended fix is to add an option to have dofn processing over input
>>> in one thread and consumption of outputs and forwarding them to downstream
>>> operators in another thread. One challenge for me is to identify which DoFn
>>> should be using this async approach.
>>>
>>> Here [2] is a commit which is WIP and use async processing only for SDF
>>> naive expansion. I would like to get feedback on:
>>>
>>> 1) does the approach make sense overall
>>>
>>> 2) to target DoFn which needs an async processing __ generates possibly
>>> large output __ I am currently just checking if it is 

Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-02 Thread Jan Lukavský

Hi Jozef,

I agree that this issue is most likely related to Spark for the reason 
how Spark uses functional style for doing flatMap().


It could be fixed with the following two options:

 a) SparkRunner's SDF implementation does not use splitting - it could 
be fixed so that the SDF is stopped after N elements buffered via 
trySplit, buffer gets flushed and the restriction is resumed


 b) alternatively use two threads and a BlockingQueue between them, 
which is what you propose


The number of output elements per input element is bounded (we are 
talking about batch case anyway), but bounded does not mean it has to 
fit to memory. Furthermore, unnecessary buffering of large number of 
elements is memory-inefficient, which is why I think that the two-thread 
approach (b) should be the most efficient. The option (a) seems 
orthogonal and might be implemented as well.


It rises the question of how to determine if the runner should do some 
special translation of SDF in this case. There are probably only these 
options:


 1) translate all SDFs to two-thread execution

 2) add runtime flag, that will turn the translation on (once turned 
on, it will translate all SDFs) - this is the current proposal


 3) extend @DoFn.BoundedPerElement annotation with some kind of 
(optional) hint - e.g. @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), 
the default would be Bounded.FITS_IN_MEMORY (which is the current approach)


The approach (3) seems to give more information to all runners and might 
result in the ability to apply various optimizations for multiple 
runners, so I'd say that this might be the ideal variant.


  Jan

On 12/29/22 13:07, Jozef Vilcek wrote:
I am surprised to hear that Dataflow runner ( which I never used ) 
would have this kind oflimitation. I see that the `OutputManager` 
interface is implemented to write to `Receiver` [1] which follows the 
push model. Do you have a reference I can take a look to review the 
must fit memory limitation?


In Spark, the problem is that the leaf operator pulls data from 
previous ones by consuming an `Iterator` of values. As per your 
suggestion, this is not a problem with `sources` because they hold 
e.g. source file and can pull data as they are being requested. This 
gets problematic exactly with SDF and flatMaps and not sources. It 
could be one of the reasons why SDF performed badly on Spark where 
community reported performance degradation [2] and increases memory 
use [3]


My proposed solution is to, similar as Dataflow, use `Receiver`-like 
implementation for DoFns which can output large number of elements. 
For now, this WIP targets SDFs only.


[1] 
https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285

[2] https://github.com/apache/beam/pull/14755
[3] 
https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005 



On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev 
 wrote:


I believe that for dataflow runner, the result of processElement
must also fit in memory, so this is not just a constraint for the
spark runner.

The best approach at present might be to convert the source from a
flatMap to an SDF that reads out chunks of the file at a time, and
supports runner checkpointing (i.e. with a file seek point to
resume from) to chunk your data in a way that doesn't require the
runner to support unbounded outputs from any individual
@ProcessElements downcall.

-Daniel

On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek
 wrote:

Hello,

I am working on an issue which currently limits spark runner
by requiring the result of processElement to fit the memory
[1]. This is problematic e.g for flatMap where the input
element is file split and generates possibly large output.

The intended fix is to add an option to have dofn processing
over input in one thread and consumption of outputs and
forwarding them to downstream operators in another thread. One
challenge for me is to identify which DoFn should be using
this async approach.

Here [2] is a commit which is WIP and use async processing
only for SDF naive expansion. I would like to get feedback on:

1) does the approach make sense overall

2) to target DoFn which needs an async processing __ generates
possibly large output __ I am currently just checking if it is
DoFn of SDF naive expansion type [3]. I failed to find a
better / more systematic approach for identifying which DoFn
should benefit from that. I would appreciate any thoughts how
to make 

Re: SparkRunner - ensure SDF output does not need to fit in memory

2022-12-29 Thread Jozef Vilcek
I am surprised to hear that Dataflow runner ( which I never used ) would
have this kind oflimitation. I see that the `OutputManager` interface is
implemented to write to `Receiver` [1] which follows the push model. Do you
have a reference I can take a look to review the must fit memory limitation?

In Spark, the problem is that the leaf operator pulls data from previous
ones by consuming an `Iterator` of values. As per your suggestion, this is
not a problem with `sources` because they hold e.g. source file and can
pull data as they are being requested. This gets problematic exactly with
SDF and flatMaps and not sources. It could be one of the reasons why SDF
performed badly on Spark where community reported performance degradation
[2] and increases memory use [3]

My proposed solution is to, similar as Dataflow, use `Receiver`-like
implementation for DoFns which can output large number of elements. For
now, this WIP targets SDFs only.

[1]
https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
[2] https://github.com/apache/beam/pull/14755
[3]
https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005

On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev 
wrote:

> I believe that for dataflow runner, the result of processElement must also
> fit in memory, so this is not just a constraint for the spark runner.
>
> The best approach at present might be to convert the source from a flatMap
> to an SDF that reads out chunks of the file at a time, and supports runner
> checkpointing (i.e. with a file seek point to resume from) to chunk your
> data in a way that doesn't require the runner to support unbounded outputs
> from any individual @ProcessElements downcall.
>
> -Daniel
>
> On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek 
> wrote:
>
>> Hello,
>>
>> I am working on an issue which currently limits spark runner by requiring
>> the result of processElement to fit the memory [1]. This is problematic e.g
>> for flatMap where the input element is file split and generates possibly
>> large output.
>>
>> The intended fix is to add an option to have dofn processing over input
>> in one thread and consumption of outputs and forwarding them to downstream
>> operators in another thread. One challenge for me is to identify which DoFn
>> should be using this async approach.
>>
>> Here [2] is a commit which is WIP and use async processing only for SDF
>> naive expansion. I would like to get feedback on:
>>
>> 1) does the approach make sense overall
>>
>> 2) to target DoFn which needs an async processing __ generates possibly
>> large output __ I am currently just checking if it is DoFn of SDF naive
>> expansion type [3]. I failed to find a better / more systematic approach
>> for identifying which DoFn should benefit from that. I would appreciate any
>> thoughts how to make this better.
>>
>> 3) Config option and validatesRunner tests - do we want to make it
>> possible to turn async DoFn off? If yes, do we want to run validatesRunner
>> tests for borth options? How do I make sure of that?
>>
>> Looking forward to the feedback.
>> Best,
>> Jozef
>>
>> [1] https://github.com/apache/beam/issues/23852
>> [2]
>> https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff
>> [3]
>> https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362
>>
>>


Re: SparkRunner - ensure SDF output does not need to fit in memory

2022-12-28 Thread Daniel Collins via dev
I believe that for dataflow runner, the result of processElement must also
fit in memory, so this is not just a constraint for the spark runner.

The best approach at present might be to convert the source from a flatMap
to an SDF that reads out chunks of the file at a time, and supports runner
checkpointing (i.e. with a file seek point to resume from) to chunk your
data in a way that doesn't require the runner to support unbounded outputs
from any individual @ProcessElements downcall.

-Daniel

On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek  wrote:

> Hello,
>
> I am working on an issue which currently limits spark runner by requiring
> the result of processElement to fit the memory [1]. This is problematic e.g
> for flatMap where the input element is file split and generates possibly
> large output.
>
> The intended fix is to add an option to have dofn processing over input in
> one thread and consumption of outputs and forwarding them to downstream
> operators in another thread. One challenge for me is to identify which DoFn
> should be using this async approach.
>
> Here [2] is a commit which is WIP and use async processing only for SDF
> naive expansion. I would like to get feedback on:
>
> 1) does the approach make sense overall
>
> 2) to target DoFn which needs an async processing __ generates possibly
> large output __ I am currently just checking if it is DoFn of SDF naive
> expansion type [3]. I failed to find a better / more systematic approach
> for identifying which DoFn should benefit from that. I would appreciate any
> thoughts how to make this better.
>
> 3) Config option and validatesRunner tests - do we want to make it
> possible to turn async DoFn off? If yes, do we want to run validatesRunner
> tests for borth options? How do I make sure of that?
>
> Looking forward to the feedback.
> Best,
> Jozef
>
> [1] https://github.com/apache/beam/issues/23852
> [2]
> https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff
> [3]
> https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362
>
>


SparkRunner - ensure SDF output does not need to fit in memory

2022-12-28 Thread Jozef Vilcek
Hello,

I am working on an issue which currently limits spark runner by requiring
the result of processElement to fit the memory [1]. This is problematic e.g
for flatMap where the input element is file split and generates possibly
large output.

The intended fix is to add an option to have dofn processing over input in
one thread and consumption of outputs and forwarding them to downstream
operators in another thread. One challenge for me is to identify which DoFn
should be using this async approach.

Here [2] is a commit which is WIP and use async processing only for SDF
naive expansion. I would like to get feedback on:

1) does the approach make sense overall

2) to target DoFn which needs an async processing __ generates possibly
large output __ I am currently just checking if it is DoFn of SDF naive
expansion type [3]. I failed to find a better / more systematic approach
for identifying which DoFn should benefit from that. I would appreciate any
thoughts how to make this better.

3) Config option and validatesRunner tests - do we want to make it possible
to turn async DoFn off? If yes, do we want to run validatesRunner tests for
borth options? How do I make sure of that?

Looking forward to the feedback.
Best,
Jozef

[1] https://github.com/apache/beam/issues/23852
[2]
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff
[3]
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362