[jira] [Updated] (BEAM-2859) Processing time based timers are not properly fired in case the watermark stays put
[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stas Levin updated BEAM-2859: - Description: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed based on the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. The implied moral being that _"eligible for processing"_ does not imply _"should be deleted"_. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to be deleted. was: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed based on the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. The implied moral being that "eligible for processing" {{!=}} "should be deleted". It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to be deleted. > Processing time based timers are not properly fired in case the watermark > stays put > --- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.0.0, 2.1.0 >Reporter: Stas Levin >Assignee: Stas Levin > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed based on the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing and is > thus deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it is not fired and needs to be re-run next time around, but > won't, since it's been deleted. The implied moral being that _"eligible for > processing"_ does not imply _"should be deleted"_. > It may be better
[jira] [Updated] (BEAM-2859) Processing time based timers are not properly fired in case the watermark stays put
[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stas Levin updated BEAM-2859: - Description: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed based on the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. The implied moral being that "eligible for processing" {{!=}} "should be deleted". It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to be deleted. was: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed based on the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to be deleted. > Processing time based timers are not properly fired in case the watermark > stays put > --- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.0.0, 2.1.0 >Reporter: Stas Levin >Assignee: Stas Levin > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed based on the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing and is > thus deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it is not fired and needs to be re-run next time around, but > won't, since it's been deleted. The implied moral being that "eligible for > processing" {{!=}} "should be deleted". > It may be better to avoid removing timers in > {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management
[jira] [Updated] (BEAM-2859) Processing time based timers are not properly fired in case the watermark stays put
[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stas Levin updated BEAM-2859: - Description: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed based on the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to be deleted. was: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to be deleted. > Processing time based timers are not properly fired in case the watermark > stays put > --- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.0.0, 2.1.0 >Reporter: Stas Levin >Assignee: Stas Levin > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed based on the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing and is > thus deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it is not fired and needs to be re-run next time around, but > won't, since it's been deleted. > It may be better to avoid removing timers in > {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management > up to {{ReduceFnRunner#clearAllState()}} which has more context to determine > whether it's time for a given timer to be deleted. -- This message was sent by
[jira] [Updated] (BEAM-2859) Processing time based timers are not properly fired in case the watermark stays put
[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stas Levin updated BEAM-2859: - Description: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to deleted. was: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to deleted. > Processing time based timers are not properly fired in case the watermark > stays put > --- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.0.0, 2.1.0 >Reporter: Stas Levin >Assignee: Stas Levin > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed by using the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing and is > thus deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it is not fired and needs to be re-run next time around, but > won't, since it's been deleted. > It may be better to avoid removing timers in > {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management > up to {{ReduceFnRunner#clearAllState()}} which has more context to determine > whether it's time for a given timer to deleted. -- This message was sent by Atlassian JIRA
[jira] [Updated] (BEAM-2859) Processing time based timers are not properly fired in case the watermark stays put
[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stas Levin updated BEAM-2859: - Description: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to be deleted. was: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to deleted. > Processing time based timers are not properly fired in case the watermark > stays put > --- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.0.0, 2.1.0 >Reporter: Stas Levin >Assignee: Stas Levin > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed by using the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing and is > thus deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it is not fired and needs to be re-run next time around, but > won't, since it's been deleted. > It may be better to avoid removing timers in > {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management > up to {{ReduceFnRunner#clearAllState()}} which has more context to determine > whether it's time for a given timer to be deleted. -- This message was sent by Atlassian
[jira] [Updated] (BEAM-2859) Processing time based timers are not properly fired in case the watermark stays put
[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stas Levin updated BEAM-2859: - Description: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to deleted. was: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it needs to be re-run next time around, but won't since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to deleted. > Processing time based timers are not properly fired in case the watermark > stays put > --- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.0.0, 2.1.0 >Reporter: Stas Levin >Assignee: Stas Levin > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed by using the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing and is > thus deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it is not fired and needs to be re-run next time around, but > won't since it's been deleted. > It may be better to avoid removing timers in > {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management > up to {{ReduceFnRunner#clearAllState()}} which has more context to determine > whether it's time for a given timer to deleted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2859) Processing time based timers are not properly fired in case the watermark stays put
[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stas Levin updated BEAM-2859: - Description: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it needs to be re-run next time around, but won't since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to deleted. was: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it needs to be re-run next time around, but won't since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to deleted. > Processing time based timers are not properly fired in case the watermark > stays put > --- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.0.0, 2.1.0 >Reporter: Stas Levin >Assignee: Stas Levin > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed by using the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing and is > thus deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it needs to be re-run next time around, but won't since it's > been deleted. > It may be better to avoid removing timers in > {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management > up to {{ReduceFnRunner#clearAllState()}} which has more context to determine > whether it's time for a given timer to deleted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2859) Processing time based timers are not properly fired in case the watermark stays put
[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stas Levin updated BEAM-2859: - Affects Version/s: 2.0.0 > Processing time based timers are not properly fired in case the watermark > stays put > --- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.0.0, 2.1.0 >Reporter: Stas Levin >Assignee: Stas Levin > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed by using the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing is thus > deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it needs to be re-run next time around, but won't since it's > been deleted. > It may be better to avoid removing timers in > {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management > up to {{ReduceFnRunner#clearAllState()}} which has more context to determine > whether it's time for a given timer to deleted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)