Browse Source

Replaced tabs with spaces

tags/0.3.1
Johann Schmitz 3 years ago
parent
commit
199b5395f9
6 changed files with 692 additions and 692 deletions
  1. +13
    -13
      setup.py
  2. +341
    -341
      src/smartcheck/check.py
  3. +35
    -35
      src/smartcheck/convert.py
  4. +77
    -77
      src/smartcheck/main.py
  5. +141
    -141
      tests/check.py
  6. +85
    -85
      tests/parsing.py

+ 13
- 13
setup.py View File

@@ -4,17 +4,17 @@
from setuptools import setup, find_packages

setup(
name='smart-check',
version='0.3',
description='A smart S.M.A.R.T. check',
author='Johann Schmitz',
author_email='johann@j-schmitz.net',
url='https://ercpe.de/projects/smart-check',
download_url='https://code.not-your-server.de/smart-check.git/tags/',
packages=find_packages('src'),
package_dir={'': 'src'},
include_package_data=True,
package_data = {'': ['*.yaml']},
zip_safe=False,
license='GPL-3',
name='smart-check',
version='0.3',
description='A smart S.M.A.R.T. check',
author='Johann Schmitz',
author_email='johann@j-schmitz.net',
url='https://ercpe.de/projects/smart-check',
download_url='https://code.not-your-server.de/smart-check.git/tags/',
packages=find_packages('src'),
package_dir={'': 'src'},
include_package_data=True,
package_data = {'': ['*.yaml']},
zip_safe=False,
license='GPL-3',
)

+ 341
- 341
src/smartcheck/check.py View File

@@ -14,16 +14,16 @@ TESTS_SECTION_START = 'SMART Self-test log structure revision number'
ATA_ERROR_COUNT = re.compile('^ATA Error Count: (\d+).*', re.MULTILINE | re.IGNORECASE)

INFORMATION_RE = [
("model_family", re.compile('Model Family: (.*)', re.UNICODE)),
("device_model", re.compile("(?:Device Model|Product): (.*)", re.UNICODE)),
("serial", re.compile("Serial Number: (.*)", re.UNICODE | re.IGNORECASE)),
("firmware_version", re.compile("Firmware version: (.*)", re.UNICODE)),
("ata_version", re.compile("ATA Version is: (.*)", re.UNICODE)),
("sata_version", re.compile("SATA Version is: (.*)", re.UNICODE)),
("model_family", re.compile('Model Family: (.*)', re.UNICODE)),
("device_model", re.compile("(?:Device Model|Product): (.*)", re.UNICODE)),
("serial", re.compile("Serial Number: (.*)", re.UNICODE | re.IGNORECASE)),
("firmware_version", re.compile("Firmware version: (.*)", re.UNICODE)),
("ata_version", re.compile("ATA Version is: (.*)", re.UNICODE)),
("sata_version", re.compile("SATA Version is: (.*)", re.UNICODE)),
]

DATA_RE = [
('overall_health_status', re.compile('SMART overall-health self-assessment test result: (.*)', re.UNICODE)),
('overall_health_status', re.compile('SMART overall-health self-assessment test result: (.*)', re.UNICODE)),
]
DATA_ATTRIBUTES_RE = re.compile(r"\s*(\d+)\s+([\w\d_\-]+)\s+([0-9a-fx]+)\s+(\d+)\s+(\d+)\s+(\d+)\s+([\w\d_\-]+)\s+([\w\d]+)\s+([\w\d_\-]+)\s+([^\r\n]*)", re.UNICODE)

