Support padding on the CAN bus.

Also use a hash for passing options around instead of individual params.
bug/bundler_fix
Pearce Barry 2017-08-06 18:05:59 -05:00
parent 83cd0bc977
commit cfd377fbd4
3 changed files with 114 additions and 52 deletions

View File

@ -100,11 +100,13 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Array] All supported pids from Mode $01 get current data # @return [Array] All supported pids from Mode $01 get current data
def get_current_data_pids(bus, src_id, dst_id) def get_current_data_pids(bus, src_id, dst_id, opt={})
pids = [] pids = []
packets = get_current_data(bus, src_id, dst_id, 0, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0, opt)
return pids if packets.nil? return pids if packets.nil?
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
@ -114,7 +116,7 @@ module UDS
end end
end end
if pids.include? 0x20 if pids.include? 0x20
packets = get_current_data(bus, src_id, dst_id, 0x20, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0x20, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -124,7 +126,7 @@ module UDS
end end
end end
if pids.include? 0x40 if pids.include? 0x40
packets = get_current_data(bus, src_id, dst_id, 0x40, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0x40, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -134,7 +136,7 @@ module UDS
end end
end end
if pids.include? 0x60 if pids.include? 0x60
packets = get_current_data(bus, src_id, dst_id, 0x60, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0x60, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -144,7 +146,7 @@ module UDS
end end
end end
if pids.include? 0x80 if pids.include? 0x80
packets = get_current_data(bus, src_id, dst_id, 0x80, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0x80, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -154,7 +156,7 @@ module UDS
end end
end end
if pids.include? 0xA0 if pids.include? 0xA0
packets = get_current_data(bus, src_id, dst_id, 0xA0, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0xA0, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -164,7 +166,7 @@ module UDS
end end
end end
if pids.include? 0xC0 if pids.include? 0xC0
packets = get_current_data(bus, src_id, dst_id, 0xC0, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0xC0, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -182,10 +184,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Hash] Packet Hash with { "MIL" => true|false "DTC_COUNT" => 0 } # @return [Hash] Packet Hash with { "MIL" => true|false "DTC_COUNT" => 0 }
def get_monitor_status(bus, src_id, dst_id) def get_monitor_status(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x01, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x01, opt)
return {} if packets.nil? return {} if packets.nil?
return packets if packets.key? "error" return packets if packets.key? "error"
return packets unless packets.key? "Packets" return packets unless packets.key? "Packets"
@ -200,10 +204,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Hash] Packet Hash with { "TEMP_C" => <Celcious Temp>, "TEMP_F" => <Fahrenheit TEmp> } # @return [Hash] Packet Hash with { "TEMP_C" => <Celcious Temp>, "TEMP_F" => <Fahrenheit TEmp> }
def get_engine_coolant_temp(bus, src_id, dst_id) def get_engine_coolant_temp(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x05, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x05, opt)
return {} if packets.nil? return {} if packets.nil?
return packets if packets.key? "error" return packets if packets.key? "error"
return packets unless packets.key? "Packets" return packets unless packets.key? "Packets"
@ -220,10 +226,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Hash] Packet Hash with { "RPM" => <RPMs> } # @return [Hash] Packet Hash with { "RPM" => <RPMs> }
def get_rpms(bus, src_id, dst_id) def get_rpms(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x0C, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x0C, opt)
return {} if packets.nil? return {} if packets.nil?
return packets if packets.key? "error" return packets if packets.key? "error"
return packets unless packets.key? "Packets" return packets unless packets.key? "Packets"
@ -237,10 +245,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Hash] Packet Hash with { "SPEED_K" => <km/h>, "SPEED_M" => <mph> } # @return [Hash] Packet Hash with { "SPEED_K" => <km/h>, "SPEED_M" => <mph> }
def get_vehicle_speed(bus, src_id, dst_id) def get_vehicle_speed(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x0D, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x0D, opt)
return {} if packets.nil? return {} if packets.nil?
return packets if packets.key? "error" return packets if packets.key? "error"
return packets unless packets.key? "Packets" return packets unless packets.key? "Packets"
@ -256,10 +266,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [String] Description of standard # @return [String] Description of standard
def get_obd_standards(bus, src_id, dst_id) def get_obd_standards(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x1C, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x1C, opt)
return "" if packets.nil? return "" if packets.nil?
if packets.key? "error" if packets.key? "error"
print_error("OBD ERR: #{packets['error']}") print_error("OBD ERR: #{packets['error']}")
@ -532,11 +544,13 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Array] Array of PIDS supported by Mode $09 # @return [Array] Array of PIDS supported by Mode $09
def get_vinfo_supported_pids(bus, src_id, dst_id) def get_vinfo_supported_pids(bus, src_id, dst_id, opt = {})
opt['MAXPKTS'] = 1
pids = [] pids = []
packets = get_vehicle_info(bus, src_id, dst_id, 0, { "MAXPKTS" => 1 }) packets = get_vehicle_info(bus, src_id, dst_id, 0, opt)
return pids if packets.nil? return pids if packets.nil?
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
unless packets["Packets"][0]["DATA"][1].hex == 0x49 unless packets["Packets"][0]["DATA"][1].hex == 0x49
@ -558,10 +572,11 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [String] VIN as ASCII # @return [String] VIN as ASCII
def get_vin(bus, src_id, dst_id) def get_vin(bus, src_id, dst_id, opt = {})
packets = get_vehicle_info(bus, src_id, dst_id, 0x02) packets = get_vehicle_info(bus, src_id, dst_id, 0x02, opt)
return "" if packets.nil? return "" if packets.nil?
return "UDS ERR: #{packets['error']}" if packets.key? "error" return "UDS ERR: #{packets['error']}" if packets.key? "error"
data = response_hash_to_data_array(dst_id.to_s(16), packets) data = response_hash_to_data_array(dst_id.to_s(16), packets)
@ -575,10 +590,11 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [String] Calibration ID as ASCII # @return [String] Calibration ID as ASCII
def get_calibration_id(bus, src_id, dst_id) def get_calibration_id(bus, src_id, dst_id, opt = {})
packets = get_vehicle_info(bus, src_id, dst_id, 0x04) packets = get_vehicle_info(bus, src_id, dst_id, 0x04, opt)
return "" if packets.nil? return "" if packets.nil?
return "UDS ERR: #{packets['error']}" if packets.key? "error" return "UDS ERR: #{packets['error']}" if packets.key? "error"
data = response_hash_to_data_array(dst_id.to_s(16), packets) data = response_hash_to_data_array(dst_id.to_s(16), packets)
@ -592,10 +608,11 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [String] ECU Name as ASCII # @return [String] ECU Name as ASCII
def get_ecu_name(bus, src_id, dst_id) def get_ecu_name(bus, src_id, dst_id, opt = {})
packets = get_vehicle_info(bus, src_id, dst_id, 0x0A) packets = get_vehicle_info(bus, src_id, dst_id, 0x0A, opt)
return "" if packets.nil? return "" if packets.nil?
return "UDS ERR: #{packets['error']}" if packets.key? "error" return "UDS ERR: #{packets['error']}" if packets.key? "error"
data = response_hash_to_data_array(dst_id.to_s(16), packets) data = response_hash_to_data_array(dst_id.to_s(16), packets)
@ -616,9 +633,10 @@ module UDS
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param level [Integer] The desired DSC level # @param level [Integer] The desired DSC level
# @param opt [Hash] Optional arguments. PADDING if set uses this hex value for padding
# #
# @return [Hash] client.automtoive response # @return [Hash] client.automtoive response
def set_dsc(bus, src_id, dst_id, level) def set_dsc(bus, src_id, dst_id, level, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -631,9 +649,12 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["TIMEOUT"] = 20 opt["TIMEOUT"] = 20
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 1
opt["PADDING"] = padding unless padding.nil?
client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x10, level], opt) client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x10, level], opt)
end end
@ -674,10 +695,11 @@ module UDS
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param id [Array] 2 Bytes in an array of the identifier. Example [ 0xF1, 0x90 ] # @param id [Array] 2 Bytes in an array of the identifier. Example [ 0xF1, 0x90 ]
# @param show_error [Boolean] If an error, return the Packet hash instead, Default false # @param opt [Hash] Additional Options. SHOW_ERROR (Returns packet hash instead, default false)
# PADDING if set uses this hex value for padding
# #
# @return [Array] Data retrieved. If show_error is true and an error is detected, then packet hash will be returned instead # @return [Array] Data retrieved. If show_error is true and an error is detected, then packet hash will be returned instead
def read_data_by_id(bus, src_id, dst_id, id, show_error = false) def read_data_by_id(bus, src_id, dst_id, id, opt = {})
data = [] data = []
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
@ -702,14 +724,22 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
show_error = false
padding = nil
show_error = true if opt.key? 'SHOW_ERROR'
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 15 opt["MAXPKTS"] = 15
opt["PADDING"] = padding unless padding.nil?
packets = client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x22] + id, opt) packets = client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x22] + id, opt)
return [] if packets.nil? return [] if packets.nil?
if packets.key? "error" if packets.key? "error"
return packets if show_error return packets if show_error
else else
data = response_hash_to_data_array(dst_id, packets) data = response_hash_to_data_array(dst_id, packets)
if id.size > 1 # Remove IDs from return
data = data[(id.size-1)..data.size]
end
end end
data data
end end
@ -723,9 +753,10 @@ module UDS
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param level [Integer] Requested security access level. Default is 1 # @param level [Integer] Requested security access level. Default is 1
# @param opt [Hash] Optional settings. PADDING if set uses this hex value for padding
# #
# @return [Hash] Packet Hash with { "SEED" => [ XX, XX ] } # @return [Hash] Packet Hash with { "SEED" => [ XX, XX ] }
def get_security_token(bus, src_id, dst_id, level = 1) def get_security_token(bus, src_id, dst_id, level = 1, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -738,8 +769,11 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 2
opt["PADDING"] = padding unless padding.nil?
packets = client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27, level], opt) packets = client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27, level], opt)
return {} if packets.nil? return {} if packets.nil?
unless packets.key? "error" unless packets.key? "error"
@ -754,11 +788,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# param key [Array] Array of Hex to be used as the key. Same size as the seed # @param key [Array] Array of Hex to be used as the key. Same size as the seed
# @param response_level [Integer] Requested security access level response. Usually level + 1. Default is 2 # @param response_level [Integer] Requested security access level response. Usually level + 1. Default is 2
# @param opt [Hash] Optional settings. PADDING if set uses this hex value for padding
# #
# @return [Hash] packet response from client.automotoive # @return [Hash] packet response from client.automotoive
def send_security_token_response(bus, src_id, dst_id, key, response_level = 2) def send_security_token_response(bus, src_id, dst_id, key, response_level = 2, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -776,8 +811,11 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 2
opt["PADDING"] = padding unless padding.nil?
client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27, response_level] + key, opt) client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27, response_level] + key, opt)
end end
@ -791,9 +829,10 @@ module UDS
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param id [Array] 2 Bytes in an array of the identifier. Example [ 0xF1, 0x90 ] # @param id [Array] 2 Bytes in an array of the identifier. Example [ 0xF1, 0x90 ]
# @param data [Array] Array of bytes to write # @param data [Array] Array of bytes to write
# @param opt [Hash] Optional settings. PADDING if set uses this hex value for padding
# #
# @return [Hash] Packet hash from client.automotive # @return [Hash] Packet hash from client.automotive
def write_data_by_id(bus, src_id, dst_id, id, data) def write_data_by_id(bus, src_id, dst_id, id, data, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -815,8 +854,11 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 1
opt["PADDING"] = padding unless padding.nil?
client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27] + id + data, opt) client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27] + id + data, opt)
end end
@ -871,10 +913,11 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param suppress_response [Boolean] By default suppress ACK from ECU. Set to false if you want confirmation # @param opt [Hash] Optional arguments such as: PADDING if set uses this hex value for padding
# SUPPRESS_RESPONSE By default suppress ACK from ECU. Set to false if you want confirmation
# #
# @return [Hash] Packet hash from client.automotive. Typically blank unless suppress_response is false # @return [Hash] Packet hash from client.automotive. Typically blank unless suppress_response is false
def send_tester_present(bus, src_id, dst_id, suppress_response = true) def send_tester_present(bus, src_id, dst_id, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -886,10 +929,13 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
suppress = 0x80 suppress = 0x80
suppress = 0 unless suppress_response suppress = 0 unless (opt.key? 'SUPRESS_RESPONSE') && opt['SUPRESS_RESPONSE'] == false
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 1
opt["PADDING"] = padding unless padding.nil?
client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x3E, suppress], opt) client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x3E, suppress], opt)
end end

