diff --git a/tes/__init__.py b/tes/__init__.py index 63d92c3a689b36e5b3f9321d297d08e06b9c1d90..ff6d3ba7ef30d73658f3facf0a058b8c628f20c4 100644 --- a/tes/__init__.py +++ b/tes/__init__.py @@ -1,7 +1,6 @@ import os from flask import Flask, request, session from flask_restful import Api, Resource -#from flask_cors import CORS, cross_origin from .tunnelstat import SSHSession app = Flask(__name__) @@ -17,7 +16,8 @@ app.config['SESSION_COOKIE_SAMESITE'] = None app.config['APPLICATION_ROOT'] = '/' if 'FLASK_ENV' in os.environ and os.environ['FLASK_ENV'] == 'development': DEV=True - app.config['SECRET_KEY'] = 'notverysecret' + #app.config['SECRET_KEY'] = 'notverysecret' + app.config['SECRET_KEY'] = os.urandom(12).hex() app.config['DEBUG'] = True else: DEV=False diff --git a/tes/apiendpoints.py b/tes/apiendpoints.py index e897a07985c89276f26e147b068f998ebf8984e8..3e419bf563640281c70eca0a85bfdea077170011 100644 --- a/tes/apiendpoints.py +++ b/tes/apiendpoints.py @@ -180,7 +180,6 @@ class TunnelstatEP(Resource): class ContactUs(Resource): def post(self): import tempfile - print(request.get_json()) data = request.get_json() f = tempfile.NamedTemporaryFile(mode='w+b',dir=app.config['MESSAGES'],delete=False) f.write(json.dumps(data).encode()) diff --git a/tes/sshwrapper/__init__.py b/tes/sshwrapper/__init__.py index ba596f5030dc398b606726c762c42dba0f09411c..999e26a6a2a649ce27b0d4815f04e8f79e435501 100644 --- a/tes/sshwrapper/__init__.py +++ b/tes/sshwrapper/__init__.py @@ -106,101 +106,110 @@ class Ssh: import logging logger = logging.getLogger() sess.lock.acquire() - ctrlsocket = "/tmp/cm-{}-{}".format(user,host) + ctrlsocket = "/tmp/cm-{}-{}-{}".format(user,host,sess.sshsessid) try: mode = os.stat(ctrlsocket).st_mode - # If the control process died (docker restarted, or otherwise exited) but the socket was not removed: - if mode is not None and ctrlsocket not in sess.ctrl_processes: - os.unlink(ctrlsocket) - mode = None - if mode is not None and sess.ctrl_processes[ctrlsocket].returncode is not None: - ctrl_p = sess.ctrl_processes[ctrlsocket] - logger.debug('The control socket exists, but the process has returned {}'.format(ctrl_p.returncode)) - (stderr,stdout) = ctrl_p.communicate() - logger.debug('ctrl_p stderr {}'.format(stderr)) - logger.debug('ctrl_p stdout {}'.format(stdout)) - os.unlink(ctrlsocket) - mode = None # This is expected: It indicates there is not yet a control master for this user on this login node except FileNotFoundError as e: mode = None + # This is a catchall for exceptions I've never seen. log them. except Exception as e: logger.error('attempting to access control master socket raised exception') logger.error(e) import traceback logger.error(traceback.format_exc()) + sess.lock.release() + raise(e) - if mode is None: - # I an existing control process is recoreded, but we are starting a new one, try to delete the old one - if ctrlsocket in sess.ctrl_processes: - try: - (stderr,stdout) = sess.ctrl_processes[ctrlsocket].communicate() - logger.debug("control master terminated with stderr {}".format(stderr)) - logger.debug("control master terminated with stdout {}".format(stdout)) - os.killpg(int(sess.ctrl_processes[ctrlsocket].pid),signal.SIGKILL) - sess.ctrl_processes[ctrlsocket].communicate() - del sess.ctrl_processes[ctrlsocket] - except: - pass + # If the control process died (docker restarted, or otherwise exited) but the socket was not removed: + if mode is not None and ctrlsocket not in sess.ctrl_processes: + os.unlink(ctrlsocket) + mode = None + # I'm not sure I've ever observed this condition in practice + if mode is not None and ctrlsocket in sess.ctrl_processes and sess.ctrl_processes[ctrlsocket].returncode is not None: + ctrl_p = sess.ctrl_processes[ctrlsocket] + logger.debug('The control socket exists, but the process has returned {}'.format(ctrl_p.returncode)) + (stderr,stdout) = ctrl_p.communicate() + logger.debug('ctrl_p stderr {}'.format(stderr)) + logger.debug('ctrl_p stdout {}'.format(stdout)) + os.unlink(ctrlsocket) + mode = None - sshcmd = Ssh.SSHCMDWITHOPTS.copy() - sshcmd.extend([ - "-S", ctrlsocket, - "-M", '-o', 'ControlPersist=60s', - '-p', sshport, '-N','-l', user, host]) - env = os.environ.copy() - if sess.socket is None: + if mode is not None: + sess.lock.release() + logger.debug('leaving get_ctrl_master_socket {}'.format(sess.ctrl_processes[ctrlsocket].returncode)) + return ctrlsocket + + # If an existing control process is recoreded, but the socket is gone, try to kill and restart + if mode is None and ctrlsocket in sess.ctrl_processes: + try: + logger.error('The socket disapeared. The pid is recoreded, lets kill and restart') + p = sess.ctrl_processes[ctrlsocket] + del sess.ctrl_processes[ctrlsocket] + os.killpg(int(p.pid),signal.SIGKILL) + (stderr,stdout) = p.communicate() + # we don't need os.unlink(ctrlsocket) be3cause we got here with mode=None indicatating that we couldn't stat the ctrlsocket + except: + pass + + sshcmd = Ssh.SSHCMDWITHOPTS.copy() + sshcmd.extend([ + "-S", ctrlsocket, + "-M", '-o', 'ControlPersist=60s', + '-p', sshport, '-N','-l', user, host]) + env = os.environ.copy() + if sess.socket is None: + sess.lock.release() + raise SshAgentException("No ssh-agent yet") + env['SSH_AUTH_SOCK'] = sess.socket + logger.debug("creating master socket") + logger.debug(" ".join(sshcmd)) + ctrl_p = subprocess.Popen(sshcmd, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + stdin=None, + env=env) + #logger.debug("master spawned, attempt communicate") + #stdout, stderr = ctrl_p.communicate() + #logger.debug('communicate on the control port complete') + logger.debug("spanwed ssh mux with pid {}".format(ctrl_p.pid)) + #sess.pids.append(ctrl_p.pid) + if ctrlsocket in sess.ctrl_processes: + logger.debug('existing control process!!!') + old_ctrl_p = sess.ctrl_processes[ctrlsocket] + old_ctrl_p.poll() + if old_ctrl_p.returncode is not None: + logger.debug('old ctrl_p is still running?') + old_ctrl_p.kill() + (stderr,stdout) = old_ctrl_p.communicate() + logger.debug('{} {}'.format(stderr,stdout)) + sess.ctrl_processes[ctrlsocket] = ctrl_p + notstarted = True + notdead = True + wait=0 + while notstarted and notdead: + import time + ctrl_p.poll() + if ctrl_p.returncode != None: + notdead = False + (stdout,stderr) = ctrl_p.communicate() + logger.error('ctrl_p died {} {} {}'.format(ctrl_p.returncode,stdout,stderr)) sess.lock.release() - raise SshAgentException("No ssh-agent yet") - env['SSH_AUTH_SOCK'] = sess.socket - logger.debug("creating master socket") - logger.debug(" ".join(sshcmd)) - ctrl_p = subprocess.Popen(sshcmd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - stdin=None, - env=env) - #logger.debug("master spawned, attempt communicate") - #stdout, stderr = ctrl_p.communicate() - #logger.debug('communicate on the control port complete') - logger.debug("spanwed ssh mux with pid {}".format(ctrl_p.pid)) - #sess.pids.append(ctrl_p.pid) - if ctrlsocket in sess.ctrl_processes: - logger.debug('existing control process!!!') - old_ctrl_p = sess.ctrl_processes[ctrlsocket] - old_ctrl_p.poll() - if old_ctrl_p.returncode is not None: - logger.debug('old ctrl_p is still running?') - old_ctrl_p.kill() - (stderr,stdout) = old_ctrl_p.communicate() - logger.debug('{} {}'.format(stderr,stdout)) - sess.ctrl_processes[ctrlsocket] = ctrl_p - notstarted = True - notdead = True - wait=0 - while notstarted and notdead: - import time - ctrl_p.poll() - if ctrl_p.returncode != None: - notdead = False - (stdout,stderr) = ctrl_p.communicate() - logger.error('ctrl_p died {} {} {}'.format(ctrl_p.returncode,stdout,stderr)) - sess.lock.release() - raise SshCtrlException(stderr.decode()) - try: - mode = os.stat(ctrlsocket).st_mode - logger.debug('control socket appeared, it must have started') - notstarted=False - except FileNotFoundError: - logger.debug('ctrl_p not started yet') - time.sleep(0.5) - wait=wait+1 - if wait>60: - ctrl_p.kill() - (stdout,stderr) = ctrl_p.communicate() - logger.error('ctrl_p was killed due to timeout {} {} {}'.format(ctrl_p.returncode,stdout,stderr)) - sess.lock.release() - raise SshCtrlException(stderr.decode()) - logger.debug('exiting if section') + + raise SshCtrlException(stderr.decode()) + try: + mode = os.stat(ctrlsocket).st_mode + logger.debug('control socket appeared, it must have started') + notstarted=False + except FileNotFoundError: + logger.debug('ctrl_p not started yet') + time.sleep(0.5) + wait=wait+1 + if wait>60: + ctrl_p.kill() + (stdout,stderr) = ctrl_p.communicate() + logger.error('ctrl_p was killed due to timeout {} {} {}'.format(ctrl_p.returncode,stdout,stderr)) + sess.lock.release() + raise SshCtrlException(stderr.decode()) sess.lock.release() logger.debug('leaving get_ctrl_master_socket {}'.format(sess.ctrl_processes[ctrlsocket].returncode)) diff --git a/tes/tunnelstat/__init__.py b/tes/tunnelstat/__init__.py index a2b63741fe455335154bee9cbbd95973723bc426..559e7dcfe6857f8613b7174d0ab46e752b3460bf 100644 --- a/tes/tunnelstat/__init__.py +++ b/tes/tunnelstat/__init__.py @@ -12,7 +12,7 @@ class SSHSession: """Interfaces for working with processes forked from flask in particular, we fork processes for ssh-agent and ssh tunnels and execution """ - def __init__(self,**kwargs): + def __init__(self,sshsessid,**kwargs): self.last = datetime.datetime.now() self.socket = None self.token = None @@ -28,6 +28,7 @@ class SSHSession: self.ctrl_processes = {} self.lock = threading.Lock() self.tunnels = [] + self.sshsessid = sshsessid def start_agent(self): import subprocess @@ -237,7 +238,7 @@ class SSHSession: import random import string from .. import sshsessions - from flask import session + from flask import session, request sshsessid = session.get('sshsessid', None) N = 8 while sshsessid is None: @@ -246,18 +247,18 @@ class SSHSession: sshsessid = key session['sshsessid'] = sshsessid if sshsessid not in sshsessions: - sshsessions[sshsessid] = SSHSession() + sshsessions[sshsessid] = SSHSession(sshsessid) - session = sshsessions[sshsessid] + sshsession = sshsessions[sshsessid] try: - SSHSession.test_sshsession(session) + SSHSession.test_sshsession(sshsession) except SshAgentException: - session.kill() - sshsessions[sshsessid] = SSHSession() - session = sshsessions[sshsessid] - SSHSession.test_sshsession(session) + sshsession.kill() + sshsessions[sshsessid] = SSHSession(sshsessid) + sshsession = sshsessions[sshsessid] + SSHSession.test_sshsession(sshsession) - return session + return sshsession @staticmethod def remove_sshsession(): diff --git a/test_backend.py b/test_backend.py new file mode 100644 index 0000000000000000000000000000000000000000..02bd3815d43c954404c3ea4e17470c84fb9e0c6c --- /dev/null +++ b/test_backend.py @@ -0,0 +1,258 @@ +import os +import requests +import threading + +#TESBACKEND = "https://strudel2-api-dev.cloud.cvl.org.au/tes" +TESBACKEND = "http://127.0.0.1:8080" +TWSPROXY = "http://127.0.0.1:8090" +#TESBACKEND = "https://strudel2-api-dev.cloud.cvl.org.au/tes" +#TWSPROXY = "https://strudel2-api-dev.cloud.cvl.org.au" +FRONTEND = "strudel2-dev.cloud.cvl.org.au" +COMPUTESITE = ("https://{}/assets/config/computesites.json".format(FRONTEND), + "M3") +APPNAME = "Desktop" +ACTION = "Connect" +KEY = os.path.expanduser("~/.ssh/ssossh-key") +CERT = os.path.expanduser("~/.ssh/ssossh-key-cert.pub") + + +s = requests.Session() + + +def get_apps(cs,user=None): + """ + Using the defintion of the compute site, find out what apps are supported + Note comptue sites can either provide a URL with their app definitions + or a command to run with their app definitions + """ + if 'appCatalogUri' in cs and cs['appCatalogUri'] is not None: + if "https" in cs['appCatalogUri']: + path = cs['appCatalogUri'] + else: + path = "https://{}/{}".format(FRONTEND, + cs['appCatalogUri'].lstrip('.\\')) + apps = s.get(path).json() + return apps + if cs['appCatalogCmd'] is not None: + cmd = cs.copy() + cmd['statcmd'] = cmd['appCatalogCmd'] + r = stat(cmd,user) + return r + #raise Exception("appCatalogCmd is not implemented in this test suite") + + +def stat(cs, user): + """ + Run something like squeue to determine what jobs the user has running + """ + import json + cookies = {} + for k,v in s.cookies.items(): + cookies[k] = v + params = {'statcmd': json.dumps(cs['statcmd']), + 'host': json.dumps(cs['host']), + 'username': json.dumps(user)} + r = s.get("{}/stat".format(TESBACKEND), params=params, cookies=cookies) + return r.json() + + +def buildParams(cs, user, app, submitcmd="", cancelcmd=""): + import json + interface = json.dumps({'submitcmd': submitcmd, 'cancelcmd': cancelcmd}) + identity = json.dumps({'site': {'host': cs['host']}, 'username': user}) + return {'app': app, 'identity': identity, 'interface': interface} + + +def get_instance(cs, user, job, app, action): + """ + Get parameters such as the port the application is running on and the + token or password to access the app + """ + import json + params = buildParams(cs, user, app) + params['cmd'] = json.dumps(action['paramscmd']) + r = s.get("{}/appinstance/{}/{}/{}/{}".format( + TESBACKEND, + user, + cs['host'], + job['batch_host'], + jobid), params=params) + inst = r.json() + return inst + + +def create_tunnel(cs, user, job, inst, action): + """ + Create an SSH tunnel the the port specified in the inst object + """ + s.post("{}/createtunnel/{}/{}/{}".format( + TESBACKEND, + user, + cs['host'], + job['batch_host']), + json=inst) + return + + +def get_url(job, inst, action): + """ + Figure out the correct URL templateing things like passwords and tokens + """ + import json + pseudoapp = {'client': {'redir': action['client']['redir'], + 'cmd': action['client']['cmd']}} + params = {'app': json.dumps(pseudoapp), 'appinst': json.dumps(inst)} + r = s.get("{}/appurl".format(TESBACKEND), params=params) + return r.text + + +def connect_app(url): + """ + See if we can connect to the application + We don't test the content, just that the return code is 200 + """ + from urllib.parse import urlparse, parse_qs + import json + p = urlparse(json.loads(url)) + r = s.get("{}://{}{}".format(p.scheme, p.netloc, p.path), + params=parse_qs(p.query)) + if not (r.status_code == 200): + raise Exception + print("Connection succesfull") + + +def start_app(cs, user, app): + """ + Perform job submission + The sbatch command here is OK for testing, but is not the real sbatch used + To get the Real sbatch command I would need a javascript engine in this + test pipeline + """ + sbatch = "sbatch -n 1 --time=00:01:00" + params = buildParams(cs, user, app, submitcmd=sbatch) + body = {'app': app} + r = s.post("{}/submit".format(TESBACKEND), params=params, json=body) + stdout = r.json() + # On our test system we should see + # "Submitted batch job NNNNNN" + jobid = int(stdout.split()[3]) + return jobid + + +def wait_app(cs, user, jobid): + """ + Wait for up to 5 minutes for the job to start + """ + import time + import datetime + interval = 5 + timeout = 300 # seconds + start = datetime.datetime.now() + while (datetime.datetime.now()-start).seconds < timeout: + jobs = stat(cs, user) + try: + job = list(filter(lambda x: "{}".format(x['jobid']) + == "{}".format(jobid), jobs))[0] + except IndexError: + raise Exception("Job disapeared from the queue") + if job['state'] == 'RUNNING': + return job + if job['state'] == 'Finished': + raise Exception("Job terminated before we could test it") + time.sleep(interval) + + +def kill_app(cs, user, jobid): + params = buildParams(cs, user, app, cancelcmd=cs['cancelcmd']) + s.delete("{}/cancel/{}".format(TESBACKEND, jobid), params=params) + + +""" +Translate the globals COMPUTESITE, APPNAME and ACTION +into their object representations +""" +sites = s.get(COMPUTESITE[0]) + +""" +Setup the authorisation +This assumes the files KEY and CERT are present contain a valid SSH cert pair +""" +r = s.get("{}/sshagent".format(TESBACKEND)) +cookies = {} +for k,v in s.cookies.items(): + cookies[k] = v + +with open(KEY, 'r') as f: + key = f.read() +with open(CERT, 'r') as f: + cert = f.read() +contents = {"key": key, "cert": cert} +r = s.post("{}/sshagent".format(TESBACKEND), json=contents, cookies = cookies) + +""" +Verify Authorization worked +If stat fails at the end of this block, there is little point continuting +""" +r = s.get("{}/sshagent".format(TESBACKEND), cookies = cookies) +auths = r.json() +foundacert = False +for a in auths: + if 'Signing CA' in a: + foundacert = True + # + # Just assume we want to use the + # first user the cert is valid for + # + user = a['Principals'][0] +if not foundacert: + raise Exception("Failed to setup a cert") + +cs = list(filter(lambda x: x['name'] == COMPUTESITE[1], sites.json()))[0] +while True: + nthreads=10 + threads = [] + for i in range(0,nthreads): + threads.append(threading.Thread(target=stat,args=(cs,user))) + threads[-1].start() + for t in threads: + t.join() + r = stat(cs,user) + print(r) +#qstat = stat(cs, user) +#print(qstat) +RUNAPP=False +if RUNAPP: + app = list(filter(lambda x: x['name'] == APPNAME, get_apps(cs,user)))[0] + action = list(filter(lambda x: x['name'] == ACTION, app['instactions']))[0] + """ + Perform the sequence of running the app etc. This is: + start_app (run sbatch get a jobid) + wait_app (wait for the task to transition from + pending to running, also returns the host its running on) + get_instance (get information such as the port the + app is running on and the token/password + for this app) + create_tunnel (create the SSH tunnel connection to the correct port) + get_url (template all parameters so we know what URL the app is served on) + connect_app (perform a basic request and verify a 200 OK result) + kill_app (perform scancel) + """ + jobid = start_app(cs, user, app) + try: + job = wait_app(cs, user, jobid) + inst = get_instance(cs, user, job, app, action) + if 'message' in inst: + kill_app(cs, user, jobid) + create_tunnel(cs, user, job, inst, action) + url = get_url(job, inst, action).format(twsproxy=TWSPROXY) + connect_app(url) + except Exception: + kill_app(cs, user, jobid) + exit(1) + kill_app(cs, user, jobid) + +print("deleting keys from agent") +""" +Logout +""" +s.delete("{}/sshagent".format(TESBACKEND))