Discussion:
[Libstoragemgmt-devel] [PATCH] RFC: Proof of concept alert daemon and basic client
Tony Asleson
2014-02-28 03:38:59 UTC
Permalink
This is a very basic solution that simply polls for array status changes.
The daemon implements a very basic dbus interface to allow any languages that
support dbus the ability to utilize this. All configuration of the daemon
occurs over the dbus interface.

Two programs:

lsmalertd.py:

Process that runs and retrieves status from one or more arrays and
generates dbus signal when a change it found. Please note it only reports
changes in status, if an array is degraded when added it won't report anything
until its status changes. In addition if a volume/pool/system is deleted or
added etc. that does not create a notification either. Only when something
previously seen changes will a notification be created.

lsmalert_client.py:

The client has the following command line help:

usage: lsmalert_client.py [-h] [-l] [-e <email address>] [-p] [-a <array URI>]
[-d <array URI>]

optional arguments:
-h, --help show this help message and exit
-l, --listen Listen for notifications
-e <email address>, --email <email address>
Email address for notifications
-p Desktop notification pop-up
-a <array URI>, --add <array URI>
Add an array to monitor
-d <array URI>, --delete <array URI>
Delete an array to monitor

You can dynamically add/remove arrays to monitor. To actually be of any use
at least one instance of this program needs to be run in -l (listen mode). When
you add/remove an array you will be prompted for a password.

Todo & discussion points:
- Using session bus, change to system bus
- How do we want to handle securely holding credentials for arrays for
unattended startup
- Signal handling in the daemon only works correctly if you send the signal
to the parent process
- Implement SMI-S indications via a http web server support, this would prevent
us from having to poll SMI-S providers that implement indications
- Anyone that is subscribed on the bus will get all changes, even for those
arrays they didn't sign up for, this is the only way to get one to many
support
- We are only checking on status changes of systems/pools/volumes, nothing else
is being monitored
- We should incorporate capability checking to figure out what we should be
monitoring

Please take a look and see what you think.

Thanks!

-Tony

Signed-off-by: Tony Asleson <***@redhat.com>
---
tools/lsmalert/lsmalert_client.py | 179 +++++++++++++++++++
tools/lsmalert/lsmalertd.py | 365 ++++++++++++++++++++++++++++++++++++++
2 files changed, 544 insertions(+)
create mode 100755 tools/lsmalert/lsmalert_client.py
create mode 100755 tools/lsmalert/lsmalertd.py

