Major cleanups to the session manager, serialized processing of incoming connections, concurrent processing (up to a max scheduler thread count) of meterpreter initialization/scripts. This is to avoid a potential deadlock in openssl and ensure consistent, reliable session staging. This commit also fixes a bug that would mark database sessions as closed too early.

git-svn-id: file:///home/svn/framework3/trunk@12485 4d416f70-5f16-0410-b530-b9f4589650da
unstable
HD Moore 2011-04-30 18:51:50 +00:00
parent 39cab9b076
commit 2c3e85af05
4 changed files with 131 additions and 52 deletions

View File

@ -25,6 +25,9 @@ module MeterpreterOptions
def on_session(session) def on_session(session)
super super
# Defer the session initialization to the Session Manager scheduler
framework.sessions.schedule Proc.new {
session.init_ui(self.user_input, self.user_output) session.init_ui(self.user_input, self.user_output)
if (datastore['AutoLoadStdapi'] == true) if (datastore['AutoLoadStdapi'] == true)
@ -60,6 +63,8 @@ module MeterpreterOptions
session.execute_script(args.shift, *args) session.execute_script(args.shift, *args)
end end
end end
}
end end

View File

@ -476,20 +476,17 @@ class DBManager
ret = {} ret = {}
s = Msf::DBManager::Session.create(sess_data) s = Msf::DBManager::Session.new(sess_data)
s.save!
if opts[:session] if opts[:session]
session.db_record = s session.db_record = s
else
myret = s.save!
end end
ret[:session] = s
# If this is a live session, we know the host is vulnerable to something. # If this is a live session, we know the host is vulnerable to something.
# If the exploit used was multi/handler, though, we don't know what # If the exploit used was multi/handler, though, we don't know what
# it's vulnerable to, so it isn't really useful to save it. # it's vulnerable to, so it isn't really useful to save it.
if opts[:session] if opts[:session] and session.via_exploit and session.via_exploit != "exploit/multi/handler"
if session.via_exploit and session.via_exploit != "exploit/multi/handler"
return unless host return unless host
port = session.exploit_datastore["RPORT"] port = session.exploit_datastore["RPORT"]
service = (port ? host.services.find_by_port(port) : nil) service = (port ? host.services.find_by_port(port) : nil)
@ -516,9 +513,8 @@ class DBManager
} }
framework.db.report_exploit(exploit_info) framework.db.report_exploit(exploit_info)
end end
end
ret[:session] s
end end
# #

View File

@ -1,4 +1,5 @@
require 'rex/socket' require 'rex/socket'
require 'thread'
module Msf module Msf
module Handler module Handler
@ -55,7 +56,7 @@ module ReverseTcp
], Msf::Handler::ReverseTcp) ], Msf::Handler::ReverseTcp)
self.conn_threads = [] self.handler_queue = ::Queue.new
end end
# #
@ -131,12 +132,6 @@ module ReverseTcp
# #
def cleanup_handler def cleanup_handler
stop_handler stop_handler
# Kill any remaining handle_connection threads that might
# be hanging around
conn_threads.each { |thr|
thr.kill
}
end end
# #
@ -157,19 +152,22 @@ module ReverseTcp
# Increment the has connection counter # Increment the has connection counter
self.pending_connections += 1 self.pending_connections += 1
# Start a new thread and pass the client connection self.handler_queue.push( client )
# as the input and output pipe. Client's are expected
# to implement the Stream interface.
conn_threads << framework.threads.spawn("ReverseTcpHandlerSession", false, client) { |client_copy|
begin
handle_connection(client_copy)
rescue ::Exception
elog("Exception raised from handle_connection: #{$!}\n\n#{$@.join("\n")}")
end
}
end while true end while true
} }
self.handler_thread = framework.threads.spawn("ReverseTcpHandlerWorker-#{datastore['LPORT']}", false) {
while true
client = self.handler_queue.pop
begin
handle_connection(client)
rescue ::Exception
elog("Exception raised from handle_connection: #{$!}\n\n#{$@.join("\n")}")
end
end
}
end end
# #
@ -182,6 +180,12 @@ module ReverseTcp
self.listener_thread = nil self.listener_thread = nil
end end
# Terminate the handler thread
if (self.handler_thread and self.handler_thread.alive? == true)
self.handler_thread.kill
self.handler_thread = nil
end
if (self.listener_sock) if (self.listener_sock)
self.listener_sock.close self.listener_sock.close
self.listener_sock = nil self.listener_sock = nil
@ -192,8 +196,8 @@ protected
attr_accessor :listener_sock # :nodoc: attr_accessor :listener_sock # :nodoc:
attr_accessor :listener_thread # :nodoc: attr_accessor :listener_thread # :nodoc:
attr_accessor :conn_threads # :nodoc: attr_accessor :handler_thread # :nodoc:
attr_accessor :handler_queue # :nodoc:
end end
end end

