From 876c59b192c569935b6ef884287b1f80b920afe1 Mon Sep 17 00:00:00 2001 From: James Lee Date: Thu, 19 Apr 2012 14:15:45 -0600 Subject: [PATCH] Make use of the new ActiveRecord 3.x concurrency contract All Database usage must go through framework.db (which should have been the case before, anyways) or explicitly checkout and checkin a connection. Failure to do so causes thread starvation and bizarre random failures when attempting to use the database. This commit also explicitly releases database connections at the end of all threads created via framework.threads.spawn, which should alleviate Deprecation Warning messages from ActiveRecord. [Fixes #6613] --- lib/msf/core/db.rb | 102 +++++++++++++++++++- lib/msf/core/db_manager.rb | 18 +++- lib/msf/core/session_manager.rb | 15 +-- lib/msf/core/thread_manager.rb | 55 ++++++++++- lib/msf/ui/console/command_dispatcher/db.rb | 11 ++- 5 files changed, 182 insertions(+), 19 deletions(-) diff --git a/lib/msf/core/db.rb b/lib/msf/core/db.rb index 4eecdf1b27..cfb8100bb4 100644 --- a/lib/msf/core/db.rb +++ b/lib/msf/core/db.rb @@ -191,27 +191,37 @@ class DBManager # Determines if the database is functional # def check + ::ActiveRecord::Base.connection_pool.with_connection { res = ::Mdm::Host.find(:first) + } end def default_workspace + ::ActiveRecord::Base.connection_pool.with_connection { ::Mdm::Workspace.default + } end def find_workspace(name) + ::ActiveRecord::Base.connection_pool.with_connection { ::Mdm::Workspace.find_by_name(name) + } end # # Creates a new workspace in the database # def add_workspace(name) + ::ActiveRecord::Base.connection_pool.with_connection { ::Mdm::Workspace.find_or_create_by_name(name) + } end def workspaces + ::ActiveRecord::Base.connection_pool.with_connection { ::Mdm::Workspace.find(:all) + } end # @@ -233,13 +243,15 @@ class DBManager address = opts[:addr] || opts[:address] || opts[:host] || return return address if address.kind_of? ::Mdm::Host end + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace if wspace.kind_of? String wspace = find_workspace(wspace) end - + address, scope = address.split('%', 2) return wspace.hosts.find_by_address(address) + } end # @@ -276,6 +288,7 @@ class DBManager return end + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace if wspace.kind_of? String wspace = find_workspace(wspace) @@ -333,6 +346,7 @@ class DBManager end host + } end @@ -361,6 +375,7 @@ class DBManager return end + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace if wspace.kind_of? String wspace = find_workspace(wspace) @@ -445,25 +460,30 @@ class DBManager end host + } end # # Iterates over the hosts table calling the supplied block with the host # instance of each entry. # def each_host(wspace=workspace, &block) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.hosts.each do |host| block.call(host) end + } end # # Returns a list of all hosts in the database # def hosts(wspace = workspace, only_up = false, addresses = nil) + ::ActiveRecord::Base.connection_pool.with_connection { conditions = {} conditions[:state] = [Msf::HostState::Alive, Msf::HostState::Unknown] if only_up conditions[:address] = addresses if addresses wspace.hosts.where(conditions).order(:address) + } end @@ -486,6 +506,7 @@ class DBManager # def report_service(opts) return if not active + #::ActiveRecord::Base.connection_pool.with_connection { |conn| addr = opts.delete(:host) || return hname = opts.delete(:host_name) hmac = opts.delete(:mac) @@ -541,12 +562,15 @@ class DBManager service.save! end ret[:service] = service + #} end def get_service(wspace, host, proto, port) + ::ActiveRecord::Base.connection_pool.with_connection { host = get_host(:workspace => wspace, :address => host) return if not host return host.services.find_by_proto_and_port(proto, port) + } end # @@ -554,15 +578,18 @@ class DBManager # service instance of each entry. # def each_service(wspace=workspace, &block) + ::ActiveRecord::Base.connection_pool.with_connection { services(wspace).each do |service| block.call(service) end + } end # # Returns a list of all services in the database # def services(wspace = workspace, only_up = false, proto = nil, addresses = nil, ports = nil, names = nil) + ::ActiveRecord::Base.connection_pool.with_connection { conditions = {} conditions[:state] = [ServiceState::Open] if only_up conditions[:proto] = proto if proto @@ -570,17 +597,20 @@ class DBManager conditions[:port] = ports if ports conditions[:name] = names if names wspace.services.includes(:host).where(conditions).order("hosts.address, port") + } end # Returns a session based on opened_time, host address, and workspace # (or returns nil) def get_session(opts) return if not active + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts[:workspace] || opts[:wspace] || workspace addr = opts[:addr] || opts[:address] || opts[:host] || return host = get_host(:workspace => wspace, :host => addr) time = opts[:opened_at] || opts[:created_at] || opts[:time] || return ::Mdm::Session.find_by_host_id_and_opened_at(host.id, time) + } end # Record a new session in the database @@ -591,6 +621,7 @@ class DBManager # def report_session(opts) return if not active + ::ActiveRecord::Base.connection_pool.with_connection { if opts[:session] raise ArgumentError.new("Invalid :session, expected Msf::Session") unless opts[:session].kind_of? Msf::Session session = opts[:session] @@ -666,6 +697,7 @@ class DBManager end s + } end # @@ -687,6 +719,7 @@ class DBManager raise ArgumentError.new("Expected an :etype") unless opts[:etype] session = nil + ::ActiveRecord::Base.connection_pool.with_connection { if opts[:session].respond_to? :db_record session = opts[:session].db_record if session.nil? @@ -712,6 +745,7 @@ class DBManager end s = ::Mdm::SessionEvent.create(event_data) + } end def report_session_route(session, route) @@ -725,9 +759,11 @@ class DBManager raise ArgumentError.new("Invalid :session, expected Session object got #{session.class}") end + ::ActiveRecord::Base.connection_pool.with_connection { subnet, netmask = route.split("/") s.routes.create(:subnet => subnet, :netmask => netmask) + } end def report_session_route_remove(session, route) @@ -741,16 +777,20 @@ class DBManager raise ArgumentError.new("Invalid :session, expected Session object got #{session.class}") end + ::ActiveRecord::Base.connection_pool.with_connection { subnet, netmask = route.split("/") r = s.routes.find_by_subnet_and_netmask(subnet, netmask) r.destroy if r + } end def get_client(opts) + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace host = get_host(:workspace => wspace, :host => opts[:host]) || return client = host.clients.where({:ua_string => opts[:ua_string]}).first() return client + } end def find_or_create_client(opts) @@ -773,6 +813,7 @@ class DBManager # def report_client(opts) return if not active + ::ActiveRecord::Base.connection_pool.with_connection { addr = opts.delete(:host) || return wspace = opts.delete(:workspace) || workspace report_host(:workspace => wspace, :host => addr) @@ -805,6 +846,7 @@ class DBManager client.save! end ret[:client] = client + } end # @@ -812,30 +854,38 @@ class DBManager # vuln instance of each entry. # def each_vuln(wspace=workspace,&block) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.vulns.each do |vulns| block.call(vulns) end + } end # # This methods returns a list of all vulnerabilities in the database # def vulns(wspace=workspace) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.vulns + } end # # This methods returns a list of all credentials in the database # def creds(wspace=workspace) + ::ActiveRecord::Base.connection_pool.with_connection { Mdm::Cred.includes({:service => :host}).where("hosts.workspace_id = ?", wspace.id) + } end # # This method returns a list of all exploited hosts in the database. # def exploited_hosts(wspace=workspace) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.exploited_hosts + } end # @@ -843,9 +893,11 @@ class DBManager # note instance of each entry. # def each_note(wspace=workspace, &block) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.notes.each do |note| block.call(note) end + } end # @@ -882,6 +934,7 @@ class DBManager # def report_note(opts) return if not active + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace if wspace.kind_of? String wspace = find_workspace(wspace) @@ -1001,13 +1054,16 @@ class DBManager msf_import_timestamps(opts,note) note.save! ret[:note] = note + } end # # This methods returns a list of all notes in the database # def notes(wspace=workspace) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.notes + } end # This is only exercised by MSF3 XML importing for now. Needs the wait @@ -1019,6 +1075,7 @@ class DBManager raise DBImportError.new("Missing required option :addr") unless addr wspace = opts.delete(:wspace) raise DBImportError.new("Missing required option :wspace") unless wspace + ::ActiveRecord::Base.connection_pool.with_connection { if wspace.kind_of? String wspace = find_workspace(wspace) end @@ -1041,6 +1098,7 @@ class DBManager tag.critical = !!crit tag.hosts = tag.hosts | [host] tag.save! if tag.changed? + } end # @@ -1194,15 +1252,19 @@ class DBManager # cred instance of each entry. # def each_cred(wspace=workspace,&block) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.creds.each do |cred| block.call(cred) end + } end def each_exploited_host(wspace=workspace,&block) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.exploited_hosts.each do |eh| block.call(eh) end + } end # @@ -1227,6 +1289,7 @@ class DBManager raise ArgumentError.new("Deprecated data column for vuln, use .info instead") if opts[:data] name = opts[:name] || return info = opts[:info] + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace exploited_at = opts[:exploited_at] || opts["exploited_at"] rids = nil @@ -1301,10 +1364,12 @@ class DBManager msf_import_timestamps(opts,vuln) vuln.save! end + } end def get_vuln(wspace, host, service, name, data='') raise RuntimeError, "Not workspace safe: #{caller.inspect}" + ::ActiveRecord::Base.connection_pool.with_connection { vuln = nil if (service) vuln = ::Mdm::Vuln.find.where("name = ? and service_id = ? and host_id = ?", name, service.id, host.id).order("vulns.id DESC").first() @@ -1313,6 +1378,7 @@ class DBManager end return vuln + } end # @@ -1323,15 +1389,19 @@ class DBManager ret[:ref] = get_ref(opts[:name]) return ret[:ref] if ret[:ref] + ::ActiveRecord::Base.connection_pool.with_connection { ref = ::Mdm::Ref.find_or_initialize_by_name(opts[:name]) if ref and ref.changed? ref.save! end ret[:ref] = ref + } end def get_ref(name) + ::ActiveRecord::Base.connection_pool.with_connection { ::Mdm::Ref.find_by_name(name) + } end # report_exploit() used to be used to track sessions and which modules @@ -1349,9 +1419,11 @@ class DBManager # Deletes a host and associated data matching this address/comm # def del_host(wspace, address, comm='') + ::ActiveRecord::Base.connection_pool.with_connection { address, scope = address.split('%', 2) host = wspace.hosts.find_by_address_and_comm(address, comm) host.destroy if host + } end # @@ -1362,37 +1434,48 @@ class DBManager host = get_host(:workspace => wspace, :address => address) return unless host + ::ActiveRecord::Base.connection_pool.with_connection { host.services.where({:proto => proto, :port => port}).each { |s| s.destroy } + } end # # Find a reference matching this name # def has_ref?(name) + ::ActiveRecord::Base.connection_pool.with_connection { Mdm::Ref.find_by_name(name) + } end # # Find a vulnerability matching this name # def has_vuln?(name) + ::ActiveRecord::Base.connection_pool.with_connection { Mdm::Vuln.find_by_name(name) + } end # # Look for an address across all comms # def has_host?(wspace,addr) + ::ActiveRecord::Base.connection_pool.with_connection { address, scope = addr.split('%', 2) wspace.hosts.find_by_address(addr) + } end def events(wspace=workspace) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.events.find :all, :order => 'created_at ASC' + } end def report_event(opts = {}) return if not active + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace return if not wspace # Temp fix? uname = opts.delete(:username) @@ -1402,6 +1485,7 @@ class DBManager end ::Mdm::Event.create(opts.merge(:workspace_id => wspace[:id], :username => uname)) + } end # @@ -1412,9 +1496,11 @@ class DBManager # instance of each entry. # def each_loot(wspace=workspace, &block) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.loots.each do |note| block.call(note) end + } end # @@ -1426,6 +1512,7 @@ class DBManager def report_loot(opts) return if not active + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace path = opts.delete(:path) || (raise RuntimeError, "A loot :path is required") @@ -1478,13 +1565,16 @@ class DBManager end ret[:loot] = loot + } end # # This methods returns a list of all loot in the database # def loots(wspace=workspace) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.loots + } end # @@ -1496,6 +1586,7 @@ class DBManager def report_task(opts) return if not active + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace path = opts.delete(:path) || (raise RuntimeError, "A task :path is required") @@ -1530,13 +1621,16 @@ class DBManager end task.save! ret[:task] = task + } end # # This methods returns a list of all tasks in the database # def tasks(wspace=workspace) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.tasks + } end @@ -1549,6 +1643,7 @@ class DBManager def report_report(opts) return if not active + ::ActiveRecord::Base.connection_pool.with_connection { wspace = opts.delete(:workspace) || workspace path = opts.delete(:path) || (raise RuntimeError, "A report :path is required") @@ -1565,13 +1660,16 @@ class DBManager report.save! ret[:task] = report + } end # # This methods returns a list of all reports in the database # def reports(wspace=workspace) + ::ActiveRecord::Base.connection_pool.with_connection { wspace.reports + } end # @@ -1601,6 +1699,7 @@ class DBManager # def report_web_site(opts) return if not active + ::ActiveRecord::Base.connection_pool.with_connection { |conn| wspace = opts.delete(:workspace) || workspace vhost = opts.delete(:vhost) @@ -1672,6 +1771,7 @@ class DBManager site.save! ret[:web_site] = site + } end # diff --git a/lib/msf/core/db_manager.rb b/lib/msf/core/db_manager.rb index 46692bc37e..4a7fd688cb 100644 --- a/lib/msf/core/db_manager.rb +++ b/lib/msf/core/db_manager.rb @@ -44,7 +44,8 @@ class DBManager # Returns true if we are ready to load/store data def active return false if not @usable - (ActiveRecord::Base.connected? && ActiveRecord::Base.connection.active? && migrated) rescue false + # We have established a connection, some connection is active, and we have run migrations + (ActiveRecord::Base.connected? && ActiveRecord::Base.connection_pool.connected? && migrated)# rescue false end # Returns true if the prerequisites have been installed @@ -175,7 +176,8 @@ class DBManager nopts['port'] = nopts['port'].to_i end - nopts['pool'] = 75 + # Prefer the config file's pool setting + nopts['pool'] ||= 75 begin self.migrated = false @@ -221,7 +223,10 @@ class DBManager # Try to force a connection to be made to the database, if it succeeds # then we know we don't need to create it :) ActiveRecord::Base.establish_connection(opts) - conn = ActiveRecord::Base.connection + # Do the checkout, checkin dance here to make sure this thread doesn't + # hold on to a connection we don't need + conn = ActiveRecord::Base.connection_pool.checkout + ActiveRecord::Base.connection_pool.checkin(conn) end rescue ::Exception => e errstr = e.to_s @@ -275,8 +280,11 @@ class DBManager success = true begin - ActiveRecord::Migration.verbose = verbose - ActiveRecord::Migrator.migrate(temp_dir, nil) + + ::ActiveRecord::Base.connection_pool.with_connection { + ActiveRecord::Migration.verbose = verbose + ActiveRecord::Migrator.migrate(temp_dir, nil) + } rescue ::Exception => e self.error = e elog("DB.migrate threw an exception: #{e}") diff --git a/lib/msf/core/session_manager.rb b/lib/msf/core/session_manager.rb index 51dcac771c..4e3e753958 100644 --- a/lib/msf/core/session_manager.rb +++ b/lib/msf/core/session_manager.rb @@ -127,11 +127,13 @@ class SessionManager < Hash # Clean out any stale sessions that have been orphaned by a dead # framework instance. # - ::Mdm::Session.find_all_by_closed_at(nil).each do |db_session| - if db_session.last_seen.nil? or ((Time.now.utc - db_session.last_seen) > (2*LAST_SEEN_INTERVAL)) - db_session.closed_at = db_session.last_seen || Time.now.utc - db_session.close_reason = "Orphaned" - db_session.save + ::ActiveRecord::Base.connection_pool.with_connection do |conn| + ::Mdm::Session.find_all_by_closed_at(nil).each do |db_session| + if db_session.last_seen.nil? or ((Time.now.utc - db_session.last_seen) > (2*LAST_SEEN_INTERVAL)) + db_session.closed_at = db_session.last_seen || Time.now.utc + db_session.close_reason = "Orphaned" + db_session.save + end end end end @@ -141,7 +143,8 @@ class SessionManager < Hash # rescue ::Exception => e respawn_cnt += 1 - elog("Exception #{respawn_cnt}/#{respawn_max} in monitor thread #{e.class} #{e} #{e.backtrace.join("\n")}") + elog("Exception #{respawn_cnt}/#{respawn_max} in monitor thread #{e.class} #{e}") + elog("Call stack: \n#{e.backtrace.join("\n")}") if respawn_cnt < respawn_max ::IO.select(nil, nil, nil, 10.0) retry diff --git a/lib/msf/core/thread_manager.rb b/lib/msf/core/thread_manager.rb index e268c32bfe..94bbad8bac 100644 --- a/lib/msf/core/thread_manager.rb +++ b/lib/msf/core/thread_manager.rb @@ -1,5 +1,40 @@ require 'msf/core/plugin' +=begin +require 'active_record' +# +# This monkeypatch can help to diagnose errors involving connection pool +# exhaustion and other strange ActiveRecord including errors like: +# +# DEPRECATION WARNING: Database connections will not be closed automatically, please close your +# database connection at the end of the thread by calling `close` on your +# connection. For example: ActiveRecord::Base.connection.close +# +# and +# +# ActiveRecord::StatementInvalid NoMethodError: undefined method `fields' for nil:NilClass: SELECT "workspaces".* FROM "workspaces" WHERE "workspaces"."id" = 24 LIMIT 1 +# +# +# Based on this code: https://gist.github.com/1364551 linked here: +# http://bibwild.wordpress.com/2011/11/14/multi-threading-in-rails-activerecord-3-0-3-1/ +module ActiveRecord + class Base + class << self + def connection + unless connection_pool.active_connection? + $stdout.puts("AR::B.connection implicit checkout") + $stdout.puts(caller.join("\n")) + raise ImplicitConnectionForbiddenError.new("Implicit ActiveRecord checkout attempted!") + end + retrieve_connection + end + end + end + class ImplicitConnectionForbiddenError < ActiveRecord::ConnectionTimeoutError ; end +end +=end + + module Msf ### @@ -66,10 +101,17 @@ class ThreadManager < Array elog("thread exception: #{::Thread.current[:tm_name]} critical=#{::Thread.current[:tm_crit]} error:#{e.class} #{e} source:#{::Thread.current[:tm_call].inspect}") elog("Call Stack\n#{e.backtrace.join("\n")}") raise e + ensure + if framework.db and framework.db.active + # NOTE: despite the Deprecation Warning's advice, this should *NOT* + # be ActiveRecord::Base.connection.close which causes unrelated + # threads to raise ActiveRecord::StatementInvalid exceptions at + # some point in the future, presumably due to the pool manager + # believing that the connection is still usable and handing it out + # to another thread. + ::ActiveRecord::Base.connection_pool.release_connection + end end - if framework.db and framework.db.active - ::ActiveRecord::Base.connection.close if ActiveRecord::Base.connection - end end else t = ::Thread.new(name, crit, caller, *args) do |*argv| @@ -77,6 +119,13 @@ class ThreadManager < Array ::Thread.current[:tm_crit] = argv.shift ::Thread.current[:tm_call] = argv.shift ::Thread.current[:tm_time] = Time.now + # Calling spawn without a block means we cannot force a database + # connection release when the thread completes, so doing so can + # potentially use up all database resources and starve all subsequent + # threads that make use of the database. Log a warning so we can track + # down this kind of usage. + dlog("Thread spawned without a block!") + dlog("Call stack: \n#{::Thread.current[:tm_call].join("\n")}") end end diff --git a/lib/msf/ui/console/command_dispatcher/db.rb b/lib/msf/ui/console/command_dispatcher/db.rb index b27c4b9afb..9dfa5cd545 100644 --- a/lib/msf/ui/console/command_dispatcher/db.rb +++ b/lib/msf/ui/console/command_dispatcher/db.rb @@ -1334,10 +1334,13 @@ class Db # def cmd_db_status(*args) return if not db_check_driver - if ActiveRecord::Base.connected? and ActiveRecord::Base.connection.active? - if ActiveRecord::Base.connection.respond_to? :current_database - cdb = ActiveRecord::Base.connection.current_database - end + if ActiveRecord::Base.connected? + cdb = "" + ::ActiveRecord::Base.connection_pool.with_connection { |conn| + if conn.respond_to? :current_database + cdb = conn.current_database + end + } print_status("#{framework.db.driver} connected to #{cdb}") else print_status("#{framework.db.driver} selected, no connection")