Discussion:
[Libstoragemgmt-devel] [PATCH 1/3] plugin_test.py: Add tests for masking/unmasking
Tony Asleson
2014-07-13 02:54:38 UTC
Permalink
Following method tests added:

- volume_mask
- volume_unmask
- volumes_accessible_by_access_group
- access_groups_granted_to_volume

Signed-off-by: Tony Asleson <***@redhat.com>
---
test/plugin_test.py | 66 ++++++++++++++++++++++++++++++++++++++++----
test/webtest/test_results.py | 4 ++-
2 files changed, 63 insertions(+), 7 deletions(-)

diff --git a/test/plugin_test.py b/test/plugin_test.py
index fb07cc8..660d8f1 100755
--- a/test/plugin_test.py
+++ b/test/plugin_test.py
@@ -33,7 +33,8 @@ results = {}
stats = {}


-MIN_VOLUME_SIZE = 50
+MIN_POOL_SIZE = 2048
+MIN_OBJECT_SIZE = 128


def mb_in_bytes(mib):
@@ -238,6 +239,9 @@ class TestPlugin(unittest.TestCase):
URI = 'sim://'
PASSWORD = None

+ def _object_size(self, pool):
+ return mb_in_bytes(MIN_OBJECT_SIZE)
+
def setUp(self):
self.c = TestProxy(lsm.Client(TestPlugin.URI, TestPlugin.PASSWORD))

@@ -255,7 +259,7 @@ class TestPlugin(unittest.TestCase):
def _get_pool_by_usage(self, system_id, element_type):
for p in self.pool_by_sys_id[system_id]:
if p.element_type & element_type and \
- p.free_space > mb_in_bytes(250):
+ p.free_space > mb_in_bytes(MIN_POOL_SIZE):
return p
return None

@@ -315,7 +319,7 @@ class TestPlugin(unittest.TestCase):
lsm.Pool.ELEMENT_TYPE_VOLUME)

if p:
- vol_size = max(p.free_space / 10, mb_in_bytes(MIN_VOLUME_SIZE))
+ vol_size = self._object_size(p)

vol = self.c.volume_create(p, rs('volume'), vol_size,
lsm.Volume.PROVISION_DEFAULT)[1]
@@ -328,10 +332,9 @@ class TestPlugin(unittest.TestCase):
pools = self.pool_by_sys_id[system_id]

for p in pools:
- if p.free_space > mb_in_bytes(250) and \
+ if p.free_space > mb_in_bytes(MIN_POOL_SIZE) and \
p.element_type & lsm.Pool.ELEMENT_TYPE_FS:
- fs_size = max(p.free_space / 10,
- mb_in_bytes(MIN_VOLUME_SIZE))
+ fs_size = self._object_size(p)
fs = self.c.fs_create(p, rs('fs'), fs_size)[1]
self.assertTrue(self._fs_exists(fs.id))
return fs, p
@@ -568,6 +571,57 @@ class TestPlugin(unittest.TestCase):
self.assertTrue(p.physical_name is not None)
self.assertTrue(p.system_id is not None)

+ def _masking_state(self, cap, ag, vol, masked):
+ if supported(cap,
+ [lsm.Capabilities.
+ VOLUMES_ACCESSIBLE_BY_ACCESS_GROUP]):
+ vol_masked = \
+ self.c.volumes_accessible_by_access_group(ag)
+
+ match = [x for x in vol_masked if x.id == vol.id]
+
+ if masked:
+ self.assertTrue(len(match) == 1)
+ else:
+ self.assertTrue(len(match) == 0)
+
+ if supported(cap,
+ [lsm.Capabilities.
+ ACCESS_GROUPS_GRANTED_TO_VOLUME]):
+ ag_masked = \
+ self.c.access_groups_granted_to_volume(vol)
+
+ match = [x for x in ag_masked if x.id == ag.id]
+
+ if masked:
+ self.assertTrue(len(match) == 1)
+ else:
+ self.assertTrue(len(match) == 0)
+
+ def test_mask_unmask(self):
+ for s in self.systems:
+ cap = self.c.capabilities(s)
+
+ if supported(cap, [lsm.Capabilities.ACCESS_GROUPS,
+ lsm.Capabilities.VOLUME_MASK,
+ lsm.Capabilities.VOLUME_UNMASK,
+ lsm.Capabilities.VOLUME_CREATE,
+ lsm.Capabilities.VOLUME_DELETE]):
+
+ # Make sure we have an access group to test with, many
+ # smi-s providers don't provide functionality to create them!
+ ag_list = self.c.access_groups('system_id', s.id)
+ if len(ag_list):
+ vol = self._volume_create(s.id)[0]
+ self.assertTrue(vol is not None)
+ ag = ag_list[0]
+
+ if vol is not None:
+ self.c.volume_mask(ag, vol)
+ self._masking_state(cap, ag, vol, True)
+ self.c.volume_unmask(ag, vol)
+ self._masking_state(cap, ag, vol, False)
+ self._volume_delete(vol)


