Re: [DISCUSS] SEP-32: Elasticity for Samza

2023-02-07 Thread Lakshmi Manasa
thank you Yi, Jagadish and Bharath for your reviews and +1 on the SEP.
I will close this discuss thread and start a vote thread.

On Tue, Feb 7, 2023 at 9:53 AM Bharath Kumara Subramanian <
codin.mart...@gmail.com> wrote:

> +1 on my end.
>
> Looks good to me.
> Thanks for putting this together, Manasa!
>
> Cheers,
> Bharath
>
>
>
> On Mon, Feb 6, 2023 at 11:51 PM Jagadish Venkatraman <
> jagadish1...@gmail.com> wrote:
>
>> Thank you Manasa for the proposal. I reviewed it and it looks good to me.
>> nice work!
>>
>> +1 (approve) from my end.
>>
>>
>>
>> On Mon, Feb 6, 2023 at 11:41 PM Yi Pan  wrote:
>>
>> > Hi, Manasa,
>> >
>> > Sorry for the late reply. The revision lgtm. Thanks for the great work!
>> >
>> > Best,
>> >
>> > -Yi
>> >
>> > On Mon, Jan 30, 2023 at 12:11 PM Lakshmi Manasa <
>> lakshmimanas...@gmail.com
>> > >
>> > wrote:
>> >
>> > > Hi Yi,
>> > >
>> > >  I have updated the SEP-32 including all feedback for the above
>> > questions.
>> > > Please let me know if there are any follow up questions.
>> > >
>> > > thanks,
>> > > Manasa
>> > >
>> > > On Mon, Jan 23, 2023 at 8:56 AM Lakshmi Manasa <
>> > lakshmimanas...@gmail.com>
>> > > wrote:
>> > >
>> > >> Hi Yi,
>> > >>
>> > >> thank you for raising these questions. Please find my answers inline
>> > >> below.
>> > >>
>> > >> *a) how are states for the virtual tasks managed during split/merge?*
>> > >> for this SEP, stateful job elasticity is future work. SEP-32
>> currently
>> > >> only deals with stateless elasticity
>> > >> The idea for state preserving elasticity is to have a requirement
>> that
>> > >> only jobs can guarantee a bijective mapping between state key and
>> input
>> > key
>> > >> will be supported.
>> > >> This requirement is needed so that when input keys move from one
>> virtual
>> > >> task to another, it is easy to identify which state keys should be
>> > present
>> > >> in the store of the virtual task for correct operation.
>> > >> additionally, stateful elasticity is only supported for jobs that
>> rely
>> > on
>> > >> blob store for backup and restore.
>> > >> Furthermore, for stateful jobs elasticity is increased or decreased
>> only
>> > >> in steps of 2.
>> > >> With these restrictions in place, when a job starts with elasticity
>> > >> factor 2, the state blob for the original task is copied for both
>> > virtual
>> > >> tasks during a split.
>> > >> for a merge, when two virtual tasks merge into one (virtual/original)
>> > >> task, the state blob of new task will need to be stitched from older
>> > blobs.
>> > >> This will need to be done by leveraging the stateKey input key
>> bijective
>> > >> mapping which will help determing for each state key in new blob, the
>> > value
>> > >> should come from which older blob
>> > >> (older blob belonged to a virtual task that consumed an input key
>> based
>> > >> on the keyBucket of the virutal task)
>> > >> That said the design for stateful needs more work and is planned for
>> a
>> > >> subsequent follow up SEP and this current SEP-32, focusses only on
>> > >> stateless jobs
>> > >>
>> > >> *b) what's perf impact when we have 2 virtual tasks on the same SSP
>> in
>> > >> the same container, while one virtual task is much faster than the
>> > other?*
>> > >> SystemConsumer subscribes to the input system at a partition level.
>> > >> Due to this even if one v. task is much faster than the other, since
>> > both
>> > >> consume the same SSP, system consumer of a container will only fetch
>> > only
>> > >> once the entire SSP buffer is empty.
>> > >> This means even though one v. task is much faster, the perf will be
>> > >> determined by the slower v. task.
>> > >> however, this is not worse than the pre-elastic job perf and if num
>> > >> containers is increased then the fast v.task can improve perf if the
>> > slow
>> > >> and fast v.task are in different containers (different system
>> consumers)
>> > >>
>> > >> *c) what's the reason that a virtual task can not filter older
>> messages
>> > >> from a previous offset, in case the container restarts from a smaller
>> > >> offset from another virtual task consuming the same SSP?*
>> > >> iiuc this question is for when a containers has two v. tasks that
>> > >> committed checkpoints for an SSP where one fast v.task commited a
>> newer
>> > >> offset and slow v.task committed an older offset.
>> > >> In this scenario, the SEP says there could be duplicate processing as
>> > the
>> > >> SystemConsumer will start consuming from the older offset for the
>> SSP.
>> > >> Yes, an improvement can be done to enable the v.task that committed a
>> > >> newer offset to start processing only from the offset after its
>> > checkpoint
>> > >> and filter out older messages.
>> > >>
>> > >> *d) how do we compare this w/ an alternative idea that implements a
>> > >> KeyedOrderedExecutor w/ multiple parallel threads within the single
>> > task's
>> > >> main event loop to increase the parallelism?*
>> > >> Is this similar to the per-key 