diff --git a/tools/lsmalert/lsmalert_client.py b/tools/lsmalert/lsmalert_client.py
new file mode 100755
index 0000000..f3e8511
--- /dev/null
+++ b/tools/lsmalert/lsmalert_client.py
@@ -0,0 +1,179 @@
+#!/usr/bin/env python
+
+# Copyright (C) 2014 Red Hat, Inc.
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Author: tasleson
+
+import gobject
+import dbus
+import dbus.mainloop.glib
+import lsm
+import argparse
+import dbus.exceptions
+import sys
+import getpass
+import signal
+import smtplib
+from email.mime.text import MIMEText
+import pynotify
+
+OBJ_PATH = '/net/sourceforge/libstoragemgmt/object'
+CON = 'net.sourceforge.libstoragemgmt'
+
+email_list = []
+desktop = False
+
+
+class Alert(object):
+ def __init__(self, uri, error_code, error, system_id, component,
+ component_id, prev_state, current_state):
+ self.uri = uri
+ self.error_code = error_code
+ self.error = error
+ self.system_id = system_id
+ self.component = component
+ self.component_id = component_id
+ self.prev_state = prev_state
+ self.current_state = current_state
+
+ def __str__(self):
+
+ if self.error_code == lsm.ErrorNumber.OK:
+ if self.component == 'system':
+ return 'System %s transistioned from %s to %s' % \
+ (self.system_id,
+ lsm.data.System.status_to_str(self.prev_state),
+ lsm.System.status_to_str(self.current_state))
+ elif self.component == 'pool':
+ return 'Pool %s on system %s, transistioned from %s to %s' % \
+ (self.component_id, self.system_id,
+ lsm.Pool.status_to_str(self.prev_state),
+ lsm.Pool.status_to_str(self.current_state))
+ elif self.component == 'volume':
+ return 'Volume %s on system %s, ' \
+ 'transistioned from %s to %s' % \
+ (self.component_id, self.system_id,
+ lsm.Volume.status_to_str(self.prev_state),
+ lsm.Volume.status_to_str(self.current_state))
+ else:
+ return 'System (%s) encountered an error during retrieval (%s:%s)'\
+ % (self.uri, self.error_code, self.error)
+
+
+def handle_notify(info):
+ message = str(Alert(*info))
+
+ print message
+
+ if len(email_list) > 0:
+ email(email_list, '***@donotreply.org', message, '')
+
+ if desktop:
+ desktop_popup(message)
+
+
+def handler(signum, frame):
+ loop.quit()
+
+
+def email(to_address, from_address, subject, body):
+ delimiter = ', '
+
+ email_msg = MIMEText(body)
+ email_msg['Subject'] = subject
+ email_msg['From'] = from_address
+ email_msg['To'] = delimiter.join(to_address)
+ smtp_server = smtplib.SMTP('localhost')
+ smtp_server.sendmail(from_address, to_address, email_msg.as_string())
+ smtp_server.quit()
+
+
+def desktop_popup(alert_msg):
+ pynotify.Notification("Array status change", alert_msg).show()
+
+
+if __name__ == '__main__':
+
+ # Install signal handlers
+ for s in [signal.SIGHUP, signal.SIGINT]:
+ try:
+ signal.signal(s, handler)
+ except RuntimeError:
+ pass
+
+ password = None
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-l", '--listen', help='Listen for notifications',
+ dest='listen', action="store_true")
+ parser.add_argument('-e', '--email',
+ metavar='<email address>', dest='email',
+ action='append', default=[],
+ help='Email address for notifications',
+ type=str)
+ parser.add_argument('-p', help='Desktop notification pop-up',
+ dest='desktop', action="store_true")
+ parser.add_argument('-a', '--add', help='Add an array to monitor',
+ metavar="<array URI>", dest='add_array', type=str)
+ parser.add_argument('-d', '--delete', help='Delete an array to monitor',
+ metavar="<array URI>", dest='del_array', type=str)
+
+ args = parser.parse_args()
+
+ if args.add_array is not None or args.del_array is not None:
+ password = getpass.getpass()
+
+ if args.listen is False and args.add_array is None and \
+ args.del_array is None:
+ print 'Please specify at least (-l, -a or -d)'
+ sys.exit(1)
+
+ try:
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+ bus = dbus.SessionBus()
+ remote_object = bus.get_object(CON, OBJ_PATH)
+
+ if args.add_array is not None:
+ param = [(args.add_array, password)]
+ result, ec, msg = remote_object.subscribe(param,
+ dbus_interface=CON)
+ if not bool(result):
+ print "Error while adding: error code= %d, msg= %s" % \
+ (ec, msg)
+
+ if args.del_array is not None:
+ param = [(args.del_array, password)]
+ remote_object.unsubscribe(param, dbus_interface=CON)
+
+ if args.listen:
+
+ if not args.desktop and len(args.email) == 0:
+ print "Please specify -e or -p with listen"
+ sys.exit(1)
+
+ email_list = args.email
+ desktop = args.desktop
+
+ bus.add_signal_receiver(handle_notify, dbus_interface=CON,
+ signal_name="notify")
+ loop = gobject.MainLoop()
+ loop.run()
+
+ except dbus.exceptions.DBusException as de:
+ print 'Error connecting to dbus serivce: %s' % de
+ sys.exit(1)
+
+ sys.exit(0)
diff --git a/tools/lsmalert/lsmalertd.py b/tools/lsmalert/lsmalertd.py
new file mode 100755
index 0000000..c8d6e90
--- /dev/null
+++ b/tools/lsmalert/lsmalertd.py
@@ -0,0 +1,365 @@
+#!/usr/bin/env python
+
+# Copyright (C) 2014 Red Hat, Inc.
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Author: tasleson
+
+from multiprocessing import Process, Queue, Lock, Value
+from Queue import Empty
+import dbus
+import dbus.service
+import dbus.mainloop.glib
+import gobject
+import os
+import lsm
+import lsm.common
+import signal
+import sys
+
+
+TIME_OUT_SECS = 5
+TIME_OUT_GET_ITEM = 1
+OBJ_PATH = '/net/sourceforge/libstoragemgmt/object'
+CON = 'net.sourceforge.libstoragemgmt'
+
+# Lock used by pprint
+stdout_lock = Lock()
+
+# Shared state variable across all processes
+run = Value('i', 1)
+
+DEBUG = False
+
+
+def handler(signum, frame):
+ run.value = 0
+
+ pprint('Signal handler called with signal %d' % (signum))
+
+ # For this to work we have to have the parent process receive the signal
+ # TODO make this work if you signal a child process
+ loop.quit()
+
+
+##
+# Serializes access to stdout to prevent interleaved output
+# @param msg Message to output to stdout
+# @return None
+def pprint(msg):
+ if DEBUG:
+ stdout_lock.acquire()
+ print "%d - %s" % (os.getpid(), msg)
+ stdout_lock.release()
+
+
+def _build_comparison_data(left, right):
+ results = []
+
+ l_dict = dict((x.id, x) for x in left)
+ r_dict = dict((x.id, x) for x in right)
+
+ l_keys = l_dict.keys()
+ r_keys = r_dict.keys()
+
+ common = set(l_keys) & set(r_keys)
+
+ for c in common:
+ if l_dict[c].status != r_dict[c].status:
+ results.append((l_dict[c], r_dict[c]))
+
+ return results
+
+
+def _create_alert(uri, error_code, error, system_id, component, component_id,
+ prev_state, current_state):
+ return (uri, error_code, error, system_id, component, component_id,
+ prev_state, current_state)
+
+
+class Status(object):
+ def __init__(self, uri, error_code, error, system, pools, volumes):
+ self.uri = uri
+ self.error_code = error_code
+ self.error = error
+ self.system = system
+ self.pools = pools
+ self.volumes = volumes
+
+ ##
+ # Compare system status.
+ # Notes: Something added/removed does not cause a status change, only
+ # a change in something that existed before and now causes a status change
+ # Returns an array of tupes.
+ def compare(self, other):
+ alerts = []
+
+ if self.system.status != other.system.status:
+ alerts.append(_create_alert(self.uri,
+ self.error_code,
+ self.error,
+ self.system.id,
+ 'system',
+ self.system.id,
+ self.system.status,
+ other.system.status))
+
+ diff_pools = _build_comparison_data(self.pools, other.pools)
+ diff_volumes = _build_comparison_data(self.volumes, other.volumes)
+
+ for dp in diff_pools:
+ alerts.append(_create_alert(self.uri,
+ self.error_code,
+ self.error,
+ self.system.id,
+ 'pool',
+ dp[0].id,
+ dp[0].status,
+ dp[1].status))
+
+ for v in diff_volumes:
+ alerts.append(_create_alert(self.uri,
+ self.error_code,
+ self.error,
+ self.system.id,
+ 'volume',
+ v[0].id,
+ v[0].status,
+ v[1].status))
+
+ return alerts
+
+
+def _test_connection(uri, password):
+ rc = False
+ ec = lsm.common.ErrorNumber.OK
+ msg = ''
+ try:
+ c = lsm.Client(uri, password)
+ c.systems()
+ c.close()
+ rc = True
+ except lsm.common.LsmError as e:
+ ec = e.code
+ msg = e.msg
+ return (rc, ec, msg)
+
+
+class ArrayAlert(dbus.service.Object):
+ def __init__(self, conn, object_path=OBJ_PATH):
+ pprint('object_path %s' % (object_path))
+ dbus.service.Object.__init__(self, conn, object_path)
+ self.hosts_config_q = None
+
+ @dbus.service.signal(CON)
+ def notify(self, status):
+ pass
+
+ @dbus.service.method(CON, in_signature='a(sv)')
+ def subscribe(self, array):
+ for a in array:
+ result = _test_connection(*a)
+ if not result[0]:
+ return result
+
+ self.hosts_config_q.put(('add', array))
+ return True, lsm.common.ErrorNumber.OK, ''
+
+ @dbus.service.method(CON, in_signature='a(sv)')
+ def unsubscribe(self, array):
+ self.hosts_config_q.put(('remove', array))
+ return True
+
+ @dbus.service.method(CON)
+ def clear_all(self):
+ self.hosts_config_q.put(('clear', ()))
+
+
+def feed_hosts(hc_q, h_q):
+ hosts = {}
+ pprint('feed_hosts start')
+
+ while run.value != 0:
+ try:
+ (operation, host_list) = hc_q.get(True, TIME_OUT_SECS)
+
+ if operation == 'add':
+ for h in host_list:
+ sig = ':'.join(h)
+ if sig not in hosts:
+ hosts[sig] = h
+ elif operation == 'remove':
+ for h in host_list:
+ sig = ':'.join(h)
+ if sig in hosts:
+ del hosts[sig]
+ else:
+ hosts.clear()
+ except IOError:
+ pass
+ except Empty:
+ # Timer expired
+ # See if queue is empty, docs say empty(), size() are
+ # unreliable for queue so we try to get one and place it back if
+ # successful.
+ try:
+ host = h_q.get(False)
+ h_q.put(host)
+ except Empty:
+ # Host queue is empty
+ for v in hosts.values():
+ pprint("Putting in an item to process %s" % (':'.join(v)))
+ h_q.put(v)
+
+ pprint('feed_hosts exit')
+
+
+def _get_status(uri, password):
+ status = []
+
+ try:
+ c = lsm.Client(uri, password)
+
+ systems = c.systems()
+ pools = c.pools()
+ volumes = c.volumes()
+
+ for system in systems:
+ sys_pools = [x for x in pools if x.system_id == system.id]
+ sys_volumes = [x for x in volumes if x.system_id == system.id]
+ status.append(Status(uri, lsm.ErrorNumber.OK, '', system,
+ sys_pools, sys_volumes))
+
+ c.close()
+
+ except lsm.LsmError as le:
+ status.append(Status(uri, le.code, le.msg, [], [], []))
+
+ return status
+
+
+def retrieve_status_worker(h_q, r_q):
+ count = 0
+
+ pprint('process_status start')
+
+ while run.value != 0:
+ try:
+ host = h_q.get(True, TIME_OUT_GET_ITEM)
+ pprint("process_status: retrieved a host %s" % (':'.join(host)))
+
+ count += 1
+ # Fetch the status, report done and put a result in the
+ # out_queue
+ status = _get_status(*host)
+
+ # Make something change to test alerts
+ if count % 3 == 0:
+ for system_stat in status:
+ if system_stat.system.status != lsm.data.System.STATUS_OK:
+ pprint("Setting system status ok")
+ system_stat.system.status = lsm.data.System.STATUS_OK
+ else:
+ pprint("Setting system status degraded")
+ system_stat.system.status = \
+ lsm.data.System.STATUS_DEGRADED
+
+ results_q.put(status)
+ except IOError:
+ pass
+ except Empty:
+ pass
+
+ pprint('Status getter exiting')
+
+
+def process_results(notifier, r_q):
+ """
+ Block waiting for results from arrays to be delivered to it via a queue.
+ For each status update, locate it in the hash and if the state has
+ changed, then deliver a message via dbus
+ """
+ systems = {}
+
+ pprint('process_results start')
+
+ while run.value != 0:
+ try:
+ status_update = results_q.get(True, TIME_OUT_GET_ITEM)
+
+ pprint("Got some status !")
+
+ for sys_update in status_update:
+
+ if sys_update.system.id in systems:
+ alerts = systems[sys_update.system.id].compare(sys_update)
+
+ for a in alerts:
+ notifier.notify(a)
+
+ systems[sys_update.system.id] = sys_update
+ except IOError:
+ pass
+ except Empty:
+ pass
+
+ pprint('process_results exiting')
+
+
+if __name__ == '__main__':
+ hosts_config_q = Queue()
+ hosts_q = Queue()
+ results_q = Queue()
+ process_list = []
+
+ # Install signal handlers
+ for s in [signal.SIGHUP, signal.SIGINT]:
+ try:
+ signal.signal(s, handler)
+ except RuntimeError:
+ pass
+
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+ sys_bus = dbus.SessionBus()
+ name = dbus.service.BusName(CON, sys_bus)
+ obj = ArrayAlert(sys_bus)
+ obj.hosts_config_q = hosts_config_q
+ loop = gobject.MainLoop()
+ gobject.threads_init()
+
+ # Create processes
+ process_list.append(Process(target=process_results,
+ args=(obj, results_q,)))
+
+ # Process for handling hosts_config
+ process_list.append(Process(target=feed_hosts, args=(hosts_config_q,
+ hosts_q,)))
+ # Processes for handling the retrieval of status from arrays
+ process_list.extend([Process(target=retrieve_status_worker,
+ args=(hosts_q, results_q,)),
+ Process(target=retrieve_status_worker,
+ args=(hosts_q, results_q,))])
+
+ for p in process_list:
+ p.damon = True
+ p.start()
+
+ loop.run()
+
+ for p in process_list:
+ p.join()
+ pprint("PID(%d), exit value= %d" % (p.pid, p.exitcode))
+
+ sys.exit(0)
--
1.8.2.1
Gris Ge
2014-03-05 14:22:37 UTC
Permalink
Post by Tony Asleson
- Using session bus, change to system bus
- How do we want to handle securely holding credentials for arrays for
unattended startup
It seems we need a config file.
Post by Tony Asleson
- Implement SMI-S indications via a http web server support, this would prevent
us from having to poll SMI-S providers that implement indications
I will take the investigation work for this and keep you posted.
Post by Tony Asleson
- We are only checking on status changes of systems/pools/volumes,
nothing else is being monitored
I added 'status_info' of system/pool/volume classes which indicate the
main reason of faults(not supported by any plugin yet). That might
provided in the notification message.