def dump_results():
diff --git a/test/webtest/test_results.py b/test/webtest/test_results.py
index eb8348e..e52a205 100755
--- a/test/webtest/test_results.py
+++ b/test/webtest/test_results.py
@@ -113,7 +113,9 @@ def to_html(results):
methods = ['capabilities',
'systems', 'plugin_info', 'pools', 'job_status', 'job_free',
'volumes', 'volume_create', 'volume_delete', 'volume_resize',
- 'volume_replicate', 'disks', 'target_ports']
+ 'volume_replicate', 'disks', 'target_ports', 'volume_mask',
+ 'volume_unmask', 'access_groups_granted_to_volume',
+ 'volumes_accessible_by_access_group']

ch = []
row_data = []
--
1.8.2.1
Tony Asleson
2014-07-13 02:54:40 UTC
Permalink
Signed-off-by: Tony Asleson <***@redhat.com>
---
test/plugin_test.py | 130 +++++++++++++++++++++++++++++++++++++++++++
test/webtest/test_results.py | 4 +-
2 files changed, 133 insertions(+), 1 deletion(-)

diff --git a/test/plugin_test.py b/test/plugin_test.py
index 660d8f1..99e670e 100755
--- a/test/plugin_test.py
+++ b/test/plugin_test.py
@@ -623,6 +623,136 @@ class TestPlugin(unittest.TestCase):
self._masking_state(cap, ag, vol, False)
self._volume_delete(vol)

+ def _create_access_group(self, cap, s):
+ ag_created = None
+
+ # Without this information we would need to systematically go through
+ # different port types trying to create an access group, which we
+ # can do, but not until we need too.
+ if not supported(cap, [lsm.Capabilities.TARGET_PORTS]):
+ return None
+
+ tps = self.c.target_ports('system_id', s.id)
+ if len(tps):
+ tp = tps[0]
+
+ if tp.port_type == lsm.TargetPort.PORT_TYPE_FC:
+ ag_created = self.c.access_group_create(
+ rs('access_group'),
+ '500A0986994B8DC5',
+ lsm.AccessGroup.INIT_TYPE_WWPN, s.id)
+ if tp.port_type == lsm.TargetPort.PORT_TYPE_ISCSI:
+ ag_created = self.c.access_group_create(
+ rs('access_group'),
+ 'iqn.1994-05.com.domain:01.89bd01',
+ lsm.AccessGroup.INIT_TYPE_ISCSI_IQN, s.id)
+
+ self.assertTrue(ag_created is not None)
+
+ if ag_created is not None:
+ ag_list = self.c.access_groups()
+ match = [x for x in ag_list if x.id == ag_created.id]
+ self.assertTrue(len(match) == 1, "Newly created access group %s "
+ "not in the access group listing"
+ % (ag_created.name))
+
+ return ag_created
+
+ def _delete_access_group(self, ag):
+ self.c.access_group_delete(ag)
+ ag_list = self.c.access_groups()
+ match = [x for x in ag_list if x.id == ag.id]
+ self.assertTrue(len(match) == 0, "Expected access group that was "
+ "deleted to not show up in the "
+ "access group list!")
+
+ def _test_ag_create_delete(self, cap, s):
+ if supported(cap, [lsm.Capabilities.ACCESS_GROUPS,
+ lsm.Capabilities.ACCESS_GROUP_CREATE]):
+ ag = self._create_access_group(cap, s)
+ if ag is not None and \
+ supported(cap, [lsm.Capabilities.ACCESS_GROUP_DELETE]):
+ self._delete_access_group(ag)
+
+ def test_access_group_create_delete(self):
+ for s in self.systems:
+ cap = self.c.capabilities(s)
+ self._test_ag_create_delete(cap, s)
+
+ def test_access_group_list(self):
+ for s in self.systems:
+ cap = self.c.capabilities(s)
+
+ if supported(cap, [lsm.Capabilities.ACCESS_GROUPS]):
+ ag_list = self.c.access_groups('system_id', s.id)
+ if len(ag_list) == 0:
+ self._test_ag_create_delete(cap, s)
+ else:
+ self.assertTrue(len(ag_list) > 0,
+ "Need at least 1 access group for testing "
+ "and no support exists for creation of "
+ "access groups for this system")
+
+ def _ag_init_add(self, ag):
+ t = None
+ t_id = ''
+
+ if ag.init_type == lsm.AccessGroup.INIT_TYPE_ISCSI_IQN:
+ t_id = 'iqn.1994-05.com.domain:01.89bd02'
+ t = lsm.AccessGroup.INIT_TYPE_ISCSI_IQN
+ else:
+ # We will try FC PN
+ t_id = '500A0986994B8DC5'
+ t = lsm.AccessGroup.INIT_TYPE_WWPN
+
+ self.c.access_group_initiator_add(ag, t_id, t)
+
+ ag_after = self.c.access_groups('id', ag.id)[0]
+ match = [x for x in ag_after.init_ids if x == t_id]
+ self.assertTrue(len(match) == 1)
+ return t_id
+
+ def _ag_init_delete(self, ag, init_id):
+ self.c.access_group_initiator_delete(ag, init_id)
+ ag_after = self.c.access_groups('id', ag.id)[0]
+ match = [x for x in ag_after.init_ids if x == init_id]
+ self.assertTrue(len(match) == 0)
+
+ def test_access_group_initiator_add_delete(self):
+ usable_ag_types = [lsm.AccessGroup.INIT_TYPE_WWPN,
+ lsm.AccessGroup.INIT_TYPE_ISCSI_IQN]
+
+ for s in self.systems:
+ ag_to_delete = None
+
+ cap = self.c.capabilities(s)
+ if supported(cap, [lsm.Capabilities.ACCESS_GROUPS]):
+ ag_list = self.c.access_groups('system_id', s.id)
+
+ if len(ag_list) == 0 and \
+ supported(cap, [lsm.Capabilities.ACCESS_GROUP_CREATE,
+ lsm.Capabilities.ACCESS_GROUP_DELETE]):
+ ag_to_delete = self._create_access_group(cap, s)
+ ag_list = self.c.access_groups('system_id', s.id)
+
+ if len(ag_list):
+ # Try and find an initiator group that has a usable access
+ # group type instead of unknown or other...
+ ag = ag_list[0]
+ for a_tmp in ag_list:
+ if a_tmp.init_type in usable_ag_types:
+ ag = a_tmp
+ break
+
+ if supported(cap, [lsm.Capabilities.
+ ACCESS_GROUP_ADD_INITIATOR]):
+ init_id = self._ag_init_add(ag)
+ if supported(cap, [lsm.Capabilities.
+ ACCESS_GROUP_DEL_INITIATOR]):
+ self._ag_init_delete(ag, init_id)
+
+ if ag_to_delete is not None:
+ self._delete_access_group(ag_to_delete)

