#!/usr/bin/env python2 # Copyright (C) 2018 Red Hat, Inc. # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; If not, see . # # Author: Gris Ge import unittest import os import lsm from lsm import Volume, LocalDisk, Pool, System, LsmError, ErrorNumber, Disk from lsm import Capabilities as Cap def supported(cap, cap_list): for c in cap_list: if not cap.supported(c): return False return True def warn(vol, blk_paths, msg): print("\n[WARN]: Volume '%s(%s)', Disk '%s': %s" % (vol.name, vol.id, " ".join(blk_paths), msg)) class LocalVol(object): def __init__(self, vol, blk_paths, sys, cap): self.vol = vol self.blk_paths = blk_paths self.sys = sys self.cap = cap class Failure(object): OUTPUT_FORMAT = """ VOL_ID: "{VOL_ID}" VOL_NAME: "{VOL_NAME}" SYS_ID: "{SYS_ID}" SYS_NAME: "{SYS_NAME}" BLK_PATHS: "{BLK_PATHS}" ISSUE: "{ISSUE}" SUGGEST: "{SUGGEST}" """ def __init__(self, blk_paths, vol, sys, msg, suggest): self.blk_paths = blk_paths self.vol = vol self.sys = sys self.msg = msg self.suggest = suggest def gen_fail_msg(self): return Failure.OUTPUT_FORMAT.format( **{ "VOL_ID": self.vol.id, "VOL_NAME": self.vol.name, "SYS_ID": self.sys.id, "SYS_NAME": self.sys.name, "BLK_PATHS": " ".join(self.blk_paths), "ISSUE": self.msg, "SUGGEST": self.suggest, }) class SanityCheck(unittest.TestCase): def setUp(self): uri = None if os.getenv('LSMCLI_URI') is not None: uri = os.getenv('LSMCLI_URI') password = os.getenv('LSMCLI_PASSWORD') if uri is None: uri = "local://" password = None self.c = lsm.Client(uri, password) self.syss = self.c.systems() sys_hash = {} cap_hash = {} self.local_vols = [] for sys in self.syss: sys_hash[sys.id] = sys cap_hash[sys.id] = self.c.capabilities(sys) for vol in self.c.volumes(): blk_paths = LocalDisk.vpd83_search(vol.vpd83) if blk_paths: self.local_vols.append(LocalVol(vol, blk_paths, sys_hash[vol.system_id], cap_hash[vol.system_id])) print("\nDisk path of volume '%s(%s)': %s" % (vol.name, vol.id, " ".join(blk_paths))) if not self.local_vols: self.skipTest("No local disk is managed by libstoragemgmt") def tearDown(self): self.c.close() def _skip_current_test(self, messsage): """ If skipTest is supported(new in python 2.7), skip this test with provided message. Silently return if not supported. """ if hasattr(unittest.TestCase, 'skipTest') is True: self.skipTest(messsage) return def _check_fail(self, fails): spliter = '-' * 72 + "\n" output = spliter.join(list(f.gen_fail_msg() for f in fails)) self.assertTrue(len(fails) == 0, "\n%s%s" % (spliter, output)) def test_volume_cache(self): fails = [] flag_has_pass = False for lv in self.local_vols: flag_pass = True sys = lv.sys cap = lv.cap vol = lv.vol blk_paths = lv.blk_paths if not supported(cap, [Cap.VOLUMES, Cap.VOLUME_CACHE_INFO]): warn(vol, blk_paths, "Capabilities VOLUMES and " "VOLUME_CACHE_INFO are not supported") continue cache_info = self.c.volume_cache_info(vol) write_cache_policy = cache_info[0] phy_disk_cache = cache_info[4] if phy_disk_cache == Volume.PHYSICAL_DISK_CACHE_UNKNOWN: warn(vol, blk_paths, "Unknown physical disk cache") continue if phy_disk_cache == Volume.PHYSICAL_DISK_CACHE_USE_DISK_SETTING: flag_pass = False fails.append( Failure(blk_paths, vol, sys, "Physical disk cache of volume is determined " "by the disk vendor which is not suggested, " "data loss might occurred on sudden power loss.", "lsmcli vpdcu --vol %s --policy DISABLE" % vol.id)) elif phy_disk_cache == Volume.PHYSICAL_DISK_CACHE_ENABLED: flag_pass = False fails.append( Failure(blk_paths, vol, sys, "Physical disk cache of volume is enabled which " "might(mostly) not be protected by " "battery/capacitor, data loss might occurred on " "sudden power loss.", "lsmcli vpdcu --vol %s --policy DISABLE" % vol.id)) if write_cache_policy == Volume.WRITE_CACHE_POLICY_UNKNOWN: warn(vol, blk_paths, "Unknown write cache policy") continue if write_cache_policy == Volume.WRITE_CACHE_POLICY_WRITE_BACK: flag_pass = False fails.append( Failure(blk_paths, vol, sys, "Write cache of volume is always enabled " "regardless of battery/capacitor status, " "data loss might occurred on sudden power loss.", "lsmcli vwcpu --vol %s --policy AUTO" % vol.id)) if flag_pass: flag_has_pass = True self._check_fail(fails) if not flag_has_pass: self._skip_current_test("No local disks are capable for this test") def test_pool_status(self): fails = [] flag_has_pass = False pool_hash = {} for pool in self.c.pools(): pool_hash[pool.id] = pool pmi_hash = {} disk_hash = {} try: for disk in self.c.disks(): disk_hash[disk.id] = disk except LsmError as lsm_err: if lsm_err.code == ErrorNumber.NO_SUPPORT: pass else: raise for lv in self.local_vols: sys = lv.sys cap = lv.cap vol = lv.vol blk_paths = lv.blk_paths pool = pool_hash[vol.pool_id] if pool.status == Pool.STATUS_UNKNOWN: warn(vol, blk_paths, "Unknown pool status %d %s" % (pool.status, pool.status_info)) if not pool.status & Pool.STATUS_OK: if supported(cap, [Cap.POOL_MEMBER_INFO]): # Check member failure if pool.id not in pmi_hash: pmi_hash[pool.id] = self.c.pool_member_info(pool) pmi = pmi_hash[pool.id] if pmi[1] == Pool.MEMBER_TYPE_DISK: for disk_id in pmi[2]: disk = disk_hash[disk_id] if not disk.status & Disk.STATUS_OK: fails.append( Failure(blk_paths, vol, sys, "Disk %s(%s) is not healthy: %d" % (disk.name, disk_id, disk.status), "Need investigation of output of" "`lsmcli ld`")) continue fails.append( Failure(blk_paths, vol, sys, "Pool is not healthy: %d '%s'" % ( pool.status, pool.status_info), "Need investigation of output of `lsmcli lp`")) continue flag_has_pass = True self._check_fail(fails) if not flag_has_pass: self._skip_current_test("No local disks are capable for this test") def test_system_status(self): fails = [] flag_has_pass = False for lv in self.local_vols: sys = lv.sys vol = lv.vol blk_paths = lv.blk_paths if sys.status == System.STATUS_UNKNOWN: warn(vol, blk_paths, "Unknown system status %d %s" % (sys.status, sys.status_info)) if not sys.status & System.STATUS_OK: fails.append( Failure(blk_paths, vol, sys, "System is not healthy: %d '%s'" % ( sys.status, sys.status_info), "Need investigation of output of `lsmcli ls`")) continue flag_has_pass = True self._check_fail(fails) if not flag_has_pass: self._skip_current_test("No local disks are capable for this test") if __name__ == '__main__': unittest.main(verbosity=2)