Land #8801, Support padding on the CAN bus.

bug/bundler_fix
Pearce Barry 2017-08-06 21:03:28 -05:00
commit f71ca924c4
No known key found for this signature in database
GPG Key ID: 0916F4DEA5C5DE0A
4 changed files with 119 additions and 53 deletions

View File

@ -24,7 +24,11 @@ PIDs to ASCII.
**CLEAR_DTCS*** **CLEAR_DTCS***
If any Diagnostic Trouble Codes (DTCs) are present it will clear those and reset the MIL (Enginge Light) If any Diagnostic Trouble Codes (DTCs) are present it will clear those and reset the MIL (Engine Light).
**PADDING**
Optional byte-value to use for padding all CAN bus packets to an 8-byte length. Padding is disabled by default.
## Scenarios ## Scenarios

View File

@ -100,11 +100,13 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Array] All supported pids from Mode $01 get current data # @return [Array] All supported pids from Mode $01 get current data
def get_current_data_pids(bus, src_id, dst_id) def get_current_data_pids(bus, src_id, dst_id, opt={})
pids = [] pids = []
packets = get_current_data(bus, src_id, dst_id, 0, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0, opt)
return pids if packets.nil? return pids if packets.nil?
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
@ -114,7 +116,7 @@ module UDS
end end
end end
if pids.include? 0x20 if pids.include? 0x20
packets = get_current_data(bus, src_id, dst_id, 0x20, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0x20, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -124,7 +126,7 @@ module UDS
end end
end end
if pids.include? 0x40 if pids.include? 0x40
packets = get_current_data(bus, src_id, dst_id, 0x40, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0x40, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -134,7 +136,7 @@ module UDS
end end
end end
if pids.include? 0x60 if pids.include? 0x60
packets = get_current_data(bus, src_id, dst_id, 0x60, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0x60, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -144,7 +146,7 @@ module UDS
end end
end end
if pids.include? 0x80 if pids.include? 0x80
packets = get_current_data(bus, src_id, dst_id, 0x80, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0x80, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -154,7 +156,7 @@ module UDS
end end
end end
if pids.include? 0xA0 if pids.include? 0xA0
packets = get_current_data(bus, src_id, dst_id, 0xA0, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0xA0, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -164,7 +166,7 @@ module UDS
end end
end end
if pids.include? 0xC0 if pids.include? 0xC0
packets = get_current_data(bus, src_id, dst_id, 0xC0, { "MAXPKTS" => 1 }) packets = get_current_data(bus, src_id, dst_id, 0xC0, opt)
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
hexpids = packets["Packets"][0]["DATA"][3, 6] hexpids = packets["Packets"][0]["DATA"][3, 6]
hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s hexpids = hexpids.join.hex.to_s(2).rjust(32, '0').split('') # Array of 1s and 0s
@ -182,10 +184,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Hash] Packet Hash with { "MIL" => true|false "DTC_COUNT" => 0 } # @return [Hash] Packet Hash with { "MIL" => true|false "DTC_COUNT" => 0 }
def get_monitor_status(bus, src_id, dst_id) def get_monitor_status(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x01, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x01, opt)
return {} if packets.nil? return {} if packets.nil?
return packets if packets.key? "error" return packets if packets.key? "error"
return packets unless packets.key? "Packets" return packets unless packets.key? "Packets"
@ -200,10 +204,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Hash] Packet Hash with { "TEMP_C" => <Celcious Temp>, "TEMP_F" => <Fahrenheit TEmp> } # @return [Hash] Packet Hash with { "TEMP_C" => <Celcious Temp>, "TEMP_F" => <Fahrenheit TEmp> }
def get_engine_coolant_temp(bus, src_id, dst_id) def get_engine_coolant_temp(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x05, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x05, opt)
return {} if packets.nil? return {} if packets.nil?
return packets if packets.key? "error" return packets if packets.key? "error"
return packets unless packets.key? "Packets" return packets unless packets.key? "Packets"
@ -220,10 +226,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Hash] Packet Hash with { "RPM" => <RPMs> } # @return [Hash] Packet Hash with { "RPM" => <RPMs> }
def get_rpms(bus, src_id, dst_id) def get_rpms(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x0C, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x0C, opt)
return {} if packets.nil? return {} if packets.nil?
return packets if packets.key? "error" return packets if packets.key? "error"
return packets unless packets.key? "Packets" return packets unless packets.key? "Packets"
@ -237,10 +245,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Hash] Packet Hash with { "SPEED_K" => <km/h>, "SPEED_M" => <mph> } # @return [Hash] Packet Hash with { "SPEED_K" => <km/h>, "SPEED_M" => <mph> }
def get_vehicle_speed(bus, src_id, dst_id) def get_vehicle_speed(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x0D, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x0D, opt)
return {} if packets.nil? return {} if packets.nil?
return packets if packets.key? "error" return packets if packets.key? "error"
return packets unless packets.key? "Packets" return packets unless packets.key? "Packets"
@ -256,10 +266,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [String] Description of standard # @return [String] Description of standard
def get_obd_standards(bus, src_id, dst_id) def get_obd_standards(bus, src_id, dst_id, opt = {})
packets = get_current_data(bus, src_id, dst_id, 0x1C, { "MAXPKTS" => 1 }) opt['MAXPKTS'] = 1
packets = get_current_data(bus, src_id, dst_id, 0x1C, opt)
return "" if packets.nil? return "" if packets.nil?
if packets.key? "error" if packets.key? "error"
print_error("OBD ERR: #{packets['error']}") print_error("OBD ERR: #{packets['error']}")
@ -532,11 +544,13 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [Array] Array of PIDS supported by Mode $09 # @return [Array] Array of PIDS supported by Mode $09
def get_vinfo_supported_pids(bus, src_id, dst_id) def get_vinfo_supported_pids(bus, src_id, dst_id, opt = {})
opt['MAXPKTS'] = 1
pids = [] pids = []
packets = get_vehicle_info(bus, src_id, dst_id, 0, { "MAXPKTS" => 1 }) packets = get_vehicle_info(bus, src_id, dst_id, 0, opt)
return pids if packets.nil? return pids if packets.nil?
if (packets.key? "Packets") && !packets["Packets"].empty? if (packets.key? "Packets") && !packets["Packets"].empty?
unless packets["Packets"][0]["DATA"][1].hex == 0x49 unless packets["Packets"][0]["DATA"][1].hex == 0x49
@ -558,10 +572,11 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [String] VIN as ASCII # @return [String] VIN as ASCII
def get_vin(bus, src_id, dst_id) def get_vin(bus, src_id, dst_id, opt = {})
packets = get_vehicle_info(bus, src_id, dst_id, 0x02) packets = get_vehicle_info(bus, src_id, dst_id, 0x02, opt)
return "" if packets.nil? return "" if packets.nil?
return "UDS ERR: #{packets['error']}" if packets.key? "error" return "UDS ERR: #{packets['error']}" if packets.key? "error"
data = response_hash_to_data_array(dst_id.to_s(16), packets) data = response_hash_to_data_array(dst_id.to_s(16), packets)
@ -575,10 +590,11 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [String] Calibration ID as ASCII # @return [String] Calibration ID as ASCII
def get_calibration_id(bus, src_id, dst_id) def get_calibration_id(bus, src_id, dst_id, opt = {})
packets = get_vehicle_info(bus, src_id, dst_id, 0x04) packets = get_vehicle_info(bus, src_id, dst_id, 0x04, opt)
return "" if packets.nil? return "" if packets.nil?
return "UDS ERR: #{packets['error']}" if packets.key? "error" return "UDS ERR: #{packets['error']}" if packets.key? "error"
data = response_hash_to_data_array(dst_id.to_s(16), packets) data = response_hash_to_data_array(dst_id.to_s(16), packets)
@ -592,10 +608,11 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param opt [Hash] Additional options to be passed to automotive.send_isotp_and_wait_for_response
# #
# @return [String] ECU Name as ASCII # @return [String] ECU Name as ASCII
def get_ecu_name(bus, src_id, dst_id) def get_ecu_name(bus, src_id, dst_id, opt = {})
packets = get_vehicle_info(bus, src_id, dst_id, 0x0A) packets = get_vehicle_info(bus, src_id, dst_id, 0x0A, opt)
return "" if packets.nil? return "" if packets.nil?
return "UDS ERR: #{packets['error']}" if packets.key? "error" return "UDS ERR: #{packets['error']}" if packets.key? "error"
data = response_hash_to_data_array(dst_id.to_s(16), packets) data = response_hash_to_data_array(dst_id.to_s(16), packets)
@ -616,9 +633,10 @@ module UDS
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param level [Integer] The desired DSC level # @param level [Integer] The desired DSC level
# @param opt [Hash] Optional arguments. PADDING if set uses this hex value for padding
# #
# @return [Hash] client.automtoive response # @return [Hash] client.automtoive response
def set_dsc(bus, src_id, dst_id, level) def set_dsc(bus, src_id, dst_id, level, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -631,9 +649,12 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["TIMEOUT"] = 20 opt["TIMEOUT"] = 20
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 1
opt["PADDING"] = padding unless padding.nil?
client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x10, level], opt) client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x10, level], opt)
end end
@ -674,10 +695,11 @@ module UDS
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param id [Array] 2 Bytes in an array of the identifier. Example [ 0xF1, 0x90 ] # @param id [Array] 2 Bytes in an array of the identifier. Example [ 0xF1, 0x90 ]
# @param show_error [Boolean] If an error, return the Packet hash instead, Default false # @param opt [Hash] Additional Options. SHOW_ERROR (Returns packet hash instead, default false)
# PADDING if set uses this hex value for padding
# #
# @return [Array] Data retrieved. If show_error is true and an error is detected, then packet hash will be returned instead # @return [Array] Data retrieved. If show_error is true and an error is detected, then packet hash will be returned instead
def read_data_by_id(bus, src_id, dst_id, id, show_error = false) def read_data_by_id(bus, src_id, dst_id, id, opt = {})
data = [] data = []
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
@ -702,14 +724,22 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
show_error = false
padding = nil
show_error = true if opt.key? 'SHOW_ERROR'
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 15 opt["MAXPKTS"] = 15
opt["PADDING"] = padding unless padding.nil?
packets = client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x22] + id, opt) packets = client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x22] + id, opt)
return [] if packets.nil? return [] if packets.nil?
if packets.key? "error" if packets.key? "error"
return packets if show_error return packets if show_error
else else
data = response_hash_to_data_array(dst_id, packets) data = response_hash_to_data_array(dst_id, packets)
if id.size > 1 # Remove IDs from return
data = data[(id.size-1)..data.size]
end
end end
data data
end end
@ -723,9 +753,10 @@ module UDS
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param level [Integer] Requested security access level. Default is 1 # @param level [Integer] Requested security access level. Default is 1
# @param opt [Hash] Optional settings. PADDING if set uses this hex value for padding
# #
# @return [Hash] Packet Hash with { "SEED" => [ XX, XX ] } # @return [Hash] Packet Hash with { "SEED" => [ XX, XX ] }
def get_security_token(bus, src_id, dst_id, level = 1) def get_security_token(bus, src_id, dst_id, level = 1, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -738,8 +769,11 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 2
opt["PADDING"] = padding unless padding.nil?
packets = client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27, level], opt) packets = client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27, level], opt)
return {} if packets.nil? return {} if packets.nil?
unless packets.key? "error" unless packets.key? "error"
@ -754,11 +788,12 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# param key [Array] Array of Hex to be used as the key. Same size as the seed # @param key [Array] Array of Hex to be used as the key. Same size as the seed
# @param response_level [Integer] Requested security access level response. Usually level + 1. Default is 2 # @param response_level [Integer] Requested security access level response. Usually level + 1. Default is 2
# @param opt [Hash] Optional settings. PADDING if set uses this hex value for padding
# #
# @return [Hash] packet response from client.automotoive # @return [Hash] packet response from client.automotoive
def send_security_token_response(bus, src_id, dst_id, key, response_level = 2) def send_security_token_response(bus, src_id, dst_id, key, response_level = 2, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -776,8 +811,11 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 2
opt["PADDING"] = padding unless padding.nil?
client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27, response_level] + key, opt) client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27, response_level] + key, opt)
end end
@ -791,9 +829,10 @@ module UDS
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param id [Array] 2 Bytes in an array of the identifier. Example [ 0xF1, 0x90 ] # @param id [Array] 2 Bytes in an array of the identifier. Example [ 0xF1, 0x90 ]
# @param data [Array] Array of bytes to write # @param data [Array] Array of bytes to write
# @param opt [Hash] Optional settings. PADDING if set uses this hex value for padding
# #
# @return [Hash] Packet hash from client.automotive # @return [Hash] Packet hash from client.automotive
def write_data_by_id(bus, src_id, dst_id, id, data) def write_data_by_id(bus, src_id, dst_id, id, data, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -815,8 +854,11 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 1
opt["PADDING"] = padding unless padding.nil?
client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27] + id + data, opt) client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x27] + id + data, opt)
end end
@ -871,10 +913,11 @@ module UDS
# @param bus [String] unique CAN bus identifier # @param bus [String] unique CAN bus identifier
# @param src_id [Integer] Integer representation of the Sending CAN ID # @param src_id [Integer] Integer representation of the Sending CAN ID
# @param dst_id [Integer] Integer representation of the receiving CAN ID # @param dst_id [Integer] Integer representation of the receiving CAN ID
# @param suppress_response [Boolean] By default suppress ACK from ECU. Set to false if you want confirmation # @param opt [Hash] Optional arguments such as: PADDING if set uses this hex value for padding
# SUPPRESS_RESPONSE By default suppress ACK from ECU. Set to false if you want confirmation
# #
# @return [Hash] Packet hash from client.automotive. Typically blank unless suppress_response is false # @return [Hash] Packet hash from client.automotive. Typically blank unless suppress_response is false
def send_tester_present(bus, src_id, dst_id, suppress_response = true) def send_tester_present(bus, src_id, dst_id, opt = {})
unless client.automotive unless client.automotive
print_error("Not an automotive hwbridge session") print_error("Not an automotive hwbridge session")
return {} return {}
@ -886,10 +929,13 @@ module UDS
print_line("No active bus, use 'connect' or specify bus via the options") print_line("No active bus, use 'connect' or specify bus via the options")
return {} return {}
end end
padding = nil
suppress = 0x80 suppress = 0x80
suppress = 0 unless suppress_response suppress = 0 unless (opt.key? 'SUPRESS_RESPONSE') && opt['SUPRESS_RESPONSE'] == false
padding = opt['PADDING'] if opt.key? 'PADDING'
opt = {} opt = {}
opt["MAXPKTS"] = 1 opt["MAXPKTS"] = 1
opt["PADDING"] = padding unless padding.nil?
client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x3E, suppress], opt) client.automotive.send_isotp_and_wait_for_response(bus, src_id, dst_id, [0x3E, suppress], opt)
end end

