Make use of the new ActiveRecord 3.x concurrency contract

All Database usage must go through framework.db (which should have been
the case before, anyways) or explicitly checkout and checkin a
connection.  Failure to do so causes thread starvation and bizarre
random failures when attempting to use the database.

This commit also explicitly releases database connections at the end of
all threads created via framework.threads.spawn, which should alleviate
Deprecation Warning messages from ActiveRecord.

[Fixes #6613]
unstable
James Lee 2012-04-19 14:15:45 -06:00
parent 8d1d63dda8
commit 876c59b192
5 changed files with 182 additions and 19 deletions

View File

@ -191,27 +191,37 @@ class DBManager
# Determines if the database is functional
#
def check
::ActiveRecord::Base.connection_pool.with_connection {
res = ::Mdm::Host.find(:first)
}
end
def default_workspace
::ActiveRecord::Base.connection_pool.with_connection {
::Mdm::Workspace.default
}
end
def find_workspace(name)
::ActiveRecord::Base.connection_pool.with_connection {
::Mdm::Workspace.find_by_name(name)
}
end
#
# Creates a new workspace in the database
#
def add_workspace(name)
::ActiveRecord::Base.connection_pool.with_connection {
::Mdm::Workspace.find_or_create_by_name(name)
}
end
def workspaces
::ActiveRecord::Base.connection_pool.with_connection {
::Mdm::Workspace.find(:all)
}
end
#
@ -233,13 +243,15 @@ class DBManager
address = opts[:addr] || opts[:address] || opts[:host] || return
return address if address.kind_of? ::Mdm::Host
end
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
if wspace.kind_of? String
wspace = find_workspace(wspace)
end
address, scope = address.split('%', 2)
return wspace.hosts.find_by_address(address)
}
end
#
@ -276,6 +288,7 @@ class DBManager
return
end
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
if wspace.kind_of? String
wspace = find_workspace(wspace)
@ -333,6 +346,7 @@ class DBManager
end
host
}
end
@ -361,6 +375,7 @@ class DBManager
return
end
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
if wspace.kind_of? String
wspace = find_workspace(wspace)
@ -445,25 +460,30 @@ class DBManager
end
host
}
end
#
# Iterates over the hosts table calling the supplied block with the host
# instance of each entry.
#
def each_host(wspace=workspace, &block)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.hosts.each do |host|
block.call(host)
end
}
end
#
# Returns a list of all hosts in the database
#
def hosts(wspace = workspace, only_up = false, addresses = nil)
::ActiveRecord::Base.connection_pool.with_connection {
conditions = {}
conditions[:state] = [Msf::HostState::Alive, Msf::HostState::Unknown] if only_up
conditions[:address] = addresses if addresses
wspace.hosts.where(conditions).order(:address)
}
end
@ -486,6 +506,7 @@ class DBManager
#
def report_service(opts)
return if not active
#::ActiveRecord::Base.connection_pool.with_connection { |conn|
addr = opts.delete(:host) || return
hname = opts.delete(:host_name)
hmac = opts.delete(:mac)
@ -541,12 +562,15 @@ class DBManager
service.save!
end
ret[:service] = service
#}
end
def get_service(wspace, host, proto, port)
::ActiveRecord::Base.connection_pool.with_connection {
host = get_host(:workspace => wspace, :address => host)
return if not host
return host.services.find_by_proto_and_port(proto, port)
}
end
#
@ -554,15 +578,18 @@ class DBManager
# service instance of each entry.
#
def each_service(wspace=workspace, &block)
::ActiveRecord::Base.connection_pool.with_connection {
services(wspace).each do |service|
block.call(service)
end
}
end
#
# Returns a list of all services in the database
#
def services(wspace = workspace, only_up = false, proto = nil, addresses = nil, ports = nil, names = nil)
::ActiveRecord::Base.connection_pool.with_connection {
conditions = {}
conditions[:state] = [ServiceState::Open] if only_up
conditions[:proto] = proto if proto
@ -570,17 +597,20 @@ class DBManager
conditions[:port] = ports if ports
conditions[:name] = names if names
wspace.services.includes(:host).where(conditions).order("hosts.address, port")
}
end
# Returns a session based on opened_time, host address, and workspace
# (or returns nil)
def get_session(opts)
return if not active
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts[:workspace] || opts[:wspace] || workspace
addr = opts[:addr] || opts[:address] || opts[:host] || return
host = get_host(:workspace => wspace, :host => addr)
time = opts[:opened_at] || opts[:created_at] || opts[:time] || return
::Mdm::Session.find_by_host_id_and_opened_at(host.id, time)
}
end
# Record a new session in the database
@ -591,6 +621,7 @@ class DBManager
#
def report_session(opts)
return if not active
::ActiveRecord::Base.connection_pool.with_connection {
if opts[:session]
raise ArgumentError.new("Invalid :session, expected Msf::Session") unless opts[:session].kind_of? Msf::Session
session = opts[:session]
@ -666,6 +697,7 @@ class DBManager
end
s
}
end
#
@ -687,6 +719,7 @@ class DBManager
raise ArgumentError.new("Expected an :etype") unless opts[:etype]
session = nil
::ActiveRecord::Base.connection_pool.with_connection {
if opts[:session].respond_to? :db_record
session = opts[:session].db_record
if session.nil?
@ -712,6 +745,7 @@ class DBManager
end
s = ::Mdm::SessionEvent.create(event_data)
}
end
def report_session_route(session, route)
@ -725,9 +759,11 @@ class DBManager
raise ArgumentError.new("Invalid :session, expected Session object got #{session.class}")
end
::ActiveRecord::Base.connection_pool.with_connection {
subnet, netmask = route.split("/")
s.routes.create(:subnet => subnet, :netmask => netmask)
}
end
def report_session_route_remove(session, route)
@ -741,16 +777,20 @@ class DBManager
raise ArgumentError.new("Invalid :session, expected Session object got #{session.class}")
end
::ActiveRecord::Base.connection_pool.with_connection {
subnet, netmask = route.split("/")
r = s.routes.find_by_subnet_and_netmask(subnet, netmask)
r.destroy if r
}
end
def get_client(opts)
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
host = get_host(:workspace => wspace, :host => opts[:host]) || return
client = host.clients.where({:ua_string => opts[:ua_string]}).first()
return client
}
end
def find_or_create_client(opts)
@ -773,6 +813,7 @@ class DBManager
#
def report_client(opts)
return if not active
::ActiveRecord::Base.connection_pool.with_connection {
addr = opts.delete(:host) || return
wspace = opts.delete(:workspace) || workspace
report_host(:workspace => wspace, :host => addr)
@ -805,6 +846,7 @@ class DBManager
client.save!
end
ret[:client] = client
}
end
#
@ -812,30 +854,38 @@ class DBManager
# vuln instance of each entry.
#
def each_vuln(wspace=workspace,&block)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.vulns.each do |vulns|
block.call(vulns)
end
}
end
#
# This methods returns a list of all vulnerabilities in the database
#
def vulns(wspace=workspace)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.vulns
}
end
#
# This methods returns a list of all credentials in the database
#
def creds(wspace=workspace)
::ActiveRecord::Base.connection_pool.with_connection {
Mdm::Cred.includes({:service => :host}).where("hosts.workspace_id = ?", wspace.id)
}
end
#
# This method returns a list of all exploited hosts in the database.
#
def exploited_hosts(wspace=workspace)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.exploited_hosts
}
end
#
@ -843,9 +893,11 @@ class DBManager
# note instance of each entry.
#
def each_note(wspace=workspace, &block)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.notes.each do |note|
block.call(note)
end
}
end
#
@ -882,6 +934,7 @@ class DBManager
#
def report_note(opts)
return if not active
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
if wspace.kind_of? String
wspace = find_workspace(wspace)
@ -1001,13 +1054,16 @@ class DBManager
msf_import_timestamps(opts,note)
note.save!
ret[:note] = note
}
end
#
# This methods returns a list of all notes in the database
#
def notes(wspace=workspace)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.notes
}
end
# This is only exercised by MSF3 XML importing for now. Needs the wait
@ -1019,6 +1075,7 @@ class DBManager
raise DBImportError.new("Missing required option :addr") unless addr
wspace = opts.delete(:wspace)
raise DBImportError.new("Missing required option :wspace") unless wspace
::ActiveRecord::Base.connection_pool.with_connection {
if wspace.kind_of? String
wspace = find_workspace(wspace)
end
@ -1041,6 +1098,7 @@ class DBManager
tag.critical = !!crit
tag.hosts = tag.hosts | [host]
tag.save! if tag.changed?
}
end
#
@ -1194,15 +1252,19 @@ class DBManager
# cred instance of each entry.
#
def each_cred(wspace=workspace,&block)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.creds.each do |cred|
block.call(cred)
end
}
end
def each_exploited_host(wspace=workspace,&block)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.exploited_hosts.each do |eh|
block.call(eh)
end
}
end
#
@ -1227,6 +1289,7 @@ class DBManager
raise ArgumentError.new("Deprecated data column for vuln, use .info instead") if opts[:data]
name = opts[:name] || return
info = opts[:info]
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
exploited_at = opts[:exploited_at] || opts["exploited_at"]
rids = nil
@ -1301,10 +1364,12 @@ class DBManager
msf_import_timestamps(opts,vuln)
vuln.save!
end
}
end
def get_vuln(wspace, host, service, name, data='')
raise RuntimeError, "Not workspace safe: #{caller.inspect}"
::ActiveRecord::Base.connection_pool.with_connection {
vuln = nil
if (service)
vuln = ::Mdm::Vuln.find.where("name = ? and service_id = ? and host_id = ?", name, service.id, host.id).order("vulns.id DESC").first()
@ -1313,6 +1378,7 @@ class DBManager
end
return vuln
}
end
#
@ -1323,15 +1389,19 @@ class DBManager
ret[:ref] = get_ref(opts[:name])
return ret[:ref] if ret[:ref]
::ActiveRecord::Base.connection_pool.with_connection {
ref = ::Mdm::Ref.find_or_initialize_by_name(opts[:name])
if ref and ref.changed?
ref.save!
end
ret[:ref] = ref
}
end
def get_ref(name)
::ActiveRecord::Base.connection_pool.with_connection {
::Mdm::Ref.find_by_name(name)
}
end
# report_exploit() used to be used to track sessions and which modules
@ -1349,9 +1419,11 @@ class DBManager
# Deletes a host and associated data matching this address/comm
#
def del_host(wspace, address, comm='')
::ActiveRecord::Base.connection_pool.with_connection {
address, scope = address.split('%', 2)
host = wspace.hosts.find_by_address_and_comm(address, comm)
host.destroy if host
}
end
#
@ -1362,37 +1434,48 @@ class DBManager
host = get_host(:workspace => wspace, :address => address)
return unless host
::ActiveRecord::Base.connection_pool.with_connection {
host.services.where({:proto => proto, :port => port}).each { |s| s.destroy }
}
end
#
# Find a reference matching this name
#
def has_ref?(name)
::ActiveRecord::Base.connection_pool.with_connection {
Mdm::Ref.find_by_name(name)
}
end
#
# Find a vulnerability matching this name
#
def has_vuln?(name)
::ActiveRecord::Base.connection_pool.with_connection {
Mdm::Vuln.find_by_name(name)
}
end
#
# Look for an address across all comms
#
def has_host?(wspace,addr)
::ActiveRecord::Base.connection_pool.with_connection {
address, scope = addr.split('%', 2)
wspace.hosts.find_by_address(addr)
}
end
def events(wspace=workspace)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.events.find :all, :order => 'created_at ASC'
}
end
def report_event(opts = {})
return if not active
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
return if not wspace # Temp fix?
uname = opts.delete(:username)
@ -1402,6 +1485,7 @@ class DBManager
end
::Mdm::Event.create(opts.merge(:workspace_id => wspace[:id], :username => uname))
}
end
#
@ -1412,9 +1496,11 @@ class DBManager
# instance of each entry.
#
def each_loot(wspace=workspace, &block)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.loots.each do |note|
block.call(note)
end
}
end
#
@ -1426,6 +1512,7 @@ class DBManager
def report_loot(opts)
return if not active
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
path = opts.delete(:path) || (raise RuntimeError, "A loot :path is required")
@ -1478,13 +1565,16 @@ class DBManager
end
ret[:loot] = loot
}
end
#
# This methods returns a list of all loot in the database
#
def loots(wspace=workspace)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.loots
}
end
#
@ -1496,6 +1586,7 @@ class DBManager
def report_task(opts)
return if not active
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
path = opts.delete(:path) || (raise RuntimeError, "A task :path is required")
@ -1530,13 +1621,16 @@ class DBManager
end
task.save!
ret[:task] = task
}
end
#
# This methods returns a list of all tasks in the database
#
def tasks(wspace=workspace)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.tasks
}
end
@ -1549,6 +1643,7 @@ class DBManager
def report_report(opts)
return if not active
::ActiveRecord::Base.connection_pool.with_connection {
wspace = opts.delete(:workspace) || workspace
path = opts.delete(:path) || (raise RuntimeError, "A report :path is required")
@ -1565,13 +1660,16 @@ class DBManager
report.save!
ret[:task] = report
}
end
#
# This methods returns a list of all reports in the database
#
def reports(wspace=workspace)
::ActiveRecord::Base.connection_pool.with_connection {
wspace.reports
}
end
#
@ -1601,6 +1699,7 @@ class DBManager
#
def report_web_site(opts)
return if not active
::ActiveRecord::Base.connection_pool.with_connection { |conn|
wspace = opts.delete(:workspace) || workspace
vhost = opts.delete(:vhost)
@ -1672,6 +1771,7 @@ class DBManager
site.save!
ret[:web_site] = site
}
end
#

