This overhauls the database reporting code - all report_* functions now add their write operation to a queue that is processed sequentially in the background. This prevents concurrency issues around database writes, but prevents modules from immediately using the results of their report_* calls in the database. Still todo is a method for waiting on the write to have occurred

git-svn-id: file:///home/svn/framework3/trunk@7997 4d416f70-5f16-0410-b530-b9f4589650da
unstable
HD Moore 2009-12-28 00:21:21 +00:00
parent b85b585098
commit 5757216f9a
4 changed files with 281 additions and 63 deletions

View File

@ -49,14 +49,16 @@ module Auxiliary::Report
def report_host(opts)
return if not db
addr = opts[:host] || return
framework.db.report_host_state(self, addr, Msf::HostState::Alive)
host = nil
opts.delete(:host)
if (opts.length > 0)
host = framework.db.report_host(self, addr, opts)
end
host
framework.db.queue Proc.new {
framework.db.report_host_state(self, addr, Msf::HostState::Alive)
host = nil
opts.delete(:host)
if (opts.length > 0)
host = framework.db.report_host(self, addr, opts)
end
}
end
def get_host(addr)
@ -78,10 +80,10 @@ module Auxiliary::Report
return if not db
addr = opts.delete(:host) || return
framework.db.report_host_state(self, addr, Msf::HostState::Alive)
cli = framework.db.report_client(self, addr, opts)
return cli
framework.db.queue Proc.new {
framework.db.report_host_state(self, addr, Msf::HostState::Alive)
cli = framework.db.report_client(self, addr, opts)
}
end
def get_client(addr, ua_string)
@ -100,31 +102,46 @@ module Auxiliary::Report
name = opts[:name]
state = opts[:state] || 'open'
info = opts[:info]
hname = opts[:host_name]
maddr = opts[:host_mac]
framework.db.report_host_state(self, addr, Msf::HostState::Alive)
framework.db.queue Proc.new {
framework.db.report_host_state(self, addr, Msf::HostState::Alive)
serv = framework.db.report_service_state(
self,
addr,
proto,
port,
state
)
changed = false
serv = framework.db.report_service_state(
self,
addr,
proto,
port,
state
)
if (name and name.length > 1)
serv.name = name.downcase
changed = true
end
changed = false
if(hname)
self.host.name = hname
changed = true
end
if (info and info.length > 1)
serv.info = info
changed = true
end
if(maddr)
self.host.mac = maddr
changed = true
end
serv.host.save! if changed
serv.save! if changed
serv
changed = false
if (name and name.length > 1)
serv.name = name.downcase
changed = true
end
if (info and info.length > 1)
serv.info = info
changed = true
end
serv.save! if changed
}
end
def report_note(opts={})
@ -133,40 +150,42 @@ module Auxiliary::Report
ntype = opts[:type] || return
data = opts[:data] || return
host = framework.db.report_host_state(self, addr, Msf::HostState::Alive)
note = framework.db.get_note(self, host, ntype, data)
note
framework.db.queue Proc.new {
host = framework.db.report_host_state(self, addr, Msf::HostState::Alive)
note = framework.db.get_note(self, host, ntype, data)
}
end
def report_vuln_service(opts={})
return if not db
serv = report_service(opts)
return if not serv
vname = opts[:vname]
vdata = opts[:vdata] || ''
framework.db.queue Proc.new {
serv = report_service(opts)
return if not serv
host = serv.host
vuln = framework.db.get_vuln(self, host, serv, vname, vdata)
vname = opts[:vname]
vdata = opts[:vdata] || ''
framework.db.vuln_add_refs(self, vuln, opts[:refs])
host = serv.host
vuln = framework.db.get_vuln(self, host, serv, vname, vdata)
vuln
framework.db.vuln_add_refs(self, vuln, opts[:refs])
}
end
def report_vuln_host(opts={})
return if not db
addr = opts[:host] || return
host = framework.db.report_host_state(self, addr, Msf::HostState::Alive)
vname = opts[:vname]
vdata = opts[:vdata] || ''
framework.db.queue Proc.new {
host = framework.db.report_host_state(self, addr, Msf::HostState::Alive)
vname = opts[:vname]
vdata = opts[:vdata] || ''
vuln = framework.db.get_vuln(self, host, nil, vname, vdata)
vuln = framework.db.get_vuln(self, host, nil, vname, vdata)
framework.db.vuln_add_refs(self, vuln, opts[:refs])
vuln
framework.db.vuln_add_refs(self, vuln, opts[:refs])
}
end
def report_auth_info(opts={})