@@ -31,352 +31,352 @@ TEST_RESULT_RE = re.compile(r"#\s*(\d+)\s+(.*?)\s{2,}(.*?)\s{2,}\s+([\d%]+)\s+(\


def toint(s, default=0):
try:
return int(s)
except ValueError:
return default
try:
return int(s)
except ValueError:
return default


class AttributeWarning(object):
Notice = 'NOTICE'
Warning = 'WARNING'
Critical = 'CRITICAL'
Notice = 'NOTICE'
Warning = 'WARNING'
Critical = 'CRITICAL'

def __init__(self, level=None, attribute_name=None, value=None, description=None):
self.level = level
self.field = attribute_name
self.value = value
self.description = (description or '').strip()
def __init__(self, level=None, attribute_name=None, value=None, description=None):
self.level = level
self.field = attribute_name
self.value = value
self.description = (description or '').strip()

@property
def short_message(self):
return "%s: %s=%s" % (self.level or '?', self.field, self.value)
@property
def short_message(self):
return "%s: %s=%s" % (self.level or '?', self.field, self.value)

@property
def long_message(self):
s = self.short_message
@property
def long_message(self):
s = self.short_message

if self.description:
s += ": %s" % self.description
if self.description:
s += ": %s" % self.description

return s
return s

def __str__(self):
return self.short_message
def __str__(self):
return self.short_message

def __repr__(self):
return self.short_message
def __repr__(self):
return self.short_message

def __eq__(self, other):
return isinstance(other, AttributeWarning) and \
self.level is not None and self.level == other.level and \
self.field is not None and self.field == other.field and \
self.value is not None and self.value == other.value
def __eq__(self, other):
return isinstance(other, AttributeWarning) and \
self.level is not None and self.level == other.level and \
self.field is not None and self.field == other.field and \
self.value is not None and self.value == other.value


class SMARTCheck(object):

def __init__(self, file_or_string, db_path=None):
if hasattr(file_or_string, 'read'):
self.raw = file_or_string.read()
elif isinstance(file_or_string, str) or (sys.version_info[0] == 2 and isinstance(file_or_string, unicode)):
self.raw = file_or_string
elif isinstance(file_or_string, bytes):
self.raw = file_or_string.decode('UTF-8')
else:
raise Exception("Unknown type: %s" % type(file_or_string))
self.parsed_sections = None
self.db_path = db_path
self._database = None
@property
def information(self):
return self.parsed.get('information', {})
@property
def smart_data(self):
return self.parsed.get('data', {})
@property
def self_tests(self):
return self.parsed.get('self_tests', {})
@property
def parsed(self):
if not self.parsed_sections:
self.parsed_sections = self.parse()
return self.parsed_sections
@property
def database(self):
if self._database is None:
if self.db_path:
with open(self.db_path) as f:
self._database = yaml.load(f) or {}
else:
self._database = []
return self._database
@property
def device_model(self):
return self.information['device_model']
@property
def ata_error_count(self):
return self.parsed['ata_error_count']
def exists_in_database(self):
return self.get_attributes_from_database(self.device_model) is not None
def get_attributes_from_database(self, device_model):
for dev in self.database:
device_regexprs = dev['model'] if isinstance(dev['model'], list) else [dev['model']]
if any(re.match(r, device_model, re.IGNORECASE) for r in device_regexprs):
logging.debug("Device exists in database (one of %s matches %s)" % (device_regexprs, self.device_model))
return dev['attributes']
logging.debug("Device does not exist in database")
return None
def parse(self):
return {
'information': self.parse_information_section(self.raw),
'data': self.parse_data_section(self.raw),
'self_tests': self.parse_tests_section(self.raw),
'ata_error_count': self.parse_ata_error_count(self.raw),
}
@property
def data_parsed(self):
return 'attributes' in self.smart_data
def parse_information_section(self, s):
if INFORMATION_SECTION_START not in s:
return {}
start = s.index(INFORMATION_SECTION_START)
if DATA_SECTION_START not in s:
end = len(s)
else:
end = s.index(DATA_SECTION_START)
information_text = s[start:end]
d = {}
for k, regex in INFORMATION_RE:
m = regex.search(information_text)
if m:
d[k] = m.group(1).strip() if m.group(1) else ''
return d
def parse_data_section(self, s):
if DATA_SECTION_START not in s:
logging.info("No data section found")
return {}
start = s.index(DATA_SECTION_START)
data_text = s[start:]
d = {}
for k, regex in DATA_RE:
m = regex.search(data_text)
if m:
d[k] = m.group(1).strip() if m.group(1) else ''
d['attributes'] = sorted(DATA_ATTRIBUTES_RE.findall(s), key=lambda t: int(t[0]))
return d
def parse_tests_section(self, s):
if TESTS_SECTION_START not in s:
return {
'test_results': []
}
start = s.index(TESTS_SECTION_START)
end = re.search(r'(\r\n\r\n|\n\n|\r\r)', s[start+1:], re.MULTILINE)
end = start + end.end(0) if end else len(s)
tests_text = s[start:end]
return {
'test_results': TEST_RESULT_RE.findall(tests_text)
}
def parse_ata_error_count(self, s):
m = ATA_ERROR_COUNT.search(s)
if m:
return int(m.group(1))
return 0
def check(self, ignore_attributes=None):
return len(self.check_attributes(ignore_attributes or [])) == 0 and self.check_tests() and self.ata_error_count == 0
def check_tests(self):
ok_test_results = [
'Completed without error',
'Interrupted (host reset)', # reboot during self test
'Aborted by host'
]
return not any([x[2] not in ok_test_results for x in self.self_tests['test_results']])
def check_attributes(self, ignore_attributes=None):
failed_attributes = self.check_generic_attributes()
if self.exists_in_database():
failed_attributes.update(self.check_device_attributes())
# remove every AttributeWarning from failed_attributes based on ignore_attributes
for attr_id_or_name in ignore_attributes or []:
del_keys = []
if isinstance(attr_id_or_name, int) or attr_id_or_name.isdigit():
del_keys = [k for k in failed_attributes.keys() if k[0] == int(attr_id_or_name)]
else:
del_keys = [k for k in failed_attributes.keys() if k[1] == attr_id_or_name]
for x in del_keys:
del failed_attributes[x]
return failed_attributes
def check_generic_attributes(self):
failed_attributes = {}
for attrid, name, flag, value, worst, thresh, attr_type, updated, when_failed, raw_value in self.smart_data['attributes']:
logging.debug("Attribute %s (%s): value=%s, raw value=%s" % (attrid, name, value, raw_value))
attrid = int(attrid)
attr_name = (name or '').lower()
int_value = toint(value)
int_raw_value = toint(raw_value)
int_thresh = toint(thresh)
# these tests are take from gsmartcontrol (storage_property_descr.cpp) and check for known pre-fail attributes
if attr_name in ('reallocated_sector_count', 'reallocated_sector_ct') and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name == 'spin_up_retry_count' and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"Your drive may have problems spinning up, which could lead to a complete mechanical failure.")
elif attr_name == "soft_read_error_rate" and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name in ("temperature_celsius", "temperature_celsius_x10"):
if 50 <= int_raw_value <= 120:
# Temperature (for some it may be 10xTemp, so limit the upper bound.)
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
int_raw_value,
"The temperature of the drive is higher than 50 degrees Celsius. " +
"This may shorten its lifespan and cause damage under severe load.")
elif int_raw_value > 500:
# Temperature (for some it may be 10xTemp, so limit the upper bound.)
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
int_raw_value,
"The temperature of the drive is higher than 50 degrees Celsius. " +
"This may shorten its lifespan and cause damage under severe load.")
elif attr_name == "reallocation_event_count" and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name in ("current_pending_sector", "current_pending_sector_count", "total_pending_sectors") and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name in ("offline_uncorrectable", "total_offline_uncorrectable") and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name == "ssd_life_left" and int_value < 50:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has less than half of its life left.")
else:
# execute a generic check for value < threshold
if int_value and int_thresh:
if int_value < int_thresh:
failed_attributes[(attrid, name)] = AttributeWarning(
AttributeWarning.Warning if attr_type == 'Pre-fail' else AttributeWarning.Notice,
name,
raw_value,
"Attribute value dropped below threshold of %s" % int_thresh)
logging.debug("Failed generic attributes: %s" % (failed_attributes, ))
return failed_attributes
def check_device_attributes(self):
device_model = self.device_model
device_db_attributes = self.get_attributes_from_database(device_model)
threshold_from = re.compile('^(\d+):$')
threshold_to = re.compile('^:(\d+)$')
threshold_from_to = re.compile('^(\d+):(\d+)$')
failed_attributes = {}
for attrid, name, flag, value, worst, tresh, type, updated, when_failed, raw_value in self.smart_data['attributes']:
attrid = int(attrid)
if attrid in device_db_attributes:
db_attrs = device_db_attributes[attrid]
if isinstance(db_attrs, list):
value_field, min_value, max_value = tuple(device_db_attributes[int(attrid)])
check_value = value if value_field == "VALUE" else raw_value
check_value = int(check_value or -1)
if not (int(min_value) <= check_value <= int(max_value)):
logging.info("Attribute %s (%s) failed: not %s <= %s <= %s" % (attrid, name, min_value, check_value, max_value))
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Critical, name, check_value)
elif isinstance(db_attrs, dict):
value_field = db_attrs.get('field', 'RAW_VALUE')
check_value = value if value_field == "VALUE" else raw_value
check_value = int(check_value or -1)
min_value = db_attrs.get('min', None)
max_value = db_attrs.get('max', None)
if min_value is None and max_value is None:
for failure_type, threshold_key in [('WARNING', 'warn_threshold'), ('CRITICAL', 'crit_threshold')]:
if threshold_key not in db_attrs:
continue
v = db_attrs.get(threshold_key)
from_m = threshold_from.match(v)
to_m = threshold_to.match(v)
from_to_m = threshold_from_to.match(v)
if (from_m and check_value >= int(from_m.group(1))) or \
(to_m and check_value <= int(to_m.group(1))) or \
(from_to_m and (int(from_to_m.group(1)) <= check_value <= int(from_to_m.group(2)))):
logging.info("Attribute %s (%s) failed with %s: not within treshold %s" % (attrid, name, failure_type, v))
failed_attributes[(attrid, name)] = AttributeWarning(failure_type, name, check_value)
else:
if (min_value is not None and check_value >= int(min_value)) or \
(max_value is not None and check_value <= int(max_value)):
logging.info("Attribute %s (%s) failed: not %s >= %s <= %s" % (attrid, name, min_value, check_value, max_value))
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Critical, name, check_value)
else:
raise ValueError("Unknown attribute specification: %s" % db_attrs)
return failed_attributes
def __init__(self, file_or_string, db_path=None):
if hasattr(file_or_string, 'read'):
self.raw = file_or_string.read()
elif isinstance(file_or_string, str) or (sys.version_info[0] == 2 and isinstance(file_or_string, unicode)):
self.raw = file_or_string
elif isinstance(file_or_string, bytes):
self.raw = file_or_string.decode('UTF-8')
else:
raise Exception("Unknown type: %s" % type(file_or_string))
self.parsed_sections = None
self.db_path = db_path
self._database = None
@property
def information(self):
return self.parsed.get('information', {})
@property
def smart_data(self):
return self.parsed.get('data', {})
@property
def self_tests(self):
return self.parsed.get('self_tests', {})
@property
def parsed(self):
if not self.parsed_sections:
self.parsed_sections = self.parse()
return self.parsed_sections
@property
def database(self):
if self._database is None:
if self.db_path:
with open(self.db_path) as f:
self._database = yaml.load(f) or {}
else:
self._database = []
return self._database
@property
def device_model(self):
return self.information['device_model']
@property
def ata_error_count(self):
return self.parsed['ata_error_count']
def exists_in_database(self):
return self.get_attributes_from_database(self.device_model) is not None
def get_attributes_from_database(self, device_model):
for dev in self.database:
device_regexprs = dev['model'] if isinstance(dev['model'], list) else [dev['model']]
if any(re.match(r, device_model, re.IGNORECASE) for r in device_regexprs):
logging.debug("Device exists in database (one of %s matches %s)" % (device_regexprs, self.device_model))
return dev['attributes']
logging.debug("Device does not exist in database")
return None
def parse(self):
return {
'information': self.parse_information_section(self.raw),
'data': self.parse_data_section(self.raw),
'self_tests': self.parse_tests_section(self.raw),
'ata_error_count': self.parse_ata_error_count(self.raw),
}
@property
def data_parsed(self):
return 'attributes' in self.smart_data
def parse_information_section(self, s):
if INFORMATION_SECTION_START not in s:
return {}
start = s.index(INFORMATION_SECTION_START)
if DATA_SECTION_START not in s:
end = len(s)
else:
end = s.index(DATA_SECTION_START)
information_text = s[start:end]
d = {}
for k, regex in INFORMATION_RE:
m = regex.search(information_text)
if m:
d[k] = m.group(1).strip() if m.group(1) else ''
return d
def parse_data_section(self, s):
if DATA_SECTION_START not in s:
logging.info("No data section found")
return {}
start = s.index(DATA_SECTION_START)
data_text = s[start:]
d = {}
for k, regex in DATA_RE:
m = regex.search(data_text)
if m:
d[k] = m.group(1).strip() if m.group(1) else ''
d['attributes'] = sorted(DATA_ATTRIBUTES_RE.findall(s), key=lambda t: int(t[0]))
return d
def parse_tests_section(self, s):
if TESTS_SECTION_START not in s:
return {
'test_results': []
}
start = s.index(TESTS_SECTION_START)
end = re.search(r'(\r\n\r\n|\n\n|\r\r)', s[start+1:], re.MULTILINE)
end = start + end.end(0) if end else len(s)
tests_text = s[start:end]
return {
'test_results': TEST_RESULT_RE.findall(tests_text)
}
def parse_ata_error_count(self, s):
m = ATA_ERROR_COUNT.search(s)
if m:
return int(m.group(1))
return 0
def check(self, ignore_attributes=None):
return len(self.check_attributes(ignore_attributes or [])) == 0 and self.check_tests() and self.ata_error_count == 0
def check_tests(self):
ok_test_results = [
'Completed without error',
'Interrupted (host reset)', # reboot during self test
'Aborted by host'
]
return not any([x[2] not in ok_test_results for x in self.self_tests['test_results']])
def check_attributes(self, ignore_attributes=None):
failed_attributes = self.check_generic_attributes()
if self.exists_in_database():
failed_attributes.update(self.check_device_attributes())
# remove every AttributeWarning from failed_attributes based on ignore_attributes
for attr_id_or_name in ignore_attributes or []:
del_keys = []
if isinstance(attr_id_or_name, int) or attr_id_or_name.isdigit():
del_keys = [k for k in failed_attributes.keys() if k[0] == int(attr_id_or_name)]
else:
del_keys = [k for k in failed_attributes.keys() if k[1] == attr_id_or_name]
for x in del_keys:
del failed_attributes[x]
return failed_attributes
def check_generic_attributes(self):
failed_attributes = {}
for attrid, name, flag, value, worst, thresh, attr_type, updated, when_failed, raw_value in self.smart_data['attributes']:
logging.debug("Attribute %s (%s): value=%s, raw value=%s" % (attrid, name, value, raw_value))
attrid = int(attrid)
attr_name = (name or '').lower()
int_value = toint(value)
int_raw_value = toint(raw_value)
int_thresh = toint(thresh)
# these tests are take from gsmartcontrol (storage_property_descr.cpp) and check for known pre-fail attributes
if attr_name in ('reallocated_sector_count', 'reallocated_sector_ct') and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name == 'spin_up_retry_count' and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"Your drive may have problems spinning up, which could lead to a complete mechanical failure.")
elif attr_name == "soft_read_error_rate" and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name in ("temperature_celsius", "temperature_celsius_x10"):
if 50 <= int_raw_value <= 120:
# Temperature (for some it may be 10xTemp, so limit the upper bound.)
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
int_raw_value,
"The temperature of the drive is higher than 50 degrees Celsius. " +
"This may shorten its lifespan and cause damage under severe load.")
elif int_raw_value > 500:
# Temperature (for some it may be 10xTemp, so limit the upper bound.)
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
int_raw_value,
"The temperature of the drive is higher than 50 degrees Celsius. " +
"This may shorten its lifespan and cause damage under severe load.")
elif attr_name == "reallocation_event_count" and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name in ("current_pending_sector", "current_pending_sector_count", "total_pending_sectors") and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name in ("offline_uncorrectable", "total_offline_uncorrectable") and int_raw_value > 0:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has a non-zero Raw value, but there is no SMART warning yet. " +
"This could be an indication of future failures and/or potential data loss in bad sectors.")
elif attr_name == "ssd_life_left" and int_value < 50:
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
name,
raw_value,
"The drive has less than half of its life left.")
else:
# execute a generic check for value < threshold
if int_value and int_thresh:
if int_value < int_thresh:
failed_attributes[(attrid, name)] = AttributeWarning(
AttributeWarning.Warning if attr_type == 'Pre-fail' else AttributeWarning.Notice,
name,
raw_value,
"Attribute value dropped below threshold of %s" % int_thresh)
logging.debug("Failed generic attributes: %s" % (failed_attributes, ))
return failed_attributes
def check_device_attributes(self):
device_model = self.device_model
device_db_attributes = self.get_attributes_from_database(device_model)
threshold_from = re.compile('^(\d+):$')
threshold_to = re.compile('^:(\d+)$')
threshold_from_to = re.compile('^(\d+):(\d+)$')
failed_attributes = {}
for attrid, name, flag, value, worst, tresh, type, updated, when_failed, raw_value in self.smart_data['attributes']:
attrid = int(attrid)
if attrid in device_db_attributes:
db_attrs = device_db_attributes[attrid]
if isinstance(db_attrs, list):
value_field, min_value, max_value = tuple(device_db_attributes[int(attrid)])
check_value = value if value_field == "VALUE" else raw_value
check_value = int(check_value or -1)
if not (int(min_value) <= check_value <= int(max_value)):
logging.info("Attribute %s (%s) failed: not %s <= %s <= %s" % (attrid, name, min_value, check_value, max_value))
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Critical, name, check_value)
elif isinstance(db_attrs, dict):
value_field = db_attrs.get('field', 'RAW_VALUE')
check_value = value if value_field == "VALUE" else raw_value
check_value = int(check_value or -1)
min_value = db_attrs.get('min', None)
max_value = db_attrs.get('max', None)
if min_value is None and max_value is None:
for failure_type, threshold_key in [('WARNING', 'warn_threshold'), ('CRITICAL', 'crit_threshold')]:
if threshold_key not in db_attrs:
continue
v = db_attrs.get(threshold_key)
from_m = threshold_from.match(v)
to_m = threshold_to.match(v)
from_to_m = threshold_from_to.match(v)
if (from_m and check_value >= int(from_m.group(1))) or \
(to_m and check_value <= int(to_m.group(1))) or \
(from_to_m and (int(from_to_m.group(1)) <= check_value <= int(from_to_m.group(2)))):
logging.info("Attribute %s (%s) failed with %s: not within treshold %s" % (attrid, name, failure_type, v))
failed_attributes[(attrid, name)] = AttributeWarning(failure_type, name, check_value)
else:
if (min_value is not None and check_value >= int(min_value)) or \
(max_value is not None and check_value <= int(max_value)):
logging.info("Attribute %s (%s) failed: not %s >= %s <= %s" % (attrid, name, min_value, check_value, max_value))
failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Critical, name, check_value)
else:
raise ValueError("Unknown attribute specification: %s" % db_attrs)
return failed_attributes

