Skip to content
Snippets Groups Projects
Commit 0f0057e6 authored by Azul's avatar Azul
Browse files

move support classes into their own package

now the webapp_login test looks nice and clean. soledad next.
parent 73bdb9a9
Branches
Tags
No related merge requests found
def functions_for_system(under_test):
"""
returns a set of functions to use for nagios reporting:
>>> ok, warn, critical, unknown = functions_for_system("tested system")
each of them will print a nagios line with its argument and
return the exit code:
>>> warn("that looks strange")
1 tested system - WARNING - that looks strange
1
"""
def report_function(code):
return lambda message : report(under_test, code, message)
return map(report_function, [0,1,2,3])
def report(system, code, message):
codes = {0: 'OK', 1: 'WARNING', 2: 'CRITICAL', 3: 'UNKNOWN'}
print "%d %s - %s - %s" % \
(code, system, codes[code], message)
return code
if __name__ == "__main__":
import doctest
doctest.testmod()
import __main__ as main
import os
import sys
import nagios_report
def run(test):
"""
run takes a function and tries it out.
If it returns nothing or 0 everything is fine and run prints an OK message
with the function name.
>>> def this_works_fine(): return
>>> run(this_works_fine)
0 nagios_test.py - OK - this_works_fine
0
>>> def this_also_works_fine(): return 0
>>> run(this_also_works_fine)
0 nagios_test.py - OK - this_also_works_fine
0
If the function returns something else it will be printed as a warning.
>>> run(lambda : "this is a warning")
1 nagios_test.py - WARNING - this is a warning
1
Errors raised will result in a CRITICAL nagios string.
>>> def failure(): raise Exception("something went wrong")
>>> run(failure)
2 nagios_test.py - CRITICAL - something went wrong
2
"""
try:
name = os.path.basename(main.__file__)
except AttributeError:
name = sys.argv[0]
ok, warn, fail, unknown = nagios_report.functions_for_system(name)
try:
warning = test()
if warning and warning != 0:
code = warn(warning)
else:
code = ok(test.__name__)
except Exception as exc:
code = fail(exc.message or str(exc))
return code
if __name__ == "__main__":
import doctest
doctest.testmod()
import requests
import json
class Api():
def __init__(self, config, verify=True):
self.config = config.api
self.session = requests.session()
self.verify = verify
def api_url(self, path):
return self.api_root() + path
def api_root(self):
return "https://{domain}:{port}/{version}/".format(**self.config)
def get(self, path, **args):
response = self.session.get(self.api_url(path),
verify=self.verify,
**args)
return response.json()
def post(self, path, **args):
response = self.session.post(self.api_url(path),
verify=self.verify,
**args)
return response.json()
def put(self, path, **args):
response = self.session.put(self.api_url(path),
verify=self.verify,
**args)
return response.json()
import yaml
class Config():
def __init__(self, filename="/etc/leap/hiera.yaml"):
with open("/etc/leap/hiera.yaml", 'r') as stream:
config = yaml.load(stream)
self.user = config['webapp']['nagios_test_user']
if 'username' not in self.user:
raise Exception('nagios test user lacks username')
if 'password' not in self.user:
raise Exception('nagios test user lacks password')
self.api = config['api']
self.api['version'] = config['webapp']['api_version']
import srp._pysrp as srp
import binascii
safe_unhexlify = lambda x: binascii.unhexlify(x) if (
len(x) % 2 == 0) else binascii.unhexlify('0' + x)
class User():
def __init__(self, config):
self.config = config.user
self.srp_user = srp.User(self.config['username'], self.config['password'], srp.SHA256, srp.NG_1024)
def login(self, api):
init=self.init_authentication(api)
if ('errors' in init):
raise Exception('test user not found')
auth=self.authenticate(api, init)
if ('errors' in auth):
raise Exception('srp password auth failed')
self.verify_server(auth)
if not self.is_authenticated():
raise Exception('user is not authenticated')
def init_authentication(self, api):
uname, A = self.srp_user.start_authentication()
params = {
'login': uname,
'A': binascii.hexlify(A)
}
return api.post('sessions', data=params)
def authenticate(self, api, init):
M = self.srp_user.process_challenge(
safe_unhexlify(init['salt']), safe_unhexlify(init['B']))
auth = api.put('sessions/' + self.config["username"],
data={'client_auth': binascii.hexlify(M)})
return auth
def verify_server(self, auth):
self.srp_user.verify_session(safe_unhexlify(auth["M2"]))
def is_authenticated(self):
return self.srp_user.authenticated()
...@@ -2,96 +2,11 @@ ...@@ -2,96 +2,11 @@
# Test Authentication with the webapp API works. # Test Authentication with the webapp API works.
import requests
import json
import string import string
import random import random
import srp._pysrp as srp from support.api import Api
import binascii from support.config import Config
import yaml from support.user import User
safe_unhexlify = lambda x: binascii.unhexlify(x) if (
len(x) % 2 == 0) else binascii.unhexlify('0' + x)
class Config():
def __init__(self, filename="/etc/leap/hiera.yaml"):
with open("/etc/leap/hiera.yaml", 'r') as stream:
config = yaml.load(stream)
self.user = config['webapp']['nagios_test_user']
if 'username' not in self.user:
raise Exception('nagios test user lacks username')
if 'password' not in self.user:
raise Exception('nagios test user lacks password')
self.api = config['api']
self.api['version'] = config['webapp']['api_version']
class Api():
def __init__(self, config, verify=True):
self.config = config.api
self.session = requests.session()
self.verify = verify
def api_url(self, path):
return self.api_root() + path
def api_root(self):
return "https://{domain}:{port}/{version}/".format(**self.config)
def get(self, path, **args):
response = self.session.get(self.api_url(path),
verify=self.verify,
**args)
return response.json()
def post(self, path, **args):
response = self.session.post(self.api_url(path),
verify=self.verify,
**args)
return response.json()
def put(self, path, **args):
response = self.session.put(self.api_url(path),
verify=self.verify,
**args)
return response.json()
class User():
def __init__(self, config):
self.config = config.user
self.srp_user = srp.User(self.config['username'], self.config['password'], srp.SHA256, srp.NG_1024)
def login(self, api):
init=self.init_authentication(api)
if ('errors' in init):
raise Exception('test user not found')
auth=self.authenticate(api, init)
if ('errors' in auth):
raise Exception('srp password auth failed')
self.verify_server(auth)
if not self.is_authenticated():
raise Exception('user is not authenticated')
def init_authentication(self, api):
uname, A = self.srp_user.start_authentication()
params = {
'login': uname,
'A': binascii.hexlify(A)
}
return api.post('sessions', data=params)
def authenticate(self, api, init):
M = self.srp_user.process_challenge(
safe_unhexlify(init['salt']), safe_unhexlify(init['B']))
auth = api.put('sessions/' + self.config["username"],
data={'client_auth': binascii.hexlify(M)})
return auth
def verify_server(self, auth):
self.srp_user.verify_session(safe_unhexlify(auth["M2"]))
def is_authenticated(self):
return self.srp_user.authenticated()
def login_successfully(): def login_successfully():
config = Config() config = Config()
...@@ -100,6 +15,6 @@ def login_successfully(): ...@@ -100,6 +15,6 @@ def login_successfully():
user.login(api) user.login(api)
if __name__ == '__main__': if __name__ == '__main__':
import nagios_test from support import nagios_test
exit_code = nagios_test.run(login_successfully) exit_code = nagios_test.run(login_successfully)
exit(exit_code) exit(exit_code)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment