Newer
Older
import threading
import socket, array
import select
import logging
Chris Hines
committed
failthresh = 10
class TWSProxy(threading.Thread):
TIMEOUT = 10
AUTHTIMEOUT = 60
MAXBUFF = 8192
MAXHEADERS = 8192
def __init__(self,socket):
super().__init__()
self.csock = socket
self.ssock = None
self.authtok = None
self._timeout = 10
def inittws(self,closed):
""" Read from the client socket until one of
1) We can verify the request based on headers
2) We hit a timeout (i.e. no more headers will be sent and the client is waiting for a response)
If we can verify
open a new socket to the server and return this
"""
bytessofar = 0
header=bytearray(TWSProxy.MAXHEADERS)
keepreading = True
Chris Hines
committed
initcount=0
Chris Hines
committed
initcount=initcount+1
r,w,e = select.select([self.csock],[],[],AUTHTIMEOUT)
if len(r) > 0:
partial = self.csock.recv(TWSProxy.MAXBUFF)
header[bytessofar:bytessofar+len(partial)] = partial
bytessofar = bytessofar + len(partial)
logger.debug('inittws, checking headers')
port = TWSProxy.verifyauth(header[0:bytessofar])
if port is not None:
logger.debug('inittws, found auth token and got port {}'.format(port))
if port is None:
logger.debug('inittws, no authtok found in the first {} bytes'.format(bytessofar))
logger.debug('inittws, select returned with no more info, verifying headers for the last time')
port = TWSProxy.verifyauth(header[0:bytessofar])
keepreading = False
Chris Hines
committed
if initcount > failthresh:
logger.debug('inittws, checked headers enough times, got {} bytes with no success'.format(bytessofar))
Chris Hines
committed
keepreading = False
if port is not None:
self.ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ssock.setblocking(True)
try:
self.ssock.connect(('127.0.0.1',port))
if bytessofar > 0:
logger.debug('inittws: returning the initial {} bytes'.format(bytessofar))
return (header,bytessofar)
#TWSProxy.reliablesend(self.ssock,header[0:bytessofar],bytessofar)
logger.error('inittws, got my tokens, got my port attempted to send data and it all went pear shaped')
self.ssock.close()
self.csock.close()
closed.set()
logger.debug('inittws, unable to determine correct port, closing connection')
import logging
logger = logging.getLogger()
logger.debug('starting new thread listening on {}'.format(self.csock))
initshutdown = threading.Event()
initshutdown.clear()
(header, bytessofar) = self.inittws(initshutdown)
logger.debug('connecting {} to {}'.format(self.csock,self.ssock))
if initshutdown.isSet():
logger.debug('NOT connecting {} inittws did not connect us'.format(self.csock))
logger.debug('testing initshutdown')
logger.debug('sending on initial headers')
TWSProxy.reliablesend(self.ssock,header[0:bytessofar],bytessofar)
# TWSProxy.twosocks(self.csock,self.ssock,initshutdown)
t1 = threading.Thread(target=TWSProxy.sockcopy, args=(self.ssock, self.csock, initshutdown),name='s2c')
t2 = threading.Thread(target=TWSProxy.sockcopy, args=(self.csock, self.ssock, initshutdown),name='c2s')
t1.start()
t2.start()
t1.join()
t2.join()
logger.debug('transmission complete threads joined')
else:
logger.debug('not creating threads, shutdown is set')
if self.ssock is not None:
self.ssock.close()
if self.csock is not None:
self.csock.close()
@staticmethod
def verifyauth(header):
import re
import requests
token = b'twsproxyauth=(?P<authtok>\w+)[\W|$]'
m = re.search(token,header)
if m:
authtok = m.groupdict()['authtok']
logger.debug('authtok found {}'.format(authtok))
s = requests.Session()
url = TES+'tunnelstat/'+authtok.decode()
Chris Hines
committed
try:
logger.debug('verify auth querying url {}'.format(url))
Chris Hines
committed
r = s.get(url)
port = r.json()
if port is None:
logger.debug('authtok found but no tunnel to connect to {}'.format(authtok))
else:
logger.debug('authtok found port found {}'.format(port))
Chris Hines
committed
except:
logger.error('authtok found port found {}'.format(port))
raise Exception('unable to get a port number for the authtok {}'.format(r.text))
return port
return None
# if m:
# print('match verify!',m.group(0))
# return 8888
# else:
# return None
@staticmethod
def reliablesend(socket,buff,msglength):
totalsent = 0
while totalsent < msglength:
sent = socket.send(buff[totalsent:])
if sent == 0:
raise RuntimeError("socket connection broken")
totalsent = totalsent + sent
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
@staticmethod
def twosocks(client,server,initshutdown):
import threading
logger=logging.getLogger()
closed = False
clientopen = True
serveropen = True
shuttype = socket.SHUT_RD
while serveropen or clientopen:
r,w,e = select.select([client,server],[],[],TWSProxy.TIMEOUT)
if client in r:
try:
buff = client.recv(TWSProxy.MAXBUFF)
msglength = len(buff)
if msglength > 0:
TWSProxy.reliablesend(server,buff,msglength)
else:
clientopen = False
server.shutdown(shuttype)
except:
clientopen = False
if server in r:
try:
buff = server.recv(TWSProxy.MAXBUFF)
msglength = len(buff)
if msglength > 0:
TWSProxy.reliablesend(client,buff,msglength)
else:
client.shutdown(shuttype)
# print("server closed socket for reading")
serveropen = False
except:
serveropen = False
# If the client has finished sending, and we've finished transmitting to the server
# The server may still have some data to transmit to the client
closed = False
while not closed:
r,w,e = select.select([server],[],[],TWSProxy.TIMEOUT)
buff = server.recv(TWSProxy.MAXBUFF)
msglength = len(buff)
if msglength > 0:
try:
TWSProxy.reliablesend(client,buff,msglength)
except BrokenPipeError as e:
pass
else:
closed = True
@staticmethod
def sockcopy(src,dest,initshutdown):
shuttype = socket.SHUT_RD
import threading
logger = logging.getLogger()
logger.debug('sock copy started')
closed = False
name = threading.current_thread().name
Chris Hines
committed
failcount=0
failthresh = 10
while not closed and failcount < failthresh:
r,w,e = select.select([src],[],[],TWSProxy.TIMEOUT)
if len(r) > 0:
Chris Hines
committed
failcount=0
buff = None
msglength = -1
try:
buff = src.recv(TWSProxy.MAXBUFF)
if buff is None:
continue
except ConnectionResetError as e:
close = True
continue
except Exception as e:
import traceback
logger.error(traceback.format_exc())
# closed = True
continue
msglength = len(buff)
if msglength > 0:
TWSProxy.reliablesend(dest,buff,msglength)
if msglength == 0:
dest.shutdown(shuttype)
initshutdown.set()
closed = True
Chris Hines
committed
else:
failcount=failcount+1
if failcount > failthresh:
dest.shutdown(shuttype)
initshutdown.set()
def main():
from . import server
import logging
Chris Hines
committed
logging.basicConfig(filename="/var/log/tws.log",format="%(asctime)s %(levelname)s:%(process)s: %(message)s")
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
Chris Hines
committed
logger.debug("starting TWS proxy")
port = int(sys.argv[1])
server = server.TWSServer(port,5)
Chris Hines
committed
logger.debug("initialised server object")