View File

@ -1,3 +1,5 @@
require 'thread'
module Msf module Msf
### ###
@ -17,17 +19,25 @@ class SessionManager < Hash
include Framework::Offspring include Framework::Offspring
LAST_SEEN_INTERVAL = 2.5 * 60 LAST_SEEN_INTERVAL = 60 * 2.5
SCHEDULER_THREAD_COUNT = 5
def initialize(framework) def initialize(framework)
self.framework = framework self.framework = framework
self.sid_pool = 0 self.sid_pool = 0
self.reaper_thread = framework.threads.spawn("SessionManager", true, self) do |manager| self.mutex = Mutex.new
self.scheduler_queue = ::Queue.new
self.initialize_scheduler_threads
self.monitor_thread = framework.threads.spawn("SessionManager", true) do
last_seen_timer = Time.now.utc last_seen_timer = Time.now.utc
begin begin
while true while true
#
# Process incoming data from all stream-based sessions and queue the
# data into the associated ring buffers.
#
rings = values.select{|s| s.respond_to?(:ring) and s.ring and s.rstream } rings = values.select{|s| s.respond_to?(:ring) and s.ring and s.rstream }
ready = ::IO.select(rings.map{|s| s.rstream}, nil, nil, 0.5) || [[],[],[]] ready = ::IO.select(rings.map{|s| s.rstream}, nil, nil, 0.5) || [[],[],[]]
@ -61,34 +71,58 @@ class SessionManager < Hash
s.rstream.close rescue nil s.rstream.close rescue nil
# Deregister the session # Deregister the session
manager.deregister(s, "Died from #{e.class}") deregister(s, "Died from #{e.class}")
end end
end end
#
# TODO: Call the dispatch entry point of each Meterpreter thread instead of
# dedicating specific processing threads to each session
#
#
# Check for closed / dead / terminated sessions # Check for closed / dead / terminated sessions
manager.values.each do |s| #
values.each do |s|
if not s.alive? if not s.alive?
manager.deregister(s, "Died") deregister(s, "Died")
wlog("Session #{s.sid} has died") wlog("Session #{s.sid} has died")
next next
end end
end
next if ((Time.now.utc - last_seen_timer) < LAST_SEEN_INTERVAL)
# Update the database entry for this session every 5 #
# minutes, give or take. This notifies other framework # Mark all open session as alive every LAST_SEEN_INTERVAL
# instances that this session is being maintained. #
if (Time.now.utc - last_seen_timer) >= LAST_SEEN_INTERVAL
# Update this timer BEFORE processing the session list, this will prevent
# processing time for large session lists from skewing our update interval.
last_seen_timer = Time.now.utc last_seen_timer = Time.now.utc
if framework.db.active and s.db_record values.each do |s|
s.db_record.last_seen = Time.now.utc # Update the database entry on a regular basis, marking alive threads
s.db_record.save # as recently seen. This notifies other framework instances that this
# session is being maintained.
if framework.db.active and s.db_record
s.db_record.last_seen = Time.now.utc
s.db_record.save
end
end end
end end
#
# Skip the database cleanup code below if there is no database # Skip the database cleanup code below if there is no database
#
next if not (framework.db and framework.db.active) next if not (framework.db and framework.db.active)
#
# Clean out any stale sessions that have been orphaned by a dead # Clean out any stale sessions that have been orphaned by a dead
# framewort instance. # framework instance.
#
Msf::DBManager::Session.find_all_by_closed_at(nil).each do |db_session| Msf::DBManager::Session.find_all_by_closed_at(nil).each do |db_session|
if db_session.last_seen.nil? or ((Time.now.utc - db_session.last_seen) > (2*LAST_SEEN_INTERVAL)) if db_session.last_seen.nil? or ((Time.now.utc - db_session.last_seen) > (2*LAST_SEEN_INTERVAL))
db_session.closed_at = db_session.last_seen || Time.now.utc db_session.closed_at = db_session.last_seen || Time.now.utc
@ -96,16 +130,45 @@ class SessionManager < Hash
db_session.save db_session.save
end end
end end
end end
#
# All session management falls apart when any exception is raised to this point. Log it.
#
rescue ::Exception => e rescue ::Exception => e
wlog("Exception in reaper thread #{e.class} #{e}") wlog("Exception in monitor thread #{e.class} #{e}")
wlog("Call Stack\n#{e.backtrace.join("\n")}", 'core', LEV_3) wlog("Call Stack\n#{e.backtrace.join("\n")}", 'core', LEV_3)
end end
end end
end end
#
# Dedicated worker threads for pulling data out of new sessions
#
def initialize_scheduler_threads
self.scheduler_threads = []
1.upto(SCHEDULER_THREAD_COUNT) do |i|
self.scheduler_threads << framework.threads.spawn("SessionScheduler-#{i}", true) do
while true
item = self.scheduler_queue.pop
begin
item.call()
rescue ::Exception => e
wlog("Exception in scheduler thread #{e.class} #{e}")
wlog("Call Stack\n#{e.backtrace.join("\n")}", 'core', LEV_3)
end
end
end
end
end
#
# Add a new task to the loader thread queue. Task is assumed to be
# a Proc or another object that responds to call()
#
def schedule(task)
self.scheduler_queue.push(task)
end
# #
# Enumerates the sorted list of keys. # Enumerates the sorted list of keys.
@ -138,7 +201,7 @@ class SessionManager < Hash
return nil return nil
end end
next_sid = (self.sid_pool += 1) next_sid = allocate_sid
# Initialize the session's sid and framework instance pointer # Initialize the session's sid and framework instance pointer
session.sid = next_sid session.sid = next_sid
@ -152,7 +215,6 @@ class SessionManager < Hash
# Notify the framework that we have a new session opening up... # Notify the framework that we have a new session opening up...
# Don't let errant event handlers kill our session # Don't let errant event handlers kill our session
begin begin
framework.events.on_session_open(session) framework.events.on_session_open(session)
rescue ::Exception => e rescue ::Exception => e
wlog("Exception in on_session_open event handler: #{e.class}: #{e}") wlog("Exception in on_session_open event handler: #{e.class}: #{e}")
@ -203,11 +265,23 @@ class SessionManager < Hash
def get(sid) def get(sid)
return self[sid.to_i] return self[sid.to_i]
end end
#
# Allocates the next Session ID
#
def allocate_sid
self.mutex.synchronize do
self.sid_pool += 1
end
end
protected protected
attr_accessor :sid_pool, :sessions # :nodoc: attr_accessor :sid_pool, :sessions # :nodoc:
attr_accessor :reaper_thread # :nodoc: attr_accessor :monitor_thread # :nodoc:
attr_accessor :scheduler_threads # :nodoc:
attr_accessor :scheduler_queue # :nodoc:
attr_accessor :mutex # :nodoc:
end end