Land #9356. Remove ring buffers from command dispatcher.

4.x
asoto-r7 2018-07-06 13:12:13 -05:00 committed by Metasploit
parent 5d95172a81
commit e4aa20ac47
No known key found for this signature in database
GPG Key ID: CDFB5FA52007B954
8 changed files with 41 additions and 254 deletions

View File

@ -53,9 +53,10 @@ class CommandShell
self.platform ||= ""
self.arch ||= ""
self.max_threads = 1
@cleanup = false
datastore = opts[:datastore]
if datastore && !datastore["CommandShellCleanupCommand"].blank?
@cleanup_command = opts[:datastore]["CommandShellCleanupCommand"]
@cleanup_command = datastore["CommandShellCleanupCommand"]
end
super
end
@ -232,8 +233,6 @@ class CommandShell
# Read from the command shell.
#
def shell_read(length=-1, timeout=1)
return shell_read_ring(length,timeout) if self.respond_to?(:ring)
begin
rv = rstream.get_once(length, timeout)
framework.events.on_session_output(self, rv) if rv
@ -245,50 +244,6 @@ class CommandShell
end
end
#
# Read from the command shell.
#
def shell_read_ring(length=-1, timeout=1)
self.ring_buff ||= ""
# Short-circuit bad length values
return "" if length == 0
# Return data from the stored buffer if available
if self.ring_buff.length >= length and length > 0
buff = self.ring_buff.slice!(0,length)
return buff
end
buff = self.ring_buff
self.ring_buff = ""
begin
::Timeout.timeout(timeout) do
while( (length > 0 and buff.length < length) or (length == -1 and buff.length == 0))
ring.select
nseq,data = ring.read_data(self.ring_seq)
if data
self.ring_seq = nseq
buff << data
end
end
end
rescue ::Timeout::Error
rescue ::Rex::SocketError, ::EOFError, ::IOError, ::Errno::EPIPE => e
shell_close
raise e
end
# Store any leftovers in the ring buffer backlog
if length > 0 and buff.length > length
self.ring_buff = buff[length, buff.length - length]
buff = buff[0,length]
end
buff
end
##
# :category: Msf::Session::Provider::SingleCommandShell implementors
#
@ -323,11 +278,20 @@ class CommandShell
# Closes the shell.
#
def cleanup
return if @cleanup
@cleanup = true
if rstream
if !@cleanup_command.blank?
shell_command_token(@cleanup_command, 0)
# this is a best effort, since the session is possibly already dead
shell_command_token(@cleanup_command) rescue nil
# we should only ever cleanup once
@cleanup_command = nil
end
rstream.close
# this is also a best-effort
rstream.close rescue nil
rstream = nil
end
super
@ -364,10 +328,6 @@ class CommandShell
end
end
def reset_ring_sequence
self.ring_seq = 0
end
attr_accessor :arch
attr_accessor :platform
attr_accessor :max_threads
@ -381,11 +341,7 @@ protected
# shell_write instead of operating on rstream directly.
def _interact
framework.events.on_session_interact(self)
if self.respond_to?(:ring)
_interact_ring
else
_interact_stream
end
_interact_stream
end
##
@ -406,48 +362,6 @@ protected
Thread.pass
end
end
def _interact_ring
begin
rdr = framework.threads.spawn("RingMonitor", false) do
seq = nil
while self.interacting
# Look for any pending data from the remote ring
nseq,data = ring.read_data(seq)
# Update the sequence number if necessary
seq = nseq || seq
# Write output to the local stream if successful
user_output.print(data) if data
begin
# Wait for new data to arrive on this session
ring.wait(seq)
rescue EOFError => e
break
end
end
end
while self.interacting
# Look for any pending input or errors from the local stream
sd = Rex::ThreadSafe.select([ _local_fd ], nil, [_local_fd], 5.0)
# Write input to the ring's input mechanism
run_cmd(user_input.gets) if sd
end
ensure
rdr.kill
end
end
attr_accessor :ring_seq # This tracks the last seen ring buffer sequence (for shell_read)
attr_accessor :ring_buff # This tracks left over read data to maintain a compatible API
end
class CommandShellWindows < CommandShell

View File

@ -51,11 +51,6 @@ class MainframeShell < Msf::Sessions::CommandShell
# override shell_read to include decode of cp1047
#
def shell_read(length=-1, timeout=1)
#mfimpl
if self.respond_to?(:ring)
return Rex::Text.from_ibm1047(shell_read_ring(length,timeout))
end
begin
rv = Rex::Text.from_ibm1047(rstream.get_once(length, timeout))
framework.events.on_session_output(self, rv) if rv
@ -104,46 +99,5 @@ class MainframeShell < Msf::Sessions::CommandShell
protected
##
#
# _interact_ring overridden to include decoding of cp1047 data
#
def _interact_ring
begin
rdr = framework.threads.spawn("RingMonitor", false) do
seq = nil
while self.interacting
# Look for any pending data from the remote ring
nseq,data = ring.read_data(seq)
# Update the sequence number if necessary
seq = nseq || seq
# Write output to the local stream if successful
user_output.print(Rex::Text.from_ibm1047(data)) if data
begin
# Wait for new data to arrive on this session
ring.wait(seq)
rescue EOFError => e
print_error("EOFError: #{e.class}: #{e}")
break
end
end
end
while self.interacting
# Look for any pending input or errors from the local stream
sd = Rex::ThreadSafe.select([ _local_fd ], nil, [_local_fd], 5.0)
# Write input to the ring's input mechanism
shell_write(user_input.gets) if sd
end
ensure
rdr.kill
end
end
end
end