Re: [DISCUSS] SEP-32: Elasticity for Samza

2023-02-07 Thread Bharath Kumara Subramanian
+1 on my end.

Looks good to me.
Thanks for putting this together, Manasa!

Cheers,
Bharath



On Mon, Feb 6, 2023 at 11:51 PM Jagadish Venkatraman 
wrote:

> Thank you Manasa for the proposal. I reviewed it and it looks good to me.
> nice work!
>
> +1 (approve) from my end.
>
>
>
> On Mon, Feb 6, 2023 at 11:41 PM Yi Pan  wrote:
>
> > Hi, Manasa,
> >
> > Sorry for the late reply. The revision lgtm. Thanks for the great work!
> >
> > Best,
> >
> > -Yi
> >
> > On Mon, Jan 30, 2023 at 12:11 PM Lakshmi Manasa <
> lakshmimanas...@gmail.com
> > >
> > wrote:
> >
> > > Hi Yi,
> > >
> > >  I have updated the SEP-32 including all feedback for the above
> > questions.
> > > Please let me know if there are any follow up questions.
> > >
> > > thanks,
> > > Manasa
> > >
> > > On Mon, Jan 23, 2023 at 8:56 AM Lakshmi Manasa <
> > lakshmimanas...@gmail.com>
> > > wrote:
> > >
> > >> Hi Yi,
> > >>
> > >> thank you for raising these questions. Please find my answers inline
> > >> below.
> > >>
> > >> *a) how are states for the virtual tasks managed during split/merge?*
> > >> for this SEP, stateful job elasticity is future work. SEP-32 currently
> > >> only deals with stateless elasticity
> > >> The idea for state preserving elasticity is to have a requirement that
> > >> only jobs can guarantee a bijective mapping between state key and
> input
> > key
> > >> will be supported.
> > >> This requirement is needed so that when input keys move from one
> virtual
> > >> task to another, it is easy to identify which state keys should be
> > present
> > >> in the store of the virtual task for correct operation.
> > >> additionally, stateful elasticity is only supported for jobs that rely
> > on
> > >> blob store for backup and restore.
> > >> Furthermore, for stateful jobs elasticity is increased or decreased
> only
> > >> in steps of 2.
> > >> With these restrictions in place, when a job starts with elasticity
> > >> factor 2, the state blob for the original task is copied for both
> > virtual
> > >> tasks during a split.
> > >> for a merge, when two virtual tasks merge into one (virtual/original)
> > >> task, the state blob of new task will need to be stitched from older
> > blobs.
> > >> This will need to be done by leveraging the stateKey input key
> bijective
> > >> mapping which will help determing for each state key in new blob, the
> > value
> > >> should come from which older blob
> > >> (older blob belonged to a virtual task that consumed an input key
> based
> > >> on the keyBucket of the virutal task)
> > >> That said the design for stateful needs more work and is planned for a
> > >> subsequent follow up SEP and this current SEP-32, focusses only on
> > >> stateless jobs
> > >>
> > >> *b) what's perf impact when we have 2 virtual tasks on the same SSP in
> > >> the same container, while one virtual task is much faster than the
> > other?*
> > >> SystemConsumer subscribes to the input system at a partition level.
> > >> Due to this even if one v. task is much faster than the other, since
> > both
> > >> consume the same SSP, system consumer of a container will only fetch
> > only
> > >> once the entire SSP buffer is empty.
> > >> This means even though one v. task is much faster, the perf will be
> > >> determined by the slower v. task.
> > >> however, this is not worse than the pre-elastic job perf and if num
> > >> containers is increased then the fast v.task can improve perf if the
> > slow
> > >> and fast v.task are in different containers (different system
> consumers)
> > >>
> > >> *c) what's the reason that a virtual task can not filter older
> messages
> > >> from a previous offset, in case the container restarts from a smaller
> > >> offset from another virtual task consuming the same SSP?*
> > >> iiuc this question is for when a containers has two v. tasks that
> > >> committed checkpoints for an SSP where one fast v.task commited a
> newer
> > >> offset and slow v.task committed an older offset.
> > >> In this scenario, the SEP says there could be duplicate processing as
> > the
> > >> SystemConsumer will start consuming from the older offset for the SSP.
> > >> Yes, an improvement can be done to enable the v.task that committed a
> > >> newer offset to start processing only from the offset after its
> > checkpoint
> > >> and filter out older messages.
> > >>
> > >> *d) how do we compare this w/ an alternative idea that implements a
> > >> KeyedOrderedExecutor w/ multiple parallel threads within the single
> > task's
> > >> main event loop to increase the parallelism?*
> > >> Is this similar to the per-key parallelism option (in the rejected
> > >> solutions section) with the difference that the num threads is fixed
> > for a
> > >> single task (as opposed to one thread per key in the rejected
> solution)?
> > >> this KeyOrdereredExecutor is better than the parallelism current
> > >> task.max.concurrency offers as it gives in-order execution per key.
> > >> However, for KeyOrderedExecutor solution num 

