From 501af62661be751b451cf166f346bcdac8daa9d8 Mon Sep 17 00:00:00 2001 From: Junegunn Choi Date: Mon, 21 Apr 2014 02:44:26 +0900 Subject: [PATCH] Parallelize search using multiple processes This commit is a proof-of-concept implementation of parallelized search to take advantage of multiple cores of the system. A multi-threaded program written in Ruby cannot saturate more than a single core of CPU because of GIL. So the only way we can use multiple cores is to spawn multiple processes, split the work, and merge the partial results. The result of each child process is passed to the parent via pipe in a serialized form. This serialization and deserialization of the result is the unavoidable overhead of using multiple processes instead of multiple threads. Unfortunately, Ruby's default serialization method, Marshal.dump is found to be so slow that it often shadows the gain especially when the selectivity of the query is not very good. So in this implementation I chose to use MessagePack which is at least 5-times faster than Marshal. However, this will make installing fzf much trickier for many people. So I'm not sure if I want to merge this code into the master branch considering that fzf is already pretty fast in most cases thanks to its caching scheme and the benefit of parallelization is only noticeable when the list has more than 50k items. --- fzf | 200 ++++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 161 insertions(+), 39 deletions(-) diff --git a/fzf b/fzf index bcdc077..e54d7b6 100755 --- a/fzf +++ b/fzf @@ -7,7 +7,7 @@ # / __/ / /_/ __/ # /_/ /___/_/ Fuzzy finder for your shell # -# Version: 0.8.3 (April 3, 2014) +# Version: 0.9.0-alpha (April 21, 2014) # # Author: Junegunn Choi # URL: https://github.com/junegunn/fzf @@ -39,6 +39,7 @@ require 'thread' require 'curses' require 'set' +require 'msgpack' unless String.method_defined? :force_encoding class String @@ -74,6 +75,16 @@ class FZF end end + # TODO + def cores? + cores = `sysctl -n hw.physicalcpu 2> /dev/null`.chomp + if $?.exitstatus == 0 + cores.to_i + else + 1 + end + end + def initialize argv, source = $stdin @rxflag = nil @sort = ENV.fetch('FZF_DEFAULT_SORT', 1000).to_i @@ -88,6 +99,9 @@ class FZF @filter = nil @nth = nil @delim = nil + @pids = [] + @procs = cores? + @parallel_min = 10000 argv = if opts = ENV['FZF_DEFAULT_OPTS'] @@ -148,6 +162,16 @@ class FZF @sort = $1.to_i when '-e', '--extended-exact' then @extended = :exact when '+e', '--no-extended-exact' then @extended = nil + when '-p', '--parallel' + usage 1, 'number of processes required' unless pnum = argv.shift + @procs = pnum.to_i + when /^-p([1-9][0-9]*)$/, /^--parallel=([1-9][0-9]*)$/ + @procs = $1.to_i + when '--parallel-min' + usage 1, 'number of processes required' unless pmin = argv.shift + @parallel_min = pmin.to_i + when /^--parallel-min=([1-9][0-9]*)$/ + @parallel_min = $1.to_i else usage 1, "illegal option: #{o}" end @@ -223,7 +247,7 @@ class FZF end def filter_list list - matches = matcher.match(list, @filter, '', '') + matches = matcher.match(list, @filter) if @sort && matches.length <= @sort matches = FZF.sort(matches) end @@ -667,6 +691,103 @@ class FZF end end + def cached lists, q, prefix, suffix + cnt = 0 + cached = lists.inject({}) do |sum, l| + cached = matcher.cached(l, q, prefix, suffix) + cnt += cached ? cached.length : l.length + sum[l] = cached + sum + end + [cnt, cached] + end + + def search lists, q, cx + cache_count, cached = cached(lists, q, q[0, cx], q[cx..-1]) + if @procs <= 1 || lists.empty? || lists.length < @procs || cache_count < @parallel_min + search_sequential lists, q, cached + else + search_parallel lists, q, cached + end + end + + def search_sequential lists, q, cached + progress = 0 + started_at = Time.now + + found = [] + skip = false + cnt = 0 + lists.each do |list| + cnt += list.length + skip = @mtx.synchronize { @events[:key] } + break if skip + + if (progress = 100 * cnt / @count.get) < 100 && Time.now - started_at > 0.5 + render { print_info " (#{progress}%)" } + end + matches = matcher.match(list, q, cached[list]) + matcher.cache list, q, matches + found.concat matches + end + return :skip if skip + found + end + + def search_parallel lists, q, cached + list_map = lists.inject({}) { |h, l| h[l.object_id] = l; h } + slice_size = lists.length / @procs + slices = lists.each_slice(slice_size) + + triples = slices.map do |lists| + read, write = IO.pipe + [fork do + read.close + running = true + Signal.trap('USR1') do + running = false + end + matches = {} + lists.each do |list| + break unless running + matches[list.object_id] = matcher.match(list, q, cached[list]) + end + write << MessagePack.pack(matches) + exit! running + end, read, write] + end + + matches = [] + @pids = triples.map { |t| t.first } + mutex = Mutex.new + skip = false + triples.map { |pid, read, write| + Thread.new do + write.close + result = read.read + _, status = Process.wait2(pid) + raise if result.empty? + if status.exitstatus == 0 + MessagePack.unpack(result).each do |list_object_id, data| + mutex.synchronize do + matches.concat data + end + matcher.cache list_map[list_object_id], q, data + end + else + skip = true + end + end + }.each(&:join) + skip ? :skip : matches + end + + def signal_children + while pid = @pids.pop + Process.kill 'USR1', pid rescue nil + end + end + def start_search &callback Thread.new do lists = [] @@ -704,39 +825,26 @@ class FZF new_search = events[:key] || events.delete(:new) user_input = events[:key] - progress = 0 - started_at = Time.now if updated = new_search && !lists.empty? - q, cx = events.delete(:key) || [q, 0] - empty = matcher.empty?(q) + q, cx = events.delete(:key) || [q, 0] unless matches = fcache[q] - found = [] - skip = false - cnt = 0 - lists.each do |list| - cnt += list.length - skip = @mtx.synchronize { @events[:key] } - break if skip - - if !empty && (progress = 100 * cnt / @count.get) < 100 && Time.now - started_at > 0.5 - render { print_info " (#{progress}%)" } + # Simply concats the list + if matcher.empty?(q) + matches = lists.inject([]) { |cc, l| cc.concat l } + else + matches ||= search(lists, q, cx) + next if matches == :skip + matches = @sort ? matches : matches.reverse + if @sort && matches.length <= @sort + matches = FZF.sort(matches) end - - found.concat(q.empty? ? list : - matcher.match(list, q, q[0, cx], q[cx..-1])) end - next if skip - matches = @sort ? found : found.reverse - if !empty && @sort && matches.length <= @sort - matches = FZF.sort(matches) - end - fcache[q] = matches end # Atomic update - @matches.set matches - end#new_search + @matches.set fcache[q] = matches + end callback = nil if callback && (updated || events[:loaded]) && @@ -1083,7 +1191,10 @@ class FZF upd = actions.fetch(key, actions[:default]).call(key) # Dispatch key event - emit(:key) { [@query.set(input.dup), cursor] } if upd + if upd + signal_children + emit(:key) { [@query.set(input.dup), cursor] } + end end end ensure @@ -1176,9 +1287,7 @@ class FZF end end - def match list, q, prefix, suffix - regexp = fuzzy_regex q - + def cached list, q, prefix, suffix cache = @caches[list.object_id] prefix_cache = nil (prefix.length - 1).downto(1) do |len| @@ -1190,10 +1299,17 @@ class FZF break if suffix_cache = cache[suffix[idx..-1]] end unless suffix.empty? - partial_cache = [prefix_cache, - suffix_cache].compact.sort_by { |e| e.length }.first - cache[q] ||= (partial_cache ? - partial_cache.map { |e| e.first } : list).map { |line| + [prefix_cache, suffix_cache].compact.sort_by { |e| e.length }.first + end + + def cache list, q, data + @caches[list.object_id][q] = data + end + + def match list, q, partial_cache = nil + regexp = fuzzy_regex q + + (partial_cache ? partial_cache.map { |e| e.first } : list).map { |line| # Ignore errors: e.g. invalid byte sequence in UTF-8 md = do_match(line, regexp) md && [line, [md.offset(0)]] @@ -1249,7 +1365,11 @@ class FZF Regexp.new(sanitize(Regexp.escape(w)), rxflag_for(w)) end - def match list, q, prefix, suffix + def cache list, q, data + @caches[list.object_id][Set[parse q]] = data + end + + def cached list, q, prefix, suffix regexps = parse q # Look for prefix cache cache = @caches[list.object_id] @@ -1258,10 +1378,12 @@ class FZF (prefix.length - 1).downto(1) do |len| break if prefix_cache = cache[Set[@regexps[prefix[0, len]]]] end + prefix_cache + end - cache[Set[regexps]] ||= (prefix_cache ? - prefix_cache.map { |e| e.first } : - list).map { |line| + def match list, q, partial_cache = nil + regexps = parse q + (partial_cache ? partial_cache.map { |e| e.first } : list).map { |line| offsets = [] regexps.all? { |pair| regexp, invert = pair