+ 35
- 35
src/smartcheck/convert.py View File

@@ -19,38 +19,38 @@ d = json.loads(s)
#pprint.pprint(d)

for group_name, device in sorted(d['Devices'].items(), key=lambda x: x[1]['Device'][0]):
print("")
if len(device['Device']) > 1:
print("- model:")
print('\n'.join(' - "%s"' % x for x in device['Device']))
else:
print('- model: "%s"' % device['Device'][0])
attribs = device['ID#']
if attribs:
thresholds = device['Threshs']
print(" attributes:")
for attribute_id, (field, description) in sorted(attribs.items(), key=lambda x: int(x[0])):
attrib_thresholds = thresholds.get(str(attribute_id), None)
if attrib_thresholds:
thresh1, thresh2 = tuple(attrib_thresholds)
if ':' in thresh1 and ':' in thresh2:
def fix_threshold(s):
if s.endswith(':'):
return ":%s" % s[:-1]
elif s.startswith(":"):
return "%s:" % s[1:]
else:
return s
print(" %s: # %s" % (int(attribute_id), description))
if field != 'RAW_VALUE':
print(' field: "%s"' % field)
print(' warn_threshold: "%s"' % fix_threshold(thresh1))
print(' crit_threshold: "%s"' % fix_threshold(thresh2))
else:
if 'WDC WD2000FYYZ-01UL1B0' in device['Device']:
thresh1 = 0
print(' %s: ["%s", %s, %s] # %s' % (int(attribute_id), field, thresh1, thresh2, description))
print("")
if len(device['Device']) > 1:
print("- model:")
print('\n'.join(' - "%s"' % x for x in device['Device']))
else:
print('- model: "%s"' % device['Device'][0])
attribs = device['ID#']
if attribs:
thresholds = device['Threshs']
print(" attributes:")
for attribute_id, (field, description) in sorted(attribs.items(), key=lambda x: int(x[0])):
attrib_thresholds = thresholds.get(str(attribute_id), None)
if attrib_thresholds:
thresh1, thresh2 = tuple(attrib_thresholds)
if ':' in thresh1 and ':' in thresh2:
def fix_threshold(s):
if s.endswith(':'):
return ":%s" % s[:-1]
elif s.startswith(":"):
return "%s:" % s[1:]
else:
return s
print(" %s: # %s" % (int(attribute_id), description))
if field != 'RAW_VALUE':
print(' field: "%s"' % field)
print(' warn_threshold: "%s"' % fix_threshold(thresh1))
print(' crit_threshold: "%s"' % fix_threshold(thresh2))
else:
if 'WDC WD2000FYYZ-01UL1B0' in device['Device']:
thresh1 = 0
print(' %s: ["%s", %s, %s] # %s' % (int(attribute_id), field, thresh1, thresh2, description))