View File

@ -1,5 +1,6 @@
require 'msf/core'
require 'msf/core/db'
require 'msf/core/task_manager'
module Msf
@ -30,6 +31,9 @@ class DBManager
# Stores the error message for why the db was not loaded
attr_accessor :error
# Stores a TaskManager for serializing database events
attr_accessor :sink
def initialize(framework)
self.framework = framework
@ -68,10 +72,15 @@ class DBManager
# Determine what drivers are available
#
initialize_drivers
#
# Instantiate the database sink
#
initialize_sink
end
#
#
# Scan through available drivers
#
def initialize_drivers
self.drivers = []
@ -93,6 +102,22 @@ class DBManager
end
end
#
# Create a new database sink and initialize it
#
def initialize_sink
self.sink = TaskManager.new(framework)
self.sink.start
end
#
# Add a new task to the sink
#
def queue(proc)
self.sink.queue_proc(proc)
end
# Verify that sqlite3 is ready
def driver_check_sqlite3
require 'sqlite3'

View File

@ -0,0 +1,185 @@
module Msf
###
#
# This class provides a task manager
#
###
class TaskManager
class Task
attr_accessor :timeout
attr_accessor :created
attr_accessor :completed
attr_accessor :status
attr_accessor :proc
attr_accessor :source
attr_accessor :exception
#
# Create a new task
#
def initialize(proc,timeout=nil)
self.proc = proc
self.status = :new
self.created = Time.now
self.timeout = timeout
self.source = caller
end
#
# Task duration in seconds (float)
#
def duration
etime = self.completed || Time.now
etime.to_f - self.created.to_f
end
end
attr_accessor :processing
attr_accessor :queue
attr_accessor :thread
attr_accessor :framework
#
# Create a new TaskManager
#
def initialize(framework)
self.framework = framework
self.flush
end
#
# Add a new task via proc
#
def queue_proc(proc)
queue_task(Task.new(proc))
end
#
# Add a new task to the queue
#
def queue_task(task)
self.queue.push(task)
end
#
# Flush the queue
#
def flush
self.queue = []
end
#
# Stop processing events
#
def stop
return if not self.thread
self.processing = false
self.thread.join
self.thread = nil
end
#
# Forcefully kill the processing thread
#
def kill
return if not self.thread
self.processing = false
self.thread.kill
self.thread = nil
end
#
# Start processing tasks
#
def start
return if self.thread
self.processing = true
self.thread = Thread.new do
process_tasks
end
end
#
# Restart the task processor
#
def restart
stop
start
end
#
# Retrieve the number of tasks in the queue
#
def backlog
self.queue.length
end
#
# Process the actual queue
#
def process_tasks
while(self.processing)
while(task = self.queue.shift)
ret = process_task(task)
case ret
when :requeue
self.queue.push(task)
when :drop, :done
# Processed or dropped
end
end
select(nil, nil, nil, 0.10)
end
self.thread = nil
end
#
# Process a specific task from the queue
#
def process_task(task)
begin
if(task.timeout)
::Timeout.timeout(task.timeout) do
task.proc.call(self, task)
end
else
task.proc.call(self, task)
end
rescue ::Exception => e
if(e.class == ::Timeout::Error)
elog("taskmanager: task #{task.inspect} timed out after #{task.timeout} seconds")
task.status = :timeout
task.completed = Time.now
return :drop
end
elog("taskmanager: task #{task.inspect} triggered an exception: #{e.class} #{e}")
dlog("Call stack:\n#{$@.join("\n")}")
task.status = :dropped
task.exception = e
return :drop
end
task.status = :done
task.completed = Time.now
return :done
end
def log_error(msg)
elog(msg, 'core')
end
def log_debug(msg)
dlog(msg, 'core')
end
end
end

View File

@ -213,27 +213,16 @@ class Metasploit3 < Msf::Auxiliary
app = 'Sentinel'
end
s = report_service(
report_service(
:host => pkt[1],
:host_mac => (maddr and maddr != '00:00:00:00:00:00') ? maddr : nil,
:host_name => (hname) ? hname.downcase : nil,
:port => pkt[2],
:proto => 'udp',
:name => app,
:info => inf
)
changed = false
if (s and maddr and maddr != '00:00:00:00:00:00')
s.host.mac = maddr
changed = true
end
if (s and hname)
s.host.name = hname.downcase
changed = true
end
s.host.save! if changed
print_status("Discovered #{app} on #{pkt[1]} (#{inf})")
end