Re: [DISCUSS] SEP-32: Elasticity for Samza

2023-02-06 Thread Jagadish Venkatraman
Thank you Manasa for the proposal. I reviewed it and it looks good to me.
nice work!

+1 (approve) from my end.



On Mon, Feb 6, 2023 at 11:41 PM Yi Pan  wrote:

> Hi, Manasa,
>
> Sorry for the late reply. The revision lgtm. Thanks for the great work!
>
> Best,
>
> -Yi
>
> On Mon, Jan 30, 2023 at 12:11 PM Lakshmi Manasa  >
> wrote:
>
> > Hi Yi,
> >
> >  I have updated the SEP-32 including all feedback for the above
> questions.
> > Please let me know if there are any follow up questions.
> >
> > thanks,
> > Manasa
> >
> > On Mon, Jan 23, 2023 at 8:56 AM Lakshmi Manasa <
> lakshmimanas...@gmail.com>
> > wrote:
> >
> >> Hi Yi,
> >>
> >> thank you for raising these questions. Please find my answers inline
> >> below.
> >>
> >> *a) how are states for the virtual tasks managed during split/merge?*
> >> for this SEP, stateful job elasticity is future work. SEP-32 currently
> >> only deals with stateless elasticity
> >> The idea for state preserving elasticity is to have a requirement that
> >> only jobs can guarantee a bijective mapping between state key and input
> key
> >> will be supported.
> >> This requirement is needed so that when input keys move from one virtual
> >> task to another, it is easy to identify which state keys should be
> present
> >> in the store of the virtual task for correct operation.
> >> additionally, stateful elasticity is only supported for jobs that rely
> on
> >> blob store for backup and restore.
> >> Furthermore, for stateful jobs elasticity is increased or decreased only
> >> in steps of 2.
> >> With these restrictions in place, when a job starts with elasticity
> >> factor 2, the state blob for the original task is copied for both
> virtual
> >> tasks during a split.
> >> for a merge, when two virtual tasks merge into one (virtual/original)
> >> task, the state blob of new task will need to be stitched from older
> blobs.
> >> This will need to be done by leveraging the stateKey input key bijective
> >> mapping which will help determing for each state key in new blob, the
> value
> >> should come from which older blob
> >> (older blob belonged to a virtual task that consumed an input key based
> >> on the keyBucket of the virutal task)
> >> That said the design for stateful needs more work and is planned for a
> >> subsequent follow up SEP and this current SEP-32, focusses only on
> >> stateless jobs
> >>
> >> *b) what's perf impact when we have 2 virtual tasks on the same SSP in
> >> the same container, while one virtual task is much faster than the
> other?*
> >> SystemConsumer subscribes to the input system at a partition level.
> >> Due to this even if one v. task is much faster than the other, since
> both
> >> consume the same SSP, system consumer of a container will only fetch
> only
> >> once the entire SSP buffer is empty.
> >> This means even though one v. task is much faster, the perf will be
> >> determined by the slower v. task.
> >> however, this is not worse than the pre-elastic job perf and if num
> >> containers is increased then the fast v.task can improve perf if the
> slow
> >> and fast v.task are in different containers (different system consumers)
> >>
> >> *c) what's the reason that a virtual task can not filter older messages
> >> from a previous offset, in case the container restarts from a smaller
> >> offset from another virtual task consuming the same SSP?*
> >> iiuc this question is for when a containers has two v. tasks that
> >> committed checkpoints for an SSP where one fast v.task commited a newer
> >> offset and slow v.task committed an older offset.
> >> In this scenario, the SEP says there could be duplicate processing as
> the
> >> SystemConsumer will start consuming from the older offset for the SSP.
> >> Yes, an improvement can be done to enable the v.task that committed a
> >> newer offset to start processing only from the offset after its
> checkpoint
> >> and filter out older messages.
> >>
> >> *d) how do we compare this w/ an alternative idea that implements a
> >> KeyedOrderedExecutor w/ multiple parallel threads within the single
> task's
> >> main event loop to increase the parallelism?*
> >> Is this similar to the per-key parallelism option (in the rejected
> >> solutions section) with the difference that the num threads is fixed
> for a
> >> single task (as opposed to one thread per key in the rejected solution)?
> >> this KeyOrdereredExecutor is better than the parallelism current
> >> task.max.concurrency offers as it gives in-order execution per key.
> >> However, for KeyOrderedExecutor solution num containers will still be <=
> >> num tasks.
> >> this means (a) to increase throughput for a key, all other keys should
> >> also be processed faster (this is partially present in elasticity as
> seen
> >> in question above, but with increased elasticity factor and more
> containers
> >> this can be combated), (b) network, disk, i/o contention will be larger
> >> than elasticity as virtual tasks can be spread across hosts 