+ 77
- 77
src/smartcheck/main.py View File

@@ -10,101 +10,101 @@ from smartcheck.check import SMARTCheck, AttributeWarning, DEFAULT_DISKS_FILE


def execute_smartctl(drive, interface=None, sudo=None, smartctl_path=None, smartctl_args=''):
command_line = "%s %s %s %s -a %s" % (
"sudo" if sudo else '',
smartctl_path,
'-d %s' % interface if interface else '',
smartctl_args,
drive
)
logging.debug("Executing smartctl command: %s" % command_line)
cmd = shlex.split(command_line)
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={'LC_ALL': 'C'})
output = process.communicate()[0]
# TODO: See if we can get hints from the smartctl exit codes:
# https://www.freebsd.org/cgi/man.cgi?query=smartctl&manpath=FreeBSD+9.0-RELEASE+and+Ports&format=html#RETURN_VALUES
# at least we should not handle if the drive is in low-power mode (spindown)
# if process.returncode:
# raise Exception("smartctl failed with status code %s" % process.returncode)
return output
command_line = "%s %s %s %s -a %s" % (
"sudo" if sudo else '',
smartctl_path,
'-d %s' % interface if interface else '',
smartctl_args,
drive
)
logging.debug("Executing smartctl command: %s" % command_line)
cmd = shlex.split(command_line)
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={'LC_ALL': 'C'})
output = process.communicate()[0]
# TODO: See if we can get hints from the smartctl exit codes:
# https://www.freebsd.org/cgi/man.cgi?query=smartctl&manpath=FreeBSD+9.0-RELEASE+and+Ports&format=html#RETURN_VALUES
# at least we should not handle if the drive is in low-power mode (spindown)
# if process.returncode:
# raise Exception("smartctl failed with status code %s" % process.returncode)
return output

if __name__ == "__main__":
parser = ArgumentParser()
parser = ArgumentParser()

parser.add_argument('--disks-file', default=DEFAULT_DISKS_FILE)
parser.add_argument('--disks-file', default=DEFAULT_DISKS_FILE)

parser.add_argument('-s', '--sudo', help="Use sudo to execute smartctl", action='store_true', default=False)
parser.add_argument('-s', '--sudo', help="Use sudo to execute smartctl", action='store_true', default=False)

parser.add_argument('--smartctl-path', default="/usr/sbin/smartctl", help='Path to smartctl (default: %(default)s)')
parser.add_argument('-a', '--smartctl-args', default='-n standby', help="Other arguments passed to smartctl (default: %(default)s)")
parser.add_argument('--smartctl-path', default="/usr/sbin/smartctl", help='Path to smartctl (default: %(default)s)')
parser.add_argument('-a', '--smartctl-args', default='-n standby', help="Other arguments passed to smartctl (default: %(default)s)")

parser.add_argument('-i', '--interface', help="The smartctl interface specification (passed to smartctl's -d parameter")
parser.add_argument('drive', type=str, nargs='?', help="The device as passed to smartctl's positional argument")
parser.add_argument('-f', '--file', help="Use S.M.A.R.T. report from file instead of calling smartctl (Use - to read from stdin)")
parser.add_argument('-i', '--interface', help="The smartctl interface specification (passed to smartctl's -d parameter")
parser.add_argument('drive', type=str, nargs='?', help="The device as passed to smartctl's positional argument")
parser.add_argument('-f', '--file', help="Use S.M.A.R.T. report from file instead of calling smartctl (Use - to read from stdin)")