View File

@ -44,7 +44,8 @@ class DBManager
# Returns true if we are ready to load/store data
def active
return false if not @usable
(ActiveRecord::Base.connected? && ActiveRecord::Base.connection.active? && migrated) rescue false
# We have established a connection, some connection is active, and we have run migrations
(ActiveRecord::Base.connected? && ActiveRecord::Base.connection_pool.connected? && migrated)# rescue false
end
# Returns true if the prerequisites have been installed
@ -175,7 +176,8 @@ class DBManager
nopts['port'] = nopts['port'].to_i
end
nopts['pool'] = 75
# Prefer the config file's pool setting
nopts['pool'] ||= 75
begin
self.migrated = false
@ -221,7 +223,10 @@ class DBManager
# Try to force a connection to be made to the database, if it succeeds
# then we know we don't need to create it :)
ActiveRecord::Base.establish_connection(opts)
conn = ActiveRecord::Base.connection
# Do the checkout, checkin dance here to make sure this thread doesn't
# hold on to a connection we don't need
conn = ActiveRecord::Base.connection_pool.checkout
ActiveRecord::Base.connection_pool.checkin(conn)
end
rescue ::Exception => e
errstr = e.to_s
@ -275,8 +280,11 @@ class DBManager
success = true
begin
ActiveRecord::Migration.verbose = verbose
ActiveRecord::Migrator.migrate(temp_dir, nil)
::ActiveRecord::Base.connection_pool.with_connection {
ActiveRecord::Migration.verbose = verbose
ActiveRecord::Migrator.migrate(temp_dir, nil)
}
rescue ::Exception => e
self.error = e
elog("DB.migrate threw an exception: #{e}")

