Skip to content
Snippets Groups Projects
Commit 4662d607 authored by Chris Hines's avatar Chris Hines
Browse files

make sure we will processes on TimeoutExpired ... also use start_new_session...

make sure we will processes on TimeoutExpired ... also use start_new_session which may help with killing processes in an uninteruptable state
parent 8086c2b6
No related branches found
No related tags found
2 merge requests!99make sure we will processes on TimeoutExpired ... also use start_new_session...,!98make sure we will processes on TimeoutExpired ... also use start_new_session...
Pipeline #89099 passed
...@@ -98,10 +98,14 @@ class Ssh: ...@@ -98,10 +98,14 @@ class Ssh:
@staticmethod @staticmethod
def check_ctrlsocket(sess, ctrlsocket): def check_ctrlsocket(sess, ctrlsocket):
cmd = ['ssh','-S',ctrlsocket,'-O','check','-l','noone','nohost'] cmd = ['ssh','-S',ctrlsocket,'-O','check','-l','noone','nohost']
check_p = None
try: try:
check_p = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=None) check_p = subprocess.Popen(cmd,start_new_session=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=None)
(stderr,stdout) = check_p.communicate(timeout=Ssh.TIMEOUT) (stderr,stdout) = check_p.communicate(timeout=Ssh.TIMEOUT)
except subprocess.TimeoutExpired as e: except subprocess.TimeoutExpired as e:
if check_p is not None:
check_p.kill()
(stderr, stdout) = check_p.communicate()
raise e raise e
return check_p.returncode == 0 return check_p.returncode == 0
...@@ -115,7 +119,6 @@ class Ssh: ...@@ -115,7 +119,6 @@ class Ssh:
import stat import stat
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
logger.debug("enter get_ctrl_master_socket")
ctrlsocket = "/tmp/cm-{}-{}-{}".format(user,host,sess.sshsessid) ctrlsocket = "/tmp/cm-{}-{}-{}".format(user,host,sess.sshsessid)
sess.lock.acquire() sess.lock.acquire()
try: try:
...@@ -128,7 +131,7 @@ class Ssh: ...@@ -128,7 +131,7 @@ class Ssh:
try: try:
mode = os.stat(ctrlsocket).st_mode mode = os.stat(ctrlsocket).st_mode
logger.debug('removing stale control socket') logger.debug(f'removing stale control socket for {user}')
os.unlink(ctrlsocket) # This might occur if a container was restarted and the file wasn't removed os.unlink(ctrlsocket) # This might occur if a container was restarted and the file wasn't removed
# This is expected: It indicates there is not yet a control master for this user on this login node # This is expected: It indicates there is not yet a control master for this user on this login node
except FileNotFoundError as e: except FileNotFoundError as e:
...@@ -146,12 +149,12 @@ class Ssh: ...@@ -146,12 +149,12 @@ class Ssh:
logger.debug("Exception get_ctrl_master_socket") logger.debug("Exception get_ctrl_master_socket")
raise SshAgentException("No ssh-agent yet") raise SshAgentException("No ssh-agent yet")
env['SSH_AUTH_SOCK'] = sess.socket env['SSH_AUTH_SOCK'] = sess.socket
logger.debug("creating master socket") logger.debug(f'creating master socket for {user}')
logger.debug("ssh_auth_sock {}".format(sess.socket))
logger.debug(" ".join(sshcmd)) logger.debug(" ".join(sshcmd))
ctrl_p = subprocess.Popen(sshcmd, ctrl_p = subprocess.Popen(sshcmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=None, stdin=None,
start_new_session=True,
env=env) env=env)
(stderr,stdout) = ctrl_p.communicate() # The process should exit, while the mux persists (stderr,stdout) = ctrl_p.communicate() # The process should exit, while the mux persists
except Exception as e: except Exception as e:
...@@ -304,7 +307,6 @@ class Ssh: ...@@ -304,7 +307,6 @@ class Ssh:
import os import os
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
logger.debug("enter execute")
if cmd is None and stdin is None: if cmd is None and stdin is None:
return {'stdout': b'', 'stderr': b'No command given to execute'} return {'stdout': b'', 'stderr': b'No command given to execute'}
env = os.environ.copy() env = os.environ.copy()
...@@ -341,18 +343,17 @@ class Ssh: ...@@ -341,18 +343,17 @@ class Ssh:
sshcmd.extend([ sshcmd.extend([
'-o', "ProxyCommand={}".format(proxycmd), '-o', "ProxyCommand={}".format(proxycmd),
'-p', str(bastionsshport), '-l', user, bastion, cmd]) '-p', str(bastionsshport), '-l', user, bastion, cmd])
exec_p = subprocess.Popen(sshcmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.PIPE,env=env) exec_p = subprocess.Popen(sshcmd,start_new_session=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=subprocess.PIPE,env=env)
try: try:
if stdin is not None: if stdin is not None:
logger.debug('begining comms with timeout')
(stdout, stderr) = exec_p.communicate(stdin.encode(),timeout=Ssh.TIMEOUT) (stdout, stderr) = exec_p.communicate(stdin.encode(),timeout=Ssh.TIMEOUT)
else: else:
logger.debug('begining comms with timeout')
(stdout, stderr) = exec_p.communicate(timeout=Ssh.TIMEOUT) (stdout, stderr) = exec_p.communicate(timeout=Ssh.TIMEOUT)
except subprocess.TimeoutExpired as e: except subprocess.TimeoutExpired as e:
logger.debug('cmd timed out') logger.debug('cmd timed out')
logger.debug("exception execute") exec_p.kill()
(stdout,stderr) = exec_p.communicate()
raise e raise e
#raise SshExecException(message="A program timed out on the backend server") #raise SshExecException(message="A program timed out on the backend server")
if exec_p.returncode == 255: if exec_p.returncode == 255:
...@@ -393,7 +394,6 @@ class Ssh: ...@@ -393,7 +394,6 @@ class Ssh:
else: else:
msg = "The program {} on {} failed. The error message was {}".format(cmd,host,stderr.decode()) msg = "The program {} on {} failed. The error message was {}".format(cmd,host,stderr.decode())
raise SshExecException(message=msg) raise SshExecException(message=msg)
logger.debug("leaving execute")
# Because people often put stuff in their bashrc which casuses stderr to be non-empty, we will swallow errors in the bashrc without comment # Because people often put stuff in their bashrc which casuses stderr to be non-empty, we will swallow errors in the bashrc without comment
if b'bashrc' in stderr or b'locale' in stderr: if b'bashrc' in stderr or b'locale' in stderr:
return {'stdout':stdout,'stderr':b''} return {'stdout':stdout,'stderr':b''}
...@@ -414,7 +414,6 @@ class Ssh: ...@@ -414,7 +414,6 @@ class Ssh:
import os import os
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
logger.debug("enter tunnel")
env = os.environ.copy() env = os.environ.copy()
env['SSH_AUTH_SOCK'] = sess.socket env['SSH_AUTH_SOCK'] = sess.socket
if ":" in host: if ":" in host:
...@@ -452,9 +451,7 @@ class Ssh: ...@@ -452,9 +451,7 @@ class Ssh:
'-N', '-N',
'-p', sshport, '-l', user, batchhost]) '-p', sshport, '-l', user, batchhost])
logger.debug('starting tunnel with {}'.format(" ".join(sshcmd))) tunnel_p = subprocess.Popen(sshcmd, start_new_session=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
tunnel_p = subprocess.Popen(sshcmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
logger.debug('tunnel created with pid {}'.format(tunnel_p.pid))
try: try:
if not Ssh.wait_for_tunnel(localport): if not Ssh.wait_for_tunnel(localport):
...@@ -476,18 +473,6 @@ class Ssh: ...@@ -476,18 +473,6 @@ class Ssh:
sess.port.update({authtok: localport}) sess.port.update({authtok: localport})
sess.pids.append(tunnel_p.pid) sess.pids.append(tunnel_p.pid)
sess.tunnels.append(tunnel_p) sess.tunnels.append(tunnel_p)
# try:
# #tunnel_p.wait(timeout=3)
# import time
# time.sleep(3)
# tunnel_p.poll()
# if tunnel_p.returncode != None:
# (stdout,stderr) = tunnel_p.communicate()
# logger.error('tunnel process exited {} {} {}'.format(tunnel_p.returncode, stdout, stderr))
# sess.tunnels.remove(tunnel_p)
# except subprocess.TimeoutExpired:
# pass
logger.debug("leaving tunnel")
return localport, [tunnel_p.pid] return localport, [tunnel_p.pid]
# @staticmethod # @staticmethod
...@@ -505,14 +490,11 @@ class Ssh: ...@@ -505,14 +490,11 @@ class Ssh:
while notopen: while notopen:
ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssock.setblocking(True) ssock.setblocking(True)
logger.debug('testing port {}'.format(localport))
try: try:
ssock.connect(('127.0.0.1', localport)) ssock.connect(('127.0.0.1', localport))
logger.debug('port is open')
notopen = False notopen = False
ssock.close() ssock.close()
except socket.error: except socket.error:
logger.debug('port is not open yet')
time.sleep(0.1) time.sleep(0.1)
ssock.close() ssock.close()
return True return True
......
...@@ -37,7 +37,7 @@ class SSHSession: ...@@ -37,7 +37,7 @@ class SSHSession:
if app.config['ENABLELAUNCH'] and 'SSH_AUTH_SOCK' in os.environ and os.environ['SSH_AUTH_SOCK']: if app.config['ENABLELAUNCH'] and 'SSH_AUTH_SOCK' in os.environ and os.environ['SSH_AUTH_SOCK']:
self.socket = os.environ['SSH_AUTH_SOCK'] self.socket = os.environ['SSH_AUTH_SOCK']
return return
p = subprocess.Popen([self.sshagent],stdout=subprocess.PIPE,stderr=subprocess.PIPE) p = subprocess.Popen([self.sshagent],start_new_session=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
(stdout,stderr) = p.communicate() (stdout,stderr) = p.communicate()
for l in stdout.decode().split(';'): for l in stdout.decode().split(';'):
if 'SSH_AUTH_SOCK=' in l: if 'SSH_AUTH_SOCK=' in l:
...@@ -187,13 +187,14 @@ class SSHSession: ...@@ -187,13 +187,14 @@ class SSHSession:
for pid in self.pids: for pid in self.pids:
logger.debug("killing pid {}".format(pid)) logger.debug("killing pid {}".format(pid))
try: try:
os.killpg(int(pid), signal.SIGTERM) # Sometimes this fails and I don't know why os.killpg(os.getpgid(int(pid)), signal.SIGTERM) # Sometimes this fails and I don't know why
try: try:
os.kill(int(pid), 0) # If the first kill worked, this will raise a ProcessLookupError os.kill(int(pid), 0) # If the first kill worked, this will raise a ProcessLookupError
time.sleep(2) time.sleep(2)
os.killpg(int(pid),signal.SIGKILL) os.killpg(os.getpgid(int(pid)),signal.SIGKILL)
logger.error('resorting to sigkill for pid {}'.format(pid)) logger.error('resorting to sigkill for pid {}'.format(pid))
except ProcessLookupError: except ProcessLookupError:
logger.debug(f'sigterm succesfull for {pid}')
pass pass
logger.debug("killed {}".format(pid)) logger.debug("killed {}".format(pid))
except ProcessLookupError as e: except ProcessLookupError as e:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment