Merge branch 'landing/4205' into upstream-master

Land #4205

* Remove unused Msf::DBManager::Sink
* Part of improvements to thread management
bug/bundler_fix
Trevor Rosen 2014-11-18 08:59:32 -06:00
commit 267f93ff81
No known key found for this signature in database
GPG Key ID: 255ADB7A642D3928
6 changed files with 0 additions and 384 deletions

View File

@ -17,7 +17,6 @@ require 'msf/core/database_event'
require 'msf/core/db_import_error'
require 'msf/core/host_state'
require 'msf/core/service_state'
require 'msf/core/task_manager'
# The db module provides persistent storage and events. This class should be instantiated LAST
# as the active_suppport library overrides Kernel.require, slowing down all future code loads.
@ -47,7 +46,6 @@ class Msf::DBManager
autoload :Service, 'msf/core/db_manager/service'
autoload :Session, 'msf/core/db_manager/session'
autoload :SessionEvent, 'msf/core/db_manager/session_event'
autoload :Sink, 'msf/core/db_manager/sink'
autoload :Task, 'msf/core/db_manager/task'
autoload :Vuln, 'msf/core/db_manager/vuln'
autoload :VulnAttempt, 'msf/core/db_manager/vuln_attempt'
@ -80,7 +78,6 @@ class Msf::DBManager
include Msf::DBManager::Service
include Msf::DBManager::Session
include Msf::DBManager::SessionEvent
include Msf::DBManager::Sink
include Msf::DBManager::Task
include Msf::DBManager::Vuln
include Msf::DBManager::VulnAttempt
@ -160,11 +157,6 @@ class Msf::DBManager
#
initialize_adapter
#
# Instantiate the database sink
#
initialize_sink
true
end

View File

@ -1,34 +0,0 @@
module Msf::DBManager::Sink
#
# Attributes
#
# Stores a TaskManager for serializing database events
attr_accessor :sink
#
# Instance Methods
#
#
# Create a new database sink and initialize it
#
def initialize_sink
self.sink = Msf::TaskManager.new(framework)
self.sink.start
end
#
# Add a new task to the sink
#
def queue(proc)
self.sink.queue_proc(proc)
end
#
# Wait for all pending write to finish
#
def sync
# There is no more queue.
end
end

View File

@ -1,238 +0,0 @@
# -*- coding: binary -*-
module Msf
###
#
# This class provides a task manager
#
###
class TaskManager
class Task
attr_accessor :timeout
attr_accessor :created
attr_accessor :completed
attr_accessor :status
attr_accessor :proc
attr_accessor :source
attr_accessor :exception
#
# Create a new task
#
def initialize(proc,timeout=nil)
self.proc = proc
self.status = :new
self.created = Time.now
self.timeout = timeout
self.source = caller
end
#
# Task duration in seconds (float)
#
def duration
etime = self.completed || Time.now
etime.to_f - self.created.to_f
end
def wait
while self.status == :new
::IO.select(nil,nil,nil,0.10)
end
return self.status
end
#
# Run the associated proc
#
def run(*args)
self.proc.call(*args) if self.proc
end
end
attr_accessor :processing
attr_accessor :queue
attr_accessor :thread
attr_accessor :framework
#
# Create a new TaskManager
#
def initialize(framework)
self.framework = framework
self.flush
end
#
# Add a new task via proc
#
def queue_proc(proc)
task = Task.new(proc)
queue_task(task)
return task
end
#
# Add a new task to the queue unless we are called
# by the queue thread itself.
#
def queue_task(task)
if Thread.current[:task_manager]
process_task(task)
else
self.queue.push(task)
end
end
#
# Flush the queue
#
def flush
self.queue = []
end
#
# Stop processing events
#
def stop
return if not self.thread
self.processing = false
self.thread.join
self.thread = nil
end
#
# Forcefully kill the processing thread
#
def kill
return if not self.thread
self.processing = false
self.thread.kill
self.thread = nil
end
#
# Start processing tasks
#
def start
return if self.thread
self.processing = true
self.thread = framework.threads.spawn("TaskManager", true) do
begin
process_tasks
rescue ::Exception => e
elog("taskmanager: process_tasks exception: #{e.class} #{e} #{e.backtrace}")
retry
end
end
# Mark this thread as the task manager
self.thread[:task_manager] = true
# Return the thread object to the caller
self.thread
end
#
# Restart the task processor
#
def restart
stop
start
end
#
# Retrieve the number of tasks in the queue
#
def backlog
self.queue.length
end
#
# Process the actual queue
#
def process_tasks
spin = 50
ltask = nil
while(self.processing)
cnt = 0
while(task = self.queue.shift)
stime = Time.now.to_f
ret = process_task(task)
etime = Time.now.to_f
case ret
when :requeue
self.queue.push(task)
when :drop, :done
# Processed or dropped
end
cnt += 1
ltask = task
end
spin = (cnt == 0) ? (spin + 1) : 0
if spin > 10
::IO.select(nil, nil, nil, 0.25)
end
end
self.thread = nil
end
#
# Process a specific task from the queue
#
def process_task(task)
begin
if(task.timeout)
::Timeout.timeout(task.timeout) do
task.run(self, task)
end
else
task.run(self, task)
end
rescue ::ThreadError
# Ignore these (caused by a return inside of the proc)
rescue ::Exception => e
if(e.class == ::Timeout::Error)
elog("taskmanager: task #{task.inspect} timed out after #{task.timeout} seconds")
task.status = :timeout
task.completed = Time.now
return :drop
end
elog("taskmanager: task triggered an exception: #{e.class} #{e}")
elog("taskmanager: task proc: #{task.proc.inspect} ")
elog("taskmanager: task Call stack: \n#{task.source.join("\n")} ")
dlog("Call stack:\n#{$@.join("\n")}")
task.status = :dropped
task.exception = e
return :drop
end
task.status = :done
task.completed = Time.now
return :done
end
def log_error(msg)
elog(msg, 'core')
end
def log_debug(msg)
dlog(msg, 'core')
end
end
end