parser.add_argument('-x', '--exclude-notices', help='Do not report NOTICE warnings (default: %(default)s)', action='store_true', default=False)
parser.add_argument('--ignore-attributes', help='Ignore this S.M.A.R.T. attributes (id or name)', nargs='*')
parser.add_argument('-v', '--verbose', help='Verbose messages', action='store_true', default=False)
parser.add_argument('--debug', help="Print debug messages", action="store_true", default=False)
parser.add_argument('-x', '--exclude-notices', help='Do not report NOTICE warnings (default: %(default)s)', action='store_true', default=False)
parser.add_argument('--ignore-attributes', help='Ignore this S.M.A.R.T. attributes (id or name)', nargs='*')
parser.add_argument('-v', '--verbose', help='Verbose messages', action='store_true', default=False)
parser.add_argument('--debug', help="Print debug messages", action="store_true", default=False)

args = parser.parse_args()
args = parser.parse_args()

if args.file and any([args.interface, args.drive]):
parser.error('-f/--file cannot be used with a device and/or -i/--interface')
if args.file and any([args.interface, args.drive]):
parser.error('-f/--file cannot be used with a device and/or -i/--interface')

if args.debug:
logging.basicConfig(level=logging.DEBUG, format='%(levelname)-8s %(message)s')
if args.debug:
logging.basicConfig(level=logging.DEBUG, format='%(levelname)-8s %(message)s')

exit_code = 0
msg = ""
try:
stream = None
if args.file:
if args.file == '-':
stream = sys.stdin
else:
stream = open(args.file, 'r')
else:
stream = execute_smartctl(args.drive, args.interface, args.sudo, args.smartctl_path, args.smartctl_args)
exit_code = 0
msg = ""
try:
stream = None
if args.file:
if args.file == '-':
stream = sys.stdin
else:
stream = open(args.file, 'r')
else:
stream = execute_smartctl(args.drive, args.interface, args.sudo, args.smartctl_path, args.smartctl_args)

check = SMARTCheck(stream, args.disks_file)
check = SMARTCheck(stream, args.disks_file)

if check.data_parsed:
attribute_errors = check.check_attributes()
if check.data_parsed:
attribute_errors = check.check_attributes()

if args.exclude_notices:
for k in [x for x, y in attribute_errors.items() if y.level == AttributeWarning.Notice]:
del attribute_errors[k]
if args.exclude_notices:
for k in [x for x, y in attribute_errors.items() if y.level == AttributeWarning.Notice]:
del attribute_errors[k]

if attribute_errors:
msg = ', '.join([ae.long_message if args.verbose else ae.short_message for ae in attribute_errors.values()])
if attribute_errors:
msg = ', '.join([ae.long_message if args.verbose else ae.short_message for ae in attribute_errors.values()])

if any((ae.level == AttributeWarning.Warning for ae in attribute_errors.values())):
exit_code = 1
if any((ae.level == AttributeWarning.Critical for ae in attribute_errors.values())):
exit_code = 2
if any((ae.level == AttributeWarning.Warning for ae in attribute_errors.values())):
exit_code = 1
if any((ae.level == AttributeWarning.Critical for ae in attribute_errors.values())):
exit_code = 2

if not check.check_tests():
msg = (msg.strip() + '; S.M.A.R.T. self test reported an error').lstrip(';').strip()
exit_code = 2
if not check.check_tests():
msg = (msg.strip() + '; S.M.A.R.T. self test reported an error').lstrip(';').strip()
exit_code = 2

if check.ata_error_count:
msg = (msg.strip() + '; %s ATA errors found' % check.ata_error_count).lstrip(';').strip()
exit_code = 2
if check.ata_error_count:
msg = (msg.strip() + '; %s ATA errors found' % check.ata_error_count).lstrip(';').strip()
exit_code = 2

if not exit_code:
msg = "S.M.A.R.T. data OK"
if not exit_code:
msg = "S.M.A.R.T. data OK"

msg = "%s: %s" % (check.device_model, msg)
else:
msg = "Could not read S.M.A.R.T. data (executed as root?)"
exit_code = 3
except Exception as ex:
msg = "Plugin failed: %s" % ex
if args.debug:
logging.exception("Plugin failed")
exit_code = 3
msg = "%s: %s" % (check.device_model, msg)
else:
msg = "Could not read S.M.A.R.T. data (executed as root?)"
exit_code = 3
except Exception as ex:
msg = "Plugin failed: %s" % ex
if args.debug:
logging.exception("Plugin failed")
exit_code = 3

print(msg)
sys.exit(exit_code)
print(msg)
sys.exit(exit_code)

+ 141
- 141
tests/check.py View File

@@ -8,88 +8,88 @@ db_path = os.path.join(samples_path, '../../src/smartcheck/disks.yaml')

class CheckTest(unittest.TestCase):

