import threading import socket, array import select import logging #TES = 'http://localhost:8080/' TES = 'http://localhost:5000/' failthresh = 10 class TWSProxy(threading.Thread): TIMEOUT = 10 AUTHTIMEOUT = 60 MAXBUFF = 8192 MAXHEADERS = 8192 def __init__(self,socket): super().__init__() self.csock = socket self.ssock = None self.authtok = None self._timeout = 10 def inittws(self,closed): """ Read from the client socket until one of 1) We can verify the request based on headers 2) We hit a timeout (i.e. no more headers will be sent and the client is waiting for a response) If we can verify open a new socket to the server and return this """ bytessofar = 0 header=bytearray(TWSProxy.MAXHEADERS) keepreading = True initcount=0 while keepreading: initcount=initcount+1 r,w,e = select.select([self.csock],[],[],5) if len(r) > 0: partial = self.csock.recv(TWSProxy.MAXBUFF) header[bytessofar:bytessofar+len(partial)] = partial bytessofar = bytessofar + len(partial) port = TWSProxy.verifyauth(header[0:bytessofar]) if port is not None: keepreading = False else: port = TWSProxy.verifyauth(header[0:bytessofar]) keepreading = False if initcount > failthresh: keepreading = False if port is not None: self.ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ssock.setblocking(True) try: self.ssock.connect(('127.0.0.1',port)) if bytessofar > 0: TWSProxy.reliablesend(self.ssock,header[0:bytessofar],bytessofar) except ConnectionRefusedError as e: self.ssock.close() self.csock.close() closed.set() else: self.csock.close() closed.set() def run(self): initshutdown = threading.Event() initshutdown.clear() self.inittws(initshutdown) t1 = None t2 = None if not initshutdown.isSet(): # TWSProxy.twosocks(self.csock,self.ssock,initshutdown) t1 = threading.Thread(target=TWSProxy.sockcopy, args=(self.ssock, self.csock, initshutdown),name='s2c') t2 = threading.Thread(target=TWSProxy.sockcopy, args=(self.csock, self.ssock, initshutdown),name='c2s') t1.start() t2.start() t1.join() t2.join() if self.ssock is not None: self.ssock.close() if self.csock is not None: self.csock.close() @staticmethod def verifyauth(header): import re import requests token = b'twsproxyauth=(?P<authtok>\w+)[\W|$]' m = re.search(token,header) if m: authtok = m.groupdict()['authtok'] s = requests.Session() url = TES+'tunnelstat/'+authtok.decode() try: r = s.get(url) port = r.json() except: raise Exception('unable to get a port number for the authtok') return port return None # if m: # print('match verify!',m.group(0)) # return 8888 # else: # return None @staticmethod def reliablesend(socket,buff,msglength): totalsent = 0 while totalsent < msglength: sent = socket.send(buff[totalsent:]) if sent == 0: raise RuntimeError("socket connection broken") totalsent = totalsent + sent # @staticmethod # def twosocks(client,server,initshutdown): # import threading # logger=logging.getLogger() # closed = False # clientopen = True # serveropen = True # shuttype = socket.SHUT_RD # while serveropen or clientopen: # r,w,e = select.select([client,server],[],[],TWSProxy.TIMEOUT) # print(r) # if client in r: # try: # buff = client.recv(TWSProxy.MAXBUFF) # msglength = len(buff) # if msglength > 0: # TWSProxy.reliablesend(server,buff,msglength) # else: # print("client closed socket for reading") # clientopen = False # server.shutdown(shuttype) # except: # clientopen = False # if server in r: # try: # buff = server.recv(TWSProxy.MAXBUFF) # msglength = len(buff) # if msglength > 0: # TWSProxy.reliablesend(client,buff,msglength) # else: # print("server sent zero bytes ... what does it mean?") # client.shutdown(shuttype) ## print("server closed socket for reading") # serveropen = False # except: # serveropen = False # # If the client has finished sending, and we've finished transmitting to the server # # The server may still have some data to transmit to the client # closed = False # while not closed: # r,w,e = select.select([server],[],[],TWSProxy.TIMEOUT) # print("cleanup loop") # buff = server.recv(TWSProxy.MAXBUFF) # msglength = len(buff) # if msglength > 0: # try: # TWSProxy.reliablesend(client,buff,msglength) # except BrokenPipeError as e: # pass # else: # closed = True @staticmethod def sockcopy(src,dest,initshutdown): shuttype = socket.SHUT_RD import threading logger = logging.getLogger() closed = False name = threading.current_thread().name failcount=0 failthresh = 10 while not closed and failcount < failthresh: r,w,e = select.select([src],[],[],TWSProxy.TIMEOUT) if len(r) > 0: failcount=0 buff = None msglength = -1 try: buff = src.recv(TWSProxy.MAXBUFF) if buff is None: continue except ConnectionResetError as e: close = True continue except Exception as e: import traceback print(traceback.format_exc()) # closed = True continue msglength = len(buff) if msglength > 0: TWSProxy.reliablesend(dest,buff,msglength) if msglength == 0: dest.shutdown(shuttype) initshutdown.set() closed = True else: failcount=failcount+1 if failcount > failthresh: dest.shutdown(shuttype) initshutdown.set() def main(): from . import server import logging logging.basicConfig(filename="/var/log/tws.log",format="%(asctime)s %(levelname)s:%(process)s: %(message)s") logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.debug("starting TWS proxy") server = server.TWSServer() logger.debug("initialised server object") server.run()