View File

@ -81,6 +81,18 @@ class Automotive < Extension
arr.map { |b| "%02x" % (b.respond_to?("hex") ? b.hex : b ) } arr.map { |b| "%02x" % (b.respond_to?("hex") ? b.hex : b ) }
end end
#
# Pad the end of a packet with a set byte until it is 8 bytes long
#
# @param data [Array] Packet to padd
# @param padding [Integer] Expected single byte 0x00 style argument
# @return [Array] Packet as data
def padd_packet(data, padding)
return data if padding.nil?
return data if data.size > 7
data + [ padding ] * (8 - data.size)
end
def set_active_bus(bus) def set_active_bus(bus)
self.active_bus = bus self.active_bus = bus
end end
@ -107,6 +119,7 @@ class Automotive < Extension
# TODO: Implement sending ISO-TP > 8 bytes # TODO: Implement sending ISO-TP > 8 bytes
data = [ data ] if data.is_a? Integer data = [ data ] if data.is_a? Integer
if data.size < 8 if data.size < 8
data = padd_packet(data, opt['PADDING']) if opt.key? 'PADDING'
data = array2hex(data).join data = array2hex(data).join
request_str = "/automotive/#{bus}/isotpsend_and_wait?srcid=#{src_id}&dstid=#{dst_id}&data=#{data}" request_str = "/automotive/#{bus}/isotpsend_and_wait?srcid=#{src_id}&dstid=#{dst_id}&data=#{data}"
request_str += "&timeout=#{opt['TIMEOUT']}" if opt.key? "TIMEOUT" request_str += "&timeout=#{opt['TIMEOUT']}" if opt.key? "TIMEOUT"

