On Mon, Oct 31, 2011 at 2:01 PM, Rusty Klophaus <[email protected]> wrote:
> Thanks for your excellent description of the problem. We haven't seen this
> before to my knowledge, and this isn't expected behavior.
> Also, if you can share your code, or if you have a small script that can
> reproduce the failure, that would be extremely helpful.
>
I created a small test script that reliable reproduces the issue, but I
created another version that creates truly independent clients (distinct
processes) and I could not reproduce it. So there issue must lie somewhere
in my Fiber based client software stack. Somewhere within em-synchrony or
EventMachine some shared state must be getting clobbered at high processing
rates, or the high rate is causing EventMachine to return a short read
under some circumstances.
Apologies for the false alarm.
In case anyone is using Ruby and would like a Fiber based client for some
parallelism, this is the code:
#!/opt/ruby/bin/ruby
require 'em-synchrony'
require 'riak'
require 'json'
# Riak's PB client uses Thread local storage. We change it to store the
socket in the client.
module Riak
class Client
class ProtobuffsBackend
def socket
@riakpbc_socket ||= new_socket
end
def reset_socket
socket.close
@riakpbc_socket = nil
end
end
end
end
# Riak's PB client mishandles encodings when storing multibyte character
strings. If tries
# to transcode to binary, which it can do as its multibyte, rather than
force binary encoding.
# It already correctly records the original encoding and forces it back
when loading the
# object. Here we monkey patch the offending function. Can't simply
define it within its module,
# as the module is included by the BeefcakeProtobuffsBackend and Ruby won't
include twice a module.
# So we defined it within the class that includes it.
# Fixed on latest beta version of gem.
Riak::Client::BeefcakeProtobuffsBackend.class_exec {
def maybe_encode(string)
ENCODING ? string.force_encoding('BINARY') : string
end
}
# replace default Socket code to use EventMachine Sockets instead
TCPSocket = EventMachine::Synchrony::TCPSocket
hosts = [ "riak1", "riak2", "riak3" ]
num_hosts = hosts.size
key = 'test_key'.encode('US-ASCII')
EM.synchrony do
concurrency = 12
fibers = []
concurrency.times do |i|
fibers << Fiber.new do
# set up our client
puts "Creating client to #{hosts[i%num_hosts]}"
c = Riak::Client.new :host => hosts[i%num_hosts], :protocol => "pbc"
puts "Client created"
raise "Could not connect to Riak." unless c.ping
puts "Client ready."
bucket = c['concur_test']
EM::Synchrony.sleep(1)
while true
begin
s = { rand(100000) => rand(100000) }
o = bucket.get_or_new(key, :r => 2)
# Here there be blowups.
s = JSON.parse(o.raw_data).merge(s) if o.raw_data
o.content_type = 'application/json'
o.raw_data = s.to_json.encode('UTF-8')
o.store(:returnbody => false, :w => 2, :dw => 0)
rescue StopIteration
break
end
end
end
end
# Ruby has not method to wait for a set of fibers to all finish
# so we just iterate over them, resuming them until they are all
# done.
puts "Starting up clients."
fibers.each { |fiber| fiber.resume }
EM::Synchrony.sleep(1)
puts "Waiting for them to finish"
while not fibers.empty?
#puts "Iterating over fibers"
fibers.each do |fiber|
if not fiber.alive?
puts "Found finished fiber."
fibers.delete fiber
end
end
#puts "Iterated over fibers."
EM::Synchrony.sleep(1)
end
EventMachine.stop
end
_______________________________________________
riak-users mailing list
[email protected]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com