"free space threshold"(configurable) could be next thing to monitor.
Post by Tony Asleson
- We should incorporate capability checking to figure out what we should be
monitoring
Please take a look and see what you think.
I tried with 'sim://', got this issue:
====
$ lsmenv sim python ./lsmalert_client.py -a sim://
python "./lsmalert_client.py" -a "sim://"
Password:

real 0m1.040s
user 0m0.088s
sys 0m0.016s
$ lsmenv sim python ./lsmalert_client.py -l -p
python "./lsmalert_client.py" -l -p
System sim-01 transistioned from OK to Degraded

(lsmalert_client.py:24313): libnotify-WARNING **: you must call
notify_init() before showing
**
libnotify:ERROR:notification.c:568:notify_notification_show: code should
not be reached
/home/fge/bin/lsmenv: line 108: 24313 Aborted python
"./lsmalert_client.py" -l -p
====

I didn't got a chance to read through the codes yet. Any quick idea?
Post by Tony Asleson
Thanks!
-Tony
--
Gris Ge
Tony Asleson
2014-03-05 17:45:53 UTC
Permalink
Post by Gris Ge
====
$ lsmenv sim python ./lsmalert_client.py -a sim://
python "./lsmalert_client.py" -a "sim://"
real 0m1.040s
user 0m0.088s
sys 0m0.016s
$ lsmenv sim python ./lsmalert_client.py -l -p
python "./lsmalert_client.py" -l -p
System sim-01 transistioned from OK to Degraded
(lsmalert_client.py:24313): libnotify-WARNING **: you must call
notify_init() before showing
Looks like I was missing a call to initialize the notification library.
I re-posted an updated patch, see if that works better. Either version
works without errors during my EL6 testing.

Regards,
Tony

Loading...