Re: [DISCUSS] SEP-32: Elasticity for Samza

2023-02-06 Thread Yi Pan
Hi, Manasa,

Sorry for the late reply. The revision lgtm. Thanks for the great work!

Best,

-Yi

On Mon, Jan 30, 2023 at 12:11 PM Lakshmi Manasa 
wrote:

> Hi Yi,
>
>  I have updated the SEP-32 including all feedback for the above questions.
> Please let me know if there are any follow up questions.
>
> thanks,
> Manasa
>
> On Mon, Jan 23, 2023 at 8:56 AM Lakshmi Manasa 
> wrote:
>
>> Hi Yi,
>>
>> thank you for raising these questions. Please find my answers inline
>> below.
>>
>> *a) how are states for the virtual tasks managed during split/merge?*
>> for this SEP, stateful job elasticity is future work. SEP-32 currently
>> only deals with stateless elasticity
>> The idea for state preserving elasticity is to have a requirement that
>> only jobs can guarantee a bijective mapping between state key and input key
>> will be supported.
>> This requirement is needed so that when input keys move from one virtual
>> task to another, it is easy to identify which state keys should be present
>> in the store of the virtual task for correct operation.
>> additionally, stateful elasticity is only supported for jobs that rely on
>> blob store for backup and restore.
>> Furthermore, for stateful jobs elasticity is increased or decreased only
>> in steps of 2.
>> With these restrictions in place, when a job starts with elasticity
>> factor 2, the state blob for the original task is copied for both virtual
>> tasks during a split.
>> for a merge, when two virtual tasks merge into one (virtual/original)
>> task, the state blob of new task will need to be stitched from older blobs.
>> This will need to be done by leveraging the stateKey input key bijective
>> mapping which will help determing for each state key in new blob, the value
>> should come from which older blob
>> (older blob belonged to a virtual task that consumed an input key based
>> on the keyBucket of the virutal task)
>> That said the design for stateful needs more work and is planned for a
>> subsequent follow up SEP and this current SEP-32, focusses only on
>> stateless jobs
>>
>> *b) what's perf impact when we have 2 virtual tasks on the same SSP in
>> the same container, while one virtual task is much faster than the other?*
>> SystemConsumer subscribes to the input system at a partition level.
>> Due to this even if one v. task is much faster than the other, since both
>> consume the same SSP, system consumer of a container will only fetch only
>> once the entire SSP buffer is empty.
>> This means even though one v. task is much faster, the perf will be
>> determined by the slower v. task.
>> however, this is not worse than the pre-elastic job perf and if num
>> containers is increased then the fast v.task can improve perf if the slow
>> and fast v.task are in different containers (different system consumers)
>>
>> *c) what's the reason that a virtual task can not filter older messages
>> from a previous offset, in case the container restarts from a smaller
>> offset from another virtual task consuming the same SSP?*
>> iiuc this question is for when a containers has two v. tasks that
>> committed checkpoints for an SSP where one fast v.task commited a newer
>> offset and slow v.task committed an older offset.
>> In this scenario, the SEP says there could be duplicate processing as the
>> SystemConsumer will start consuming from the older offset for the SSP.
>> Yes, an improvement can be done to enable the v.task that committed a
>> newer offset to start processing only from the offset after its checkpoint
>> and filter out older messages.
>>
>> *d) how do we compare this w/ an alternative idea that implements a
>> KeyedOrderedExecutor w/ multiple parallel threads within the single task's
>> main event loop to increase the parallelism?*
>> Is this similar to the per-key parallelism option (in the rejected
>> solutions section) with the difference that the num threads is fixed for a
>> single task (as opposed to one thread per key in the rejected solution)?
>> this KeyOrdereredExecutor is better than the parallelism current
>> task.max.concurrency offers as it gives in-order execution per key.
>> However, for KeyOrderedExecutor solution num containers will still be <=
>> num tasks.
>> this means (a) to increase throughput for a key, all other keys should
>> also be processed faster (this is partially present in elasticity as seen
>> in question above, but with increased elasticity factor and more containers
>> this can be combated), (b) network, disk, i/o contention will be larger
>> than elasticity as virtual tasks can be spread across hosts whereas
>> increased throughput due to all keys (single task) in key ordered executor
>> sitting in the same host will increase the load on the host and (c) if one
>> or more of the parallel units (threads here) needs more resources, it will
>> result in large container which makes scheduling harder as finding large
>> chunks takes longer in a cluster whereas with virtual tasks, we can have
>> smaller containers 