def test_check_broken1(self):
with open(os.path.join(samples_path, 'seagate-barracuda-broken1.txt')) as f:
check = SMARTCheck(f)
self.assertFalse(check.check_tests())
self.assertEqual(check.ata_error_count, 8)
self.assertFalse(check.check())
def test_check_broken2(self):
with open(os.path.join(samples_path, 'seagate-barracuda-broken2.txt')) as f:
check = SMARTCheck(f)
self.assertFalse(check.check_tests())
self.assertEqual(check.ata_error_count, 52)
self.assertFalse(check.check())
def test_check_broken3(self):
with open(os.path.join(samples_path, 'WDC-WD1000FYPS-01ZKB0.txt')) as f:
check = SMARTCheck(f)
self.assertTrue(check.check_tests()) # no test ran
self.assertEqual(check.ata_error_count, 32)
self.assertFalse(check.check())
def test_smart_attributes_not_found(self):
with open(os.path.join(samples_path, 'ST2000NM0033-9ZM175.txt')) as f:
check = SMARTCheck(f, db_path)
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), {}) # Attributes not found in disks.json
self.assertEqual(check.ata_error_count, 0)
self.assertTrue(check.check())
def test_smart_attributes_nothing_wrong(self):
with open(os.path.join(samples_path, 'WDC-WD2000FYYZ-01UL1B1.txt')) as f:
check = SMARTCheck(f, db_path)
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), {})
self.assertEqual(check.ata_error_count, 0)
self.assertTrue(check.check())
def test_smart_attributes_min_max(self):
# from list
with open(os.path.join(samples_path, 'ST2000NM0033-9ZM175.txt')) as f:
check = SMARTCheck(f, os.path.join(samples_path, 'disks-min-max.yaml'))
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), {
(9, 'Power_On_Hours'): AttributeWarning(AttributeWarning.Critical, 'Power_On_Hours', 16998)
})
self.assertEqual(check.ata_error_count, 0)
self.assertFalse(check.check())
# from dict
with open(os.path.join(samples_path, 'ST2000NM0033-9ZM175.txt')) as f:
check = SMARTCheck(f, os.path.join(samples_path, 'disks-min-or-max.yaml'))
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), {
(9, 'Power_On_Hours'): AttributeWarning(AttributeWarning.Critical, 'Power_On_Hours', 16998),
(194, 'Temperature_Celsius'): AttributeWarning(AttributeWarning.Critical, 'Temperature_Celsius', 30)
})
self.assertEqual(check.ata_error_count, 0)
self.assertFalse(check.check())
def test_smart_attributes_thresholds_min(self):
for sample_file, expected_attributes in [
# only warning
('disks-thresholds.yaml', {
(9, 'Power_On_Hours'): AttributeWarning(AttributeWarning.Warning, 'Power_On_Hours', 15360)
}),
# warning and critical - critical wins
('disks-thresholds-warn-and-crit.yaml', {
(9, 'Power_On_Hours'): AttributeWarning(AttributeWarning.Critical, 'Power_On_Hours', 15360)
}),
# warning threshold with range
('disks-thresholds-range.yaml', {
(4, 'Start_Stop_Count'): AttributeWarning(AttributeWarning.Warning, 'Start_Stop_Count', 2)
})
]:
with open(os.path.join(samples_path, 'WDC-WD2000FYYZ-01UL1B1.txt')) as f:
check = SMARTCheck(f, os.path.join(samples_path, sample_file))
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), expected_attributes)
self.assertFalse(check.check())
def test_generic_attributes(self):
base = """=== START OF INFORMATION SECTION ===
def test_check_broken1(self):
with open(os.path.join(samples_path, 'seagate-barracuda-broken1.txt')) as f:
check = SMARTCheck(f)
self.assertFalse(check.check_tests())
self.assertEqual(check.ata_error_count, 8)
self.assertFalse(check.check())
def test_check_broken2(self):
with open(os.path.join(samples_path, 'seagate-barracuda-broken2.txt')) as f:
check = SMARTCheck(f)
self.assertFalse(check.check_tests())
self.assertEqual(check.ata_error_count, 52)
self.assertFalse(check.check())
def test_check_broken3(self):
with open(os.path.join(samples_path, 'WDC-WD1000FYPS-01ZKB0.txt')) as f:
check = SMARTCheck(f)
self.assertTrue(check.check_tests()) # no test ran
self.assertEqual(check.ata_error_count, 32)
self.assertFalse(check.check())
def test_smart_attributes_not_found(self):
with open(os.path.join(samples_path, 'ST2000NM0033-9ZM175.txt')) as f:
check = SMARTCheck(f, db_path)
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), {}) # Attributes not found in disks.json
self.assertEqual(check.ata_error_count, 0)
self.assertTrue(check.check())
def test_smart_attributes_nothing_wrong(self):
with open(os.path.join(samples_path, 'WDC-WD2000FYYZ-01UL1B1.txt')) as f:
check = SMARTCheck(f, db_path)
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), {})
self.assertEqual(check.ata_error_count, 0)
self.assertTrue(check.check())
def test_smart_attributes_min_max(self):
# from list
with open(os.path.join(samples_path, 'ST2000NM0033-9ZM175.txt')) as f:
check = SMARTCheck(f, os.path.join(samples_path, 'disks-min-max.yaml'))
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), {
(9, 'Power_On_Hours'): AttributeWarning(AttributeWarning.Critical, 'Power_On_Hours', 16998)
})
self.assertEqual(check.ata_error_count, 0)
self.assertFalse(check.check())
# from dict
with open(os.path.join(samples_path, 'ST2000NM0033-9ZM175.txt')) as f:
check = SMARTCheck(f, os.path.join(samples_path, 'disks-min-or-max.yaml'))
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), {
(9, 'Power_On_Hours'): AttributeWarning(AttributeWarning.Critical, 'Power_On_Hours', 16998),
(194, 'Temperature_Celsius'): AttributeWarning(AttributeWarning.Critical, 'Temperature_Celsius', 30)
})
self.assertEqual(check.ata_error_count, 0)
self.assertFalse(check.check())
def test_smart_attributes_thresholds_min(self):
for sample_file, expected_attributes in [
# only warning
('disks-thresholds.yaml', {
(9, 'Power_On_Hours'): AttributeWarning(AttributeWarning.Warning, 'Power_On_Hours', 15360)
}),
# warning and critical - critical wins
('disks-thresholds-warn-and-crit.yaml', {
(9, 'Power_On_Hours'): AttributeWarning(AttributeWarning.Critical, 'Power_On_Hours', 15360)
}),
# warning threshold with range
('disks-thresholds-range.yaml', {
(4, 'Start_Stop_Count'): AttributeWarning(AttributeWarning.Warning, 'Start_Stop_Count', 2)
})
]:
with open(os.path.join(samples_path, 'WDC-WD2000FYYZ-01UL1B1.txt')) as f:
check = SMARTCheck(f, os.path.join(samples_path, sample_file))
self.assertTrue(check.check_tests())
self.assertDictEqual(check.check_attributes(), expected_attributes)
self.assertFalse(check.check())
def test_generic_attributes(self):
base = """=== START OF INFORMATION SECTION ===
Model Family: Seagate Barracuda 7200.14 (AF)
Device Model: ST3000DM001-1CH167
Serial Number: Z1F220RJ
@@ -99,31 +99,31 @@ SMART Attributes Data Structure revision number: 1
Vendor Specific SMART Attributes with Thresholds:
ID# ATTRIBUTE_NAME FLAG VALUE WORST THRESH TYPE UPDATED WHEN_FAILED RAW_VALUE
"""
for attr_id, attr_name in [
('5', "Reallocated_Sector_Ct"),
('197', "current_pending_sector"),
('197', "current_pending_sector_count"),
('197', "total_pending_sectors"),
('0', 'spin_up_retry_count'),
('0', 'soft_read_error_rate'),
('0', 'Reallocation_Event_Count'),
('0', 'ssd_life_left'),
]:
s = base + "%s %s 0x000f 001 000 000 Pre-fail Always - 1" % (
attr_id.rjust(3), attr_name.ljust(23)
)
check = SMARTCheck(s, db_path)
failed_attributes = check.check_generic_attributes()
assert len(failed_attributes) == 1
(failed_id, failed_name), warning = list(failed_attributes.items())[0]
assert int(failed_id) == int(attr_id)
assert warning.level == AttributeWarning.Notice
assert warning.value == '1'
self.assertTrue(check.check_tests())
self.assertFalse(check.check())
def test_generic_temperature_attribute(self):
base = """=== START OF INFORMATION SECTION ===
for attr_id, attr_name in [
('5', "Reallocated_Sector_Ct"),
('197', "current_pending_sector"),
('197', "current_pending_sector_count"),
('197', "total_pending_sectors"),
('0', 'spin_up_retry_count'),
('0', 'soft_read_error_rate'),
('0', 'Reallocation_Event_Count'),
('0', 'ssd_life_left'),
]:
s = base + "%s %s 0x000f 001 000 000 Pre-fail Always - 1" % (
attr_id.rjust(3), attr_name.ljust(23)
)
check = SMARTCheck(s, db_path)
failed_attributes = check.check_generic_attributes()
assert len(failed_attributes) == 1
(failed_id, failed_name), warning = list(failed_attributes.items())[0]
assert int(failed_id) == int(attr_id)
assert warning.level == AttributeWarning.Notice
assert warning.value == '1'
self.assertTrue(check.check_tests())
self.assertFalse(check.check())
def test_generic_temperature_attribute(self):
base = """=== START OF INFORMATION SECTION ===
Model Family: Seagate Barracuda 7200.14 (AF)
Device Model: ST3000DM001-1CH167
Serial Number: Z1F220RJ
@@ -133,37 +133,37 @@ SMART Attributes Data Structure revision number: 1
Vendor Specific SMART Attributes with Thresholds:
ID# ATTRIBUTE_NAME FLAG VALUE WORST THRESH TYPE UPDATED WHEN_FAILED RAW_VALUE
"""
for attr_id, attr_name, attr_value, notice_expected in [
('0', 'temperature_celsius', 1, False), # too low
('0', 'temperature_celsius', 51, True), # within range
('0', 'temperature_celsius', 130, False), # out of range
('0', 'temperature_celsius_x10', 1, False), # too low
('0', 'temperature_celsius_x10', 501, True), # within range
]:
s = base + "%s %s 0x000f %s 000 000 Pre-fail Always - %s" % (
attr_id.rjust(3), attr_name.ljust(23), str(attr_value).zfill(3), attr_value
)
check = SMARTCheck(s, db_path)
failed_attributes = check.check_generic_attributes()
if notice_expected:
assert len(failed_attributes) == 1
(failed_id, failed_name), warning = list(failed_attributes.items())[0]
assert int(failed_id) == int(attr_id)
assert warning.level == AttributeWarning.Notice
assert warning.value == attr_value
self.assertFalse(check.check())
else:
assert len(failed_attributes) == 0
self.assertTrue(check.check_tests())
def test_ignore_attributes(self):
with open(os.path.join(samples_path, 'seagate-barracuda-broken1.txt')) as f:
check = SMARTCheck(f)
for ignore_value in (198, '198', 'Offline_Uncorrectable'):
failed_attributes = check.check_attributes(ignore_attributes=[ignore_value])
assert len(failed_attributes) == 1
(failed_id, failed_name), failed_attribute = list(failed_attributes.items())[0]
assert failed_id == 197
assert failed_name == "Current_Pending_Sector"
assert int(failed_attribute.value) == 24
for attr_id, attr_name, attr_value, notice_expected in [
('0', 'temperature_celsius', 1, False), # too low
('0', 'temperature_celsius', 51, True), # within range
('0', 'temperature_celsius', 130, False), # out of range
('0', 'temperature_celsius_x10', 1, False), # too low
('0', 'temperature_celsius_x10', 501, True), # within range
]:
s = base + "%s %s 0x000f %s 000 000 Pre-fail Always - %s" % (
attr_id.rjust(3), attr_name.ljust(23), str(attr_value).zfill(3), attr_value
)
check = SMARTCheck(s, db_path)
failed_attributes = check.check_generic_attributes()
if notice_expected:
assert len(failed_attributes) == 1
(failed_id, failed_name), warning = list(failed_attributes.items())[0]
assert int(failed_id) == int(attr_id)
assert warning.level == AttributeWarning.Notice
assert warning.value == attr_value
self.assertFalse(check.check())
else:
assert len(failed_attributes) == 0
self.assertTrue(check.check_tests())
def test_ignore_attributes(self):
with open(os.path.join(samples_path, 'seagate-barracuda-broken1.txt')) as f:
check = SMARTCheck(f)
for ignore_value in (198, '198', 'Offline_Uncorrectable'):
failed_attributes = check.check_attributes(ignore_attributes=[ignore_value])
assert len(failed_attributes) == 1
(failed_id, failed_name), failed_attribute = list(failed_attributes.items())[0]
assert failed_id == 197
assert failed_name == "Current_Pending_Sector"
assert int(failed_attribute.value) == 24