View File

@ -76,12 +76,6 @@ class RPC_Session < RPC_Base
# Reads the output of a shell session (such as a command output).
#
# @note Shell read is now a positon-aware reader of the shell's associated
# ring buffer. For more direct control of the pointer into a ring
# buffer, a client can instead use ring_read, and note the returned
# sequence number on their own (making multiple views into the same
# session possible, regardless of position in the stream)
# @see #rpc_ring_read
# @param [Integer] sid Session ID.
# @param [Integer] ptr Pointer.
# @raise [Msf::RPC::Exception] An error that could be one of these:
@ -94,17 +88,13 @@ class RPC_Session < RPC_Base
# @example Here's how you would use this from the client:
# rpc.call('session.shell_read', 2)
def rpc_shell_read( sid, ptr=nil)
_valid_session(sid,"shell")
# @session_sequence tracks the pointer into the ring buffer
# data of sessions (by sid) in order to emulate the old behavior
# of shell_read
@session_sequence ||= {}
@session_sequence[sid] ||= 0
ring_buffer = rpc_ring_read(sid,(ptr || @session_sequence[sid]))
if not (ring_buffer["seq"].nil? || ring_buffer["seq"].empty?)
@session_sequence[sid] = ring_buffer["seq"].to_i
s = _valid_session(sid,"shell")
begin
res = s.shell_read(data)
{ "write_count" => res.to_s}
rescue ::Exception => e
error(500, "Session Disconnected: #{e.class} #{e}")
end
return ring_buffer
end
@ -112,8 +102,6 @@ class RPC_Session < RPC_Base
# enf of your input so the system will process it.
# You may want to use #rpc_shell_read to retrieve the output.
#
# @note shell_write is a wrapper of #rpc_ring_put.
# @see #rpc_ring_put
# @raise [Msf::RPC::Exception] An error that could be one of these:
# * 500 Session ID is unknown.
# * 500 Invalid session type.
@ -125,8 +113,13 @@ class RPC_Session < RPC_Base
# @example Here's how you would use this from the client:
# rpc.call('session.shell_write', 2, "DATA")
def rpc_shell_write( sid, data)
_valid_session(sid,"shell")
rpc_ring_put(sid,data)
s = _valid_session(sid,"shell")
begin
res = s.shell_write(data)
{ "write_count" => res.to_s}
rescue ::Exception => e
error(500, "Session Disconnected: #{e.class} #{e}")
end
end
@ -177,7 +170,7 @@ class RPC_Session < RPC_Base
# Reads from a session (such as a command output).
#
# @param [Integer] sid Session ID.
# @param [Integer] ptr Pointer.
# @param [Integer] ptr Pointer (ignored)
# @raise [Msf::RPC::Exception] An error that could be one of these:
# * 500 Session ID is unknown.
# * 500 Invalid session type.
@ -187,11 +180,11 @@ class RPC_Session < RPC_Base
# * 'data' [String] Read data.
# @example Here's how you would use this from the client:
# rpc.call('session.ring_read', 2)
def rpc_ring_read( sid, ptr=nil)
def rpc_ring_read(sid, ptr = nil)
s = _valid_session(sid,"ring")
begin
res = s.ring.read_data(ptr)
{ "seq" => res[0].to_s, "data" => res[1].to_s }
res = s.shell_read()
{ "seq" => 0, "data" => res.to_s }
rescue ::Exception => e
error(500, "Session Disconnected: #{e.class} #{e}")
end
@ -210,7 +203,7 @@ class RPC_Session < RPC_Base
# * 'write_count' [String] Number of bytes written.
# @example Here's how you would use this from the client:
# rpc.call('session.ring_put', 2, "DATA")
def rpc_ring_put( sid, data)
def rpc_ring_put(sid, data)
s = _valid_session(sid,"ring")
begin
res = s.shell_write(data)
@ -230,9 +223,9 @@ class RPC_Session < RPC_Base
# * 'seq' [String] Sequence.
# @example Here's how you would use this from the client:
# rpc.call('session.ring_last', 2)
def rpc_ring_last( sid)
def rpc_ring_last(sid)
s = _valid_session(sid,"ring")
{ "seq" => s.ring.last_sequence.to_s }
{ "seq" => 0 }
end
@ -246,14 +239,8 @@ class RPC_Session < RPC_Base
# * 'result' [String] Either 'success' or 'failure'.
# @example Here's how you would use this from the client:
# rpc.call('session.ring_clear', 2)
def rpc_ring_clear( sid)
s = _valid_session(sid,"ring")
res = s.ring.clear_data
if res.compact.empty?
{ "result" => "success"}
else # Doesn't seem like this can fail. Maybe a race?
{ "result" => "failure"}
end
def rpc_ring_clear(sid)
{ "result" => "success" }
end

