metasploit-framework/lib/snmp/varbind.rb

580 lines
12 KiB
Ruby
Raw Normal View History

#
# Copyright (c) 2004 David R. Halliday
# All rights reserved.
#
# This SNMP library is free software. Redistribution is permitted under the
# same terms and conditions as the standard Ruby distribution. See the
# COPYING file in the Ruby distribution for details.
#
require 'snmp/ber'
include SNMP::BER
module SNMP
class UnsupportedValueTag < RuntimeError; end
class InvalidIpAddress < ArgumentError; end
class VarBindList < Array
def self.decode(data)
list = VarBindList.new
varbind_data, remainder = decode_sequence(data)
while varbind_data != ""
varbind, varbind_data = VarBind.decode(varbind_data)
list << varbind
end
return list, remainder
end
def initialize(varbind_list=[])
super()
if varbind_list.respond_to? :to_str
self << ObjectId.new(varbind_list.to_str).to_varbind
elsif varbind_list.respond_to? :to_varbind
self << varbind_list.to_varbind
else
varbind_list.each do |item|
if item.respond_to? :to_str
self << ObjectId.new(item.to_str).to_varbind
else
self << item.to_varbind
end
end
end
end
def asn1_type
"VarBindList"
end
def encode
varbind_data = ""
self.each do |varbind|
varbind_data << varbind.encode
end
encode_sequence(varbind_data)
end
end
class Integer
include Comparable
def self.decode(value_data)
Integer.new(decode_integer_value(value_data))
end
def asn1_type
"INTEGER"
end
def initialize(value)
@value = value.to_i
end
def <=>(other)
@value <=> other.to_i
end
def coerce(other)
if other.kind_of? Fixnum
return [other, @value]
else
return [other.to_f, self.to_f]
end
end
def to_s
@value.to_s
end
def to_i
@value
end
def to_f
@value.to_f
end
def encode
encode_integer(@value)
end
def to_oid
raise RangeError, "@{value} cannot be an OID (must be >0)" if @value < 0
ObjectId.new([@value])
end
end
class Integer32 < Integer
def initialize(value)
super(value)
raise ArgumentError, "Out of range: #{value}" if value < -2147483648
raise ArgumentError, "Out of range: #{value}" if value > 2147483647
end
end
class OctetString < String
def self.decode(value_data)
OctetString.new(value_data)
end
def asn1_type
"OCTET STRING"
end
def encode
encode_octet_string(self)
end
def to_oid
oid = ObjectId.new
each_byte { |b| oid << b }
oid
end
end
class ObjectId < Array
include Comparable
def self.decode(value_data)
ObjectId.new(decode_object_id_value(value_data))
end
def asn1_type
"OBJECT IDENTIFIER"
end
##
# Create an object id. The input is expected to be either a string
# in the format "n.n.n.n.n.n" or an array of integers.
#
def initialize(id=[])
if id.nil?
raise ArgumentError
elsif id.respond_to? :to_str
super(make_integers(id.to_str.split(".")))
else
super(make_integers(id.to_ary))
end
rescue ArgumentError
raise ArgumentError, "#{id.inspect}:#{id.class} not a valid object ID"
end
def to_varbind
VarBind.new(self, Null)
end
def to_oid
self
end
def to_s
self.join('.')
end
def inspect
"[#{self.to_s}]"
end
def encode
encode_object_id(self)
end
##
# Returns true if this ObjectId is a subtree of the provided parent tree
# ObjectId. For example, "1.3.6.1.5" is a subtree of "1.3.6.1".
#
def subtree_of?(parent_tree)
parent_tree = make_object_id(parent_tree)
if parent_tree.length > self.length
false
else
parent_tree.each_index do |i|
return false if parent_tree[i] != self[i]
end
true
end
end
##
# Returns an index based on the difference between this ObjectId
# and the provided parent ObjectId.
#
# For example, ObjectId.new("1.3.6.1.5").index("1.3.6.1") returns an
# ObjectId of "5".
#
def index(parent_tree)
parent_tree = make_object_id(parent_tree)
if not subtree_of?(parent_tree)
raise ArgumentError, "#{self.to_s} not a subtree of #{parent_tree.to_s}"
elsif self.length == parent_tree.length
raise ArgumentError, "OIDs are the same"
else
ObjectId.new(self[parent_tree.length..-1])
end
end
private
def make_integers(list)
list.collect{|n| Integer(n)}
end
def make_object_id(oid)
oid.kind_of?(ObjectId) ? oid : ObjectId.new(oid)
end
end
class IpAddress
class << self
def decode(value_data)
IpAddress.new(value_data)
end
end
def asn1_type
"IpAddress"
end
##
# Create an IpAddress object. The constructor accepts either a raw
# four-octet string or a formatted string of integers separated by dots
# (i.e. "10.1.2.3").
#
def initialize(value_data)
ip = value_data.to_str
if ip.length > 4
ip = parse_string(ip)
elsif ip.length != 4
raise InvalidIpAddress, "Expected 4 octets or formatted string, got #{value_data.inspect}"
end
@value = ip
end
##
# Returns a raw four-octet string representing this IpAddress.
#
def to_str
@value.dup
end
##
# Returns a formatted, dot-separated string representing this IpAddress.
#
def to_s
octets = []
@value.each_byte { |b| octets << b.to_s }
octets.join('.')
end
def to_oid
oid = ObjectId.new
@value.each_byte { |b| oid << b }
oid
end
def ==(other)
if other.respond_to? :to_str
return @value.eql?(other.to_str)
else
return false
end
end
def eql?(other)
self == other
end
def hash
@value.hash
end
def encode
encode_tlv(IpAddress_TAG, @value)
end
private
def parse_string(ip_string)
parts = ip_string.split(".")
raise InvalidIpAddress, ip_string.inspect if parts.length != 4
value_data = ""
parts.each do |s|
octet = s.to_i
raise InvalidIpAddress, ip_string.inspect if octet > 255
raise InvalidIpAddress, ip_string.inspect if octet < 0
value_data << octet.chr
end
value_data
end
end
class UnsignedInteger < Integer
def initialize(value)
super(value)
raise ArgumentError, "Negative integer invalid: #{value}" if value < 0
raise ArgumentError, "Out of range: #{value}" if value > 4294967295
end
def self.decode(value_data)
self.new(decode_uinteger_value(value_data))
end
end
class Counter32 < UnsignedInteger
def asn1_type
"Counter32"
end
def encode
encode_tagged_integer(Counter32_TAG, @value)
end
end
class Gauge32 < UnsignedInteger
def asn1_type
"Gauge32"
end
def encode
encode_tagged_integer(Gauge32_TAG, @value)
end
end
class Unsigned32 < UnsignedInteger
def asn1_type
"Unsigned32"
end
def encode
encode_tagged_integer(Unsigned32_TAG, @value)
end
end
class TimeTicks < UnsignedInteger
def asn1_type
"TimeTicks"
end
def encode
encode_tagged_integer(TimeTicks_TAG, @value)
end
def to_s
days, remainder = @value.divmod(8640000)
hours, remainder = remainder.divmod(360000)
minutes, remainder = remainder.divmod(6000)
seconds, hundredths = remainder.divmod(100)
case
when days < 1
sprintf('%02d:%02d:%02d.%02d',
hours, minutes, seconds, hundredths)
when days == 1
sprintf('1 day, %02d:%02d:%02d.%02d',
hours, minutes, seconds, hundredths)
when days > 1
sprintf('%d days, %02d:%02d:%02d.%02d',
days, hours, minutes, seconds, hundredths)
end
end
end
class Opaque < OctetString
def self.decode(value_data)
Opaque.new(value_data)
end
def asn1_type
"Opaque"
end
def encode
encode_tlv(Opaque_TAG, self)
end
end
class Counter64 < Integer
def self.decode(value_data)
Counter64.new(decode_integer_value(value_data))
end
def asn1_type
"Counter64"
end
def initialize(value)
super(value)
raise ArgumentError, "Negative integer invalid: #{value}" if value < 0
raise ArgumentError, "Out of range: #{value}" if value > 18446744073709551615
end
def encode
encode_tagged_integer(Counter64_TAG, @value)
end
end
class Null
class << self
def decode(value_data)
Null
end
def encode
encode_null
end
def asn1_type
'Null'
end
def to_s
asn1_type
end
end
end
class NoSuchObject
class << self
def decode(value_data)
NoSuchObject
end
def encode
encode_exception(NoSuchObject_TAG)
end
def asn1_type
'noSuchObject'
end
def to_s
asn1_type
end
end
end
class NoSuchInstance
class << self
def decode(value_data)
NoSuchInstance
end
def encode
encode_exception(NoSuchInstance_TAG)
end
def asn1_type
'noSuchInstance'
end
def to_s
asn1_type
end
end
end
class EndOfMibView
class << self
def decode(value_data)
EndOfMibView
end
def encode
encode_exception(EndOfMibView_TAG)
end
def asn1_type
'endOfMibView'
end
def to_s
asn1_type
end
end
end
class VarBind
attr_accessor :name
attr_accessor :value
alias :oid :name
class << self
def decode(data)
varbind_data, remaining_varbind_data = decode_sequence(data)
name, remainder = decode_object_id(varbind_data)
value, remainder = decode_value(remainder)
assert_no_remainder(remainder)
return VarBind.new(name, value), remaining_varbind_data
end
ValueDecoderMap = {
INTEGER_TAG => Integer,
OCTET_STRING_TAG => OctetString,
NULL_TAG => Null,
OBJECT_IDENTIFIER_TAG => ObjectId,
IpAddress_TAG => IpAddress,
Counter32_TAG => Counter32,
Gauge32_TAG => Gauge32,
# note Gauge32 tag same as Unsigned32
TimeTicks_TAG => TimeTicks,
Opaque_TAG => Opaque,
Counter64_TAG => Counter64,
NoSuchObject_TAG => NoSuchObject,
NoSuchInstance_TAG => NoSuchInstance,
EndOfMibView_TAG => EndOfMibView
}
def decode_value(data)
value_tag, value_data, remainder = decode_tlv(data)
decoder_class = ValueDecoderMap[value_tag]
if decoder_class
value = decoder_class.decode(value_data)
else
raise UnsupportedValueTag, value_tag.to_s
end
return value, remainder
end
end
def initialize(name, value=Null)
if name.kind_of? ObjectId
@name = name
else
@name = ObjectName.new(name)
end
@value = value
end
def asn1_type
"VarBind"
end
def to_varbind
self
end
def to_s
"[name=#{@name.to_s}, value=#{@value.to_s} (#{@value.asn1_type})]"
end
def each
yield self
end
def encode
data = encode_object_id(@name) << value.encode
encode_sequence(data)
end
end
class ObjectName < ObjectId
def asn1_type
"ObjectName"
end
end
end