Skip to content
Snippets Groups Projects
Commit 8078cf0f authored by azul's avatar azul
Browse files

Merge pull request #147 from azul/test/nagios-test-signup

Test/nagios test signup
parents 3513ad74 d639e0a4
Branches
Tags 0.5.1-rc
No related merge requests found
......@@ -22,6 +22,7 @@ bin
*/Gemfile.lock
test/dummy/log/*
test/dummy/tmp/*
*.pyc
# ignore all deploy specific configuration
config/couchdb.yml
......
......@@ -7,12 +7,12 @@
import tempfile
import requests
import os
import srp._pysrp as srp
import shutil
import u1db
import webapp_login
from support.api import Api
from support.config import Config
from support.user import User
from u1db.remote.http_target import HTTPSyncTarget
......@@ -32,14 +32,6 @@ HTTPSyncTarget.set_token_credentials = set_token_credentials
HTTPSyncTarget._sign_request = _sign_request
def fail(reason):
print '2 soledad_sync - CRITICAL - ' + reason
exit(2)
# monkey patch webapp_login's fail function to report as soledad
webapp_login.fail = fail
# The following function could fetch all info needed to sync using soledad.
# Despite that, we won't use all that info because we are instead faking a
# Soledad sync by using U1DB slightly modified syncing capabilities. Part of
......@@ -47,58 +39,39 @@ webapp_login.fail = fail
# to actually use the Soledad client in the future.
def get_soledad_info(config, tempdir):
# get login and get user info
user = config['user']
api = config['api']
usr = srp.User( user['username'], user['password'], srp.SHA256, srp.NG_1024 )
try:
auth = webapp_login.parse(webapp_login.authenticate(api, usr))
except requests.exceptions.ConnectionError:
fail('no connection to server')
# get soledad server url
service_url = 'https://%s:%d/%d/config/soledad-service.json' % \
(api['domain'], api['port'], api['version'])
soledad_hosts = requests.get(service_url).json['hosts']
host = soledad_hosts.keys()[0]
server_url = 'https://%s:%d/user-%s' % \
(soledad_hosts[host]['hostname'], soledad_hosts[host]['port'],
auth['id'])
# get provider ca certificate
#ca_cert = requests.get('https://127.0.0.1/ca.crt', verify=False).text
#cert_file = os.path.join(tempdir, 'ca.crt')
cert_file = None # not used for now
#with open(cert_file, 'w') as f:
# f.write(ca_cert)
return auth['id'], user['password'], server_url, cert_file, auth['token']
def run_tests():
tempdir = tempfile.mkdtemp()
uuid, password, server_url, cert_file, token = \
get_soledad_info(webapp_login.read_config(), tempdir)
exc = None
try:
# in the future, we can replace the following by an actual Soledad
# client sync, if needed
db = u1db.open(os.path.join(tempdir, '%s.db' % uuid), True)
creds = {'token': {'uuid': uuid, 'token': token}}
db.sync(server_url, creds=creds, autocreate=False)
except Exception as e:
exc = e
shutil.rmtree(tempdir)
exit(report(exc))
def report(exc):
if exc is None:
print '0 soledad_sync - OK - can sync soledad fine'
return 0
if isinstance(exc, u1db.errors.U1DBError):
print '2 soledad_sync - CRITICAL - ' + exc.message
else:
print '2 soledad_sync - CRITICAL - ' + str(exc)
return 2
# get login and get user info
user = User(config)
api = Api(config, verify=False)
auth = user.login(api)
# get soledad server url
soledad_hosts = api.get('config/soledad-service.json')['hosts']
host = soledad_hosts.keys()[0]
server_url = 'https://%s:%d/user-%s' % \
(soledad_hosts[host]['hostname'], soledad_hosts[host]['port'],
auth['id'])
# get provider ca certificate
#ca_cert = requests.get('https://127.0.0.1/ca.crt', verify=False).text
#cert_file = os.path.join(tempdir, 'ca.crt')
cert_file = None # not used for now
#with open(cert_file, 'w') as f:
# f.write(ca_cert)
return auth['id'], server_url, cert_file, auth['token']
def can_sync_soledad_fine():
tempdir = tempfile.mkdtemp()
try:
uuid, server_url, cert_file, token = \
get_soledad_info(Config(), tempdir)
# in the future, we can replace the following by an actual Soledad
# client sync, if needed
db = u1db.open(os.path.join(tempdir, '%s.db' % uuid), True)
creds = {'token': {'uuid': uuid, 'token': token}}
db.sync(server_url, creds=creds, autocreate=False)
finally:
shutil.rmtree(tempdir)
if __name__ == '__main__':
run_tests()
from support import nagios_test
exit_code = nagios_test.run(can_sync_soledad_fine)
exit(exit_code)
import requests
import json
class Api():
def __init__(self, config, verify=True):
self.config = config.api
self.session = requests.session()
self.verify = verify
def api_url(self, path):
return self.api_root() + path
def api_root(self):
return "https://{domain}:{port}/{version}/".format(**self.config)
def get(self, path, **args):
response = self.session.get(self.api_url(path),
verify=self.verify,
**args)
return self.parse_json(response)
def post(self, path, **args):
response = self.session.post(self.api_url(path),
verify=self.verify,
**args)
return self.parse_json(response)
def put(self, path, **args):
response = self.session.put(self.api_url(path),
verify=self.verify,
**args)
return self.parse_json(response)
def parse_json(self, response):
try:
return response.json()
except TypeError:
return response.json # older versions of requests
import yaml
class Config():
def __init__(self, filename="/etc/leap/hiera.yaml"):
with open("/etc/leap/hiera.yaml", 'r') as stream:
config = yaml.load(stream)
self.user = config['webapp']['nagios_test_user']
if 'username' not in self.user:
raise Exception('nagios test user lacks username')
if 'password' not in self.user:
raise Exception('nagios test user lacks password')
self.api = config['api']
self.api['version'] = config['webapp']['api_version']
def functions_for_system(under_test):
"""
returns a set of functions to use for nagios reporting:
>>> ok, warn, critical, unknown = functions_for_system("tested system")
each of them will print a nagios line with its argument and
return the exit code:
>>> warn("that looks strange")
1 tested system - WARNING - that looks strange
1
"""
def report_function(code):
return lambda message : report(under_test, code, message)
return map(report_function, [0,1,2,3])
def report(system, code, message):
codes = {0: 'OK', 1: 'WARNING', 2: 'CRITICAL', 3: 'UNKNOWN'}
print "%d %s - %s - %s" % \
(code, system, codes[code], message)
return code
if __name__ == "__main__":
import doctest
doctest.testmod()
import __main__ as main
import os
import sys
import nagios_report
def run(test):
"""
run takes a function and tries it out.
If it returns nothing or 0 everything is fine and run prints an OK message
with the function name.
>>> def this_works_fine(): return
>>> run(this_works_fine)
0 nagios_test.py - OK - this_works_fine
0
>>> def this_also_works_fine(): return 0
>>> run(this_also_works_fine)
0 nagios_test.py - OK - this_also_works_fine
0
If the function returns something else it will be printed as a warning.
>>> run(lambda : "this is a warning")
1 nagios_test.py - WARNING - this is a warning
1
Errors raised will result in a CRITICAL nagios string.
>>> def failure(): raise Exception("something went wrong")
>>> run(failure)
2 nagios_test.py - CRITICAL - something went wrong
2
"""
try:
name = os.path.basename(main.__file__)
except AttributeError:
name = sys.argv[0]
ok, warn, fail, unknown = nagios_report.functions_for_system(name)
try:
warning = test()
if warning and warning != 0:
code = warn(warning)
else:
code = ok(test.__name__)
except Exception as exc:
code = fail(exc.message or str(exc))
return code
if __name__ == "__main__":
import doctest
doctest.testmod()
import srp._pysrp as srp
import binascii
import string
import random
safe_unhexlify = lambda x: binascii.unhexlify(x) if (
len(x) % 2 == 0) else binascii.unhexlify('0' + x)
# let's have some random name and password
def id_generator(size=6, chars=string.ascii_lowercase + string.digits):
return ''.join(random.choice(chars) for x in range(size))
class User():
def __init__(self, config = None):
if config and config.user:
self.username = config.user["username"]
self.password = config.user["password"]
else:
self.username = 'test_' + id_generator()
self.password = id_generator() + id_generator()
self.srp_user = srp.User(self.username, self.password, srp.SHA256, srp.NG_1024)
def signup(self, api):
salt, vkey = srp.create_salted_verification_key( self.username, self.password, srp.SHA256, srp.NG_1024 )
user_params = {
'user[login]': self.username,
'user[password_verifier]': binascii.hexlify(vkey),
'user[password_salt]': binascii.hexlify(salt)
}
return api.post('users.json', data = user_params)
def login(self, api):
init=self.init_authentication(api)
if ('errors' in init):
raise Exception('test user not found')
auth=self.authenticate(api, init)
if ('errors' in auth):
raise Exception('srp password auth failed')
self.verify_server(auth)
if not self.is_authenticated():
raise Exception('user is not authenticated')
return auth
def init_authentication(self, api):
uname, A = self.srp_user.start_authentication()
params = {
'login': uname,
'A': binascii.hexlify(A)
}
return api.post('sessions', data=params)
def authenticate(self, api, init):
M = self.srp_user.process_challenge(
safe_unhexlify(init['salt']), safe_unhexlify(init['B']))
auth = api.put('sessions/' + self.username,
data={'client_auth': binascii.hexlify(M)})
return auth
def verify_server(self, auth):
self.srp_user.verify_session(safe_unhexlify(auth["M2"]))
def is_authenticated(self):
return self.srp_user.authenticated()
......@@ -2,85 +2,17 @@
# Test Authentication with the webapp API works.
import requests
import json
import string
import random
import srp._pysrp as srp
import binascii
import yaml
from support.api import Api
from support.config import Config
from support.user import User
safe_unhexlify = lambda x: binascii.unhexlify(x) if (
len(x) % 2 == 0) else binascii.unhexlify('0' + x)
def read_config():
with open("/etc/leap/hiera.yaml", 'r') as stream:
config = yaml.load(stream)
user = config['webapp']['nagios_test_user']
if 'username' not in user:
fail('nagios test user lacks username')
if 'password' not in user:
fail('nagios test user lacks password')
api = config['api']
api['version'] = config['webapp']['api_version']
return {'api': api, 'user': user}
def run_tests(config):
user = config['user']
api = config['api']
usr = srp.User(user['username'], user['password'], srp.SHA256, srp.NG_1024)
try:
auth = parse(authenticate(api, usr))
except requests.exceptions.ConnectionError:
fail('no connection to server')
exit(report(auth, usr))
# parse the server responses
def parse(response):
request = response.request
try:
return json.loads(response.text)
except ValueError:
return None
def authenticate(api, usr):
api_url = "https://{domain}:{port}/{version}".format(**api)
session = requests.session()
uname, A = usr.start_authentication()
params = {
'login': uname,
'A': binascii.hexlify(A)
}
init = parse(
session.post(api_url + '/sessions', data=params, verify=False))
if ('errors' in init):
fail('test user not found')
M = usr.process_challenge(
safe_unhexlify(init['salt']), safe_unhexlify(init['B']))
return session.put(api_url + '/sessions/' + uname, verify=False,
data={'client_auth': binascii.hexlify(M)})
def report(auth, usr):
if ('errors' in auth):
fail('srp password auth failed')
usr.verify_session(safe_unhexlify(auth["M2"]))
if usr.authenticated():
print '0 webapp_login - OK - can login to webapp fine'
return 0
print '1 webapp_login - WARNING - failed to verify webapp server'
return 1
def fail(reason):
print '2 webapp_login - CRITICAL - ' + reason
exit(2)
def login_successfully():
config = Config()
user = User(config)
api = Api(config, verify=False)
user.login(api)
if __name__ == '__main__':
run_tests(read_config())
from support import nagios_test
exit_code = nagios_test.run(login_successfully)
exit(exit_code)
#!/usr/bin/env python
# Test Signup and Login with the webapp API works.
from support.api import Api
from support.config import Config
from support.user import User
def signup_successfully():
config = Config()
user = User()
api = Api(config, verify=False)
user.signup(api)
user.login(api)
if __name__ == '__main__':
from support import nagios_test
exit_code = nagios_test.run(signup_successfully)
exit(exit_code)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment