Storm is using curator.

Given an ensemble of size 5, what if a single client which is connected to
zk member 0 performs a write quorum operation which succeeds after updating
zk members 0, 1, 2, then loses it's connection to zk member 0, and
reconnects (using curator) to zk member 3 and performs a read for the same
zknode it recently updates. Zk Member 3 has not yet been synchronized with
that latest write. Unless the client requests a forced sync, in my
understanding it will read an older value. Is this not true? When does the
synchronization happen.


On Thu, Apr 10, 2014 at 3:05 AM, Ted Dunning <[email protected]> wrote:

> Jason,
>
> A single client is guaranteed to have strict consistency in its own reads
> and writes.  If the write has occurred according to the client, then all
> subsequent reads by that client will show it.  This applies even if the
> client does a write, is disconnected from ZK, reconnects automatically and
> then reads.  The only place that the weaker consistency applies is that if
> A successfully writes to ZK and then sends an out-of-band message to B and
> B looks at ZK upon receiving the notification from A.  In that case, B may
> not see the update from A right away.
>
> The most common source of hangs such as you describe that I know about are
> cases where change notification handlers are not coded correctly and they
> lose the watcher on a status variable by forgetting to reset it when
> handling a change.  This can happen due to exceptions.  The best way to
> avoid such problems is to use a higher level library such as Curator.  I
> forget if Storm already uses Curator, but I seem to remember not.
>
>
>
>
> On Thu, Apr 10, 2014 at 1:39 AM, Jason Jackson <[email protected]>wrote:
>
>> My idea for the bug was that trident expects to read from zookeeper what
>> was recently written zookeeper for the same zknode, and due to sequential
>> consistency it sometimes reads an older value even though it just wrote a
>> newer value. I could be way off the mark though, it's just an idea to
>> explore more.
>>
>>
>> On Thu, Apr 10, 2014 at 1:36 AM, Jason Jackson <[email protected]>wrote:
>>
>>> Hi Ted, thanks for clearing up the language, I intended to express
>>> sequential consistency then.
>>>
>>> Yes you could do a forced sync too, that would be another way good test.
>>>
>>> Taylor, the bug that I witnessed only occurs after you leave a trident
>>> topology running for at least a day. One day it'll just stop making
>>> progress and re-attempt the same batch forever.  Unfortunately I can't send
>>> the particular trident code to you, but I don't think there's anything
>>> unique about it. I suspect any trident topology could reproduce the bug if
>>> ran for a week. Other correlated factors may include that the trident
>>> topology has to occasionally fail batches, the zookeeper cluster has to be
>>> under significant load from other applications beyond trident. I don't many
>>> much details unfortunately.
>>>
>>> -Jason
>>>
>>>
>>>
>>>
>>> On Wed, Apr 9, 2014 at 3:03 PM, Ted Dunning <[email protected]>wrote:
>>>
>>>>
>>>> In what sense do you mean when you say that reads in ZK are eventually
>>>> consistent?
>>>>
>>>> You may get a slightly old value, but you are guaranteed to see a
>>>> consistent history.  That is, if a value has values (which include version
>>>> numbers) v_1 ... v_n, then if you see v_i, you will never see v_j where 
>>>> j<i.
>>>>
>>>> You can also guarantee that you don't even see delayed values by using
>>>> sync.
>>>>
>>>> Normally when people say "eventually consistent" they mean that two
>>>> participants can see inconsistent histories under partition.  That isn't
>>>> possible in ZK.  As I understand it, ZK would be better described as
>>>> providing sequential consistency since all observers will see all updates
>>>> in the same order.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Apr 9, 2014 at 2:50 PM, Jason Jackson <[email protected]>wrote:
>>>>
>>>>> I have one theory that because reads in zookeeper are eventually
>>>>> consistent, this is a necessary condition for the bug to manifest. So one
>>>>> way to test this hypothesis is to run a zookeeper ensemble with 1 node, or
>>>>> a zookeeper ensemble configured for 5 nodes, but take 2 of them offline, 
>>>>> so
>>>>> that every write operation only succeeds if every member of the ensemble
>>>>> sees the write. This should produce strong consistent reads. If you run
>>>>> this test, let me know what the results are. (Clearly this isn't a good
>>>>> production system though as you're making a tradeoff for lower 
>>>>> availability
>>>>> in favor of greater consistency, but the results could help narrow down 
>>>>> the
>>>>> bug)
>>>>>
>>>>>
>>>>> On Wed, Apr 9, 2014 at 2:43 PM, Jason Jackson <[email protected]>wrote:
>>>>>
>>>>>> Yah it's probably a bug in trident. It would be amazing if someone
>>>>>> figured out the fix for this. I spent about 6 hours looking into, but
>>>>>> couldn't figure out why it was occuring.
>>>>>>
>>>>>> Beyond fixing this, one thing you could do to buy yourself time is
>>>>>> disable batch retries in trident. There's no option for this in the API,
>>>>>> but it's like a 1 or 2 line change to the code. Obviously you loose 
>>>>>> exactly
>>>>>> once semantics, but at least you would have a system that never falls
>>>>>> behind real-time.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 9, 2014 at 1:10 AM, Danijel Schiavuzzi <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Thanks Jason. However, I don't think that was the case in my stuck
>>>>>>> topology, otherwise I'd have seen exceptions (thrown by my Trident
>>>>>>> functions) in the worker logs.
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 9, 2014 at 3:02 AM, Jason Jackson 
>>>>>>> <[email protected]>wrote:
>>>>>>>
>>>>>>>> An example of "corrupted input" that causes a batch to fail would
>>>>>>>> be for example if you expected a schema to your data that you read off
>>>>>>>> kafka, or some queue, and for whatever reason the data didn't conform 
>>>>>>>> to
>>>>>>>> your schema and the function that you implement that you pass to
>>>>>>>> stream.each throws an exception when this unexpected situation occurs. 
>>>>>>>> This
>>>>>>>> would cause the batch to be retried, but it's deterministically 
>>>>>>>> failing, so
>>>>>>>> the batch will be retried forever.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 7, 2014 at 10:37 AM, Danijel Schiavuzzi <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Jason,
>>>>>>>>>
>>>>>>>>> Could you be more specific -- what do you mean by "corrupted
>>>>>>>>> input"? Do you mean that there's a bug in Trident itself that causes 
>>>>>>>>> the
>>>>>>>>> tuples in a batch to somehow become corrupted?
>>>>>>>>>
>>>>>>>>> Thanks a lot!
>>>>>>>>>
>>>>>>>>> Danijel
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Monday, April 7, 2014, Jason Jackson <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> This could happen if you have corrupted input that always causes
>>>>>>>>>> a batch to fail and be retried.
>>>>>>>>>>
>>>>>>>>>> I have seen this behaviour before and I didn't see corrupted
>>>>>>>>>> input. It might be a bug in trident, I'm not sure. If you figure it 
>>>>>>>>>> out
>>>>>>>>>> please update this thread and/or submit a patch.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 31, 2014 at 7:39 AM, Danijel Schiavuzzi <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>> To (partially) answer my own question -- I still have no idea on
>>>>>>>>>> the cause of the stuck topology, but re-submitting the topology 
>>>>>>>>>> helps --
>>>>>>>>>> after re-submitting my topology is now running normally.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 26, 2014 at 6:04 PM, Danijel Schiavuzzi <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>> Also, I did have multiple cases of my IBackingMap workers dying
>>>>>>>>>> (because of RuntimeExceptions) but successfully restarting 
>>>>>>>>>> afterwards (I
>>>>>>>>>> throw RuntimeExceptions in the BackingMap implementation as my 
>>>>>>>>>> strategy in
>>>>>>>>>> rare SQL database deadlock situations to force a worker restart and 
>>>>>>>>>> to
>>>>>>>>>> fail+retry the batch).
>>>>>>>>>>
>>>>>>>>>> From the logs, one such IBackingMap worker death (and subsequent
>>>>>>>>>> restart) resulted in the Kafka spout re-emitting the pending tuple:
>>>>>>>>>>
>>>>>>>>>>     2014-03-22 16:26:43 s.k.t.TridentKafkaEmitter [INFO]
>>>>>>>>>> re-emitting batch, attempt 29698959:736
>>>>>>>>>>
>>>>>>>>>> This is of course the normal behavior of a transactional
>>>>>>>>>> topology, but this is the first time I've encountered a case of a 
>>>>>>>>>> batch
>>>>>>>>>> retrying indefinitely. This is especially suspicious since the 
>>>>>>>>>> topology has
>>>>>>>>>> been running fine for 20 days straight, re-emitting batches and 
>>>>>>>>>> restarting
>>>>>>>>>> IBackingMap workers quite a number of times.
>>>>>>>>>>
>>>>>>>>>> I can see in my IBackingMap backing SQL database that the batch
>>>>>>>>>> with the exact txid value 29698959 has been committed -- but I 
>>>>>>>>>> suspect that
>>>>>>>>>> could come from another BackingMap, since there are two BackingMap
>>>>>>>>>> instances running (paralellismHint 2).
>>>>>>>>>>
>>>>>>>>>> However, I have no idea why the batch is being retried
>>>>>>>>>> indefinitely now nor why it hasn't been successfully acked by 
>>>>>>>>>> Trident.
>>>>>>>>>>
>>>>>>>>>> Any suggestions on the area (topology component) to focus my
>>>>>>>>>> research on?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 26, 2014 at 5:32 PM, Danijel Schiavuzzi <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> I'm having problems with my transactional Trident topology. It
>>>>>>>>>> has been running fine for about 20 days, and suddenly is stuck 
>>>>>>>>>> processing a
>>>>>>>>>> single batch, with no tuples being emitted nor tuples being 
>>>>>>>>>> persisted by
>>>>>>>>>> the TridentState (IBackingMap).
>>>>>>>>>>
>>>>>>>>>> It's a simple topology which consumes messages off a Kafka queue.
>>>>>>>>>> The spout is an instance of storm-kafka-0.8-plus
>>>>>>>>>> TransactionalTridentKafkaSpout and I use the trident-mssql 
>>>>>>>>>> transactional
>>>>>>>>>> TridentState implementation to persistentAggregate() data into a SQL
>>>>>>>>>> database.
>>>>>>>>>>
>>>>>>>>>> In Zookeeper I can see Storm is re-trying a batch, i.e.
>>>>>>>>>>
>>>>>>>>>>      "/transactional/<myTopologyName>/coordinator/currattempts"
>>>>>>>>>> is "{"29698959":6487}"
>>>>>>>>>>
>>>>>>>>>> ... and the attempt count keeps increasing. It seems the batch
>>>>>>>>>> with txid 29698959 is stuck, as the attempt count in Zookeeper keeps
>>>>>>>>>> increasing -- seems like the batch isn't being acked by Trident and 
>>>>>>>>>> I have
>>>>>>>>>> no idea why, especially since the topology has been running 
>>>>>>>>>> successfully
>>>>>>>>>> the last 20 days.
>>>>>>>>>>
>>>>>>>>>> I did rebalance the topology on one occasion, after which it
>>>>>>>>>> continued running normally. Other than that, no other modifications 
>>>>>>>>>> were
>>>>>>>>>> done. Storm is at version 0.9.0.1.
>>>>>>>>>>
>>>>>>>>>> Any hints on how to debug the stuck topology? Any other useful
>>>>>>>>>> info I might provide?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Danijel Schiavuzzi
>>>>>>>>>>
>>>>>>>>>> E: [email protected]
>>>>>>>>>> W: www.schiavuzzi.com
>>>>>>>>>> T: +385989035562
>>>>>>>>>> Skype: danijel.schiavuzzi
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Danijel Schiavuzzi
>>>>>>>>>>
>>>>>>>>>> E: danije
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Danijel Schiavuzzi
>>>>>>>>>
>>>>>>>>> E: [email protected]
>>>>>>>>> W: www.schiavuzzi.com
>>>>>>>>> T: +385989035562
>>>>>>>>> Skype: danijels7
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Danijel Schiavuzzi
>>>>>>>
>>>>>>> E: [email protected]
>>>>>>> W: www.schiavuzzi.com
>>>>>>> T: +385989035562
>>>>>>> Skype: danijels7
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to