Re: Beam/Samza Ensuring At Least Once semantics

2019-07-12 Thread Lukasz Cwik
That seems to be an issue with how the commit is being restarted in Samza
and not with the Kafka source.

On Thu, Jul 11, 2019 at 4:44 PM Deshpande, Omkar 
wrote:

> Yes, we are resuming from samza’s last commit. But the problem is that the
> last commit was done for data in the window that is not completely
> processed.
>
>
>
> *From: *Lukasz Cwik 
> *Date: *Wednesday, July 10, 2019 at 11:07 AM
> *To: *dev 
> *Cc: *"LeVeck, Matt" , "Deshpande, Omkar" <
> omkar_deshpa...@intuit.com>, Xinyu Liu , Xinyu Liu
> , Samarth Shetty , "Audo,
> Nicholas" , "Cesar, Scott" <
> scott_ce...@intuit.com>, "Ho, Tom" , "
> dev@samza.apache.org" 
> *Subject: *Re: Beam/Samza Ensuring At Least Once semantics
>
>
>
> This email is from an external sender.
>
>
>
> When you restart the application, are you resuming it from Samza's last
> commit?
>
>
>
> Since the exception is thrown after the GBK, all the data could be read
> from Kafka and forwarded to the GBK operator inside of Samza and
> checkpointed in Kafka before the exception is ever thrown.
>
>
>
> On Tue, Jul 9, 2019 at 8:34 PM Benenson, Mikhail <
> mikhail_benen...@intuit.com> wrote:
>
> Hi
>
>
>
> I have run a few experiments to verify if 'at least once' processing is
> guarantee on Beam 2.13.0 with Samza Runner 1.1.0
>
>
>
> Beam application is a slightly modified Stream Word Count from Beam
> examples:
>
>- read strings from input Kafka topic, print (topic, partition,
>offset, value)
>- convert values to pairs (value, 1)
>- grouping in Fixed Windows with duration 30 sec
>- sum per key
>- throw exception, if key starts with 'm'
>- write (key, sum) to output Kafka topic
>
>
>
> Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is
> no difference in results.
>
>
>
> Please, see src code attached.
>
>
>
> Environment:
>
>- Run with local zk & kafka, pre-create input & output topics with 1
>partition.
>- samza.properties contains "task.commit.ms=2000". According to samza
>doc "this property determines how often a checkpoint is written. The value
>is the time between checkpoints, in milliseconds". See
>
> http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing.
>Please, see samza config file and run script attached.
>
>
>
>
>
> *Scenario 1: Exception in transformation*
>
>
>
> Run
>
>- Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic
>- start Beam app
>- verify, that app log contains "read from topic=XXX, part=0,
>offset=100, val: e". Because input topic has only one partition, this means
>all data have been read from Kafka.
>- wait, until app terminates, because of the exception, while
>processing 'm'
>
>
>
> Expectation
>
> The order of processing after grouping is not specified, so some data
> could be written to output topic before application terminates, but I
> expect that value=m with offset 98 and all later records must NOT be marked
> as processed, so if I restart Beam app, I expect it again throws the
> exception when processing value=m.
>
> Comment: throwing exception in transformation is not a good idea, but such
> exception could be the result of application error. So, expectation is that
> after fixing the error, and restarting Beam app, it should process the
> record that cause an error.
>
>
>
> Results
>
> After I restarted app, it does NOT re-processing value m and does not
> throws an exception. If I add new value 'f' into input topic, I see  "read
> from topic=XXX, part=0, offset=101, val: f", and after some time I see 'm'
> in the output topic. So, the record with value 'm' is NOT processed.
>
>
>
>
>
> *Scenario 2: App termination*
>
>
>
> Run
>
>- Write 'g', 'h', 'i', 'j' into input topic
>- start Beam app
>- verify, that app log contains "read from topic=XXX, part=0,
>offset=105, val: j". Because input topic has only one partition, this means
>that all data has been read from Kafka.
>- wait about 10 sec, then terminate Beam app. The idea is to terminate
>app, when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but
>after  task.commit.ms=2000 pass, so offsets are committed.
>
>
>
> Expectation
>
> As records 'g', 'h', 'i', 'j'  are NOT processed, I expect that after app
> restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process
> these records.
>
>
>
> Results
>
> After I restarted app, it does NOT re-process  ‘g’, ‘h’, ‘I’, ‘j’ values.
> If I add new value ‘k’ into input topic, I see  “read from topic=XXX,
> part=0, offset=106, val: k”, and after some time I see ‘k’ in the output
> topic. So, the records with values ‘g’, ‘h’, ‘I’, ‘j’ are NOT processed.
>
>
>
>
>
> Based on these results I’m incline to conclude that Beam with Samza runner
> does NOT provides 'at least once' guarantee for processing.
>
>
>
> If I missed something?
>
>
>
> --
>
> Michael Benenson
>
>
>
>
>
> *From: *"LeVeck, Matt" 
> *Date: *Monday, July 1, 

[REPORT] Samza - July 2019

2019-07-12 Thread Yi Pan (Data Infrastructure)
## Description:
- Apache Samza is a distributed stream processing engine that are highly
  configurable to process events from various data sources, including
  real-time messaging system (e.g. Kafka) and distributed file systems (e.g.
  HDFS).

## Issues:
- No issues requires board attention

## Activity:
- Samza 1.2 is released: 
http://samza.apache.org/blog/2019-06-11-announcing-the-release-of-apache-samza--1.2.0

## Health report:
- Project is in healthy status with 1.2 released in June 2019

## PMC changes:

- Currently 16 PMC members.
- Boris Shkolnik was added to the PMC on Thu Jun 06 2019

## Committer base changes:

- Currently 26 committers.
- New commmitters:
- Bharath Kumarasubramanian was added as a committer on Mon Jun 24 2019
- Cameron Lee was added as a committer on Thu Apr 11 2019
- Rayman Preet Singh was added as a committer on Mon Jul 08 2019

## Releases:

- Last release was 1.2.0 on June 11 2019

## /dist/ errors: 9
- This has been fixed.

## Mailing list activity:
- dev@samza.apache.org:
- 267 subscribers (down -3 in the last 3 months):
- 215 emails sent to list (1011 in previous quarter)


## JIRA activity:

- 107 JIRA tickets created in the last 3 months
- 84 JIRA tickets closed/resolved in the last 3 months