View File

@ -35,11 +35,7 @@ protected
#
def _interact
framework.events.on_session_interact(self)
if self.respond_to?(:ring)
interact_ring(ring)
else
interact_stream(rstream)
end
interact_stream(rstream)
end
end

View File

@ -1,6 +1,5 @@
# -*- coding: binary -*-
require 'rex/ui'
require 'rex/io/ring_buffer'
module Msf
module Session
@ -26,8 +25,6 @@ module Interactive
# A nil is passed in the case of non-stream interactive sessions (Meterpreter)
if rstream
self.rstream = rstream
klass = opts[:udp_session] ? Rex::IO::RingBufferUdp : Rex::IO::RingBuffer
self.ring = klass.new(rstream, {:size => opts[:ring_size] || 100 })
end
super()
end
@ -98,11 +95,6 @@ module Interactive
#
attr_accessor :rstream
#
# The RingBuffer object used to allow concurrent access to this session
#
attr_accessor :ring
protected
#

View File

@ -45,7 +45,7 @@ module SingleCommandShell
def shell_read_until_token(token, wanted_idx=0, timeout=10)
return if timeout.to_i == 0
if (wanted_idx == 0)
if wanted_idx == 0
parts_needed = 2
else
parts_needed = 1 + (wanted_idx * 2)
@ -57,12 +57,13 @@ module SingleCommandShell
buf = ''
idx = nil
loop do
if (tmp = shell_read(-1, 2))
if (tmp = shell_read(-1))
buf << tmp
# see if we have the wanted idx
parts = buf.split(token, -1)
if (parts.length == parts_needed)
if parts.length == parts_needed
# cause another prompt to appear (just in case)
shell_write("\n")
return parts[wanted_idx]

View File

@ -49,7 +49,6 @@ class Core
"-k" => [ true, "Terminate sessions by session ID and/or range" ],
"-K" => [ false, "Terminate all sessions" ],
"-s" => [ true, "Run a script or module on the session given with -i, or all" ],
"-r" => [ false, "Reset the ring buffer for the session given with -i, or all" ],
"-u" => [ true, "Upgrade a shell to a meterpreter session on many platforms" ],
"-t" => [ true, "Set a response timeout (default: 15)" ],
"-S" => [ true, "Row search filter." ],
@ -1145,7 +1144,6 @@ class Core
sid = nil
cmds = []
script = nil
reset_ring = false
response_timeout = 15
search_term = nil
session_name = nil
@ -1204,10 +1202,6 @@ class Core
# Search for specific session
when "-S", "--search"
search_term = val
# Reset the ring buffer read pointer
when "-r"
reset_ring = true
method = 'reset_ring'
# Display help banner
when "-h"
cmd_sessions_help
@ -1463,14 +1457,6 @@ class Core
sleep(5)
end
end
when 'reset_ring'
sessions = sid ? [sid] : framework.sessions.keys
sessions.each do |sidx|
s = framework.sessions[sidx]
next unless (s && s.respond_to?(:ring_seq))
s.reset_ring_sequence
print_status("Reset the ring buffer pointer for Session #{sidx}")
end
when 'list', 'list_inactive', nil
print_line
print(Serializer::ReadableText.dump_sessions(framework, show_active: show_active, show_inactive: show_inactive, show_extended: show_extended, verbose: verbose, search_term: search_term))

View File

@ -216,49 +216,6 @@ protected
end
#
# Interacts between a local stream and a remote ring buffer. This has to use
# a secondary thread to prevent the select on the local stream from blocking
#
def interact_ring(ring)
begin
rdr = Rex::ThreadFactory.spawn("RingMonitor", false) do
seq = nil
while self.interacting
# Look for any pending data from the remote ring
nseq,data = ring.read_data(seq)
# Update the sequence number if necessary
seq = nseq || seq
# Write output to the local stream if successful
user_output.print(data) if data
# Wait for new data to arrive on this session
ring.wait(seq)
end
end
while self.interacting
# Look for any pending input from the local stream
sd = Rex::ThreadSafe.select([ _local_fd ], nil, [_local_fd], 5.0)
# Write input to the ring's input mechanism
if sd
data = user_input.gets
ring.put(data)
end
end
ensure
rdr.kill
end
end
#
# Installs a signal handler to monitor suspend signal notifications.
#