View File

@ -81,6 +81,18 @@ class Automotive < Extension
arr.map { |b| "%02x" % (b.respond_to?("hex") ? b.hex : b ) } arr.map { |b| "%02x" % (b.respond_to?("hex") ? b.hex : b ) }
end end
#
# Pad the end of a packet with a set byte until it is 8 bytes long
#
# @param data [Array] Packet to padd
# @param padding [Integer] Expected single byte 0x00 style argument
# @return [Array] Packet as data
def padd_packet(data, padding)
return data if padding.nil?
return data if data.size > 7
data + [ padding ] * (8 - data.size)
end
def set_active_bus(bus) def set_active_bus(bus)
self.active_bus = bus self.active_bus = bus
end end
@ -107,6 +119,7 @@ class Automotive < Extension
# TODO: Implement sending ISO-TP > 8 bytes # TODO: Implement sending ISO-TP > 8 bytes
data = [ data ] if data.is_a? Integer data = [ data ] if data.is_a? Integer
if data.size < 8 if data.size < 8
data = padd_packet(data, opt['PADDING']) if opt.key? 'PADDING'
data = array2hex(data).join data = array2hex(data).join
request_str = "/automotive/#{bus}/isotpsend_and_wait?srcid=#{src_id}&dstid=#{dst_id}&data=#{data}" request_str = "/automotive/#{bus}/isotpsend_and_wait?srcid=#{src_id}&dstid=#{dst_id}&data=#{data}"
request_str += "&timeout=#{opt['TIMEOUT']}" if opt.key? "TIMEOUT" request_str += "&timeout=#{opt['TIMEOUT']}" if opt.key? "TIMEOUT"

