import os import requests import threading #TESBACKEND = "https://strudel2-api-dev.cloud.cvl.org.au/tes" TESBACKEND = "http://127.0.0.1:8080" TWSPROXY = "http://127.0.0.1:8090" #TESBACKEND = "https://strudel2-api-dev.cloud.cvl.org.au/tes" #TWSPROXY = "https://strudel2-api-dev.cloud.cvl.org.au" FRONTEND = "strudel2-dev.cloud.cvl.org.au" COMPUTESITE = ("https://{}/assets/config/computesites.json".format(FRONTEND), "M3") APPNAME = "Desktop" ACTION = "Connect" KEY = os.path.expanduser("~/.ssh/ssossh-key") CERT = os.path.expanduser("~/.ssh/ssossh-key-cert.pub") s = requests.Session() def get_apps(cs,user=None): """ Using the defintion of the compute site, find out what apps are supported Note comptue sites can either provide a URL with their app definitions or a command to run with their app definitions """ if 'appCatalogUri' in cs and cs['appCatalogUri'] is not None: if "https" in cs['appCatalogUri']: path = cs['appCatalogUri'] else: path = "https://{}/{}".format(FRONTEND, cs['appCatalogUri'].lstrip('.\\')) apps = s.get(path).json() return apps if cs['appCatalogCmd'] is not None: cmd = cs.copy() cmd['statcmd'] = cmd['appCatalogCmd'] r = stat(cmd,user) return r #raise Exception("appCatalogCmd is not implemented in this test suite") def stat(cs, user): """ Run something like squeue to determine what jobs the user has running """ import json cookies = {} for k,v in s.cookies.items(): cookies[k] = v params = {'statcmd': json.dumps(cs['statcmd']), 'host': json.dumps(cs['host']), 'username': json.dumps(user)} r = s.get("{}/stat".format(TESBACKEND), params=params, cookies=cookies) return r.json() def buildParams(cs, user, app, submitcmd="", cancelcmd=""): import json interface = json.dumps({'submitcmd': submitcmd, 'cancelcmd': cancelcmd}) identity = json.dumps({'site': {'host': cs['host']}, 'username': user}) return {'app': app, 'identity': identity, 'interface': interface} def get_instance(cs, user, job, app, action): """ Get parameters such as the port the application is running on and the token or password to access the app """ import json params = buildParams(cs, user, app) params['cmd'] = json.dumps(action['paramscmd']) r = s.get("{}/appinstance/{}/{}/{}/{}".format( TESBACKEND, user, cs['host'], job['batch_host'], jobid), params=params) inst = r.json() return inst def create_tunnel(cs, user, job, inst, action): """ Create an SSH tunnel the the port specified in the inst object """ s.post("{}/createtunnel/{}/{}/{}".format( TESBACKEND, user, cs['host'], job['batch_host']), json=inst) return def get_url(job, inst, action): """ Figure out the correct URL templateing things like passwords and tokens """ import json pseudoapp = {'client': {'redir': action['client']['redir'], 'cmd': action['client']['cmd']}} params = {'app': json.dumps(pseudoapp), 'appinst': json.dumps(inst)} r = s.get("{}/appurl".format(TESBACKEND), params=params) return r.text def connect_app(url): """ See if we can connect to the application We don't test the content, just that the return code is 200 """ from urllib.parse import urlparse, parse_qs import json p = urlparse(json.loads(url)) r = s.get("{}://{}{}".format(p.scheme, p.netloc, p.path), params=parse_qs(p.query)) if not (r.status_code == 200): raise Exception print("Connection succesfull") def start_app(cs, user, app): """ Perform job submission The sbatch command here is OK for testing, but is not the real sbatch used To get the Real sbatch command I would need a javascript engine in this test pipeline """ sbatch = "sbatch -n 1 --time=00:01:00" params = buildParams(cs, user, app, submitcmd=sbatch) body = {'app': app} r = s.post("{}/submit".format(TESBACKEND), params=params, json=body) stdout = r.json() # On our test system we should see # "Submitted batch job NNNNNN" jobid = int(stdout.split()[3]) return jobid def wait_app(cs, user, jobid): """ Wait for up to 5 minutes for the job to start """ import time import datetime interval = 5 timeout = 300 # seconds start = datetime.datetime.now() while (datetime.datetime.now()-start).seconds < timeout: jobs = stat(cs, user) try: job = list(filter(lambda x: "{}".format(x['jobid']) == "{}".format(jobid), jobs))[0] except IndexError: raise Exception("Job disapeared from the queue") if job['state'] == 'RUNNING': return job if job['state'] == 'Finished': raise Exception("Job terminated before we could test it") time.sleep(interval) def kill_app(cs, user, jobid): params = buildParams(cs, user, app, cancelcmd=cs['cancelcmd']) s.delete("{}/cancel/{}".format(TESBACKEND, jobid), params=params) """ Translate the globals COMPUTESITE, APPNAME and ACTION into their object representations """ sites = s.get(COMPUTESITE[0]) """ Setup the authorisation This assumes the files KEY and CERT are present contain a valid SSH cert pair """ r = s.get("{}/sshagent".format(TESBACKEND)) cookies = {} for k,v in s.cookies.items(): cookies[k] = v with open(KEY, 'r') as f: key = f.read() with open(CERT, 'r') as f: cert = f.read() contents = {"key": key, "cert": cert} r = s.post("{}/sshagent".format(TESBACKEND), json=contents, cookies = cookies) """ Verify Authorization worked If stat fails at the end of this block, there is little point continuting """ r = s.get("{}/sshagent".format(TESBACKEND), cookies = cookies) auths = r.json() foundacert = False for a in auths: if 'Signing CA' in a: foundacert = True # # Just assume we want to use the # first user the cert is valid for # user = a['Principals'][0] if not foundacert: raise Exception("Failed to setup a cert") cs = list(filter(lambda x: x['name'] == COMPUTESITE[1], sites.json()))[0] while True: nthreads=10 threads = [] for i in range(0,nthreads): threads.append(threading.Thread(target=stat,args=(cs,user))) threads[-1].start() for t in threads: t.join() r = stat(cs,user) print(r) #qstat = stat(cs, user) #print(qstat) RUNAPP=False if RUNAPP: app = list(filter(lambda x: x['name'] == APPNAME, get_apps(cs,user)))[0] action = list(filter(lambda x: x['name'] == ACTION, app['instactions']))[0] """ Perform the sequence of running the app etc. This is: start_app (run sbatch get a jobid) wait_app (wait for the task to transition from pending to running, also returns the host its running on) get_instance (get information such as the port the app is running on and the token/password for this app) create_tunnel (create the SSH tunnel connection to the correct port) get_url (template all parameters so we know what URL the app is served on) connect_app (perform a basic request and verify a 200 OK result) kill_app (perform scancel) """ jobid = start_app(cs, user, app) try: job = wait_app(cs, user, jobid) inst = get_instance(cs, user, job, app, action) if 'message' in inst: kill_app(cs, user, jobid) create_tunnel(cs, user, job, inst, action) url = get_url(job, inst, action).format(twsproxy=TWSPROXY) connect_app(url) except Exception: kill_app(cs, user, jobid) exit(1) kill_app(cs, user, jobid) print("deleting keys from agent") """ Logout """ s.delete("{}/sshagent".format(TESBACKEND))