-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Last question first: A KTable is basically in finite window over the
whole stream providing a single result (that gets updated when new
data arrives). If you use windows, you cut the overall stream into
finite subsets and get a result per window. Thus, I guess you do not
need windows (if I understood you use case correctly).

However, current state of Kafka Streams DSL, you will not be able to
use KTable (directly -- see suggestion to fix this below) because is
does (currently) not allow to access the timestamp of the current
record (thus, you can not know if a record is late or not). You will
need to use Processor API which allows you to access the current
records timestamp via the Context object given in init()

Your reasoning about partitions and Streams instances is correct.
However, the following two are not

> - Because I'm using a KTable, the timestamp of the messages is
> extracted, and I'm not shown the older bid because I've already
> processed the later bid. The older bid is ignored.

and

> - Because of this, the replica already knows which timestamps it
> has processed, and is able to ignore the older messages.

Late arriving records are not dropped but processes regularly. Thus,
your KTable aggregate function will be called for the late arriving
record, too (but as described about, you have currently no way to know
it is a later record).


Last but not least, you last statement is a valid concern:

> Also, what will happen if bid 2 arrived and got processed, and then
> the particular replica crashed, and was restarted. The restarted
> replica won't have any memory of which timestamps it has previously
> processed.
> 
> So if bid 2 got processed, replica crashed and restarted, and then
> bid 1 arrived, what would happen in that case?

In order to make this work, you would need to store the timestamp in
you store next to the actual data. Thus, you can compare the timestamp
of the latest result (safely stored in operator state) with the
timestamp of the current record.

Does this makes sense?

To fix you issue, you could add a .transformValue() before you KTable,
which allows you to access the timestamp of a record. If you add this
timestamp to you value and pass it to KTable afterwards, you can
access it and it gets also store reliably.

<bid_id : bid_value> => transformValue => <bid_id : {bid_value,
timestamp} => aggregate

Hope this helps.

- -Matthias


On 10/11/16 9:12 PM, Ali Akhtar wrote:
> P.S, does my scenario require using windows, or can it be achieved
> using just KTable?
> 
> On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> 
>> Heya,
>> 
>> Say I'm building a live auction site, with different products.
>> Different users will bid on different products. And each time
>> they do, I want to update the product's price, so it should
>> always have the latest price in place.
>> 
>> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
>> the same product 100 ms later.
>> 
>> The second bid arrives first and the price is updated to $5. Then
>> the first bid arrives. I want the price to not be updated in this
>> case, as this bid is older than the one I've already processed.
>> 
>> Here's my understanding of how I can achieve this with Kafka
>> Streaming - is my understanding correct?
>> 
>> - I have a topic for receiving bids. The topic has N partitions,
>> and I have N replicas of my application which hooks up w/ Kafka
>> Streaming, up and running.
>> 
>> - I assume each replica of my app will listen to a different
>> partition of the topic.
>> 
>> - A user makes a bid on product A.
>> 
>> - This is pushed to the topic with the key bid_a
>> 
>> - Another user makes a bid. This is also pushed with the same key
>> (bid_a)
>> 
>> - The 2nd bid arrives first, and gets processed. Then the first
>> (older) bid arrives.
>> 
>> - Because I'm using a KTable, the timestamp of the messages is
>> extracted, and I'm not shown the older bid because I've already
>> processed the later bid. The older bid is ignored.
>> 
>> - All bids on product A go to the same topic partition, and hence
>> the same replica of my app, because they all have the key bid_a.
>> 
>> - Because of this, the replica already knows which timestamps it
>> has processed, and is able to ignore the older messages.
>> 
>> Is the above understandning correct?
>> 
>> Also, what will happen if bid 2 arrived and got processed, and
>> then the particular replica crashed, and was restarted. The
>> restarted replica won't have any memory of which timestamps it
>> has previously processed.
>> 
>> So if bid 2 got processed, replica crashed and restarted, and
>> then bid 1 arrived, what would happen in that case?
>> 
>> Thanks.
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn
AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy
sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9
S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/
i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR
lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5
fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX
7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE
0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v
wLVWRn2J0n9mKSiOnHki
=oJIL
-----END PGP SIGNATURE-----

Reply via email to