Newer
Older
"""
This module handles SSH Connections
"""
import subprocess
Chris Hines
committed
def __init__(self,message=None):
super(SshAgentException,self).__init__(message)
"""
Raised if an SSH Agent isn't running (Normally the SPA client should invoke the agent
before trying to execute anythign or create any tunnels)
"""
class SshValidationException(Exception):
"""
Raised if any parameters passed to SSH don't look valid,
eg hostnames are not hostnames, ports are not numbers, usernames are
not the correct format
"""
pass
class SshCtrlException(Exception):
"""
Raised if the ssh control socket is not valid.
"""
Chris Hines
committed
def __init__(self,message=None):
super(SshCtrlException,self).__init__(message)
class SshExecException(Exception):
"""
Raised if the ssh can't run a remote program. Possibly a bug on the other end
"""
def __init__(self,message=None):
super(SshExecException,self).__init__(message)
class SftpException(Exception):
pass
class SftpPermissionException(Exception):
pass
"""
Ssh class can execute or create tunnelstat
"""
TIMEOUT=30
Chris Hines
committed
SSHOPTS = [ '-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null',
'-o', 'IdentityFile=/dev/null',
Chris Hines
committed
'-o', 'PreferredAuthentications=publickey',
'-o', 'UpdateHostKeys=no',
'-o', 'LogLevel=error',
# '-o', 'ServerAliveInterval=30',
# '-o', 'ServerAliveCountMax=4'
Chris Hines
committed
]
SSHCMDWITHOPTS = ['ssh']
SSHCMDWITHOPTS.extend(SSHOPTS)
SFTPCMDWITHOPTS = ['sftp']
SFTPCMDWITHOPTS.extend(SSHOPTS)
@staticmethod
def validate_port(port):
"validate that the port we're being asked to connect to is an integer"
try:
port = int(port)
except:
raise SshValidationException("port number {} was not an integer".format(port))
@staticmethod
def validate_username(user):
"""
ensure that the username conforms to posix standards
Some systems do not conform to this standard, and linux still works
but for the moment, lets assume they do conform.
"""
import re
username_re = re.compile(r'^[a-z_]([a-z0-9_-]{0,31}|[a-z0-9_\.-]{0,30}\$)$')
if not username_re.match(user):
raise SshValidationException("username {} is not valid according to " \
"posix standards".format(user))
@staticmethod
def validate_hostname(host):
"ensure that the hostname at least conforms to naming standards"
import re
hostname_re = re.compile(r'^[a-zA-Z0-9_\.-]+$')
if not hostname_re.match(host):
raise SshValidationException("hostname {} is not valid".format(host))
@staticmethod
def validate_command(cmd):
"""So in theory, due the the format of the Popen command
we can take any input as a commnand to execute on the login host, including
; and other forms of injection,
and the worst that can happen is the user screws up their own account
"""
pass
@staticmethod
def check_ctrlsocket(sess, ctrlsocket):
cmd = ['ssh','-S',ctrlsocket,'-O','check','-l','noone','nohost']
Chris Hines
committed
check_p = None
Chris Hines
committed
check_p = subprocess.Popen(cmd,start_new_session=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=None)
(stderr,stdout) = check_p.communicate(timeout=Ssh.TIMEOUT)
Chris Hines
committed
if check_p is not None:
check_p.kill()
(stderr, stdout) = check_p.communicate()
@staticmethod
def get_ctrl_master_socket(sess, host, user, sshport):
"""
Returns the socket for the control master process
Opens it if needed.
"""
import os
import stat
import logging
logger = logging.getLogger()
Chris Hines
committed
ctrlsocket = "/tmp/cm-{}-{}-{}".format(user,host,sess.sshsessid)
try:
if Ssh.check_ctrlsocket(sess,ctrlsocket):
sess.lock.release()
return ctrlsocket
except subprocess.TimeoutExpired as e:
try:
mode = os.stat(ctrlsocket).st_mode
Chris Hines
committed
logger.debug(f'removing stale control socket for {user}')
os.unlink(ctrlsocket) # This might occur if a container was restarted and the file wasn't removed
# This is expected: It indicates there is not yet a control master for this user on this login node
try:
sshcmd = Ssh.SSHCMDWITHOPTS.copy()
sshcmd.extend([
"-S", ctrlsocket,
"-M", '-o', 'ControlPersist=60s',
'-p', sshport, '-N','-l', user, host])
env = os.environ.copy()
if sess.socket is None:
sess.lock.release()
logger.debug("Exception get_ctrl_master_socket")
raise SshAgentException("No ssh-agent yet")
env['SSH_AUTH_SOCK'] = sess.socket
Chris Hines
committed
logger.debug(f'creating master socket for {user}')
logger.debug(" ".join(sshcmd))
ctrl_p = subprocess.Popen(sshcmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=None,
Chris Hines
committed
start_new_session=True,
env=env)
(stderr,stdout) = ctrl_p.communicate() # The process should exit, while the mux persists
except Exception as e:
if ctrl_p.returncode == 0:
sess.lock.release()
return ctrlsocket
else:
sess.lock.release()
try:
certs = sess.get_certs()
if b'The agent has no identities.\n' == certs:
raise SshCtrlException(certs.decode())
except Exception as e:
pass
@staticmethod
def parse_sftp_output(output):
import logging
logger = logging.getLogger()
rv = []
pwd = None
import dateutil.parser
for l in output.splitlines():
fields = l.split(None,8)
fd = {}
if len(fields) == 9:
fd['name'] = fields[8]
fd['mode'] = fields[0]
fd['hardlinks'] = fields[1]
fd['user'] = fields[2]
fd['group'] = fields[3]
fd['size'] = fields[4]
fd['mtime'] = dateutil.parser.parse("{} {} {}".format(fields[5],fields[6],fields[7])).isoformat()
fd['name'] = fields[8]
rv.append(fd)
remotestr = "Remote working directory: "
if remotestr in l:
pwd = l[len(remotestr):].rstrip()
return ({'pwd':pwd,'files':rv})
@staticmethod
def sftpmkdir(sess, host, user, path, name, sshport):
"""
Use sftp to run mkdir.
"""
import os
import logging
logger = logging.getLogger()
env = os.environ.copy()
if path is None:
path = "."
if sess.socket is None:
raise SshAgentException("No ssh-agent yet")
env['SSH_AUTH_SOCK'] = sess.socket
#Ssh.validate_username(user)
Ssh.validate_hostname(host)
ctrlsocket = Ssh.get_ctrl_master_socket(sess, host, user, sshport)
Chris Hines
committed
sftpcmd = Ssh.SFTPCMDWITHOPTS.copy()
sftpcmd.extend(['-P', sshport,
'-o', 'ControlPath={}'.format(ctrlsocket),
'{}@{}'.format(user, host)])
exec_p = subprocess.Popen(sftpcmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=subprocess.PIPE, env=env)
sftpcmd="cd {}\n mkdir \"{}\"\n".format(path,name)
(stdout, stderr) = exec_p.communicate(sftpcmd.encode())
if stderr is not None:
if not (stderr == b''):
logger.error('sftp failure')
logger.error(stdout.decode())
logger.error(stderr.decode())
if ('Couldn\'t create directory: Failure' in stderr.decode()):
return
if ('Couldn\'t canonicalize: No such file or directory' in stderr.decode()):
logger.error('can\'t change to that directory')
if ('Permission denied' in stderr.decode()):
logger.error('can\'t change to that directory')
logger.error(stdout.decode())
logger.error(stderr.decode())
raise SftpException()
return
@staticmethod
"""
Use sftp to run an ls on the given path.
Return the directory listing (as a list) and the cwd
"""
import os
import logging
logger = logging.getLogger()
env = os.environ.copy()
if sess.socket is None:
raise SshAgentException("No ssh-agent yet")
env['SSH_AUTH_SOCK'] = sess.socket
#Ssh.validate_username(user)
Ssh.validate_hostname(host)
ctrlsocket = Ssh.get_ctrl_master_socket(sess, host, user, sshport)
if (path is None or path == ""):
path="."
if (changepath is None or changepath == ""):
changepath="."
Chris Hines
committed
sftpcmd = Ssh.SFTPCMDWITHOPTS.copy()
sftpcmd.extend([ '-P', sshport,
'-o', 'ControlPath={}'.format(ctrlsocket),
'{}@{}'.format(user, host)])
exec_p = subprocess.Popen(sftpcmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=subprocess.PIPE, env=env)
sftpcmd="cd {}\n cd {}\n pwd\nls -al\n".format(path,changepath)
(stdout, stderr) = exec_p.communicate(sftpcmd.encode())
if stderr is not None:
if not (stderr == b''):
logger.error('sftp failure')
if ('Couldn\'t canonicalize: No such file or directory' in stderr.decode()):
logger.error('can\'t change to that directory')
return Ssh.sftpls(sess,host,user,sshport, path,changepath='.')
if ('Permission denied' in stderr.decode()):
logger.error('can\'t change to that directory')
return Ssh.sftpls(sess,host,user,sshport, path,changepath='.')
logger.error(stderr.decode())
logger.error(('Permission denied' in stderr.decode()))
logger.error('Couldn\'t canonicalize: No such file or directory' in stderr.decode())
raise SshCtrlException(stderr.decode())
dirlist = Ssh.parse_sftp_output(stdout.decode())
return dirlist
def execute(sess, host, user, cmd, bastion=None, stdin=None, sshport="22", bastionsshport="22"):
"""
execute the command cmd on the host via ssh
# assume the environment is already setup with an
# SSH_AUTH_SOCK that allows login
"""
import logging
logger = logging.getLogger()
if cmd is None and stdin is None:
return {'stdout': b'', 'stderr': b'No command given to execute'}
env = os.environ.copy()
if sess.socket is None:
raise SshAgentException("No ssh-agent yet")
env['SSH_AUTH_SOCK'] = sess.socket
if ":" in host:
host,sshport = host.split(':')
if bastion is not None and ":" in bastion:
bastion,bastionsshport = bastion.split(':')
#Ssh.validate_username(user)
Ssh.validate_hostname(host)
Ssh.validate_command(cmd)
if bastion == None:
ctrlsocket = Ssh.get_ctrl_master_socket(sess, host, user, sshport)
else:
ctrlsocket = Ssh.get_ctrl_master_socket(sess, bastion, user, bastionsshport)
if bastion == None:
# we are executing this command on the login node, so no more magic is needed
Chris Hines
committed
sshcmd = Ssh.SSHCMDWITHOPTS.copy()
sshcmd.extend([
Chris Hines
committed
'-l', user, host, cmd])
else:
# we are executing on a node (e.g. a compute/batch node) using a bastion (e.g. login node)
# at the moment I'll assume the ssh port for the batch host is the same as the ssh port for the bastion/login host
Chris Hines
committed
proxycmd = " ".join(Ssh.SSHCMDWITHOPTS)
proxycmd = proxycmd + " {user}@{bastion} -W {host}:{sshport} -S {ctrlsocket}".format(
user=user, host=host,
ctrlsocket=ctrlsocket,
sshport=sshport,
bastion=bastion)
Chris Hines
committed
sshcmd = Ssh.SSHCMDWITHOPTS.copy()
sshcmd.extend([
Chris Hines
committed
'-p', str(bastionsshport), '-l', user, bastion, cmd])
Chris Hines
committed
exec_p = subprocess.Popen(sshcmd,start_new_session=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.PIPE,env=env)
Chris Hines
committed
(stdout, stderr) = exec_p.communicate(stdin.encode(),timeout=Ssh.TIMEOUT)
Chris Hines
committed
(stdout, stderr) = exec_p.communicate(timeout=Ssh.TIMEOUT)
Chris Hines
committed
exec_p.kill()
(stdout,stderr) = exec_p.communicate()
raise e
#raise SshExecException(message="A program timed out on the backend server")
Chris Hines
committed
if exec_p.returncode == 255:
Chris Hines
committed
logger.error(stderr)
Chris Hines
committed
if b'client_loop' in stderr:
raise SshExecException(message="It looks like your job died for some reason. Please check the log files")
else:
logger.error('ssh execute returned error code 255, killing the session')
Chris Hines
committed
#sess.kill()
Chris Hines
committed
raise SshCtrlException(message="Ssh was unable to login. It's likely you're credentials need to be renewed by loggin in again")
# this error condition should now be handled in get_control_socket
# if exec_p.returncode != 0 or b'Control socket connect' in stderr:
# try:
# ctrl_p = sess.ctrl_processes[ctrlsocket]
# except KeyError as e:
# # If the process is restarted and the agents all get restarted, but the control socket /tmp/cm-* is not removed
# # This can happen
# import os
# try:
# os.unlink(ctrlsocket)
# except FileNotFoundError as e:
# pass
# raise SshCtrlException(message="ssh execute failed, there was a problem with the control socket. Did the docker backend get restartd?")
Chris Hines
committed
# logger.error('ran command {}'.format(" ".join(sshcmd)))
# logger.error('got return code {} stderr {} stdout {}'.format(exec_p.returncode,stderr,stdout))
# logger.debug("exception execute")
# raise SshExecException(message="{}".format(stderr.decode()))
#sess.kill()
#raise SshCtrlException(message="ssh execute failed, killing off the agent. Log in again")
# If the return code is non-zero, its likely something went wrong logging in. We should perhaps consider killing the ssh session, or at least the ctrl socket processes
# And allowing the client to retry them
# I haven't decided yet whether we should kill the whole agent or just the ctrl process
if stderr == b'':
msg = "The program {} on {} failed".format(cmd,host)
else:
msg = "The program {} on {} failed. The error message was {}".format(cmd,host,stderr.decode())
# Because people often put stuff in their bashrc which casuses stderr to be non-empty, we will swallow errors in the bashrc without comment
Chris Hines
committed
if b'bashrc' in stderr or b'locale' in stderr:
return {'stdout':stdout,'stderr':b''}
else:
return {'stdout':stdout, 'stderr':stderr}
def tunnel(sess, port, batchhost, user, host, internalfirewall = True, localbind = True, authtok = None, sshport="22"):
the ProxyCommand is used if the server we run on the batch host is only
addressable on localhost
e.g. jupyter in its default config runs on localhost:8888 so a web
browser on the login node can not connect to it
If the server port is directly accessible from the login host (eg the
ssh server port is directly accessible) or the batchhost IS the login host
(i.e. batchhost = localhost) we can use -O forward -S <control socket>
import logging
logger = logging.getLogger()
env = os.environ.copy()
env['SSH_AUTH_SOCK'] = sess.socket
if ":" in host:
host,sshport = host.split(':')
Ssh.validate_port(port)
Ssh.validate_hostname(batchhost)
#Ssh.validate_username(user)
Ssh.validate_hostname(host)
ctrlsocket = Ssh.get_ctrl_master_socket(sess, host, user, sshport)
localport = Ssh.get_free_port()
if port == 22 or (not internalfirewall and not localbind):
# Can we use the existing control master connection and just add a
# port forward to the batch node
Chris Hines
committed
sshcmd = Ssh.SSHCMDWITHOPTS.copy()
sshcmd.extend([
format(port=port, localport=localport, batchhost=batchhost),
'-O', 'forward', '-S', ctrlsocket,
Chris Hines
committed
'-p', sshport, '-l', user, host])
else:
# Create an ssh tunnel to the batch node using a proxycommand.
# The proxy command should utilise
# the existing control master connection
Chris Hines
committed
proxycmd = " ".join(Ssh.SSHCMDWITHOPTS) + " {user}@{host} -W {batchhost}:22 -S {ctrlsocket}".format(
user=user, host=host,
ctrlsocket=ctrlsocket,
batchhost=batchhost)
Chris Hines
committed
sshcmd = Ssh.SSHCMDWITHOPTS.copy()
sshcmd.extend([
format(port=port, localport=localport),
Chris Hines
committed
'-p', sshport, '-l', user, batchhost])
Chris Hines
committed
tunnel_p = subprocess.Popen(sshcmd, start_new_session=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
try:
if not Ssh.wait_for_tunnel(localport):
logger.error('tunnel failed to open')
Chris Hines
committed
(stdout,stderr) = tunnel_p.communicate(timeout=Ssh.TIMEOUT)
logger.error('Ssh.tunnel: stdout {}'.format(stdout.decode()))
logger.error('Ssh.tunnel: stderr {}'.format(stderr.decode()))
raise SshCtrlException(message="failed to open a tunnel")
except SshCtrlException as e:
raise(e)
except Exception as e:
import traceback
logger.error('exception waiting for tunnel')
logger.error(e)
logger.error(traceback.format_exc())
return None, []
if sess.port is None:
sess.port = {}
sess.port.update({authtok: localport})
return localport, [tunnel_p.pid]
# @staticmethod
# def addkey(sess, key, cert):
# ""
# pass
@staticmethod
def wait_for_tunnel(localport):
notopen = True
import socket
import time
import logging
logger = logging.getLogger()
while notopen:
ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssock.setblocking(True)
try:
ssock.connect(('127.0.0.1', localport))
notopen = False
ssock.close()
except socket.error:
In order to avoid a race condition, we wait for the tunnel to be established
If the tunnel hasn't been establised before TIMEOUT, perhaps something has gone wrong
import time
import logging
logger = logging.getLogger()
ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssock.settimeout(TIMEOUT)
for retry in range(0,10):
try:
ssock.connect(('127.0.0.1', localport))
ssock.close()
return True
except Exception as e:
logger.debug('socket exception {}'.format(e))
ssock.close()
time.sleep(1)
return False
@staticmethod
def get_free_port():
"""
# Finds a port which the local server can listen on.
"""
import socket
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for testport in range(1025, 65500):
try:
serversocket.bind(('127.0.0.1', testport))
port = testport
serversocket.close()
return port
except OSError:
pass