+ 85
- 85
tests/parsing.py View File

@@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
try:
from StringIO import StringIO
from StringIO import StringIO
except ImportError:
from io import StringIO
from io import StringIO
import os
import unittest
from smartcheck.check import SMARTCheck
@@ -12,98 +12,98 @@ samples_path = os.path.join(os.path.dirname(__file__), 'samples')

class InformationBlockParsingTest(unittest.TestCase):

def test_parsing(self):
for filename, expected_data in [
('seagate-barracuda-broken1.txt', {
'ata_version': 'ATA8-ACS T13/1699-D revision 4',
'device_model': 'ST3000DM001-1CH166',
'model_family': 'Seagate Barracuda 7200.14 (AF)',
'sata_version': 'SATA 3.0, 6.0 Gb/s (current: 6.0 Gb/s)',
'serial': 'Z1F220RJ'
}),
('seagate-barracuda-broken2.txt', {
'ata_version': 'ATA8-ACS T13/1699-D revision 4',
'device_model': 'ST3000DM001-1CH166',
'model_family': 'Seagate Barracuda 7200.14 (AF)',
'sata_version': 'SATA 3.0, 6.0 Gb/s (current: 6.0 Gb/s)',
'serial': 'Z1F23HW0'
}),
('areca-WD40EFRX.txt', {
'device_model': 'WD40EFRX-68WT0N0',
'serial': 'WD-WCC4E0664813'
}),
]:
def test_parsing(self):
for filename, expected_data in [
('seagate-barracuda-broken1.txt', {
'ata_version': 'ATA8-ACS T13/1699-D revision 4',
'device_model': 'ST3000DM001-1CH166',
'model_family': 'Seagate Barracuda 7200.14 (AF)',
'sata_version': 'SATA 3.0, 6.0 Gb/s (current: 6.0 Gb/s)',
'serial': 'Z1F220RJ'
}),
('seagate-barracuda-broken2.txt', {
'ata_version': 'ATA8-ACS T13/1699-D revision 4',
'device_model': 'ST3000DM001-1CH166',
'model_family': 'Seagate Barracuda 7200.14 (AF)',
'sata_version': 'SATA 3.0, 6.0 Gb/s (current: 6.0 Gb/s)',
'serial': 'Z1F23HW0'
}),
('areca-WD40EFRX.txt', {
'device_model': 'WD40EFRX-68WT0N0',
'serial': 'WD-WCC4E0664813'
}),
]:

with open(os.path.join(samples_path, filename)) as f:
check = SMARTCheck(f)
self.assertDictEqual(check.information, expected_data)
with open(os.path.join(samples_path, filename)) as f:
check = SMARTCheck(f)
self.assertDictEqual(check.information, expected_data)

def test_information_section_missing_empty(self):
check = SMARTCheck(StringIO(""))
self.assertDictEqual(check.information, {})
def test_information_section_missing_empty(self):
check = SMARTCheck(StringIO(""))
self.assertDictEqual(check.information, {})

def test_information_section_missing(self):
with open(os.path.join(samples_path, 'no-information-section.txt')) as f:
check = SMARTCheck(f)
self.assertDictEqual(check.information, {})
def test_information_section_missing(self):
with open(os.path.join(samples_path, 'no-information-section.txt')) as f:
check = SMARTCheck(f)
self.assertDictEqual(check.information, {})