View File

@ -127,11 +127,13 @@ class SessionManager < Hash
# Clean out any stale sessions that have been orphaned by a dead
# framework instance.
#
::Mdm::Session.find_all_by_closed_at(nil).each do |db_session|
if db_session.last_seen.nil? or ((Time.now.utc - db_session.last_seen) > (2*LAST_SEEN_INTERVAL))
db_session.closed_at = db_session.last_seen || Time.now.utc
db_session.close_reason = "Orphaned"
db_session.save
::ActiveRecord::Base.connection_pool.with_connection do |conn|
::Mdm::Session.find_all_by_closed_at(nil).each do |db_session|
if db_session.last_seen.nil? or ((Time.now.utc - db_session.last_seen) > (2*LAST_SEEN_INTERVAL))
db_session.closed_at = db_session.last_seen || Time.now.utc
db_session.close_reason = "Orphaned"
db_session.save
end
end
end
end
@ -141,7 +143,8 @@ class SessionManager < Hash
#
rescue ::Exception => e
respawn_cnt += 1
elog("Exception #{respawn_cnt}/#{respawn_max} in monitor thread #{e.class} #{e} #{e.backtrace.join("\n")}")
elog("Exception #{respawn_cnt}/#{respawn_max} in monitor thread #{e.class} #{e}")
elog("Call stack: \n#{e.backtrace.join("\n")}")
if respawn_cnt < respawn_max
::IO.select(nil, nil, nil, 10.0)
retry

