Re: Beam/Samza Ensuring At Least Once semantics

2019-07-12 Thread Lukasz Cwik
That seems to be an issue with how the commit is being restarted in Samza
and not with the Kafka source.

On Thu, Jul 11, 2019 at 4:44 PM Deshpande, Omkar 
wrote:

> Yes, we are resuming from samza’s last commit. But the problem is that the
> last commit was done for data in the window that is not completely
> processed.
>
>
>
> *From: *Lukasz Cwik 
> *Date: *Wednesday, July 10, 2019 at 11:07 AM
> *To: *dev 
> *Cc: *"LeVeck, Matt" , "Deshpande, Omkar" <
> omkar_deshpa...@intuit.com>, Xinyu Liu , Xinyu Liu
> , Samarth Shetty , "Audo,
> Nicholas" , "Cesar, Scott" <
> scott_ce...@intuit.com>, "Ho, Tom" , "
> d...@samza.apache.org" 
> *Subject: *Re: Beam/Samza Ensuring At Least Once semantics
>
>
>
> This email is from an external sender.
>
>
>
> When you restart the application, are you resuming it from Samza's last
> commit?
>
>
>
> Since the exception is thrown after the GBK, all the data could be read
> from Kafka and forwarded to the GBK operator inside of Samza and
> checkpointed in Kafka before the exception is ever thrown.
>
>
>
> On Tue, Jul 9, 2019 at 8:34 PM Benenson, Mikhail <
> mikhail_benen...@intuit.com> wrote:
>
> Hi
>
>
>
> I have run a few experiments to verify if 'at least once' processing is
> guarantee on Beam 2.13.0 with Samza Runner 1.1.0
>
>
>
> Beam application is a slightly modified Stream Word Count from Beam
> examples:
>
>- read strings from input Kafka topic, print (topic, partition,
>offset, value)
>- convert values to pairs (value, 1)
>- grouping in Fixed Windows with duration 30 sec
>- sum per key
>- throw exception, if key starts with 'm'
>- write (key, sum) to output Kafka topic
>
>
>
> Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is
> no difference in results.
>
>
>
> Please, see src code attached.
>
>
>
> Environment:
>
>- Run with local zk & kafka, pre-create input & output topics with 1
>partition.
>- samza.properties contains "task.commit.ms=2000". According to samza
>doc "this property determines how often a checkpoint is written. The value
>is the time between checkpoints, in milliseconds". See
>
> http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing.
>Please, see samza config file and run script attached.
>
>
>
>
>
> *Scenario 1: Exception in transformation*
>
>
>
> Run
>
>- Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic
>- start Beam app
>- verify, that app log contains "read from topic=XXX, part=0,
>offset=100, val: e". Because input topic has only one partition, this means
>all data have been read from Kafka.
>- wait, until app terminates, because of the exception, while
>processing 'm'
>
>
>
> Expectation
>
> The order of processing after grouping is not specified, so some data
> could be written to output topic before application terminates, but I
> expect that value=m with offset 98 and all later records must NOT be marked
> as processed, so if I restart Beam app, I expect it again throws the
> exception when processing value=m.
>
> Comment: throwing exception in transformation is not a good idea, but such
> exception could be the result of application error. So, expectation is that
> after fixing the error, and restarting Beam app, it should process the
> record that cause an error.
>
>
>
> Results
>
> After I restarted app, it does NOT re-processing value m and does not
> throws an exception. If I add new value 'f' into input topic, I see  "read
> from topic=XXX, part=0, offset=101, val: f", and after some time I see 'm'
> in the output topic. So, the record with value 'm' is NOT processed.
>
>
>
>
>
> *Scenario 2: App termination*
>
>
>
> Run
>
>- Write 'g', 'h', 'i', 'j' into input topic
>- start Beam app
>- verify, that app log contains "read from topic=XXX, part=0,
>offset=105, val: j". Because input topic has only one partition, this means
>that all data has been read from Kafka.
>- wait about 10 sec, then terminate Beam app. The idea is to terminate
>app, when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but
>after  task.commit.ms=2000 pass, so offsets are committed.
>
>
>
> Expectation
>
> As records 'g', 'h', 'i', 'j'  are NOT processed, I expect that after app
> restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process
> these records.
>
>
>
> Results

Re: Beam/Samza Ensuring At Least Once semantics