Re: [DISCUSS] SEP-32: Elasticity for Samza

2023-01-30 Thread Lakshmi Manasa
Hi Yi,

 I have updated the SEP-32 including all feedback for the above questions.
Please let me know if there are any follow up questions.

thanks,
Manasa

On Mon, Jan 23, 2023 at 8:56 AM Lakshmi Manasa 
wrote:

> Hi Yi,
>
> thank you for raising these questions. Please find my answers inline
> below.
>
> *a) how are states for the virtual tasks managed during split/merge?*
> for this SEP, stateful job elasticity is future work. SEP-32 currently
> only deals with stateless elasticity
> The idea for state preserving elasticity is to have a requirement that
> only jobs can guarantee a bijective mapping between state key and input key
> will be supported.
> This requirement is needed so that when input keys move from one virtual
> task to another, it is easy to identify which state keys should be present
> in the store of the virtual task for correct operation.
> additionally, stateful elasticity is only supported for jobs that rely on
> blob store for backup and restore.
> Furthermore, for stateful jobs elasticity is increased or decreased only
> in steps of 2.
> With these restrictions in place, when a job starts with elasticity factor
> 2, the state blob for the original task is copied for both virtual tasks
> during a split.
> for a merge, when two virtual tasks merge into one (virtual/original)
> task, the state blob of new task will need to be stitched from older blobs.
> This will need to be done by leveraging the stateKey input key bijective
> mapping which will help determing for each state key in new blob, the value
> should come from which older blob
> (older blob belonged to a virtual task that consumed an input key based on
> the keyBucket of the virutal task)
> That said the design for stateful needs more work and is planned for a
> subsequent follow up SEP and this current SEP-32, focusses only on
> stateless jobs
>
> *b) what's perf impact when we have 2 virtual tasks on the same SSP in the
> same container, while one virtual task is much faster than the other?*
> SystemConsumer subscribes to the input system at a partition level.
> Due to this even if one v. task is much faster than the other, since both
> consume the same SSP, system consumer of a container will only fetch only
> once the entire SSP buffer is empty.
> This means even though one v. task is much faster, the perf will be
> determined by the slower v. task.
> however, this is not worse than the pre-elastic job perf and if num
> containers is increased then the fast v.task can improve perf if the slow
> and fast v.task are in different containers (different system consumers)
>
> *c) what's the reason that a virtual task can not filter older messages
> from a previous offset, in case the container restarts from a smaller
> offset from another virtual task consuming the same SSP?*
> iiuc this question is for when a containers has two v. tasks that
> committed checkpoints for an SSP where one fast v.task commited a newer
> offset and slow v.task committed an older offset.
> In this scenario, the SEP says there could be duplicate processing as the
> SystemConsumer will start consuming from the older offset for the SSP.
> Yes, an improvement can be done to enable the v.task that committed a
> newer offset to start processing only from the offset after its checkpoint
> and filter out older messages.
>
> *d) how do we compare this w/ an alternative idea that implements a
> KeyedOrderedExecutor w/ multiple parallel threads within the single task's
> main event loop to increase the parallelism?*
> Is this similar to the per-key parallelism option (in the rejected
> solutions section) with the difference that the num threads is fixed for a
> single task (as opposed to one thread per key in the rejected solution)?
> this KeyOrdereredExecutor is better than the parallelism current
> task.max.concurrency offers as it gives in-order execution per key.
> However, for KeyOrderedExecutor solution num containers will still be <=
> num tasks.
> this means (a) to increase throughput for a key, all other keys should
> also be processed faster (this is partially present in elasticity as seen
> in question above, but with increased elasticity factor and more containers
> this can be combated), (b) network, disk, i/o contention will be larger
> than elasticity as virtual tasks can be spread across hosts whereas
> increased throughput due to all keys (single task) in key ordered executor
> sitting in the same host will increase the load on the host and (c) if one
> or more of the parallel units (threads here) needs more resources, it will
> result in large container which makes scheduling harder as finding large
> chunks takes longer in a cluster whereas with virtual tasks, we can have
> smaller containers for virtual tasks.
>
>
> Please let me know if the above answers make sense and if there are any
> follow-ups for this SEP.
>
> On Thu, Jan 19, 2023 at 10:33 PM Yi Pan  wrote:
>
>> Hey, Manasa,
>>
>> Sorry to chime in late. A few questions:
>> a) how 

