That seems to be an issue with how the commit is being restarted in Samza
and not with the Kafka source.
On Thu, Jul 11, 2019 at 4:44 PM Deshpande, Omkar
wrote:
> Yes, we are resuming from samza’s last commit. But the problem is that the
> last commit was done for data in the window that is not completely
> processed.
>
>
>
> *From: *Lukasz Cwik
> *Date: *Wednesday, July 10, 2019 at 11:07 AM
> *To: *dev
> *Cc: *"LeVeck, Matt" , "Deshpande, Omkar" <
> omkar_deshpa...@intuit.com>, Xinyu Liu , Xinyu Liu
> , Samarth Shetty , "Audo,
> Nicholas" , "Cesar, Scott" <
> scott_ce...@intuit.com>, "Ho, Tom" , "
> dev@samza.apache.org"
> *Subject: *Re: Beam/Samza Ensuring At Least Once semantics
>
>
>
> This email is from an external sender.
>
>
>
> When you restart the application, are you resuming it from Samza's last
> commit?
>
>
>
> Since the exception is thrown after the GBK, all the data could be read
> from Kafka and forwarded to the GBK operator inside of Samza and
> checkpointed in Kafka before the exception is ever thrown.
>
>
>
> On Tue, Jul 9, 2019 at 8:34 PM Benenson, Mikhail <
> mikhail_benen...@intuit.com> wrote:
>
> Hi
>
>
>
> I have run a few experiments to verify if 'at least once' processing is
> guarantee on Beam 2.13.0 with Samza Runner 1.1.0
>
>
>
> Beam application is a slightly modified Stream Word Count from Beam
> examples:
>
>- read strings from input Kafka topic, print (topic, partition,
>offset, value)
>- convert values to pairs (value, 1)
>- grouping in Fixed Windows with duration 30 sec
>- sum per key
>- throw exception, if key starts with 'm'
>- write (key, sum) to output Kafka topic
>
>
>
> Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is
> no difference in results.
>
>
>
> Please, see src code attached.
>
>
>
> Environment:
>
>- Run with local zk & kafka, pre-create input & output topics with 1
>partition.
>- samza.properties contains "task.commit.ms=2000". According to samza
>doc "this property determines how often a checkpoint is written. The value
>is the time between checkpoints, in milliseconds". See
>
> http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing.
>Please, see samza config file and run script attached.
>
>
>
>
>
> *Scenario 1: Exception in transformation*
>
>
>
> Run
>
>- Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic
>- start Beam app
>- verify, that app log contains "read from topic=XXX, part=0,
>offset=100, val: e". Because input topic has only one partition, this means
>all data have been read from Kafka.
>- wait, until app terminates, because of the exception, while
>processing 'm'
>
>
>
> Expectation
>
> The order of processing after grouping is not specified, so some data
> could be written to output topic before application terminates, but I
> expect that value=m with offset 98 and all later records must NOT be marked
> as processed, so if I restart Beam app, I expect it again throws the
> exception when processing value=m.
>
> Comment: throwing exception in transformation is not a good idea, but such
> exception could be the result of application error. So, expectation is that
> after fixing the error, and restarting Beam app, it should process the
> record that cause an error.
>
>
>
> Results
>
> After I restarted app, it does NOT re-processing value m and does not
> throws an exception. If I add new value 'f' into input topic, I see "read
> from topic=XXX, part=0, offset=101, val: f", and after some time I see 'm'
> in the output topic. So, the record with value 'm' is NOT processed.
>
>
>
>
>
> *Scenario 2: App termination*
>
>
>
> Run
>
>- Write 'g', 'h', 'i', 'j' into input topic
>- start Beam app
>- verify, that app log contains "read from topic=XXX, part=0,
>offset=105, val: j". Because input topic has only one partition, this means
>that all data has been read from Kafka.
>- wait about 10 sec, then terminate Beam app. The idea is to terminate
>app, when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but
>after task.commit.ms=2000 pass, so offsets are committed.
>
>
>
> Expectation
>
> As records 'g', 'h', 'i', 'j' are NOT processed, I expect that after app
> restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process
> these records.
>
>
>
> Results
>
> After I restarted app, it does NOT re-process ‘g’, ‘h’, ‘I’, ‘j’ values.
> If I add new value ‘k’ into input topic, I see “read from topic=XXX,
> part=0, offset=106, val: k”, and after some time I see ‘k’ in the output
> topic. So, the records with values ‘g’, ‘h’, ‘I’, ‘j’ are NOT processed.
>
>
>
>
>
> Based on these results I’m incline to conclude that Beam with Samza runner
> does NOT provides 'at least once' guarantee for processing.
>
>
>
> If I missed something?
>
>
>
> --
>
> Michael Benenson
>
>
>
>
>
> *From: *"LeVeck, Matt"
> *Date: *Monday, July 1,