View File

@ -23,6 +23,7 @@ class MetasploitModule < Msf::Post
register_options([ register_options([
OptInt.new('SRCID', [true, "Module ID to query", 0x7e0]), OptInt.new('SRCID', [true, "Module ID to query", 0x7e0]),
OptInt.new('DSTID', [false, "Expected reponse ID, defaults to SRCID + 8", 0x7e8]), OptInt.new('DSTID', [false, "Expected reponse ID, defaults to SRCID + 8", 0x7e8]),
OptInt.new('PADDING', [false, "Optinal end of packet padding", nil]),
OptBool.new('CLEAR_DTCS', [false, "Clear any DTCs and reset MIL if errors are present", false]), OptBool.new('CLEAR_DTCS', [false, "Clear any DTCs and reset MIL if errors are present", false]),
OptString.new('CANBUS', [false, "CAN Bus to perform scan on, defaults to connected bus", nil]) OptString.new('CANBUS', [false, "CAN Bus to perform scan on, defaults to connected bus", nil])
]) ])
@ -30,7 +31,9 @@ class MetasploitModule < Msf::Post
end end
def run def run
pids = get_current_data_pids(datastore["CANBUS"], datastore["SRCID"], datastore["DSTID"]) opt = {}
opt['PADDING'] = datastore["PADDING"] if datastore["PADDING"]
pids = get_current_data_pids(datastore["CANBUS"], datastore["SRCID"], datastore["DSTID"], opt)
if pids.size == 0 if pids.size == 0
print_status("No reported PIDs. You may not be properly connected") print_status("No reported PIDs. You may not be properly connected")
else else
@ -38,26 +41,26 @@ class MetasploitModule < Msf::Post
print_status(" #{pids.inspect}") print_status(" #{pids.inspect}")
end end
if pids.include? 1 if pids.include? 1
data = get_monitor_status(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) data = get_monitor_status(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status(" MIL (Engine Light) : #{data['MIL'] ? 'ON' : 'OFF'}") if data.key? "MIL" print_status(" MIL (Engine Light) : #{data['MIL'] ? 'ON' : 'OFF'}") if data.key? "MIL"
print_status(" Number of DTCs: #{data['DTC_COUNT']}") if data.key? "DTC_COUNT" print_status(" Number of DTCs: #{data['DTC_COUNT']}") if data.key? "DTC_COUNT"
end end
if pids.include? 5 if pids.include? 5
data = get_engine_coolant_temp(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) data = get_engine_coolant_temp(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status(" Engine Temp: #{data['TEMP_C']} \u00b0C / #{data['TEMP_F']} \u00b0F") if data.key? "TEMP_C" print_status(" Engine Temp: #{data['TEMP_C']} \u00b0C / #{data['TEMP_F']} \u00b0F") if data.key? "TEMP_C"
end end
if pids.include? 0x0C if pids.include? 0x0C
data = get_rpms(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) data = get_rpms(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status(" RPMS: #{data['RPM']}") if data.key? "RPM" print_status(" RPMS: #{data['RPM']}") if data.key? "RPM"
end end
if pids.include? 0x0D if pids.include? 0x0D
data = get_vehicle_speed(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) data = get_vehicle_speed(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status(" Speed: #{data['SPEED_K']} km/h / #{data['SPEED_M']} mph") if data.key? "SPEED_K" print_status(" Speed: #{data['SPEED_K']} km/h / #{data['SPEED_M']} mph") if data.key? "SPEED_K"
end end
if pids.include? 0x1C if pids.include? 0x1C
print_status("Supported OBD Standards: #{get_obd_standards(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'])}") print_status("Supported OBD Standards: #{get_obd_standards(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)}")
end end
dtcs = get_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) dtcs = get_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
unless dtcs.empty? unless dtcs.empty?
print_status("DTCS:") print_status("DTCS:")
dtcs.each do |dtc| dtcs.each do |dtc|
@ -66,7 +69,7 @@ class MetasploitModule < Msf::Post
print_status(" #{msg}") print_status(" #{msg}")
end end
end end
frozen_dtcs = get_frozen_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) frozen_dtcs = get_frozen_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
unless frozen_dtcs.empty? unless frozen_dtcs.empty?
print_status("Frozen DTCS:") print_status("Frozen DTCS:")
frozen_dtcs.each do |dtc| frozen_dtcs.each do |dtc|
@ -75,27 +78,27 @@ class MetasploitModule < Msf::Post
print_status(" #{msg}") print_status(" #{msg}")
end end
end end
pids = get_vinfo_supported_pids(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) pids = get_vinfo_supported_pids(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("Mode $09 Vehicle Info Supported PIDS: #{pids.inspect}") if pids.size > 0 print_status("Mode $09 Vehicle Info Supported PIDS: #{pids.inspect}") if pids.size > 0
pids.each do |pid| pids.each do |pid|
# Handle known pids # Handle known pids
if pid == 2 if pid == 2
vin = get_vin(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) vin = get_vin(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("VIN: #{vin}") print_status("VIN: #{vin}")
elsif pid == 4 elsif pid == 4
calid = get_calibration_id(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) calid = get_calibration_id(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("Calibration ID: #{calid}") print_status("Calibration ID: #{calid}")
elsif pid == 0x0A elsif pid == 0x0A
ecuname = get_ecu_name(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) ecuname = get_ecu_name(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("ECU Name: #{ecuname}") print_status("ECU Name: #{ecuname}")
else else
data = get_vehicle_info(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], pid) data = get_vehicle_info(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], pid, opt)
data = response_hash_to_data_array(datastore['DSTID'].to_s(16), data) data = response_hash_to_data_array(datastore['DSTID'].to_s(16), data)
print_status("PID #{pid} Response: #{data.inspect}") print_status("PID #{pid} Response: #{data.inspect}")
end end
end end
if datastore['CLEAR_DTCS'] == true if datastore['CLEAR_DTCS'] == true
clear_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) clear_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("Cleared DTCs and reseting MIL") print_status("Cleared DTCs and reseting MIL")
end end
end end