View File

@ -1,96 +0,0 @@
# -*- coding:binary -*-
require 'msf/core'
require 'msf/core/task_manager'
describe Msf::TaskManager do
include_context 'Msf::Simple::Framework'
let(:tm) do
Msf::TaskManager.new(framework)
end
it "should have attributes" do
tm.should respond_to("processing")
tm.should respond_to("queue")
tm.should respond_to("thread")
tm.should respond_to("framework")
tm.should respond_to("processing=")
tm.should respond_to("queue=")
tm.should respond_to("thread=")
tm.should respond_to("framework=")
end
it "should initialize with an empty queue" do
tm.queue.length.should == 0
tm.backlog.should == 0
tm.backlog.should == tm.queue.length
end
it "should add items to the queue and process them" do
tm.queue_proc(Proc.new{ })
tm.backlog.should == 1
t = Msf::TaskManager::Task.new(Proc.new { })
tm.queue_task(t)
tm.backlog.should == 2
tm.start
t.wait
tm.backlog.should == 0
end
it "should add items to the queue and flush them" do
tm.queue_proc(Proc.new{ })
tm.backlog.should == 1
tm.queue_proc(Proc.new{ })
tm.backlog.should == 2
tm.flush
tm.backlog.should == 0
end
it "should start and stop" do
t = Msf::TaskManager::Task.new(Proc.new { })
tm.queue_task(t)
tm.backlog.should == 1
tm.start
t.wait
tm.backlog.should == 0
tm.stop
1.upto 100 do |cnt|
tm.queue_proc(Proc.new{ })
tm.backlog.should == cnt
end
t = Msf::TaskManager::Task.new(Proc.new { })
tm.queue_task(t)
tm.start
t.wait
tm.backlog.should == 0
end
it "should handle task timeouts" do
t = Msf::TaskManager::Task.new(Proc.new { sleep(30) })
t.timeout = 0.1
tm.start
tm.queue_task(t)
t.wait
t.status.should == :timeout
t.duration.should <= 5.0
end
it "should handle task exceptions" do
t = Msf::TaskManager::Task.new(Proc.new { asdf1234() })
tm.start
tm.queue_task(t)
t.wait
t.status.should == :dropped
t.exception.class.should == ::NoMethodError
t = Msf::TaskManager::Task.new(Proc.new { eval "'" })
tm.queue_task(t)
t.wait
t.status.should == :dropped
t.exception.should be_a ::SyntaxError
end
end

View File

@ -40,7 +40,6 @@ describe Msf::DBManager do
it_should_behave_like 'Msf::DBManager::Service'
it_should_behave_like 'Msf::DBManager::Session'
it_should_behave_like 'Msf::DBManager::SessionEvent'
it_should_behave_like 'Msf::DBManager::Sink'
it_should_behave_like 'Msf::DBManager::Task'
it_should_behave_like 'Msf::DBManager::Vuln'
it_should_behave_like 'Msf::DBManager::VulnAttempt'

View File

@ -1,7 +0,0 @@
shared_examples_for 'Msf::DBManager::Sink' do
it { is_expected.to respond_to :initialize_sink }
it { is_expected.to respond_to :queue }
it { is_expected.to respond_to :sink }
it { is_expected.to respond_to :sink= }
it { is_expected.to respond_to :sync }
end