2019-07-10 Thread Lukasz Cwik
When you restart the application, are you resuming it from Samza's last
commit?

Since the exception is thrown after the GBK, all the data could be read
from Kafka and forwarded to the GBK operator inside of Samza and
checkpointed in Kafka before the exception is ever thrown.

On Tue, Jul 9, 2019 at 8:34 PM Benenson, Mikhail <
mikhail_benen...@intuit.com> wrote:

> Hi
>
>
>
> I have run a few experiments to verify if 'at least once' processing is
> guarantee on Beam 2.13.0 with Samza Runner 1.1.0
>
>
>
> Beam application is a slightly modified Stream Word Count from Beam
> examples:
>
>- read strings from input Kafka topic, print (topic, partition,
>offset, value)
>- convert values to pairs (value, 1)
>- grouping in Fixed Windows with duration 30 sec
>- sum per key
>- throw exception, if key starts with 'm'
>- write (key, sum) to output Kafka topic
>
>
>
> Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is
> no difference in results.
>
>
>
> Please, see src code attached.
>
>
>
> Environment:
>
>- Run with local zk & kafka, pre-create input & output topics with 1
>partition.
>- samza.properties contains "task.commit.ms=2000". According to samza
>doc "this property determines how often a checkpoint is written. The value
>is the time between checkpoints, in milliseconds". See
>
> http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing.
>Please, see samza config file and run script attached.
>
>
>
>
>
> *Scenario 1: Exception in transformation*
>
>
>
> Run
>
>- Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic
>- start Beam app
>- verify, that app log contains "read from topic=XXX, part=0,
>offset=100, val: e". Because input topic has only one partition, this means
>all data have been read from Kafka.
>- wait, until app terminates, because of the exception, while
>processing 'm'
>
>
>
> Expectation
>
> The order of processing after grouping is not specified, so some data
> could be written to output topic before application terminates, but I
> expect that value=m with offset 98 and all later records must NOT be marked
> as processed, so if I restart Beam app, I expect it again throws the
> exception when processing value=m.
>
> Comment: throwing exception in transformation is not a good idea, but such
> exception could be the result of application error. So, expectation is that
> after fixing the error, and restarting Beam app, it should process the
> record that cause an error.
>
>
>
> Results
>
> After I restarted app, it does NOT re-processing value m and does not
> throws an exception. If I add new value 'f' into input topic, I see  "read
> from topic=XXX, part=0, offset=101, val: f", and after some time I see 'm'
> in the output topic. So, the record with value 'm' is NOT processed.
>
>
>
>
>
> *Scenario 2: App termination*
>
>
>
> Run
>
>- Write 'g', 'h', 'i', 'j' into input topic
>- start Beam app
>- verify, that app log contains "read from topic=XXX, part=0,
>offset=105, val: j". Because input topic has only one partition, this means
>that all data has been read from Kafka.
>- wait about 10 sec, then terminate Beam app. The idea is to terminate
>app, when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but
>after  task.commit.ms=2000 pass, so offsets are committed.
>
>
>
> Expectation
>
> As records 'g', 'h', 'i', 'j'  are NOT processed, I expect that after app
> restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process
> these records.
>
>
>
> Results
>
> After I restarted app, it does NOT re-process  ‘g’, ‘h’, ‘I’, ‘j’ values.
> If I add new value ‘k’ into input topic, I see  “read from topic=XXX,
> part=0, offset=106, val: k”, and after some time I see ‘k’ in the output
> topic. So, the records with values ‘g’, ‘h’, ‘I’, ‘j’ are NOT processed.
>
>
>
>
>
> Based on these results I’m incline to conclude that Beam with Samza runner
> does NOT provides 'at least once' guarantee for processing.
>
>
>
> If I missed something?
>
>
>
> --
>
> Michael Benenson
>
>
>
>
>
> *From: *"LeVeck, Matt" 
> *Date: *Monday, July 1, 2019 at 5:28 PM
> *To: *"Deshpande, Omkar" , "Benenson,
> Mikhail" , Xinyu Liu ,
> Xinyu Liu , Samarth Shetty ,
> "Audo, Nicholas" 
> *Subject: *Beam/Samza Ensuring At Least Once semanti

Re: Beam/Samza Ensuring At Least Once semantics

2019-07-09 Thread Benenson, Mikhail
Hi