View File

@ -1,5 +1,40 @@
require 'msf/core/plugin'
=begin
require 'active_record'
#
# This monkeypatch can help to diagnose errors involving connection pool
# exhaustion and other strange ActiveRecord including errors like:
#
# DEPRECATION WARNING: Database connections will not be closed automatically, please close your
# database connection at the end of the thread by calling `close` on your
# connection. For example: ActiveRecord::Base.connection.close
#
# and
#
# ActiveRecord::StatementInvalid NoMethodError: undefined method `fields' for nil:NilClass: SELECT "workspaces".* FROM "workspaces" WHERE "workspaces"."id" = 24 LIMIT 1
#
#
# Based on this code: https://gist.github.com/1364551 linked here:
# http://bibwild.wordpress.com/2011/11/14/multi-threading-in-rails-activerecord-3-0-3-1/
module ActiveRecord
class Base
class << self
def connection
unless connection_pool.active_connection?
$stdout.puts("AR::B.connection implicit checkout")
$stdout.puts(caller.join("\n"))
raise ImplicitConnectionForbiddenError.new("Implicit ActiveRecord checkout attempted!")
end
retrieve_connection
end
end
end
class ImplicitConnectionForbiddenError < ActiveRecord::ConnectionTimeoutError ; end
end
=end
module Msf
###
@ -66,10 +101,17 @@ class ThreadManager < Array
elog("thread exception: #{::Thread.current[:tm_name]} critical=#{::Thread.current[:tm_crit]} error:#{e.class} #{e} source:#{::Thread.current[:tm_call].inspect}")
elog("Call Stack\n#{e.backtrace.join("\n")}")
raise e
ensure
if framework.db and framework.db.active
# NOTE: despite the Deprecation Warning's advice, this should *NOT*
# be ActiveRecord::Base.connection.close which causes unrelated
# threads to raise ActiveRecord::StatementInvalid exceptions at
# some point in the future, presumably due to the pool manager
# believing that the connection is still usable and handing it out
# to another thread.
::ActiveRecord::Base.connection_pool.release_connection
end
end
if framework.db and framework.db.active
::ActiveRecord::Base.connection.close if ActiveRecord::Base.connection
end
end
else
t = ::Thread.new(name, crit, caller, *args) do |*argv|
@ -77,6 +119,13 @@ class ThreadManager < Array
::Thread.current[:tm_crit] = argv.shift
::Thread.current[:tm_call] = argv.shift
::Thread.current[:tm_time] = Time.now
# Calling spawn without a block means we cannot force a database
# connection release when the thread completes, so doing so can
# potentially use up all database resources and starve all subsequent
# threads that make use of the database. Log a warning so we can track
# down this kind of usage.
dlog("Thread spawned without a block!")
dlog("Call stack: \n#{::Thread.current[:tm_call].join("\n")}")
end
end

View File

@ -1334,10 +1334,13 @@ class Db
#
def cmd_db_status(*args)
return if not db_check_driver
if ActiveRecord::Base.connected? and ActiveRecord::Base.connection.active?
if ActiveRecord::Base.connection.respond_to? :current_database
cdb = ActiveRecord::Base.connection.current_database
end
if ActiveRecord::Base.connected?
cdb = ""
::ActiveRecord::Base.connection_pool.with_connection { |conn|
if conn.respond_to? :current_database
cdb = conn.current_database
end
}
print_status("#{framework.db.driver} connected to #{cdb}")
else
print_status("#{framework.db.driver} selected, no connection")