Re: [DISCUSS] SEP-32: Elasticity for Samza

2023-01-23 Thread Lakshmi Manasa
Hi Yi,

thank you for raising these questions. Please find my answers inline below.

*a) how are states for the virtual tasks managed during split/merge?*
for this SEP, stateful job elasticity is future work. SEP-32 currently only
deals with stateless elasticity
The idea for state preserving elasticity is to have a requirement that only
jobs can guarantee a bijective mapping between state key and input key will
be supported.
This requirement is needed so that when input keys move from one virtual
task to another, it is easy to identify which state keys should be present
in the store of the virtual task for correct operation.
additionally, stateful elasticity is only supported for jobs that rely on
blob store for backup and restore.
Furthermore, for stateful jobs elasticity is increased or decreased only in
steps of 2.
With these restrictions in place, when a job starts with elasticity factor
2, the state blob for the original task is copied for both virtual tasks
during a split.
for a merge, when two virtual tasks merge into one (virtual/original) task,
the state blob of new task will need to be stitched from older blobs.
This will need to be done by leveraging the stateKey input key bijective
mapping which will help determing for each state key in new blob, the value
should come from which older blob
(older blob belonged to a virtual task that consumed an input key based on
the keyBucket of the virutal task)
That said the design for stateful needs more work and is planned for a
subsequent follow up SEP and this current SEP-32, focusses only on
stateless jobs

*b) what's perf impact when we have 2 virtual tasks on the same SSP in the
same container, while one virtual task is much faster than the other?*
SystemConsumer subscribes to the input system at a partition level.
Due to this even if one v. task is much faster than the other, since both
consume the same SSP, system consumer of a container will only fetch only
once the entire SSP buffer is empty.
This means even though one v. task is much faster, the perf will be
determined by the slower v. task.
however, this is not worse than the pre-elastic job perf and if num
containers is increased then the fast v.task can improve perf if the slow
and fast v.task are in different containers (different system consumers)

