A smart S.M.A.R.T. check
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

371 lines
14KB

  1. # -*- coding: utf-8 -*-
  2. import logging
  3. import sys
  4. import yaml
  5. import re
  6. import os
  7. DEFAULT_DISKS_FILE=os.path.join(os.path.dirname(__file__), 'disks.yaml')
  8. INFORMATION_SECTION_START = '=== START OF INFORMATION SECTION ==='
  9. DATA_SECTION_START = '=== START OF READ SMART DATA SECTION ==='
  10. TESTS_SECTION_START = 'SMART Self-test log structure revision number'
  11. INFORMATION_RE = [
  12. ("model_family", re.compile('Model Family: (.*)', re.UNICODE)),
  13. ("device_model", re.compile("(?:Device Model|Product): (.*)", re.UNICODE)),
  14. ("serial", re.compile("Serial Number: (.*)", re.UNICODE | re.IGNORECASE)),
  15. ("firmware_version", re.compile("Firmware version: (.*)", re.UNICODE)),
  16. ("ata_version", re.compile("ATA Version is: (.*)", re.UNICODE)),
  17. ("sata_version", re.compile("SATA Version is: (.*)", re.UNICODE)),
  18. ]
  19. DATA_RE = [
  20. ('overall_health_status', re.compile('SMART overall-health self-assessment test result: (.*)', re.UNICODE)),
  21. ]
  22. DATA_ATTRIBUTES_RE = re.compile(r"\s*(\d+)\s+([\w\d_\-]+)\s+([0-9a-fx]+)\s+(\d+)\s+(\d+)\s+(\d+)\s+([\w\d_\-]+)\s+([\w\d]+)\s+([\w\d_\-]+)\s+([^\r\n]*)", re.UNICODE)
  23. TEST_RESULT_RE = re.compile(r"#\s*(\d+)\s+(.*?)\s{2,}(.*?)\s{2,}\s+([\d%]+)\s+(\d+)\s+(\d+|-)", re.UNICODE)
  24. def toint(s, default=0):
  25. try:
  26. return int(s)
  27. except ValueError:
  28. return default
  29. class AttributeWarning(object):
  30. Notice = 'NOTICE'
  31. Warning = 'WARNING'
  32. Critical = 'CRITICAL'
  33. def __init__(self, level=None, attribute_name=None, value=None, description=None):
  34. self.level = level
  35. self.field = attribute_name
  36. self.value = value
  37. self.description = (description or '').strip()
  38. @property
  39. def short_message(self):
  40. return "%s: %s=%s" % (self.level or '?', self.field, self.value)
  41. @property
  42. def long_message(self):
  43. s = self.short_message
  44. if self.description:
  45. s += ": %s" % self.description
  46. return s
  47. def __str__(self):
  48. return self.short_message
  49. def __repr__(self):
  50. return self.short_message
  51. def __eq__(self, other):
  52. return isinstance(other, AttributeWarning) and \
  53. self.level is not None and self.level == other.level and \
  54. self.field is not None and self.field == other.field and \
  55. self.value is not None and self.value == other.value
  56. class SMARTCheck(object):
  57. def __init__(self, file_or_string, db_path=None):
  58. if hasattr(file_or_string, 'read'):
  59. self.raw = file_or_string.read()
  60. elif isinstance(file_or_string, str) or (sys.version_info[0] == 2 and isinstance(file_or_string, unicode)):
  61. self.raw = file_or_string
  62. elif isinstance(file_or_string, bytes):
  63. self.raw = file_or_string.decode('UTF-8')
  64. else:
  65. raise Exception("Unknown type: %s" % type(file_or_string))
  66. self.parsed_sections = None
  67. self.db_path = db_path
  68. self._database = None
  69. @property
  70. def information(self):
  71. return self.parsed.get('information', {})
  72. @property
  73. def smart_data(self):
  74. return self.parsed.get('data', {})
  75. @property
  76. def self_tests(self):
  77. return self.parsed.get('self_tests', {})
  78. @property
  79. def parsed(self):
  80. if not self.parsed_sections:
  81. self.parsed_sections = self.parse()
  82. return self.parsed_sections
  83. @property
  84. def database(self):
  85. if self._database is None:
  86. if self.db_path:
  87. with open(self.db_path) as f:
  88. self._database = yaml.load(f) or {}
  89. else:
  90. self._database = []
  91. return self._database
  92. @property
  93. def device_model(self):
  94. return self.information['device_model']
  95. def exists_in_database(self):
  96. return self.get_attributes_from_database(self.device_model) is not None
  97. def get_attributes_from_database(self, device_model):
  98. for dev in self.database:
  99. device_regexprs = dev['model'] if isinstance(dev['model'], list) else [dev['model']]
  100. if any(re.match(r, device_model, re.IGNORECASE) for r in device_regexprs):
  101. logging.debug("Device exists in database (one of %s matches %s)" % (device_regexprs, self.device_model))
  102. return dev['attributes']
  103. logging.debug("Device does not exist in database")
  104. return None
  105. def parse(self):
  106. return {
  107. 'information': self.parse_information_section(self.raw),
  108. 'data': self.parse_data_section(self.raw),
  109. 'self_tests': self.parse_tests_section(self.raw),
  110. }
  111. @property
  112. def data_parsed(self):
  113. return 'attributes' in self.smart_data
  114. def parse_information_section(self, s):
  115. if INFORMATION_SECTION_START not in s:
  116. return {}
  117. start = s.index(INFORMATION_SECTION_START)
  118. if DATA_SECTION_START not in s:
  119. end = len(s)
  120. else:
  121. end = s.index(DATA_SECTION_START)
  122. information_text = s[start:end]
  123. d = {}
  124. for k, regex in INFORMATION_RE:
  125. m = regex.search(information_text)
  126. if m:
  127. d[k] = m.group(1).strip() if m.group(1) else ''
  128. return d
  129. def parse_data_section(self, s):
  130. if DATA_SECTION_START not in s:
  131. logging.info("No data section found")
  132. return {}
  133. start = s.index(DATA_SECTION_START)
  134. data_text = s[start:]
  135. d = {}
  136. for k, regex in DATA_RE:
  137. m = regex.search(data_text)
  138. if m:
  139. d[k] = m.group(1).strip() if m.group(1) else ''
  140. d['attributes'] = sorted(DATA_ATTRIBUTES_RE.findall(s), key=lambda t: int(t[0]))
  141. return d
  142. def parse_tests_section(self, s):
  143. if TESTS_SECTION_START not in s:
  144. return {
  145. 'test_results': []
  146. }
  147. start = s.index(TESTS_SECTION_START)
  148. end = re.search(r'(\r\n\r\n|\n\n|\r\r)', s[start+1:], re.MULTILINE)
  149. end = start + end.end(0) if end else len(s)
  150. tests_text = s[start:end]
  151. return {
  152. 'test_results': TEST_RESULT_RE.findall(tests_text)
  153. }
  154. def check(self, ignore_attributes=None):
  155. return len(self.check_attributes(ignore_attributes or [])) == 0 and self.check_tests()
  156. def check_tests(self):
  157. ok_test_results = [
  158. 'Completed without error',
  159. 'Interrupted (host reset)', # reboot during self test
  160. 'Aborted by host'
  161. ]
  162. return not any([x[2] not in ok_test_results for x in self.self_tests['test_results']])
  163. def check_attributes(self, ignore_attributes=None):
  164. failed_attributes = self.check_generic_attributes()
  165. if self.exists_in_database():
  166. failed_attributes.update(self.check_device_attributes())
  167. # remove every AttributeWarning from failed_attributes based on ignore_attributes
  168. for attr_id_or_name in ignore_attributes or []:
  169. del_keys = []
  170. if isinstance(attr_id_or_name, int) or attr_id_or_name.isdigit():
  171. del_keys = [k for k in failed_attributes.keys() if k[0] == int(attr_id_or_name)]
  172. else:
  173. del_keys = [k for k in failed_attributes.keys() if k[1] == attr_id_or_name]
  174. for x in del_keys:
  175. del failed_attributes[x]
  176. return failed_attributes
  177. def check_generic_attributes(self):
  178. failed_attributes = {}
  179. for attrid, name, flag, value, worst, thresh, attr_type, updated, when_failed, raw_value in self.smart_data['attributes']:
  180. logging.debug("Attribute %s (%s): value=%s, raw value=%s" % (attrid, name, value, raw_value))
  181. attrid = int(attrid)
  182. attr_name = (name or '').lower()
  183. int_value = toint(value)
  184. int_raw_value = toint(raw_value)
  185. int_thresh = toint(thresh)
  186. # these tests are take from gsmartcontrol (storage_property_descr.cpp) and check for known pre-fail attributes
  187. if attr_name in ('reallocated_sector_count', 'reallocated_sector_ct') and int_raw_value > 0:
  188. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
  189. name,
  190. raw_value,
  191. "The drive has a non-zero Raw value, but there is no SMART warning yet. " +
  192. "This could be an indication of future failures and/or potential data loss in bad sectors.")
  193. elif attr_name == 'spin_up_retry_count' and int_raw_value > 0:
  194. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
  195. name,
  196. raw_value,
  197. "The drive has a non-zero Raw value, but there is no SMART warning yet. " +
  198. "Your drive may have problems spinning up, which could lead to a complete mechanical failure.")
  199. elif attr_name == "soft_read_error_rate" and int_raw_value > 0:
  200. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
  201. name,
  202. raw_value,
  203. "The drive has a non-zero Raw value, but there is no SMART warning yet. " +
  204. "This could be an indication of future failures and/or potential data loss in bad sectors.")
  205. elif attr_name in ("temperature_celsius", "temperature_celsius_x10"):
  206. if 50 <= int_raw_value <= 120:
  207. # Temperature (for some it may be 10xTemp, so limit the upper bound.)
  208. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
  209. name,
  210. int_raw_value,
  211. "The temperature of the drive is higher than 50 degrees Celsius. " +
  212. "This may shorten its lifespan and cause damage under severe load.")
  213. elif int_raw_value > 500:
  214. # Temperature (for some it may be 10xTemp, so limit the upper bound.)
  215. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
  216. name,
  217. int_raw_value,
  218. "The temperature of the drive is higher than 50 degrees Celsius. " +
  219. "This may shorten its lifespan and cause damage under severe load.")
  220. elif attr_name == "reallocation_event_count" and int_raw_value > 0:
  221. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
  222. name,
  223. raw_value,
  224. "The drive has a non-zero Raw value, but there is no SMART warning yet. " +
  225. "This could be an indication of future failures and/or potential data loss in bad sectors.")
  226. elif attr_name in ("current_pending_sector", "current_pending_sector_count", "total_pending_sectors") and int_raw_value > 0:
  227. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
  228. name,
  229. raw_value,
  230. "The drive has a non-zero Raw value, but there is no SMART warning yet. " +
  231. "This could be an indication of future failures and/or potential data loss in bad sectors.")
  232. elif attr_name in ("offline_uncorrectable", "total_offline_uncorrectable") and int_raw_value > 0:
  233. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
  234. name,
  235. raw_value,
  236. "The drive has a non-zero Raw value, but there is no SMART warning yet. " +
  237. "This could be an indication of future failures and/or potential data loss in bad sectors.")
  238. elif attr_name == "ssd_life_left" and int_value < 50:
  239. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Notice,
  240. name,
  241. raw_value,
  242. "The drive has less than half of its life left.")
  243. else:
  244. # execute a generic check for value < threshold
  245. if int_value and int_thresh:
  246. if int_value < int_thresh:
  247. failed_attributes[(attrid, name)] = AttributeWarning(
  248. AttributeWarning.Warning if attr_type == 'Pre-fail' else AttributeWarning.Notice,
  249. name,
  250. raw_value,
  251. "Attribute value dropped below threshold of %s" % int_thresh)
  252. logging.debug("Failed generic attributes: %s" % (failed_attributes, ))
  253. return failed_attributes
  254. def check_device_attributes(self):
  255. device_model = self.device_model
  256. device_db_attributes = self.get_attributes_from_database(device_model)
  257. threshold_from = re.compile('^(\d+):$')
  258. threshold_to = re.compile('^:(\d+)$')
  259. threshold_from_to = re.compile('^(\d+):(\d+)$')
  260. failed_attributes = {}
  261. for attrid, name, flag, value, worst, tresh, type, updated, when_failed, raw_value in self.smart_data['attributes']:
  262. attrid = int(attrid)
  263. if attrid in device_db_attributes:
  264. db_attrs = device_db_attributes[attrid]
  265. if isinstance(db_attrs, list):
  266. value_field, min_value, max_value = tuple(device_db_attributes[int(attrid)])
  267. check_value = value if value_field == "VALUE" else raw_value
  268. check_value = int(check_value or -1)
  269. if not (int(min_value) <= check_value <= int(max_value)):
  270. logging.info("Attribute %s (%s) failed: not %s <= %s <= %s" % (attrid, name, min_value, check_value, max_value))
  271. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Critical, name, check_value)
  272. elif isinstance(db_attrs, dict):
  273. value_field = db_attrs.get('field', 'RAW_VALUE')
  274. check_value = value if value_field == "VALUE" else raw_value
  275. check_value = int(check_value or -1)
  276. min_value = db_attrs.get('min', None)
  277. max_value = db_attrs.get('max', None)
  278. if min_value is None and max_value is None:
  279. for failure_type, threshold_key in [('WARNING', 'warn_threshold'), ('CRITICAL', 'crit_threshold')]:
  280. if threshold_key not in db_attrs:
  281. continue
  282. v = db_attrs.get(threshold_key)
  283. from_m = threshold_from.match(v)
  284. to_m = threshold_to.match(v)
  285. from_to_m = threshold_from_to.match(v)
  286. if (from_m and check_value >= int(from_m.group(1))) or \
  287. (to_m and check_value <= int(to_m.group(1))) or \
  288. (from_to_m and (int(from_to_m.group(1)) <= check_value <= int(from_to_m.group(2)))):
  289. logging.info("Attribute %s (%s) failed with %s: not within treshold %s" % (attrid, name, failure_type, v))
  290. failed_attributes[(attrid, name)] = AttributeWarning(failure_type, name, check_value)
  291. else:
  292. if (min_value is not None and check_value >= int(min_value)) or \
  293. (max_value is not None and check_value <= int(max_value)):
  294. logging.info("Attribute %s (%s) failed: not %s >= %s <= %s" % (attrid, name, min_value, check_value, max_value))
  295. failed_attributes[(attrid, name)] = AttributeWarning(AttributeWarning.Critical, name, check_value)
  296. else:
  297. raise ValueError("Unknown attribute specification: %s" % db_attrs)
  298. return failed_attributes