I have run a few experiments to verify if 'at least once' processing is 
guarantee on Beam 2.13.0 with Samza Runner 1.1.0

Beam application is a slightly modified Stream Word Count from Beam examples:

  *   read strings from input Kafka topic, print (topic, partition, offset, 
value)
  *   convert values to pairs (value, 1)
  *   grouping in Fixed Windows with duration 30 sec
  *   sum per key
  *   throw exception, if key starts with 'm'
  *   write (key, sum) to output Kafka topic

Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is no 
difference in results.

Please, see src code attached.

Environment:

  *   Run with local zk & kafka, pre-create input & output topics with 1 
partition.
  *   samza.properties contains "task.commit.ms=2000". According to samza doc 
"this property determines how often a checkpoint is written. The value is the 
time between checkpoints, in milliseconds". See 
http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing.
 Please, see samza config file and run script attached.


Scenario 1: Exception in transformation

Run

  *   Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic
  *   start Beam app
  *   verify, that app log contains "read from topic=XXX, part=0, offset=100, 
val: e". Because input topic has only one partition, this means all data have 
been read from Kafka.
  *   wait, until app terminates, because of the exception, while processing 'm'

Expectation
The order of processing after grouping is not specified, so some data could be 
written to output topic before application terminates, but I expect that 
value=m with offset 98 and all later records must NOT be marked as processed, 
so if I restart Beam app, I expect it again throws the exception when 
processing value=m.
Comment: throwing exception in transformation is not a good idea, but such 
exception could be the result of application error. So, expectation is that 
after fixing the error, and restarting Beam app, it should process the record 
that cause an error.

Results
After I restarted app, it does NOT re-processing value m and does not throws an 
exception. If I add new value 'f' into input topic, I see  "read from 
topic=XXX, part=0, offset=101, val: f", and after some time I see 'm' in the 
output topic. So, the record with value 'm' is NOT processed.


Scenario 2: App termination

Run

  *   Write 'g', 'h', 'i', 'j' into input topic
  *   start Beam app
  *   verify, that app log contains "read from topic=XXX, part=0, offset=105, 
val: j". Because input topic has only one partition, this means that all data 
has been read from Kafka.
  *   wait about 10 sec, then terminate Beam app. The idea is to terminate app, 
when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but after  
task.commit.ms=2000 pass, so offsets are committed.

Expectation
As records 'g', 'h', 'i', 'j'  are NOT processed, I expect that after app 
restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process these 
records.

Results
After I restarted app, it does NOT re-process  ‘g’, ‘h’, ‘I’, ‘j’ values. If I 
add new value ‘k’ into input topic, I see  “read from topic=XXX, part=0, 
offset=106, val: k”, and after some time I see ‘k’ in the output topic. So, the 
records with values ‘g’, ‘h’, ‘I’, ‘j’ are NOT processed.


Based on these results I’m incline to conclude that Beam with Samza runner does 
NOT provides 'at least once' guarantee for processing.

If I missed something?

--
Michael Benenson


From: "LeVeck, Matt" 
Date: Monday, July 1, 2019 at 5:28 PM
To: "Deshpande, Omkar" , "Benenson, Mikhail" 
, Xinyu Liu , Xinyu Liu 
, Samarth Shetty , "Audo, Nicholas" 

Subject: Beam/Samza Ensuring At Least Once semantics

We’re seeing some behavior when using Beam’s KafkaIO and Samza as the runner 
that suggests checkpoints are getting committed even when an error gets 
throwing in the Beam Pipline while processing a batch.  Do you all have a 
recommended set of settings/patterns for using Beam with Samza to ensure that 
checkpoints are only updated after successful processing (i.e. the transforms 
succeed and the message is sent to the Beam pipeline’s final output sink)?

Our current settings for Samza are:
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.shutdown.ms=1
task.commit.ms=2000

Nothing is specified with regards to checkpointing at the Beam level.

Thanks,
Matt


#!/bin/bash

java  -cp  
"target/data-strmprocess-samza-driver-0.1.0.jar:target/lib:target/lib/*"  \
  com.intuit.strmprocess.once.OnceDemoWordCount01  \
  --runner=SamzaRunner   \
  --jobName=driver-once \
  --jobInstance=001 \
  --maxSourceParallelism=10 \
  --samzaExecutionEnvironment=STANDALONE \
  --configFilePath=src/main/resources/samza-once-local.properties