Skip to content
Snippets Groups Projects
Commit 3cbc0b2b authored by Chris Hines's avatar Chris Hines
Browse files

Merge branch 'dev' of gitlab.erc.monash.edu.au:hpc-team/strudel2_backend into dev

parents 518b13b8 74446b2a
No related branches found
No related tags found
3 merge requests!77Revert "disable agressive kill and restart",!51Test,!50Dev
import os
from flask import Flask, request, session
from flask_restful import Api, Resource
#from flask_cors import CORS, cross_origin
from .tunnelstat import SSHSession
app = Flask(__name__)
......@@ -17,7 +16,8 @@ app.config['SESSION_COOKIE_SAMESITE'] = None
app.config['APPLICATION_ROOT'] = '/'
if 'FLASK_ENV' in os.environ and os.environ['FLASK_ENV'] == 'development':
DEV=True
app.config['SECRET_KEY'] = 'notverysecret'
#app.config['SECRET_KEY'] = 'notverysecret'
app.config['SECRET_KEY'] = os.urandom(12).hex()
app.config['DEBUG'] = True
else:
DEV=False
......
......@@ -180,7 +180,6 @@ class TunnelstatEP(Resource):
class ContactUs(Resource):
def post(self):
import tempfile
print(request.get_json())
data = request.get_json()
f = tempfile.NamedTemporaryFile(mode='w+b',dir=app.config['MESSAGES'],delete=False)
f.write(json.dumps(data).encode())
......
......@@ -106,101 +106,110 @@ class Ssh:
import logging
logger = logging.getLogger()
sess.lock.acquire()
ctrlsocket = "/tmp/cm-{}-{}".format(user,host)
ctrlsocket = "/tmp/cm-{}-{}-{}".format(user,host,sess.sshsessid)
try:
mode = os.stat(ctrlsocket).st_mode
# If the control process died (docker restarted, or otherwise exited) but the socket was not removed:
if mode is not None and ctrlsocket not in sess.ctrl_processes:
os.unlink(ctrlsocket)
mode = None
if mode is not None and sess.ctrl_processes[ctrlsocket].returncode is not None:
ctrl_p = sess.ctrl_processes[ctrlsocket]
logger.debug('The control socket exists, but the process has returned {}'.format(ctrl_p.returncode))
(stderr,stdout) = ctrl_p.communicate()
logger.debug('ctrl_p stderr {}'.format(stderr))
logger.debug('ctrl_p stdout {}'.format(stdout))
os.unlink(ctrlsocket)
mode = None
# This is expected: It indicates there is not yet a control master for this user on this login node
except FileNotFoundError as e:
mode = None
# This is a catchall for exceptions I've never seen. log them.
except Exception as e:
logger.error('attempting to access control master socket raised exception')
logger.error(e)
import traceback
logger.error(traceback.format_exc())
sess.lock.release()
raise(e)
if mode is None:
# I an existing control process is recoreded, but we are starting a new one, try to delete the old one
if ctrlsocket in sess.ctrl_processes:
try:
(stderr,stdout) = sess.ctrl_processes[ctrlsocket].communicate()
logger.debug("control master terminated with stderr {}".format(stderr))
logger.debug("control master terminated with stdout {}".format(stdout))
os.killpg(int(sess.ctrl_processes[ctrlsocket].pid),signal.SIGKILL)
sess.ctrl_processes[ctrlsocket].communicate()
del sess.ctrl_processes[ctrlsocket]
except:
pass
# If the control process died (docker restarted, or otherwise exited) but the socket was not removed:
if mode is not None and ctrlsocket not in sess.ctrl_processes:
os.unlink(ctrlsocket)
mode = None
# I'm not sure I've ever observed this condition in practice
if mode is not None and ctrlsocket in sess.ctrl_processes and sess.ctrl_processes[ctrlsocket].returncode is not None:
ctrl_p = sess.ctrl_processes[ctrlsocket]
logger.debug('The control socket exists, but the process has returned {}'.format(ctrl_p.returncode))
(stderr,stdout) = ctrl_p.communicate()
logger.debug('ctrl_p stderr {}'.format(stderr))
logger.debug('ctrl_p stdout {}'.format(stdout))
os.unlink(ctrlsocket)
mode = None
sshcmd = Ssh.SSHCMDWITHOPTS.copy()
sshcmd.extend([
"-S", ctrlsocket,
"-M", '-o', 'ControlPersist=60s',
'-p', sshport, '-N','-l', user, host])
env = os.environ.copy()
if sess.socket is None:
if mode is not None:
sess.lock.release()
logger.debug('leaving get_ctrl_master_socket {}'.format(sess.ctrl_processes[ctrlsocket].returncode))
return ctrlsocket
# If an existing control process is recoreded, but the socket is gone, try to kill and restart
if mode is None and ctrlsocket in sess.ctrl_processes:
try:
logger.error('The socket disapeared. The pid is recoreded, lets kill and restart')
p = sess.ctrl_processes[ctrlsocket]
del sess.ctrl_processes[ctrlsocket]
os.killpg(int(p.pid),signal.SIGKILL)
(stderr,stdout) = p.communicate()
# we don't need os.unlink(ctrlsocket) be3cause we got here with mode=None indicatating that we couldn't stat the ctrlsocket
except:
pass
sshcmd = Ssh.SSHCMDWITHOPTS.copy()
sshcmd.extend([
"-S", ctrlsocket,
"-M", '-o', 'ControlPersist=60s',
'-p', sshport, '-N','-l', user, host])
env = os.environ.copy()
if sess.socket is None:
sess.lock.release()
raise SshAgentException("No ssh-agent yet")
env['SSH_AUTH_SOCK'] = sess.socket
logger.debug("creating master socket")
logger.debug(" ".join(sshcmd))
ctrl_p = subprocess.Popen(sshcmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=None,
env=env)
#logger.debug("master spawned, attempt communicate")
#stdout, stderr = ctrl_p.communicate()
#logger.debug('communicate on the control port complete')
logger.debug("spanwed ssh mux with pid {}".format(ctrl_p.pid))
#sess.pids.append(ctrl_p.pid)
if ctrlsocket in sess.ctrl_processes:
logger.debug('existing control process!!!')
old_ctrl_p = sess.ctrl_processes[ctrlsocket]
old_ctrl_p.poll()
if old_ctrl_p.returncode is not None:
logger.debug('old ctrl_p is still running?')
old_ctrl_p.kill()
(stderr,stdout) = old_ctrl_p.communicate()
logger.debug('{} {}'.format(stderr,stdout))
sess.ctrl_processes[ctrlsocket] = ctrl_p
notstarted = True
notdead = True
wait=0
while notstarted and notdead:
import time
ctrl_p.poll()
if ctrl_p.returncode != None:
notdead = False
(stdout,stderr) = ctrl_p.communicate()
logger.error('ctrl_p died {} {} {}'.format(ctrl_p.returncode,stdout,stderr))
sess.lock.release()
raise SshAgentException("No ssh-agent yet")
env['SSH_AUTH_SOCK'] = sess.socket
logger.debug("creating master socket")
logger.debug(" ".join(sshcmd))
ctrl_p = subprocess.Popen(sshcmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=None,
env=env)
#logger.debug("master spawned, attempt communicate")
#stdout, stderr = ctrl_p.communicate()
#logger.debug('communicate on the control port complete')
logger.debug("spanwed ssh mux with pid {}".format(ctrl_p.pid))
#sess.pids.append(ctrl_p.pid)
if ctrlsocket in sess.ctrl_processes:
logger.debug('existing control process!!!')
old_ctrl_p = sess.ctrl_processes[ctrlsocket]
old_ctrl_p.poll()
if old_ctrl_p.returncode is not None:
logger.debug('old ctrl_p is still running?')
old_ctrl_p.kill()
(stderr,stdout) = old_ctrl_p.communicate()
logger.debug('{} {}'.format(stderr,stdout))
sess.ctrl_processes[ctrlsocket] = ctrl_p
notstarted = True
notdead = True
wait=0
while notstarted and notdead:
import time
ctrl_p.poll()
if ctrl_p.returncode != None:
notdead = False
(stdout,stderr) = ctrl_p.communicate()
logger.error('ctrl_p died {} {} {}'.format(ctrl_p.returncode,stdout,stderr))
sess.lock.release()
raise SshCtrlException(stderr.decode())
try:
mode = os.stat(ctrlsocket).st_mode
logger.debug('control socket appeared, it must have started')
notstarted=False
except FileNotFoundError:
logger.debug('ctrl_p not started yet')
time.sleep(0.5)
wait=wait+1
if wait>60:
ctrl_p.kill()
(stdout,stderr) = ctrl_p.communicate()
logger.error('ctrl_p was killed due to timeout {} {} {}'.format(ctrl_p.returncode,stdout,stderr))
sess.lock.release()
raise SshCtrlException(stderr.decode())
logger.debug('exiting if section')
raise SshCtrlException(stderr.decode())
try:
mode = os.stat(ctrlsocket).st_mode
logger.debug('control socket appeared, it must have started')
notstarted=False
except FileNotFoundError:
logger.debug('ctrl_p not started yet')
time.sleep(0.5)
wait=wait+1
if wait>60:
ctrl_p.kill()
(stdout,stderr) = ctrl_p.communicate()
logger.error('ctrl_p was killed due to timeout {} {} {}'.format(ctrl_p.returncode,stdout,stderr))
sess.lock.release()
raise SshCtrlException(stderr.decode())
sess.lock.release()
logger.debug('leaving get_ctrl_master_socket {}'.format(sess.ctrl_processes[ctrlsocket].returncode))
......
......@@ -12,7 +12,7 @@ class SSHSession:
"""Interfaces for working with processes forked from flask
in particular, we fork processes for ssh-agent and ssh tunnels and execution
"""
def __init__(self,**kwargs):
def __init__(self,sshsessid,**kwargs):
self.last = datetime.datetime.now()
self.socket = None
self.token = None
......@@ -28,6 +28,7 @@ class SSHSession:
self.ctrl_processes = {}
self.lock = threading.Lock()
self.tunnels = []
self.sshsessid = sshsessid
def start_agent(self):
import subprocess
......@@ -237,7 +238,7 @@ class SSHSession:
import random
import string
from .. import sshsessions
from flask import session
from flask import session, request
sshsessid = session.get('sshsessid', None)
N = 8
while sshsessid is None:
......@@ -246,18 +247,18 @@ class SSHSession:
sshsessid = key
session['sshsessid'] = sshsessid
if sshsessid not in sshsessions:
sshsessions[sshsessid] = SSHSession()
sshsessions[sshsessid] = SSHSession(sshsessid)
session = sshsessions[sshsessid]
sshsession = sshsessions[sshsessid]
try:
SSHSession.test_sshsession(session)
SSHSession.test_sshsession(sshsession)
except SshAgentException:
session.kill()
sshsessions[sshsessid] = SSHSession()
session = sshsessions[sshsessid]
SSHSession.test_sshsession(session)
sshsession.kill()
sshsessions[sshsessid] = SSHSession(sshsessid)
sshsession = sshsessions[sshsessid]
SSHSession.test_sshsession(sshsession)
return session
return sshsession
@staticmethod
def remove_sshsession():
......
import os
import requests
import threading
#TESBACKEND = "https://strudel2-api-dev.cloud.cvl.org.au/tes"
TESBACKEND = "http://127.0.0.1:8080"
TWSPROXY = "http://127.0.0.1:8090"
#TESBACKEND = "https://strudel2-api-dev.cloud.cvl.org.au/tes"
#TWSPROXY = "https://strudel2-api-dev.cloud.cvl.org.au"
FRONTEND = "strudel2-dev.cloud.cvl.org.au"
COMPUTESITE = ("https://{}/assets/config/computesites.json".format(FRONTEND),
"M3")
APPNAME = "Desktop"
ACTION = "Connect"
KEY = os.path.expanduser("~/.ssh/ssossh-key")
CERT = os.path.expanduser("~/.ssh/ssossh-key-cert.pub")
s = requests.Session()
def get_apps(cs,user=None):
"""
Using the defintion of the compute site, find out what apps are supported
Note comptue sites can either provide a URL with their app definitions
or a command to run with their app definitions
"""
if 'appCatalogUri' in cs and cs['appCatalogUri'] is not None:
if "https" in cs['appCatalogUri']:
path = cs['appCatalogUri']
else:
path = "https://{}/{}".format(FRONTEND,
cs['appCatalogUri'].lstrip('.\\'))
apps = s.get(path).json()
return apps
if cs['appCatalogCmd'] is not None:
cmd = cs.copy()
cmd['statcmd'] = cmd['appCatalogCmd']
r = stat(cmd,user)
return r
#raise Exception("appCatalogCmd is not implemented in this test suite")
def stat(cs, user):
"""
Run something like squeue to determine what jobs the user has running
"""
import json
cookies = {}
for k,v in s.cookies.items():
cookies[k] = v
params = {'statcmd': json.dumps(cs['statcmd']),
'host': json.dumps(cs['host']),
'username': json.dumps(user)}
r = s.get("{}/stat".format(TESBACKEND), params=params, cookies=cookies)
return r.json()
def buildParams(cs, user, app, submitcmd="", cancelcmd=""):
import json
interface = json.dumps({'submitcmd': submitcmd, 'cancelcmd': cancelcmd})
identity = json.dumps({'site': {'host': cs['host']}, 'username': user})
return {'app': app, 'identity': identity, 'interface': interface}
def get_instance(cs, user, job, app, action):
"""
Get parameters such as the port the application is running on and the
token or password to access the app
"""
import json
params = buildParams(cs, user, app)
params['cmd'] = json.dumps(action['paramscmd'])
r = s.get("{}/appinstance/{}/{}/{}/{}".format(
TESBACKEND,
user,
cs['host'],
job['batch_host'],
jobid), params=params)
inst = r.json()
return inst
def create_tunnel(cs, user, job, inst, action):
"""
Create an SSH tunnel the the port specified in the inst object
"""
s.post("{}/createtunnel/{}/{}/{}".format(
TESBACKEND,
user,
cs['host'],
job['batch_host']),
json=inst)
return
def get_url(job, inst, action):
"""
Figure out the correct URL templateing things like passwords and tokens
"""
import json
pseudoapp = {'client': {'redir': action['client']['redir'],
'cmd': action['client']['cmd']}}
params = {'app': json.dumps(pseudoapp), 'appinst': json.dumps(inst)}
r = s.get("{}/appurl".format(TESBACKEND), params=params)
return r.text
def connect_app(url):
"""
See if we can connect to the application
We don't test the content, just that the return code is 200
"""
from urllib.parse import urlparse, parse_qs
import json
p = urlparse(json.loads(url))
r = s.get("{}://{}{}".format(p.scheme, p.netloc, p.path),
params=parse_qs(p.query))
if not (r.status_code == 200):
raise Exception
print("Connection succesfull")
def start_app(cs, user, app):
"""
Perform job submission
The sbatch command here is OK for testing, but is not the real sbatch used
To get the Real sbatch command I would need a javascript engine in this
test pipeline
"""
sbatch = "sbatch -n 1 --time=00:01:00"
params = buildParams(cs, user, app, submitcmd=sbatch)
body = {'app': app}
r = s.post("{}/submit".format(TESBACKEND), params=params, json=body)
stdout = r.json()
# On our test system we should see
# "Submitted batch job NNNNNN"
jobid = int(stdout.split()[3])
return jobid
def wait_app(cs, user, jobid):
"""
Wait for up to 5 minutes for the job to start
"""
import time
import datetime
interval = 5
timeout = 300 # seconds
start = datetime.datetime.now()
while (datetime.datetime.now()-start).seconds < timeout:
jobs = stat(cs, user)
try:
job = list(filter(lambda x: "{}".format(x['jobid'])
== "{}".format(jobid), jobs))[0]
except IndexError:
raise Exception("Job disapeared from the queue")
if job['state'] == 'RUNNING':
return job
if job['state'] == 'Finished':
raise Exception("Job terminated before we could test it")
time.sleep(interval)
def kill_app(cs, user, jobid):
params = buildParams(cs, user, app, cancelcmd=cs['cancelcmd'])
s.delete("{}/cancel/{}".format(TESBACKEND, jobid), params=params)
"""
Translate the globals COMPUTESITE, APPNAME and ACTION
into their object representations
"""
sites = s.get(COMPUTESITE[0])
"""
Setup the authorisation
This assumes the files KEY and CERT are present contain a valid SSH cert pair
"""
r = s.get("{}/sshagent".format(TESBACKEND))
cookies = {}
for k,v in s.cookies.items():
cookies[k] = v
with open(KEY, 'r') as f:
key = f.read()
with open(CERT, 'r') as f:
cert = f.read()
contents = {"key": key, "cert": cert}
r = s.post("{}/sshagent".format(TESBACKEND), json=contents, cookies = cookies)
"""
Verify Authorization worked
If stat fails at the end of this block, there is little point continuting
"""
r = s.get("{}/sshagent".format(TESBACKEND), cookies = cookies)
auths = r.json()
foundacert = False
for a in auths:
if 'Signing CA' in a:
foundacert = True
#
# Just assume we want to use the
# first user the cert is valid for
#
user = a['Principals'][0]
if not foundacert:
raise Exception("Failed to setup a cert")
cs = list(filter(lambda x: x['name'] == COMPUTESITE[1], sites.json()))[0]
while True:
nthreads=10
threads = []
for i in range(0,nthreads):
threads.append(threading.Thread(target=stat,args=(cs,user)))
threads[-1].start()
for t in threads:
t.join()
r = stat(cs,user)
print(r)
#qstat = stat(cs, user)
#print(qstat)
RUNAPP=False
if RUNAPP:
app = list(filter(lambda x: x['name'] == APPNAME, get_apps(cs,user)))[0]
action = list(filter(lambda x: x['name'] == ACTION, app['instactions']))[0]
"""
Perform the sequence of running the app etc. This is:
start_app (run sbatch get a jobid)
wait_app (wait for the task to transition from
pending to running, also returns the host its running on)
get_instance (get information such as the port the
app is running on and the token/password
for this app)
create_tunnel (create the SSH tunnel connection to the correct port)
get_url (template all parameters so we know what URL the app is served on)
connect_app (perform a basic request and verify a 200 OK result)
kill_app (perform scancel)
"""
jobid = start_app(cs, user, app)
try:
job = wait_app(cs, user, jobid)
inst = get_instance(cs, user, job, app, action)
if 'message' in inst:
kill_app(cs, user, jobid)
create_tunnel(cs, user, job, inst, action)
url = get_url(job, inst, action).format(twsproxy=TWSPROXY)
connect_app(url)
except Exception:
kill_app(cs, user, jobid)
exit(1)
kill_app(cs, user, jobid)
print("deleting keys from agent")
"""
Logout
"""
s.delete("{}/sshagent".format(TESBACKEND))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment