I had a need to support sphinx's distributed indexes to improve search
times for our 15million record index. TS doesn't appear to support it
out of the box so I had to patch it. Below is my patch of the 3
affected modules to build the correct conf for our purposes. Perhaps
it could be integrated into TS itself?
It only creates the distributed/parallel index setup if there are
multiple define_index blocks for a model. Otherwise it reverts to
default behavior. The deltas are still kept sequential, but the cores
are parallel.
require 'active_record'
require 'thinking_sphinx'
require 'thinking_sphinx/active_record'
module ThinkingSphinx
module ActiveRecord
def self.included(base)
base.class_eval do
class << self
def to_riddle(offset)
sphinx_database_adapter.setup
indexes = to_riddle_for_core(offset)
indexes.concat to_riddle_for_delta(offset) if
sphinx_delta?
indexes << to_riddle_for_distributed
end
def to_riddle_for_core(offset)
logger.debug "Using patch"
indexes = []
list = self.sphinx_indexes.select { |ts_index|
ts_index.model == self
}
list.each_with_index do |ts_index, i|
if list.size > 1
index = Riddle::Configuration::Index.new("#
{sphinx_name}_core#{i}")
else
index = Riddle::Configuration::Index.new("#
{sphinx_name}_core")
end
index.path = File.join(
ThinkingSphinx::Configuration.instance.searchd_file_path, index.name
)
set_configuration_options_for_indexes index
set_field_settings_for_indexes index
index.sources += ts_index.sources.collect { |source|
source.to_riddle_for_core(offset, i)
}
indexes << index
end
indexes
end
def to_riddle_for_delta(offset)
indexes = []
self.sphinx_indexes.each_with_index do |ts_index, i|
if self.sphinx_indexes.size > 1
delta="#{sphinx_name}_delta#{i}"
core= "#{sphinx_name}_core#{i}"
else
delta="#{sphinx_name}_delta"
core = "#{sphinx_name}_core"
end
index = Riddle::Configuration::Index.new(delta)
index.parent=core
index.path = File.join
(ThinkingSphinx::Configuration.instance.searchd_file_path, delta)
index.sources += ts_index.sources.collect { |source|
source.to_riddle_for_delta(offset, i)
} if ts_index.delta?
indexes << index if ts_index.delta?
end
indexes
end
def to_riddle_for_distributed
index = Riddle::Configuration::DistributedIndex.new
(sphinx_name)
port= ThinkingSphinx::Configuration.instance.port
address=ThinkingSphinx::Configuration.instance.address
if self.sphinx_indexes.size > 1
self.sphinx_indexes.each_with_index do |ts_index, i|
index.remote_indexes <<
Riddle::Configuration::RemoteIndex.new(address,port,"#{sphinx_name}
_core#{i}")
index.local_indexes.unshift "#{sphinx_name}_delta#{i}"
if sphinx_delta?
end
else
index.local_indexes << "#{sphinx_name}_core"
index.local_indexes.unshift "#{sphinx_name}_delta" if
sphinx_delta?
end
index
end
end
end
end
end
end
module Riddle
class Configuration
class DistributedIndex < Riddle::Configuration::Section
def agent
agents = remote_indexes.collect { |index| index.remote }
r_tmp = remote_indexes.clone
agents.collect { |agent| agent + ":" + r_tmp.pop.name}
end
end
end
end
module ThinkingSphinx
module Deltas
class DelayedDelta < ThinkingSphinx::Deltas::DefaultDelta
def index(model, instance = nil)
idx_delta = delta_index_name(model)
if instance
method = instance.method(:delta_index_name)
idx_delta = method.call if !method.nil?
end
ThinkingSphinx::Deltas::Job.enqueue(
ThinkingSphinx::Deltas::DeltaJob.new(idx_delta),
ThinkingSphinx::Configuration.instance.delayed_job_priority
)
Delayed::Job.enqueue(
ThinkingSphinx::Deltas::FlagAsDeletedJob.new(
core_index_name(model), instance.sphinx_document_id
),
ThinkingSphinx::Configuration.instance.delayed_job_priority
) if instance
true
end
end
end
end
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"Thinking Sphinx" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/thinking-sphinx?hl=en
-~----------~----~----~----~------~----~------~--~---