import threading import socket, array import select import logging TES = 'http://localhost:8080/' failthresh = 10 class TWSProxy(threading.Thread): TIMEOUT = 10 AUTHTIMEOUT = 60 MAXBUFF = 8192 MAXHEADERS = 8192 def __init__(self,socket): super().__init__() self.csock = socket self.ssock = None self.authtok = None self._timeout = 10 def inittws(self,closed): """ Read from the client socket until one of 1) We can verify the request based on headers 2) We hit a timeout (i.e. no more headers will be sent and the client is waiting for a response) If we can verify open a new socket to the server and return this """ logger = logging.getLogger() bytessofar = 0 header=bytearray(TWSProxy.MAXHEADERS) keepreading = True initcount=0 while keepreading: initcount=initcount+1 r,w,e = select.select([self.csock],[],[],5) if len(r) > 0: partial = self.csock.recv(TWSProxy.MAXBUFF) header[bytessofar:bytessofar+len(partial)] = partial bytessofar = bytessofar + len(partial) port = TWSProxy.verifyauth(header[0:bytessofar]) if port is not None: keepreading = False else: port = TWSProxy.verifyauth(header[0:bytessofar]) keepreading = False if initcount > failthresh: keepreading = False if port is not None: self.ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ssock.setblocking(True) try: self.ssock.connect(('127.0.0.1',port)) if bytessofar > 0: return (header,bytessofar) #TWSProxy.reliablesend(self.ssock,header[0:bytessofar],bytessofar) except ConnectionRefusedError as e: self.ssock.close() self.csock.close() closed.set() return (None,None) else: self.csock.close() closed.set() return (None,None) def run(self): import logging logger = logging.getLogger() initshutdown = threading.Event() initshutdown.clear() (header, bytessofar) = self.inittws(initshutdown) t1 = None t2 = None if not initshutdown.isSet(): TWSProxy.reliablesend(self.ssock,header[0:bytessofar],bytessofar) # TWSProxy.twosocks(self.csock,self.ssock,initshutdown) t1 = threading.Thread(target=TWSProxy.sockcopy, args=(self.ssock, self.csock, initshutdown),name='s2c') t2 = threading.Thread(target=TWSProxy.sockcopy, args=(self.csock, self.ssock, initshutdown),name='c2s') t1.start() t2.start() t1.join() t2.join() if self.ssock is not None: self.ssock.close() if self.csock is not None: self.csock.close() @staticmethod def verifyauth(header): import re import requests logger = logging.getLogger() token = b'twsproxyauth=(?P<authtok>\w+)[\W|$]' m = re.search(token,header) if m: authtok = m.groupdict()['authtok'] s = requests.Session() url = TES+'tunnelstat/'+authtok.decode() try: r = s.get(url) port = r.json() except: raise Exception('unable to get a port number for the authtok {}'.format(r.text)) return port return None # if m: # print('match verify!',m.group(0)) # return 8888 # else: # return None @staticmethod def reliablesend(socket,buff,msglength): totalsent = 0 while totalsent < msglength: sent = socket.send(buff[totalsent:]) if sent == 0: raise RuntimeError("socket connection broken") totalsent = totalsent + sent @staticmethod def twosocks(client,server,initshutdown): import threading logger=logging.getLogger() closed = False clientopen = True serveropen = True shuttype = socket.SHUT_RD while serveropen or clientopen: r,w,e = select.select([client,server],[],[],TWSProxy.TIMEOUT) if client in r: try: buff = client.recv(TWSProxy.MAXBUFF) msglength = len(buff) if msglength > 0: TWSProxy.reliablesend(server,buff,msglength) else: clientopen = False server.shutdown(shuttype) except: clientopen = False if server in r: try: buff = server.recv(TWSProxy.MAXBUFF) msglength = len(buff) if msglength > 0: TWSProxy.reliablesend(client,buff,msglength) else: client.shutdown(shuttype) # print("server closed socket for reading") serveropen = False except: serveropen = False # If the client has finished sending, and we've finished transmitting to the server # The server may still have some data to transmit to the client closed = False while not closed: r,w,e = select.select([server],[],[],TWSProxy.TIMEOUT) buff = server.recv(TWSProxy.MAXBUFF) msglength = len(buff) if msglength > 0: try: TWSProxy.reliablesend(client,buff,msglength) except BrokenPipeError as e: pass else: closed = True @staticmethod def sockcopy(src,dest,initshutdown): shuttype = socket.SHUT_RD import threading logger = logging.getLogger() closed = False name = threading.current_thread().name logger.debug('begining sock copy {} {}'.format(src,dest)) while not closed: r,w,e = select.select([src],[],[],TWSProxy.TIMEOUT) if len(r) > 0: buff = None msglength = -1 try: buff = src.recv(TWSProxy.MAXBUFF) if buff is None: continue except ConnectionResetError as e: logger.debug('connection reset closing {} {}'.format(src,dest)) closed = True continue except Exception as e: import traceback logger.error(traceback.format_exc()) logger.debug('Exception in sockcopy {} {}'.format(src,dest)) closed = True msglength = len(buff) if msglength > 0: TWSProxy.reliablesend(dest,buff,msglength) if msglength == 0: """ see https://manpages.debian.org/buster/manpages-dev/recv.2.en.html in particular Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit zero-length datagrams. When such a datagram is received, the return value is 0. """ logger.debug('msglength 0, sending 0 {} {}'.format(src,dest)) dest.send(buff) #dest.shutdown(shuttype) #initshutdown.set() #closed = True logger.debug('ending sock copy {} {}'.format(src,dest)) dest.shutdown(shuttype) initshutdown.set() def main(): from . import server import logging import sys logging.basicConfig(filename="/var/log/tws.log",format="%(asctime)s %(levelname)s:%(process)s: %(message)s") logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.debug("starting TWS proxy") port = int(sys.argv[1]) server = server.TWSServer(port,5) logger.debug("initialised server object") server.run()