*c) what's the reason that a virtual task can not filter older messages
from a previous offset, in case the container restarts from a smaller
offset from another virtual task consuming the same SSP?*
iiuc this question is for when a containers has two v. tasks that committed
checkpoints for an SSP where one fast v.task commited a newer offset and
slow v.task committed an older offset.
In this scenario, the SEP says there could be duplicate processing as the
SystemConsumer will start consuming from the older offset for the SSP.
Yes, an improvement can be done to enable the v.task that committed a newer
offset to start processing only from the offset after its checkpoint and
filter out older messages.

*d) how do we compare this w/ an alternative idea that implements a
KeyedOrderedExecutor w/ multiple parallel threads within the single task's
main event loop to increase the parallelism?*
Is this similar to the per-key parallelism option (in the rejected
solutions section) with the difference that the num threads is fixed for a
single task (as opposed to one thread per key in the rejected solution)?
this KeyOrdereredExecutor is better than the parallelism current
task.max.concurrency offers as it gives in-order execution per key.
However, for KeyOrderedExecutor solution num containers will still be <=
num tasks.
this means (a) to increase throughput for a key, all other keys should also
be processed faster (this is partially present in elasticity as seen in
question above, but with increased elasticity factor and more containers
this can be combated), (b) network, disk, i/o contention will be larger
than elasticity as virtual tasks can be spread across hosts whereas
increased throughput due to all keys (single task) in key ordered executor
sitting in the same host will increase the load on the host and (c) if one
or more of the parallel units (threads here) needs more resources, it will
result in large container which makes scheduling harder as finding large
chunks takes longer in a cluster whereas with virtual tasks, we can have
smaller containers for virtual tasks.


Please let me know if the above answers make sense and if there are any
follow-ups for this SEP.

On Thu, Jan 19, 2023 at 10:33 PM Yi Pan  wrote:

> Hey, Manasa,
>
> Sorry to chime in late. A few questions:
> a) how are states for the virtual tasks managed during split/merge?
> b) what's perf impact when we have 2 virtual tasks on the same SSP in the
> same container, while one virtual task is much faster than the other?
> c) what's the reason that a virtual task can not filter older messages from
> a previous offset, in case the container restarts from a smaller offset
> from another 

Re: [DISCUSS] SEP-32: Elasticity for Samza

2023-01-19 Thread Yi Pan
Hey, Manasa,

Sorry to chime in late. A few questions:
a) how are states for the virtual tasks managed during split/merge?
b) what's perf impact when we have 2 virtual tasks on the same SSP in the
same container, while one virtual task is much faster than the other?
c) what's the reason that a virtual task can not filter older messages from
a previous offset, in case the container restarts from a smaller offset
from another virtual task consuming the same SSP?
d) how do we compare this w/ an alternative idea that implements a
KeyedOrderedExecutor w/ multiple parallel threads within the single task's
main event loop to increase the parallelism?

Best,

-Yi


On Thu, Jan 19, 2023 at 3:26 PM Lakshmi Manasa 
wrote:

> hi all,
>
>  if there are no concerns or questions about this SEP, I shall start the
> vote email thread tomorrow.
>
> thanks,
> Manasa
>
> On Fri, Jan 6, 2023 at 8:08 AM Lakshmi Manasa 
> wrote:
>
> > Hi all,
> >   We created SEP-32: Elasticity for Samza.
> >
> > Please find SEP here (
> >
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-32%3A+Elasticity+for+Samza
> > )
> >   Please take a look and provide feedback. thanks, Manasa
> >
>


Re: [DISCUSS] SEP-32: Elasticity for Samza

2023-01-19 Thread Lakshmi Manasa
hi all,

 if there are no concerns or questions about this SEP, I shall start the
vote email thread tomorrow.

thanks,
Manasa

On Fri, Jan 6, 2023 at 8:08 AM Lakshmi Manasa 
wrote:

> Hi all,
>   We created SEP-32: Elasticity for Samza.
>
> Please find SEP here (
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-32%3A+Elasticity+for+Samza
> )
>   Please take a look and provide feedback. thanks, Manasa
>