Parallelize search using multiple processes

This commit is a proof-of-concept implementation of parallelized search
to take advantage of multiple cores of the system. A multi-threaded
program written in Ruby cannot saturate more than a single core of CPU
because of GIL. So the only way we can use multiple cores is to spawn
multiple processes, split the work, and merge the partial results. The
result of each child process is passed to the parent via pipe in a
serialized form. This serialization and deserialization of the result is
the unavoidable overhead of using multiple processes instead of multiple
threads. Unfortunately, Ruby's default serialization method,
Marshal.dump is found to be so slow that it often shadows the gain
especially when the selectivity of the query is not very good. So in
this implementation I chose to use MessagePack which is at least 5-times
faster than Marshal. However, this will make installing fzf much
trickier for many people. So I'm not sure if I want to merge this code
into the master branch considering that fzf is already pretty fast in
most cases thanks to its caching scheme and the benefit of
parallelization is only noticeable when the list has more than 50k
items.
This commit is contained in:
Junegunn Choi 2014-04-21 02:44:26 +09:00
parent fe22213b51
commit 501af62661

200
fzf
View File

@ -7,7 +7,7 @@
# / __/ / /_/ __/
# /_/ /___/_/ Fuzzy finder for your shell
#
# Version: 0.8.3 (April 3, 2014)
# Version: 0.9.0-alpha (April 21, 2014)
#
# Author: Junegunn Choi
# URL: https://github.com/junegunn/fzf
@ -39,6 +39,7 @@
require 'thread'
require 'curses'
require 'set'
require 'msgpack'
unless String.method_defined? :force_encoding
class String
@ -74,6 +75,16 @@ class FZF
end
end
# TODO
def cores?
cores = `sysctl -n hw.physicalcpu 2> /dev/null`.chomp
if $?.exitstatus == 0
cores.to_i
else
1
end
end
def initialize argv, source = $stdin
@rxflag = nil
@sort = ENV.fetch('FZF_DEFAULT_SORT', 1000).to_i
@ -88,6 +99,9 @@ class FZF
@filter = nil
@nth = nil
@delim = nil
@pids = []
@procs = cores?
@parallel_min = 10000
argv =
if opts = ENV['FZF_DEFAULT_OPTS']
@ -148,6 +162,16 @@ class FZF
@sort = $1.to_i
when '-e', '--extended-exact' then @extended = :exact
when '+e', '--no-extended-exact' then @extended = nil
when '-p', '--parallel'
usage 1, 'number of processes required' unless pnum = argv.shift
@procs = pnum.to_i
when /^-p([1-9][0-9]*)$/, /^--parallel=([1-9][0-9]*)$/
@procs = $1.to_i
when '--parallel-min'
usage 1, 'number of processes required' unless pmin = argv.shift
@parallel_min = pmin.to_i
when /^--parallel-min=([1-9][0-9]*)$/
@parallel_min = $1.to_i
else
usage 1, "illegal option: #{o}"
end
@ -223,7 +247,7 @@ class FZF
end
def filter_list list
matches = matcher.match(list, @filter, '', '')
matches = matcher.match(list, @filter)
if @sort && matches.length <= @sort
matches = FZF.sort(matches)
end
@ -667,6 +691,103 @@ class FZF
end
end
def cached lists, q, prefix, suffix
cnt = 0
cached = lists.inject({}) do |sum, l|
cached = matcher.cached(l, q, prefix, suffix)
cnt += cached ? cached.length : l.length
sum[l] = cached
sum
end
[cnt, cached]
end
def search lists, q, cx
cache_count, cached = cached(lists, q, q[0, cx], q[cx..-1])
if @procs <= 1 || lists.empty? || lists.length < @procs || cache_count < @parallel_min
search_sequential lists, q, cached
else
search_parallel lists, q, cached
end
end
def search_sequential lists, q, cached
progress = 0
started_at = Time.now
found = []
skip = false
cnt = 0
lists.each do |list|
cnt += list.length
skip = @mtx.synchronize { @events[:key] }
break if skip
if (progress = 100 * cnt / @count.get) < 100 && Time.now - started_at > 0.5
render { print_info " (#{progress}%)" }
end
matches = matcher.match(list, q, cached[list])
matcher.cache list, q, matches
found.concat matches
end
return :skip if skip
found
end
def search_parallel lists, q, cached
list_map = lists.inject({}) { |h, l| h[l.object_id] = l; h }
slice_size = lists.length / @procs
slices = lists.each_slice(slice_size)
triples = slices.map do |lists|
read, write = IO.pipe
[fork do
read.close
running = true
Signal.trap('USR1') do
running = false
end
matches = {}
lists.each do |list|
break unless running
matches[list.object_id] = matcher.match(list, q, cached[list])
end
write << MessagePack.pack(matches)
exit! running
end, read, write]
end
matches = []
@pids = triples.map { |t| t.first }
mutex = Mutex.new
skip = false
triples.map { |pid, read, write|
Thread.new do
write.close
result = read.read
_, status = Process.wait2(pid)
raise if result.empty?
if status.exitstatus == 0
MessagePack.unpack(result).each do |list_object_id, data|
mutex.synchronize do
matches.concat data
end
matcher.cache list_map[list_object_id], q, data
end
else
skip = true
end
end
}.each(&:join)
skip ? :skip : matches
end
def signal_children
while pid = @pids.pop
Process.kill 'USR1', pid rescue nil
end
end
def start_search &callback
Thread.new do
lists = []
@ -704,39 +825,26 @@ class FZF
new_search = events[:key] || events.delete(:new)
user_input = events[:key]
progress = 0
started_at = Time.now
if updated = new_search && !lists.empty?
q, cx = events.delete(:key) || [q, 0]
empty = matcher.empty?(q)
q, cx = events.delete(:key) || [q, 0]
unless matches = fcache[q]
found = []
skip = false
cnt = 0
lists.each do |list|
cnt += list.length
skip = @mtx.synchronize { @events[:key] }
break if skip
if !empty && (progress = 100 * cnt / @count.get) < 100 && Time.now - started_at > 0.5
render { print_info " (#{progress}%)" }
# Simply concats the list
if matcher.empty?(q)
matches = lists.inject([]) { |cc, l| cc.concat l }
else
matches ||= search(lists, q, cx)
next if matches == :skip
matches = @sort ? matches : matches.reverse
if @sort && matches.length <= @sort
matches = FZF.sort(matches)
end
found.concat(q.empty? ? list :
matcher.match(list, q, q[0, cx], q[cx..-1]))
end
next if skip
matches = @sort ? found : found.reverse
if !empty && @sort && matches.length <= @sort
matches = FZF.sort(matches)
end
fcache[q] = matches
end
# Atomic update
@matches.set matches
end#new_search
@matches.set fcache[q] = matches
end
callback = nil if callback &&
(updated || events[:loaded]) &&
@ -1083,7 +1191,10 @@ class FZF
upd = actions.fetch(key, actions[:default]).call(key)
# Dispatch key event
emit(:key) { [@query.set(input.dup), cursor] } if upd
if upd
signal_children
emit(:key) { [@query.set(input.dup), cursor] }
end
end
end
ensure
@ -1176,9 +1287,7 @@ class FZF
end
end
def match list, q, prefix, suffix
regexp = fuzzy_regex q
def cached list, q, prefix, suffix
cache = @caches[list.object_id]
prefix_cache = nil
(prefix.length - 1).downto(1) do |len|
@ -1190,10 +1299,17 @@ class FZF
break if suffix_cache = cache[suffix[idx..-1]]
end unless suffix.empty?
partial_cache = [prefix_cache,
suffix_cache].compact.sort_by { |e| e.length }.first
cache[q] ||= (partial_cache ?
partial_cache.map { |e| e.first } : list).map { |line|
[prefix_cache, suffix_cache].compact.sort_by { |e| e.length }.first
end
def cache list, q, data
@caches[list.object_id][q] = data
end
def match list, q, partial_cache = nil
regexp = fuzzy_regex q
(partial_cache ? partial_cache.map { |e| e.first } : list).map { |line|
# Ignore errors: e.g. invalid byte sequence in UTF-8
md = do_match(line, regexp)
md && [line, [md.offset(0)]]
@ -1249,7 +1365,11 @@ class FZF
Regexp.new(sanitize(Regexp.escape(w)), rxflag_for(w))
end
def match list, q, prefix, suffix
def cache list, q, data
@caches[list.object_id][Set[parse q]] = data
end
def cached list, q, prefix, suffix
regexps = parse q
# Look for prefix cache
cache = @caches[list.object_id]
@ -1258,10 +1378,12 @@ class FZF
(prefix.length - 1).downto(1) do |len|
break if prefix_cache = cache[Set[@regexps[prefix[0, len]]]]
end
prefix_cache
end
cache[Set[regexps]] ||= (prefix_cache ?
prefix_cache.map { |e| e.first } :
list).map { |line|
def match list, q, partial_cache = nil
regexps = parse q
(partial_cache ? partial_cache.map { |e| e.first } : list).map { |line|
offsets = []
regexps.all? { |pair|
regexp, invert = pair