View File

@ -23,6 +23,7 @@ class MetasploitModule < Msf::Post
register_options([ register_options([
OptInt.new('SRCID', [true, "Module ID to query", 0x7e0]), OptInt.new('SRCID', [true, "Module ID to query", 0x7e0]),
OptInt.new('DSTID', [false, "Expected reponse ID, defaults to SRCID + 8", 0x7e8]), OptInt.new('DSTID', [false, "Expected reponse ID, defaults to SRCID + 8", 0x7e8]),
OptInt.new('PADDING', [false, "Optinal end of packet padding", nil]),
OptBool.new('CLEAR_DTCS', [false, "Clear any DTCs and reset MIL if errors are present", false]), OptBool.new('CLEAR_DTCS', [false, "Clear any DTCs and reset MIL if errors are present", false]),
OptString.new('CANBUS', [false, "CAN Bus to perform scan on, defaults to connected bus", nil]) OptString.new('CANBUS', [false, "CAN Bus to perform scan on, defaults to connected bus", nil])
]) ])
@ -30,7 +31,9 @@ class MetasploitModule < Msf::Post
end end
def run def run
pids = get_current_data_pids(datastore["CANBUS"], datastore["SRCID"], datastore["DSTID"]) opt = {}
opt['PADDING'] = datastore["PADDING"] if datastore["PADDING"]
pids = get_current_data_pids(datastore["CANBUS"], datastore["SRCID"], datastore["DSTID"], opt)
if pids.size == 0 if pids.size == 0
print_status("No reported PIDs. You may not be properly connected") print_status("No reported PIDs. You may not be properly connected")
else else
@ -38,26 +41,26 @@ class MetasploitModule < Msf::Post
print_status(" #{pids.inspect}") print_status(" #{pids.inspect}")
end end
if pids.include? 1 if pids.include? 1
data = get_monitor_status(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) data = get_monitor_status(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status(" MIL (Engine Light) : #{data['MIL'] ? 'ON' : 'OFF'}") if data.key? "MIL" print_status(" MIL (Engine Light) : #{data['MIL'] ? 'ON' : 'OFF'}") if data.key? "MIL"
print_status(" Number of DTCs: #{data['DTC_COUNT']}") if data.key? "DTC_COUNT" print_status(" Number of DTCs: #{data['DTC_COUNT']}") if data.key? "DTC_COUNT"
end end
if pids.include? 5 if pids.include? 5
data = get_engine_coolant_temp(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) data = get_engine_coolant_temp(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status(" Engine Temp: #{data['TEMP_C']} \u00b0C / #{data['TEMP_F']} \u00b0F") if data.key? "TEMP_C" print_status(" Engine Temp: #{data['TEMP_C']} \u00b0C / #{data['TEMP_F']} \u00b0F") if data.key? "TEMP_C"
end end
if pids.include? 0x0C if pids.include? 0x0C
data = get_rpms(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) data = get_rpms(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status(" RPMS: #{data['RPM']}") if data.key? "RPM" print_status(" RPMS: #{data['RPM']}") if data.key? "RPM"
end end
if pids.include? 0x0D if pids.include? 0x0D
data = get_vehicle_speed(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) data = get_vehicle_speed(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status(" Speed: #{data['SPEED_K']} km/h / #{data['SPEED_M']} mph") if data.key? "SPEED_K" print_status(" Speed: #{data['SPEED_K']} km/h / #{data['SPEED_M']} mph") if data.key? "SPEED_K"
end end
if pids.include? 0x1C if pids.include? 0x1C
print_status("Supported OBD Standards: #{get_obd_standards(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'])}") print_status("Supported OBD Standards: #{get_obd_standards(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)}")
end end
dtcs = get_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) dtcs = get_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
unless dtcs.empty? unless dtcs.empty?
print_status("DTCS:") print_status("DTCS:")
dtcs.each do |dtc| dtcs.each do |dtc|
@ -66,7 +69,7 @@ class MetasploitModule < Msf::Post
print_status(" #{msg}") print_status(" #{msg}")
end end
end end
frozen_dtcs = get_frozen_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) frozen_dtcs = get_frozen_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
unless frozen_dtcs.empty? unless frozen_dtcs.empty?
print_status("Frozen DTCS:") print_status("Frozen DTCS:")
frozen_dtcs.each do |dtc| frozen_dtcs.each do |dtc|
@ -75,27 +78,27 @@ class MetasploitModule < Msf::Post
print_status(" #{msg}") print_status(" #{msg}")
end end
end end
pids = get_vinfo_supported_pids(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) pids = get_vinfo_supported_pids(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("Mode $09 Vehicle Info Supported PIDS: #{pids.inspect}") if pids.size > 0 print_status("Mode $09 Vehicle Info Supported PIDS: #{pids.inspect}") if pids.size > 0
pids.each do |pid| pids.each do |pid|
# Handle known pids # Handle known pids
if pid == 2 if pid == 2
vin = get_vin(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) vin = get_vin(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("VIN: #{vin}") print_status("VIN: #{vin}")
elsif pid == 4 elsif pid == 4
calid = get_calibration_id(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) calid = get_calibration_id(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("Calibration ID: #{calid}") print_status("Calibration ID: #{calid}")
elsif pid == 0x0A elsif pid == 0x0A
ecuname = get_ecu_name(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) ecuname = get_ecu_name(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("ECU Name: #{ecuname}") print_status("ECU Name: #{ecuname}")
else else
data = get_vehicle_info(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], pid) data = get_vehicle_info(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], pid, opt)
data = response_hash_to_data_array(datastore['DSTID'].to_s(16), data) data = response_hash_to_data_array(datastore['DSTID'].to_s(16), data)
print_status("PID #{pid} Response: #{data.inspect}") print_status("PID #{pid} Response: #{data.inspect}")
end end
end end
if datastore['CLEAR_DTCS'] == true if datastore['CLEAR_DTCS'] == true
clear_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID']) clear_dtcs(datastore['CANBUS'], datastore['SRCID'], datastore['DSTID'], opt)
print_status("Cleared DTCs and reseting MIL") print_status("Cleared DTCs and reseting MIL")
end end
end end