class SMARTDataParsingTest(unittest.TestCase):
def test_parsing(self):
for filename, overall_health, attributes in [
('seagate-barracuda-broken1.txt', 'PASSED', [
('1', 'Raw_Read_Error_Rate', '0x000f', '103', '099', '006', 'Pre-fail', 'Always', '-', '5845168'),
('3', 'Spin_Up_Time', '0x0003', '095', '095', '000', 'Pre-fail', 'Always', '-', '0'),
('4', 'Start_Stop_Count', '0x0032', '100', '100', '020', 'Old_age', 'Always', '-', '7'),
('5', 'Reallocated_Sector_Ct', '0x0033', '100', '100', '010', 'Pre-fail', 'Always', '-', '0'),
('7', 'Seek_Error_Rate', '0x000f', '087', '055', '030', 'Pre-fail', 'Always', '-', '643382224'),
('9', 'Power_On_Hours', '0x0032', '074', '074', '000', 'Old_age', 'Always', '-', '23115'),
('10', 'Spin_Retry_Count', '0x0013', '100', '100', '097', 'Pre-fail', 'Always', '-', '0'),
('12', 'Power_Cycle_Count', '0x0032', '100', '100', '020', 'Old_age', 'Always', '-', '7'),
('183', 'Runtime_Bad_Block', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '0'),
('184', 'End-to-End_Error', '0x0032', '100', '100', '099', 'Old_age', 'Always', '-', '0'),
('187', 'Reported_Uncorrect', '0x0032', '092', '092', '000', 'Old_age', 'Always', '-', '8'),
('188', 'Command_Timeout', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '0 0 0'),
('189', 'High_Fly_Writes', '0x003a', '100', '100', '000', 'Old_age', 'Always', '-', '0'),
('190', 'Airflow_Temperature_Cel', '0x0022', '071', '064', '045', 'Old_age', 'Always', '-', '29 (Min/Max 24/32)'),
('191', 'G-Sense_Error_Rate', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '0'),
('192', 'Power-Off_Retract_Count', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '5'),
('193', 'Load_Cycle_Count', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '1574'),
('194', 'Temperature_Celsius', '0x0022', '029', '040', '000', 'Old_age', 'Always', '-', '29 (0 22 0 0 0)'),
('197', 'Current_Pending_Sector', '0x0012', '100', '100', '000', 'Old_age', 'Always', '-', '24'),
('198', 'Offline_Uncorrectable', '0x0010', '100', '100', '000', 'Old_age', 'Offline', '-', '24'),
('199', 'UDMA_CRC_Error_Count', '0x003e', '200', '200', '000', 'Old_age', 'Always', '-', '0'),
('240', 'Head_Flying_Hours', '0x0000', '100', '253', '000', 'Old_age', 'Offline', '-', '23044h+44m+28.078s'),
('241', 'Total_LBAs_Written', '0x0000', '100', '253', '000', 'Old_age', 'Offline', '-', '161370349869'),
('242', 'Total_LBAs_Read', '0x0000', '100', '253', '000', 'Old_age', 'Offline', '-', '78560290534'),
])
]:
with open(os.path.join(samples_path, filename)) as f:
check = SMARTCheck(f)
self.assertEqual(check.smart_data['overall_health_status'], overall_health)
self.assertEqual(check.smart_data['attributes'], attributes)
def test_parsing(self):
for filename, overall_health, attributes in [
('seagate-barracuda-broken1.txt', 'PASSED', [
('1', 'Raw_Read_Error_Rate', '0x000f', '103', '099', '006', 'Pre-fail', 'Always', '-', '5845168'),
('3', 'Spin_Up_Time', '0x0003', '095', '095', '000', 'Pre-fail', 'Always', '-', '0'),
('4', 'Start_Stop_Count', '0x0032', '100', '100', '020', 'Old_age', 'Always', '-', '7'),
('5', 'Reallocated_Sector_Ct', '0x0033', '100', '100', '010', 'Pre-fail', 'Always', '-', '0'),
('7', 'Seek_Error_Rate', '0x000f', '087', '055', '030', 'Pre-fail', 'Always', '-', '643382224'),
('9', 'Power_On_Hours', '0x0032', '074', '074', '000', 'Old_age', 'Always', '-', '23115'),
('10', 'Spin_Retry_Count', '0x0013', '100', '100', '097', 'Pre-fail', 'Always', '-', '0'),
('12', 'Power_Cycle_Count', '0x0032', '100', '100', '020', 'Old_age', 'Always', '-', '7'),
('183', 'Runtime_Bad_Block', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '0'),
('184', 'End-to-End_Error', '0x0032', '100', '100', '099', 'Old_age', 'Always', '-', '0'),
('187', 'Reported_Uncorrect', '0x0032', '092', '092', '000', 'Old_age', 'Always', '-', '8'),
('188', 'Command_Timeout', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '0 0 0'),
('189', 'High_Fly_Writes', '0x003a', '100', '100', '000', 'Old_age', 'Always', '-', '0'),
('190', 'Airflow_Temperature_Cel', '0x0022', '071', '064', '045', 'Old_age', 'Always', '-', '29 (Min/Max 24/32)'),
('191', 'G-Sense_Error_Rate', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '0'),
('192', 'Power-Off_Retract_Count', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '5'),
('193', 'Load_Cycle_Count', '0x0032', '100', '100', '000', 'Old_age', 'Always', '-', '1574'),
('194', 'Temperature_Celsius', '0x0022', '029', '040', '000', 'Old_age', 'Always', '-', '29 (0 22 0 0 0)'),
('197', 'Current_Pending_Sector', '0x0012', '100', '100', '000', 'Old_age', 'Always', '-', '24'),
('198', 'Offline_Uncorrectable', '0x0010', '100', '100', '000', 'Old_age', 'Offline', '-', '24'),
('199', 'UDMA_CRC_Error_Count', '0x003e', '200', '200', '000', 'Old_age', 'Always', '-', '0'),
('240', 'Head_Flying_Hours', '0x0000', '100', '253', '000', 'Old_age', 'Offline', '-', '23044h+44m+28.078s'),
('241', 'Total_LBAs_Written', '0x0000', '100', '253', '000', 'Old_age', 'Offline', '-', '161370349869'),
('242', 'Total_LBAs_Read', '0x0000', '100', '253', '000', 'Old_age', 'Offline', '-', '78560290534'),
])
]:
with open(os.path.join(samples_path, filename)) as f:
check = SMARTCheck(f)
self.assertEqual(check.smart_data['overall_health_status'], overall_health)
self.assertEqual(check.smart_data['attributes'], attributes)

def test_data_section_missing(self):
check = SMARTCheck(StringIO(""))
self.assertDictEqual(check.smart_data, {})
def test_data_section_missing(self):
check = SMARTCheck(StringIO(""))
self.assertDictEqual(check.smart_data, {})

def test_data_section_missing2(self):
with open(os.path.join(samples_path, 'no-data-section.txt')) as f:
check = SMARTCheck(f)
self.assertDictEqual(check.smart_data, {})
def test_data_section_missing2(self):
with open(os.path.join(samples_path, 'no-data-section.txt')) as f:
check = SMARTCheck(f)
self.assertDictEqual(check.smart_data, {})


class SelfTestParsingTest(unittest.TestCase):

def test_parsing(self):
for filename, tests in [
('seagate-barracuda-broken1.txt', [
('1', 'Extended offline', 'Completed: read failure', '80%', '23113', '1737376544'),
('2', 'Extended offline', 'Completed: read failure', '80%', '23000', '1737376544'),
('3', 'Extended offline', 'Interrupted (host reset)', '80%', '22998', '-'),
('4', 'Extended offline', 'Completed without error', '00%', '5', '-'),
])
]:
with open(os.path.join(samples_path, filename)) as f:
check = SMARTCheck(f)
self.assertEqual(check.self_tests['test_results'], tests)
def test_parsing(self):
for filename, tests in [
('seagate-barracuda-broken1.txt', [
('1', 'Extended offline', 'Completed: read failure', '80%', '23113', '1737376544'),
('2', 'Extended offline', 'Completed: read failure', '80%', '23000', '1737376544'),
('3', 'Extended offline', 'Interrupted (host reset)', '80%', '22998', '-'),
('4', 'Extended offline', 'Completed without error', '00%', '5', '-'),
])
]:
with open(os.path.join(samples_path, filename)) as f:
check = SMARTCheck(f)
self.assertEqual(check.self_tests['test_results'], tests)

Loading…
Cancel
Save