Merge branch 'chore/MSP-11614/remove-msf-db-manager-sink' into feature/MSP-11605/lazy-thread-creation
MSP-11605 Conflicts: spec/lib/msf/core/task_manager_spec.rbbug/bundler_fix
commit
43511e648a
|
@ -17,7 +17,6 @@ require 'msf/core/database_event'
|
|||
require 'msf/core/db_import_error'
|
||||
require 'msf/core/host_state'
|
||||
require 'msf/core/service_state'
|
||||
require 'msf/core/task_manager'
|
||||
|
||||
# The db module provides persistent storage and events. This class should be instantiated LAST
|
||||
# as the active_suppport library overrides Kernel.require, slowing down all future code loads.
|
||||
|
@ -47,7 +46,6 @@ class Msf::DBManager
|
|||
autoload :Service, 'msf/core/db_manager/service'
|
||||
autoload :Session, 'msf/core/db_manager/session'
|
||||
autoload :SessionEvent, 'msf/core/db_manager/session_event'
|
||||
autoload :Sink, 'msf/core/db_manager/sink'
|
||||
autoload :Task, 'msf/core/db_manager/task'
|
||||
autoload :Vuln, 'msf/core/db_manager/vuln'
|
||||
autoload :VulnAttempt, 'msf/core/db_manager/vuln_attempt'
|
||||
|
@ -80,7 +78,6 @@ class Msf::DBManager
|
|||
include Msf::DBManager::Service
|
||||
include Msf::DBManager::Session
|
||||
include Msf::DBManager::SessionEvent
|
||||
include Msf::DBManager::Sink
|
||||
include Msf::DBManager::Task
|
||||
include Msf::DBManager::Vuln
|
||||
include Msf::DBManager::VulnAttempt
|
||||
|
@ -160,11 +157,6 @@ class Msf::DBManager
|
|||
#
|
||||
initialize_adapter
|
||||
|
||||
#
|
||||
# Instantiate the database sink
|
||||
#
|
||||
initialize_sink
|
||||
|
||||
true
|
||||
end
|
||||
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
module Msf::DBManager::Sink
|
||||
#
|
||||
# Attributes
|
||||
#
|
||||
|
||||
# Stores a TaskManager for serializing database events
|
||||
attr_accessor :sink
|
||||
|
||||
#
|
||||
# Instance Methods
|
||||
#
|
||||
|
||||
#
|
||||
# Create a new database sink and initialize it
|
||||
#
|
||||
def initialize_sink
|
||||
self.sink = Msf::TaskManager.new(framework)
|
||||
self.sink.start
|
||||
end
|
||||
|
||||
#
|
||||
# Add a new task to the sink
|
||||
#
|
||||
def queue(proc)
|
||||
self.sink.queue_proc(proc)
|
||||
end
|
||||
|
||||
#
|
||||
# Wait for all pending write to finish
|
||||
#
|
||||
def sync
|
||||
# There is no more queue.
|
||||
end
|
||||
end
|
|
@ -1,238 +0,0 @@
|
|||
# -*- coding: binary -*-
|
||||
module Msf
|
||||
|
||||
###
|
||||
#
|
||||
# This class provides a task manager
|
||||
#
|
||||
###
|
||||
|
||||
class TaskManager
|
||||
|
||||
class Task
|
||||
attr_accessor :timeout
|
||||
attr_accessor :created
|
||||
attr_accessor :completed
|
||||
attr_accessor :status
|
||||
attr_accessor :proc
|
||||
attr_accessor :source
|
||||
attr_accessor :exception
|
||||
|
||||
#
|
||||
# Create a new task
|
||||
#
|
||||
def initialize(proc,timeout=nil)
|
||||
self.proc = proc
|
||||
self.status = :new
|
||||
self.created = Time.now
|
||||
self.timeout = timeout
|
||||
self.source = caller
|
||||
end
|
||||
|
||||
#
|
||||
# Task duration in seconds (float)
|
||||
#
|
||||
def duration
|
||||
etime = self.completed || Time.now
|
||||
etime.to_f - self.created.to_f
|
||||
end
|
||||
|
||||
def wait
|
||||
while self.status == :new
|
||||
::IO.select(nil,nil,nil,0.10)
|
||||
end
|
||||
return self.status
|
||||
end
|
||||
|
||||
#
|
||||
# Run the associated proc
|
||||
#
|
||||
def run(*args)
|
||||
self.proc.call(*args) if self.proc
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
||||
attr_accessor :processing
|
||||
attr_accessor :queue
|
||||
attr_accessor :thread
|
||||
attr_accessor :framework
|
||||
|
||||
#
|
||||
# Create a new TaskManager
|
||||
#
|
||||
def initialize(framework)
|
||||
self.framework = framework
|
||||
self.flush
|
||||
end
|
||||
|
||||
#
|
||||
# Add a new task via proc
|
||||
#
|
||||
def queue_proc(proc)
|
||||
task = Task.new(proc)
|
||||
queue_task(task)
|
||||
return task
|
||||
end
|
||||
|
||||
#
|
||||
# Add a new task to the queue unless we are called
|
||||
# by the queue thread itself.
|
||||
#
|
||||
def queue_task(task)
|
||||
if Thread.current[:task_manager]
|
||||
process_task(task)
|
||||
else
|
||||
self.queue.push(task)
|
||||
end
|
||||
end
|
||||
|
||||
#
|
||||
# Flush the queue
|
||||
#
|
||||
def flush
|
||||
self.queue = []
|
||||
end
|
||||
|
||||
#
|
||||
# Stop processing events
|
||||
#
|
||||
def stop
|
||||
return if not self.thread
|
||||
self.processing = false
|
||||
self.thread.join
|
||||
self.thread = nil
|
||||
end
|
||||
|
||||
#
|
||||
# Forcefully kill the processing thread
|
||||
#
|
||||
def kill
|
||||
return if not self.thread
|
||||
self.processing = false
|
||||
self.thread.kill
|
||||
self.thread = nil
|
||||
end
|
||||
|
||||
#
|
||||
# Start processing tasks
|
||||
#
|
||||
def start
|
||||
return if self.thread
|
||||
self.processing = true
|
||||
self.thread = framework.threads.spawn("TaskManager", true) do
|
||||
begin
|
||||
process_tasks
|
||||
rescue ::Exception => e
|
||||
elog("taskmanager: process_tasks exception: #{e.class} #{e} #{e.backtrace}")
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
# Mark this thread as the task manager
|
||||
self.thread[:task_manager] = true
|
||||
|
||||
# Return the thread object to the caller
|
||||
self.thread
|
||||
end
|
||||
|
||||
#
|
||||
# Restart the task processor
|
||||
#
|
||||
def restart
|
||||
stop
|
||||
start
|
||||
end
|
||||
|
||||
#
|
||||
# Retrieve the number of tasks in the queue
|
||||
#
|
||||
def backlog
|
||||
self.queue.length
|
||||
end
|
||||
|
||||
#
|
||||
# Process the actual queue
|
||||
#
|
||||
def process_tasks
|
||||
spin = 50
|
||||
ltask = nil
|
||||
|
||||
while(self.processing)
|
||||
cnt = 0
|
||||
while(task = self.queue.shift)
|
||||
stime = Time.now.to_f
|
||||
ret = process_task(task)
|
||||
etime = Time.now.to_f
|
||||
|
||||
case ret
|
||||
when :requeue
|
||||
self.queue.push(task)
|
||||
when :drop, :done
|
||||
# Processed or dropped
|
||||
end
|
||||
cnt += 1
|
||||
|
||||
ltask = task
|
||||
end
|
||||
|
||||
spin = (cnt == 0) ? (spin + 1) : 0
|
||||
|
||||
if spin > 10
|
||||
::IO.select(nil, nil, nil, 0.25)
|
||||
end
|
||||
|
||||
end
|
||||
self.thread = nil
|
||||
end
|
||||
|
||||
#
|
||||
# Process a specific task from the queue
|
||||
#
|
||||
def process_task(task)
|
||||
begin
|
||||
if(task.timeout)
|
||||
::Timeout.timeout(task.timeout) do
|
||||
task.run(self, task)
|
||||
end
|
||||
else
|
||||
task.run(self, task)
|
||||
end
|
||||
rescue ::ThreadError
|
||||
# Ignore these (caused by a return inside of the proc)
|
||||
rescue ::Exception => e
|
||||
|
||||
if(e.class == ::Timeout::Error)
|
||||
elog("taskmanager: task #{task.inspect} timed out after #{task.timeout} seconds")
|
||||
task.status = :timeout
|
||||
task.completed = Time.now
|
||||
return :drop
|
||||
end
|
||||
|
||||
elog("taskmanager: task triggered an exception: #{e.class} #{e}")
|
||||
elog("taskmanager: task proc: #{task.proc.inspect} ")
|
||||
elog("taskmanager: task Call stack: \n#{task.source.join("\n")} ")
|
||||
dlog("Call stack:\n#{$@.join("\n")}")
|
||||
|
||||
task.status = :dropped
|
||||
task.exception = e
|
||||
return :drop
|
||||
|
||||
end
|
||||
task.status = :done
|
||||
task.completed = Time.now
|
||||
return :done
|
||||
end
|
||||
|
||||
def log_error(msg)
|
||||
elog(msg, 'core')
|
||||
end
|
||||
|
||||
def log_debug(msg)
|
||||
dlog(msg, 'core')
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
@ -1,96 +0,0 @@
|
|||
# -*- coding:binary -*-
|
||||
|
||||
require 'msf/core'
|
||||
require 'msf/core/task_manager'
|
||||
|
||||
describe Msf::TaskManager do
|
||||
include_context 'Msf::Simple::Framework'
|
||||
|
||||
let(:tm) do
|
||||
Msf::TaskManager.new(framework)
|
||||
end
|
||||
|
||||
it "should have attributes" do
|
||||
tm.should respond_to("processing")
|
||||
tm.should respond_to("queue")
|
||||
tm.should respond_to("thread")
|
||||
tm.should respond_to("framework")
|
||||
tm.should respond_to("processing=")
|
||||
tm.should respond_to("queue=")
|
||||
tm.should respond_to("thread=")
|
||||
tm.should respond_to("framework=")
|
||||
end
|
||||
|
||||
it "should initialize with an empty queue" do
|
||||
tm.queue.length.should == 0
|
||||
tm.backlog.should == 0
|
||||
tm.backlog.should == tm.queue.length
|
||||
end
|
||||
|
||||
it "should add items to the queue and process them" do
|
||||
tm.queue_proc(Proc.new{ })
|
||||
tm.backlog.should == 1
|
||||
t = Msf::TaskManager::Task.new(Proc.new { })
|
||||
tm.queue_task(t)
|
||||
tm.backlog.should == 2
|
||||
tm.start
|
||||
t.wait
|
||||
tm.backlog.should == 0
|
||||
end
|
||||
|
||||
it "should add items to the queue and flush them" do
|
||||
tm.queue_proc(Proc.new{ })
|
||||
tm.backlog.should == 1
|
||||
tm.queue_proc(Proc.new{ })
|
||||
tm.backlog.should == 2
|
||||
tm.flush
|
||||
tm.backlog.should == 0
|
||||
end
|
||||
|
||||
it "should start and stop" do
|
||||
t = Msf::TaskManager::Task.new(Proc.new { })
|
||||
tm.queue_task(t)
|
||||
tm.backlog.should == 1
|
||||
tm.start
|
||||
t.wait
|
||||
tm.backlog.should == 0
|
||||
tm.stop
|
||||
1.upto 100 do |cnt|
|
||||
tm.queue_proc(Proc.new{ })
|
||||
tm.backlog.should == cnt
|
||||
end
|
||||
t = Msf::TaskManager::Task.new(Proc.new { })
|
||||
tm.queue_task(t)
|
||||
tm.start
|
||||
t.wait
|
||||
tm.backlog.should == 0
|
||||
end
|
||||
|
||||
it "should handle task timeouts" do
|
||||
t = Msf::TaskManager::Task.new(Proc.new { sleep(30) })
|
||||
t.timeout = 0.1
|
||||
|
||||
tm.start
|
||||
tm.queue_task(t)
|
||||
t.wait
|
||||
|
||||
t.status.should == :timeout
|
||||
t.duration.should <= 5.0
|
||||
end
|
||||
|
||||
it "should handle task exceptions" do
|
||||
t = Msf::TaskManager::Task.new(Proc.new { asdf1234() })
|
||||
tm.start
|
||||
tm.queue_task(t)
|
||||
t.wait
|
||||
t.status.should == :dropped
|
||||
t.exception.class.should == ::NoMethodError
|
||||
|
||||
t = Msf::TaskManager::Task.new(Proc.new { eval "'" })
|
||||
tm.queue_task(t)
|
||||
t.wait
|
||||
t.status.should == :dropped
|
||||
t.exception.should be_a ::SyntaxError
|
||||
end
|
||||
end
|
||||
|
|
@ -40,7 +40,6 @@ describe Msf::DBManager do
|
|||
it_should_behave_like 'Msf::DBManager::Service'
|
||||
it_should_behave_like 'Msf::DBManager::Session'
|
||||
it_should_behave_like 'Msf::DBManager::SessionEvent'
|
||||
it_should_behave_like 'Msf::DBManager::Sink'
|
||||
it_should_behave_like 'Msf::DBManager::Task'
|
||||
it_should_behave_like 'Msf::DBManager::Vuln'
|
||||
it_should_behave_like 'Msf::DBManager::VulnAttempt'
|
||||
|
|
|
@ -1,7 +0,0 @@
|
|||
shared_examples_for 'Msf::DBManager::Sink' do
|
||||
it { is_expected.to respond_to :initialize_sink }
|
||||
it { is_expected.to respond_to :queue }
|
||||
it { is_expected.to respond_to :sink }
|
||||
it { is_expected.to respond_to :sink= }
|
||||
it { is_expected.to respond_to :sync }
|
||||
end
|
Loading…
Reference in New Issue