def dump_results():
"""
diff --git a/test/webtest/test_results.py b/test/webtest/test_results.py
index e52a205..64f7d2e 100755
--- a/test/webtest/test_results.py
+++ b/test/webtest/test_results.py
@@ -115,7 +115,9 @@ def to_html(results):
'volumes', 'volume_create', 'volume_delete', 'volume_resize',
'volume_replicate', 'disks', 'target_ports', 'volume_mask',
'volume_unmask', 'access_groups_granted_to_volume',
- 'volumes_accessible_by_access_group']
+ 'volumes_accessible_by_access_group', 'access_group_create',
+ 'access_group_delete', 'access_group_initiator_add',
+ 'access_group_initiator_delete']

ch = []
row_data = []
--
1.8.2.1
Tony Asleson
2014-07-13 02:54:39 UTC
Permalink
Signed-off-by: Tony Asleson <***@redhat.com>
---
python_binding/lsm/_client.py | 10 ++++++++++
1 file changed, 10 insertions(+)

diff --git a/python_binding/lsm/_client.py b/python_binding/lsm/_client.py
index d22bb24..b975540 100644
--- a/python_binding/lsm/_client.py
+++ b/python_binding/lsm/_client.py
@@ -1044,8 +1044,18 @@ class Client(INetworkAttachedStorage):
"""
return self._tp.rpc('export_remove', _del_self(locals()))

+ ## Returns a list of target ports
+ # @param self The this pointer
+ # @param search_key The key to search against
+ # @param search_value The value to search for
+ # @param flags Reserved for future use, must be zero
+ # @returns List of target ports, else raises LsmError
@_return_requires([TargetPort])
def target_ports(self, search_key=None, search_value=None, flags=0):
+ """
+ Returns a list of target ports
+ """
+ _check_search_key(search_key, TargetPort.SUPPORTED_SEARCH_KEYS)
return self._tp.rpc('target_ports', _del_self(locals()))
--
1.8.2.1
Loading...