Skip to content
Snippets Groups Projects 22.2 KiB
Newer Older
This module handles SSH Connections
import subprocess

class SshAgentException(Exception):
    def __init__(self,message=None):
    Raised if an SSH Agent isn't running (Normally the SPA client should invoke the agent
    before trying to execute anythign or create any tunnels)
class SshValidationException(Exception):
    Raised if any parameters passed to SSH don't look valid,
    eg hostnames are not hostnames, ports are not numbers, usernames are
    not the correct format

class SshCtrlException(Exception):
    Raised if the ssh control socket is not valid.
    def __init__(self,message=None):
class SshExecException(Exception):
    Raised if the ssh can't run a remote program. Possibly a bug on the other end 
    def __init__(self,message=None):

Ubuntu's avatar
Ubuntu committed
class SftpException(Exception):
class SftpPermissionException(Exception):

    Ssh class can execute or create tunnelstat
    SSHOPTS = [ '-o', 'StrictHostKeyChecking=no',
                '-o', 'UserKnownHostsFile=/dev/null',
                '-o', 'IdentityFile=/dev/null',
                '-o', 'PreferredAuthentications=publickey',
                '-o', 'UpdateHostKeys=no',
                '-o', 'LogLevel=error',
Chris Hines's avatar
Chris Hines committed
#                '-o', 'ServerAliveInterval=30',
#                '-o', 'ServerAliveCountMax=4'
    SSHCMDWITHOPTS = ['ssh']
    SFTPCMDWITHOPTS = ['sftp']
        "validate that the port we're being asked to connect to is an integer"
            raise SshValidationException("port number {} was not an integer".format(port))

    def validate_username(user):
        ensure that the username conforms to posix standards
        Some systems do not conform to this standard, and linux still works
        but for the moment, lets assume they do conform.
        import re
        username_re = re.compile(r'^[a-z_]([a-z0-9_-]{0,31}|[a-z0-9_\.-]{0,30}\$)$')
        if not username_re.match(user):
            raise SshValidationException("username {} is not valid according to " \
                                         "posix standards".format(user))

    def validate_hostname(host):
        "ensure that the hostname at least conforms to naming standards"
        hostname_re = re.compile(r'^[a-z0-9_\.-]+$')
        if not hostname_re.match(host):
            raise SshValidationException("hostname {} is not valid".format(host))

    def validate_command(cmd):
        """So in theory, due the the format of the Popen command
        we can take any input as a commnand to execute on the login host, including
        ; and other forms of injection,
        and the worst that can happen is the user screws up their own account

Chris Hines's avatar
Chris Hines committed
    def check_ctrlsocket(sess, ctrlsocket):
        cmd = ['ssh','-S',ctrlsocket,'-O','check','-l','noone','nohost']
        check_p = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=None)
        (stderr,stdout) = check_p.communicate(timeout=Ssh.TIMEOUT)
        return check_p.returncode == 0

    def get_ctrl_master_socket(sess, host, user, sshport):
        Returns the socket for the control master process
        Opens it if needed.
        import os
        import stat
        import logging
        logger = logging.getLogger()
        logger.debug("enter get_ctrl_master_socket")
        ctrlsocket = "/tmp/cm-{}-{}-{}".format(user,host,sess.sshsessid)
Chris Hines's avatar
Chris Hines committed
        if Ssh.check_ctrlsocket(sess,ctrlsocket):
Chris Hines's avatar
Chris Hines committed
            return ctrlsocket
            mode = os.stat(ctrlsocket).st_mode
Chris Hines's avatar
Chris Hines committed
            logger.debug('removing stale control socket')
            os.unlink(ctrlsocket) # This might occur if a container was restarted and the file wasn't removed
Chris Hines's avatar
Chris Hines committed
        # This is expected: It indicates there is not yet a control master for this user on this login node
Chris Hines's avatar
Chris Hines committed
        except FileNotFoundError as e:
Chris Hines's avatar
Chris Hines committed
Chris Hines's avatar
Chris Hines committed
        sshcmd = Ssh.SSHCMDWITHOPTS.copy()
                  "-S", ctrlsocket,
                  "-M", '-o', 'ControlPersist=60s',
                  '-p', sshport, '-N','-l', user, host])
        env = os.environ.copy()
        if sess.socket is None:
            logger.debug("Exception get_ctrl_master_socket")
Chris Hines's avatar
Chris Hines committed
            raise SshAgentException("No ssh-agent yet")
        env['SSH_AUTH_SOCK'] = sess.socket
        logger.debug("creating master socket")
Chris Hines's avatar
Chris Hines committed
        logger.debug("ssh_auth_sock {}".format(sess.socket))
Chris Hines's avatar
Chris Hines committed
        logger.debug(" ".join(sshcmd))
        ctrl_p = subprocess.Popen(sshcmd,
                                  stdout=subprocess.PIPE, stderr=subprocess.PIPE,
Chris Hines's avatar
Chris Hines committed
        (stderr,stdout) = ctrl_p.communicate() # The process should exit, while the mux persists
        if ctrl_p.returncode == 0:
            return ctrlsocket
            raise SshCtrlException(stderr.decode())
    def parse_sftp_output(output):
        import logging
        logger = logging.getLogger()
        rv = []
        pwd = None
        import dateutil.parser
        for l in output.splitlines():
            fields = l.split(None,8)
            fd = {}
            if len(fields) == 9:

                fd['name'] = fields[8]
                fd['mode'] = fields[0]
                fd['hardlinks'] = fields[1]
                fd['user'] = fields[2]
                fd['group'] = fields[3]
                fd['size'] = fields[4]
                fd['mtime'] = dateutil.parser.parse("{} {} {}".format(fields[5],fields[6],fields[7])).isoformat()
                fd['name'] = fields[8]
            remotestr = "Remote working directory: "
            if remotestr in l:
                pwd = l[len(remotestr):].rstrip()
        return ({'pwd':pwd,'files':rv})

    def sftpmkdir(sess, host, user, path, name, sshport):
        Use sftp to run mkdir.
        import os
        import logging
        logger = logging.getLogger()
        env = os.environ.copy()
        if path is None:
            path = "."
        if sess.socket is None:
            raise SshAgentException("No ssh-agent yet")
        env['SSH_AUTH_SOCK'] = sess.socket
        ctrlsocket = Ssh.get_ctrl_master_socket(sess, host, user, sshport)
        sftpcmd = Ssh.SFTPCMDWITHOPTS.copy()
        sftpcmd.extend(['-P', sshport, 
                                   '-o', 'ControlPath={}'.format(ctrlsocket),
                                   '{}@{}'.format(user, host)])
        exec_p = subprocess.Popen(sftpcmd,
                                  stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  stdin=subprocess.PIPE, env=env)

        sftpcmd="cd {}\n mkdir \"{}\"\n".format(path,name)

        (stdout, stderr) = exec_p.communicate(sftpcmd.encode())
        if stderr is not None:
            if not (stderr == b''):
                logger.error('sftp failure')
                if ('Couldn\'t create directory: Failure' in stderr.decode()):
Ubuntu's avatar
Ubuntu committed
                    raise SftpPermissionException()
                if ('Couldn\'t canonicalize: No such file or directory' in stderr.decode()):
                    logger.error('can\'t change to that directory')
Ubuntu's avatar
Ubuntu committed
                    raise SftpException()
                if ('Permission denied' in stderr.decode()):
                    logger.error('can\'t change to that directory')
Ubuntu's avatar
Ubuntu committed
                    raise SftpPermissionException()
                raise SftpException()
Ubuntu's avatar
Ubuntu committed
    def sftpls(sess, host, user, sshport, path=".",changepath="."):
        Use sftp to run an ls on the given path.
        Return the directory listing (as a list) and the cwd
        import os
        import logging
        logger = logging.getLogger()
        env = os.environ.copy()
        if sess.socket is None:
            raise SshAgentException("No ssh-agent yet")
        env['SSH_AUTH_SOCK'] = sess.socket
        ctrlsocket = Ssh.get_ctrl_master_socket(sess, host, user, sshport)
        if (path is None or path == ""):
        if (changepath is None or changepath == ""):
        sftpcmd = Ssh.SFTPCMDWITHOPTS.copy()
        sftpcmd.extend([ '-P', sshport, 
                                                  '-o', 'ControlPath={}'.format(ctrlsocket),
                                                  '{}@{}'.format(user, host)])
        exec_p = subprocess.Popen(sftpcmd,
                                  stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  stdin=subprocess.PIPE, env=env)

        sftpcmd="cd {}\n cd {}\n pwd\nls -al\n".format(path,changepath)

        (stdout, stderr) = exec_p.communicate(sftpcmd.encode())
        if stderr is not None:
            if not (stderr == b''):
                logger.error('sftp failure')
                if ('Couldn\'t canonicalize: No such file or directory' in stderr.decode()):
                    logger.error('can\'t change to that directory')
                    return Ssh.sftpls(sess,host,user,sshport, path,changepath='.')
                if ('Permission denied' in stderr.decode()):
                    logger.error('can\'t change to that directory')
                    return Ssh.sftpls(sess,host,user,sshport, path,changepath='.')

                logger.error(('Permission denied' in stderr.decode()))
                logger.error('Couldn\'t canonicalize: No such file or directory' in stderr.decode())
                raise SshCtrlException(stderr.decode())

        dirlist = Ssh.parse_sftp_output(stdout.decode())
        return dirlist

Ubuntu's avatar
Ubuntu committed
    def execute(sess,  host, user, cmd, bastion=None, stdin=None, sshport="22", bastionsshport="22"):
        execute the command cmd on the host via ssh
        # assume the environment is already setup with an
        # SSH_AUTH_SOCK that allows login
        import os
        import logging
        logger = logging.getLogger()
        if cmd is None and stdin is None:
            return {'stdout': b'', 'stderr': b'No command given to execute'}
        env = os.environ.copy()
        if sess.socket is None:
            raise SshAgentException("No ssh-agent yet")
        env['SSH_AUTH_SOCK'] = sess.socket
        if ":" in host:
            host,sshport = host.split(':')
        if bastion is not None and ":" in bastion:
            bastion,bastionsshport = bastion.split(':')
Ubuntu's avatar
Ubuntu committed
        if bastion == None:
            ctrlsocket = Ssh.get_ctrl_master_socket(sess, host, user, sshport)
            ctrlsocket = Ssh.get_ctrl_master_socket(sess, bastion, user, bastionsshport)
        if bastion == None:
            # we are executing this command on the login node, so no more magic is needed
            sshcmd = Ssh.SSHCMDWITHOPTS.copy()
Ubuntu's avatar
Ubuntu committed
                      '-S', ctrlsocket, '-p', sshport,
Ubuntu's avatar
Ubuntu committed
            # we are executing on a node (e.g. a compute/batch node) using a bastion (e.g. login node)
            # at the moment I'll assume the ssh port for the batch host is the same as the ssh port for the bastion/login host
            proxycmd = " ".join(Ssh.SSHCMDWITHOPTS)
            proxycmd = proxycmd + " {user}@{bastion} -W {host}:{sshport} -S {ctrlsocket}".format(
Ubuntu's avatar
Ubuntu committed
                user=user, host=host,
            sshcmd = Ssh.SSHCMDWITHOPTS.copy()
Ubuntu's avatar
Ubuntu committed
                      '-o', "ProxyCommand={}".format(proxycmd),
                      '-p', str(bastionsshport), '-l', user, bastion, cmd])
Chris Hines's avatar
Chris Hines committed
        logger.debug('in execute, opening process {}'.format(sshcmd))
Ubuntu's avatar
Ubuntu committed
        exec_p = subprocess.Popen(sshcmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.PIPE,env=env)
Chris Hines's avatar
Chris Hines committed
        logger.debug('in execute pid {}'.format(
Chris Hines's avatar
Chris Hines committed
            if stdin is not None:
Chris Hines's avatar
Chris Hines committed
                logger.debug('begining comms with timeout')
                (stdout, stderr) = exec_p.communicate(stdin.encode(),timeout=Ssh.TIMEOUT)
Chris Hines's avatar
Chris Hines committed
Chris Hines's avatar
Chris Hines committed
                logger.debug('begining comms with timeout')
                (stdout, stderr) = exec_p.communicate(timeout=Ssh.TIMEOUT)
Chris Hines's avatar
Chris Hines committed
        except subprocess.TimeoutExpired as e:
Chris Hines's avatar
Chris Hines committed
                logger.debug('cmd timed out')
                raise e
                #raise SshExecException(message="A program timed out on the backend server")
            if b'client_loop' in stderr:
                raise SshExecException(message="It looks like your job died for some reason. Please check the log files")
                logger.error('ssh execute returned error code 255, killing the session')
                raise SshCtrlException(message="Ssh was unable to login. It's likely you're credentials need to be renewed by loggin in again")
Chris Hines's avatar
Chris Hines committed
# this error condition should now be handled in  get_control_socket
#        if exec_p.returncode != 0 or b'Control socket connect' in stderr:
#            try:
#                ctrl_p = sess.ctrl_processes[ctrlsocket]
#            except KeyError as e:
#                # If the process is restarted and the agents all get restarted, but the control socket /tmp/cm-* is not removed
#                # This can happen
#                import os
#                try:
#                    os.unlink(ctrlsocket)
#                except FileNotFoundError as e:
#                    pass
#                raise SshCtrlException(message="ssh execute failed, there was a problem with the control socket. Did the docker backend get restartd?")
#            logger.error('ran command {}'.format(" ".join(sshcmd)))
#            logger.error('got return code {} stderr {} stdout {}'.format(exec_p.returncode,stderr,stdout))
#            logger.debug("exception execute")
#            raise SshExecException(message="{}".format(stderr.decode()))
            #raise SshCtrlException(message="ssh execute failed, killing off the agent. Log in again")
        # If the return code is non-zero, its likely something went wrong logging in. We should perhaps consider killing the ssh session, or at least the ctrl socket processes
        # And allowing the client to retry them
        # I haven't decided yet whether we should kill the whole agent or just the ctrl process
        if exec_p.returncode != 0:
            if stderr  == b'':
                msg = "The program {} on {} failed".format(cmd,host)
                msg = "The program {} on {} failed. The error message was {}".format(cmd,host,stderr.decode())
            raise SshExecException(message=msg)
        # Because people often put stuff in their bashrc which casuses stderr to be non-empty, we will swallow errors in the bashrc without comment
        if b'bashrc' in stderr:
            return {'stdout':stdout,'stderr':b''}
            return {'stdout':stdout, 'stderr':stderr}
    def tunnel(sess, port, batchhost, user, host, internalfirewall = True, localbind = True, authtok = None, sshport="22"):
        the ProxyCommand is used if the server we run on the batch host is only
        addressable on localhost
        e.g. jupyter in its default config runs on localhost:8888 so a web
        browser on the login node can not connect to it
        If the server port is directly accessible from the login host (eg the
        ssh server port is directly accessible) or the batchhost IS the login host
        (i.e. batchhost = localhost) we can use -O forward -S <control socket>
        import os
        import logging
        logger = logging.getLogger()
        env = os.environ.copy()
        env['SSH_AUTH_SOCK'] = sess.socket
        if ":" in host:
            host,sshport = host.split(':')
        ctrlsocket = Ssh.get_ctrl_master_socket(sess, host, user, sshport)
        localport = Ssh.get_free_port()

        if port == 22 or (not internalfirewall and not localbind):
            # Can we use the existing control master connection and just add a
            # port forward to the batch node
            sshcmd = Ssh.SSHCMDWITHOPTS.copy()
Chris Hines's avatar
Chris Hines committed
                '-L', '{localport}:{batchhost}:{port}'.
                      format(port=port, localport=localport, batchhost=batchhost),
Chris Hines's avatar
Chris Hines committed
                      '-O', 'forward', '-S', ctrlsocket,
            # Create an ssh tunnel to the batch node using a proxycommand.
            # The proxy command should utilise
            # the existing control master connection
            proxycmd = " ".join(Ssh.SSHCMDWITHOPTS) + " {user}@{host} -W {batchhost}:22 -S {ctrlsocket}".format(
                user=user, host=host,
            sshcmd = Ssh.SSHCMDWITHOPTS.copy()
Chris Hines's avatar
Chris Hines committed
                '-o', "ProxyCommand={}".format(proxycmd),
Chris Hines's avatar
Chris Hines committed
                      '-L', '{localport}:localhost:{port}'.
                      format(port=port, localport=localport),
Chris Hines's avatar
Chris Hines committed
Chris Hines's avatar
Chris Hines committed
        logger.debug('starting tunnel with {}'.format(" ".join(sshcmd)))
        tunnel_p = subprocess.Popen(sshcmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
Chris Hines's avatar
Chris Hines committed
        logger.debug('tunnel created with pid {}'.format(
            if not Ssh.wait_for_tunnel(localport):
                logger.error('tunnel failed to open')
                (stdout,stderr) = tunnel_p.communicate(timeout=Ssh.TIMEOUT) 
                logger.error('Ssh.tunnel: stdout {}'.format(stdout.decode()))
                logger.error('Ssh.tunnel: stderr {}'.format(stderr.decode()))
                raise SshCtrlException(message="failed to open a tunnel")
        except SshCtrlException as e:
        except Exception as e:
            import traceback
            logger.error('exception waiting for tunnel')
            return None, []
        sess.port.update({authtok: localport})
Chris Hines's avatar
Chris Hines committed
#        try:
#            #tunnel_p.wait(timeout=3)
#            import time
#            time.sleep(3)
#            tunnel_p.poll()
#            if tunnel_p.returncode != None:
#                (stdout,stderr) = tunnel_p.communicate()
#                logger.error('tunnel process exited {} {} {}'.format(tunnel_p.returncode, stdout, stderr))
#                sess.tunnels.remove(tunnel_p)
#        except subprocess.TimeoutExpired:
#            pass
        return localport, []

    # @staticmethod
    # def addkey(sess, key, cert):
    #     ""
    #     pass

    def wait_for_tunnel(localport):
Chris Hines's avatar
Chris Hines committed
        notopen = True
        import socket
        import time
        import logging
        logger = logging.getLogger()
        while notopen:
            ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Chris Hines's avatar
Chris Hines committed
            logger.debug('testing port {}'.format(localport))
Chris Hines's avatar
Chris Hines committed
                ssock.connect(('', localport))
Chris Hines's avatar
Chris Hines committed
                logger.debug('port is open')
Chris Hines's avatar
Chris Hines committed
                notopen = False
            except socket.error:
Chris Hines's avatar
Chris Hines committed
                logger.debug('port is not open yet')
Chris Hines's avatar
Chris Hines committed
Chris Hines's avatar
Chris Hines committed
        return True

        In order to avoid a race condition, we wait for the tunnel to be established
        If the tunnel hasn't been establised before TIMEOUT, perhaps something has gone wrong
Chris Hines's avatar
Chris Hines committed
        import time
        import logging
        logger = logging.getLogger()
        ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        for retry in range(0,10):
                ssock.connect(('', localport))
                return True
            except Exception as e:
                logger.debug('socket exception {}'.format(e))
        return False
Chris Hines's avatar
Chris Hines committed

    def get_free_port():
        # Finds a port which the local server can listen on.
        import socket
        serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        for testport in range(1025, 65500):
                serversocket.bind(('', testport))
                port = testport
                return port
            except OSError: