Changeset 3361

Show
Ignore:
Timestamp:
01/13/10 15:37:56 (10 years ago)
Author:
kimbrl
Message:

Addressed issues in #811 including error passing, advertising / lookups, and forwrder reconnection

Location:
seattle/trunk/NatForwardingService
Files:
8 modified

Legend:

Unmodified
Added
Removed
  • seattle/trunk/NatForwardingService/src/NAT_CONSTANTS.repy

    r3344 r3361  
    2424NAT_NO = 'NO'     # use for negitive confirmation 
    2525 
     26NAT_SERVER_NOT_FOUND = 'SERVERNOTFOUND' 
     27NAT_FORWARDER_BUSY = 'BUSY' 
     28NAT_SERVER_ERROR = 'SERVERERROR' 
    2629 
    2730 
     31#common advertise keys 
     32 
     33NAT_FORWARDER = 'seattle_NAT_FORWARDING_SERVICE_NODE' 
     34 
  • seattle/trunk/NatForwardingService/src/NatForwardingShim.repy

    r3344 r3361  
    3535include ShimStack.repy 
    3636include NAT_CONSTANTS.repy 
    37  
     37include advertise.repy 
     38include NullShim.repy 
     39 
     40 
     41 
     42 
     43 
     44class NatConnError(Exception): 
     45  # base class for problem with nat connections 
     46  pass 
     47 
     48class NatLookupError(NatConnError): 
     49  # unable to lookup a needed value 
     50  pass 
    3851 
    3952 
     
    4255# for diffent calls to waitforconn 
    4356class NatStateObj: 
    44   def __init__(self,sock,running,callback,port): 
     57  def __init__(self,sock,running,host,callback,port,forwarder_ip,forwarder_port): 
    4558    self.sock = sock 
    4659    self.running = running 
    4760    self.callback = callback 
     61    self.host = host 
    4862    self.port = port 
    49  
     63    self.adkey = host+':'+str(port) 
     64    self.adval = forwarder_ip +':'+str(forwarder_port) 
    5065 
    5166 
     
    5368class NatForwardingShim(): 
    5469   
     70  advertise_wait_time = 60 
    5571  name ='NatForwardingShim' 
    5672 
     73  # create a cache to store forwarder lookups 
     74  forwarder_cache = {'tuple_list':None,'lock':getlock()} 
     75 
     76  # cache clt lookups 
     77  clt_cache = {} 
     78  clt_cache_lock = getlock() 
     79 
     80  ######    CONSTRUCTOR 
    5781 
    5882  def __init__(self,next_shim=None,optional_args=None,control_sock=None): 
    59     #CONSTRUCTOR 
     83     
    6084 
    6185    self.shim_stack = ShimStack(next_shim) 
     
    6387    self.state_objs = {} 
    6488     
     89 
    6590    # hop_key and hop_port can be used to avoid doing a lookup with 
    6691    # openconn, primarily used for testing 
     
    79104     
    80105 
    81    
     106  #######  SERVER METHODS   
     107 
     108 
    82109 
    83110  def waitforconn(self,key,port,callback): 
     
    87114 
    88115    <Arguments> 
    89       key: They srv key to register with a nat forwarder 
     116      key: They srv key to register with a nat forwarder, its up to the user 
     117      of this library to come up with a unique value to use as key, and to  
     118      communicate this key to potential clients. 
    90119 
    91120      port: The port to be used 
     
    98127      Sock exceptions may occur in event of connection failure, etc 
    99128     
     129    <Side Effects> 
     130      1 thread is consumed to wait for connections over the control socket 
     131      1 thread is consumed to advertise this connection 
     132 
     133      every call to waitforconn will use 2 threads, until stopcomm is called 
     134 
    100135    """ 
    101      
     136 
    102137    if port in self.state_objs: 
    103138      # if this port alreayd has an active listener just replace the callback 
     
    108143 
    109144    # establish a control socket with a forwarder 
    110     control_sock = self.establish_control_sock(key,port) 
     145    (for_ip,for_port,control_sock) = self.establish_control_sock(key,port) 
    111146     
    112147    # create a new state object for this listener 
    113     state_obj = NatStateObj(control_sock,True,callback,port) 
     148    state_obj = NatStateObj(control_sock,True,key,callback,port,for_ip,for_port) 
     149 
    114150    self.state_objs[port] = state_obj 
     151 
     152    # advertise this connection so the host can be ound by clients 
     153    settimer(0,self.advertise_host,[state_obj]) 
    115154 
    116155    # start a thread to listen for new requests 
     
    121160 
    122161 
    123      
    124162 
    125163  def establish_control_sock(self,key,port): 
     
    128166 
    129167    if self.hop_key is not None and self.hop_port is not None: 
     168      # use the manually configured forwarder if it exists 
    130169      control_sock = self.shim_stack.openconn(self.hop_key,int(self.hop_port))   
     170      for_ip = self.hop_key 
     171      for_port = int(self.hop_port) 
     172     
    131173    else: 
    132       raise Exception("lookup not implemented in establish control sock") 
     174      # use the lookup mechanism to find a forwarder 
     175       
     176      self.forwarder_cache['lock'].acquire()       
     177      connected = False 
     178      is_cache_fresh = False # if we have fresh cache 
     179       
     180      #check that we actually have a cache 
     181      if self.forwarder_cache['tuple_list'] is None: 
     182        self.forwarder_cache['tuple_list'] = self.forwarder_lookup() 
     183        is_cache_fresh = True 
     184 
     185      while not connected: 
     186        ex_str ='' 
     187        for (for_ip,for_port) in self.forwarder_cache['tuple_list']: 
     188          try: 
     189            control_sock = self.shim_stack.openconn(for_ip,for_port) 
     190          except Exception,e: 
     191            ex_str = ex_str+' '+str(e) 
     192          else: 
     193            connected = True 
     194            break 
     195     
     196 
     197        if not connected: 
     198          if is_cache_fresh:  
     199            self.forwarder_cache['lock'].release()  
     200            raise NatConnError("Unable to establish control socket: "+ex_str) 
     201          else: 
     202            self.forwarder_cache['tuple_list'] = self.forwarder_lookup() 
     203            is_cache_fresh = True 
     204 
     205    
     206      self.forwarder_cache['lock'].release() 
     207 
    133208 
    134209    #register the with the forwarder 
     
    144219      raise Exception, 'NAT node refused connection' 
    145220 
    146     return control_sock 
     221    return (for_ip,for_port,control_sock) 
    147222 
    148223 
     
    215290        
    216291        # respond to forwarder checks to see if this connection is still 
    217         # active 
     292        # active, if we've lost the control sock we will detect it here 
     293        # and can reconnection 
    218294        while request == NAT_CONNECTION_ALIVE: 
    219           request = session_recvmessage(state_obj.sock) 
    220           if request != NAT_CONNECTION_ALIVE: break 
    221           session_sendmessage(state_obj.sock,NAT_YES)   
     295          try: 
     296            request = session_recvmessage(state_obj.sock) 
     297            if request != NAT_CONNECTION_ALIVE: break 
     298            session_sendmessage(state_obj.sock,NAT_YES)   
     299          except: 
     300            # error over the control socket, establish a new one 
     301            if state_obj.running: 
     302              state_obj.sock.close() 
     303              (for_ip,for_port,state_obj.sock) = self.establish_control_sock( 
     304                                                state_obj.host,state_obj.port) 
     305              state_obj.adval=for_ip+':'+str(for_port) 
     306            else: 
     307              raise # if stop has been called don't re-establish 
    222308 
    223309        # take a request to make a new connection  
     
    245331 
    246332 
     333 
     334  ############ CLIENT METHODS 
     335 
     336 
    247337  def openconn(self,id,port,localip=None,localport=None,timeout=5): 
    248338    """ 
     
    259349      Exception if Forwarder rejects connection    
    260350 
     351    <Warning> TODO: Does not correctly adhere to timeout semantics 
     352 
    261353    """   
    262  
    263     # if a hop key and port have been specified, use em. 
     354     
    264355    if self.hop_key is not None and self.hop_port is not None: 
    265       base_sock = self.shim_stack.openconn(self.hop_key,self.hop_port,localip,localport) 
     356      # if a hop key and port have been specified, use em. 
     357      base_sock = self.shim_stack.openconn(self.hop_key,self.hop_port,localip,localport,timeout) 
     358      self.establish_client_server_conn(base_sock,id,port) 
     359      return base_sock 
     360 
    266361    else: 
    267       raise Exception("openconn lookups not yet implemented") 
    268       # TODO implement lookups 
    269  
     362      # lookup the host's forwarder, note that this is a list 
     363      # because old advertisements might still be in the DHT 
     364       
     365      host_key = id+':'+str(port) 
     366 
     367      self.clt_cache_lock.acquire() 
     368 
     369      if host_key in self.clt_cache: 
     370        fresh_cache = False 
     371      else: 
     372        try: 
     373          self.clt_cache[host_key]= self.lookup_host(id,port) 
     374          fresh_cache = True 
     375        except Exception,e: 
     376          self.clt_cache_lock.release() 
     377          raise NatLookupError(str(e)) 
     378 
     379       
     380      while True: 
     381        # this loop breaks when we get a connection of decide 
     382        # to fail, loops is executed at most twice  
     383  
     384        exception_str = '' 
     385        for (forip,forport) in self.clt_cache[host_key]: 
     386          try: 
     387            base_sock = self.shim_stack.openconn(forip,forport,localip, 
     388                                               localport,timeout) 
     389            self.establish_client_server_conn(base_sock,id,port) 
     390          except Exception,e: 
     391            exception_str = exception_str+',  '+str(e) 
     392          else: 
     393            # established a connection, return the socket 
     394            self.clt_cache_lock.release() 
     395            return base_sock 
     396 
     397        # we tried all the forwarders and could not establish a connection 
     398        if not fresh_cache: 
     399          # freshen the cache and try again 
     400           try: 
     401             del self.clt_cache[host_key] 
     402             self.clt_cache[host_key]= self.lookup_host(id,port) 
     403             fresh_cache = True 
     404           except Exception,e: 
     405             self.clt_cache_lock.release() 
     406             raise NatLookupError(str(e)) 
     407        else: 
     408          # if we already had fresh cache then fail   
     409          del self.clt_cache[host_key] 
     410          self.clt_cache_lock.release() 
     411          raise NatConnError("Failed to get connection: "+exception_str) 
     412       
     413 
     414 
     415 
     416  def establish_client_server_conn(self,base_sock,id,port): 
     417    #used by opennconn to establish connection 
    270418 
    271419    # specify this is a client connection, and what server we want 
    272     base_sock.send(NAT_CLIENT) 
    273     session_sendmessage(base_sock,str(id))  
    274     session_sendmessage(base_sock,str(port)) 
    275    
     420    try: 
     421      base_sock.send(NAT_CLIENT) 
     422      session_sendmessage(base_sock,str(id))  
     423      session_sendmessage(base_sock,str(port)) 
     424    except Exception,e: 
     425      raise NatConnError("Error initializing socket connection: "+str(e))     
     426 
    276427    # see if the connection was established 
    277428    response =  session_recvmessage(base_sock) 
    278429    if response != NAT_YES: 
    279430      base_sock.close() 
    280       raise Exception, 'Failed to Connect to host' 
     431      if response == NAT_SERVER_NOT_FOUND: 
     432        raise NatConnError('The Host requested was not found at the forwarder') 
     433      elif response == NAT_FORWARDER_BUSY: 
     434        raise NatConnError('The Host requested has reach its client capacity') 
     435      elif response == NAT_SERVER_ERROR: 
     436        raise NatConnError('The Host requested suffered an unexpected error during connection') 
     437      else: 
     438        raise NatConnError("Unknown nat failure: "+response) 
    281439 
    282440    #if the connection is established we can return the socket 
     
    285443 
    286444 
     445  ####   ADVERTISE / LOOKUP METHODS 
     446 
     447 
     448  def forwarder_lookup(self): 
     449    # retrns a list of forwarders in the form [(ip,port),...] 
     450    # ordered by the server load on the forwareders 
     451    #  
     452    # raises exception if no valid forwarder entries are found 
     453 
     454    try: 
     455      #TODO remove lookup type when DOR is fixed 
     456      raw_data = advertise_lookup(NAT_FORWARDER, 
     457                            lookuptype=['central','opendht']) 
     458    except: 
     459      raise NatLookupError("No Nat Forwarders were found") 
     460 
     461 
     462    # hash the forwarders based on the load they have     
     463    tuple_list_dict = {} 
     464    for item in raw_data: 
     465      try: 
     466        (ip,port,load) = item.split(':') 
     467        port = int(port) 
     468      except: 
     469        pass  # throw out invalid entries, todo log this? 
     470      else: 
     471        if load not in tuple_list_dict: 
     472          tuple_list_dict[load] = [] 
     473        tuple_list_dict[load].append((ip,port)) 
     474 
     475    if len(tuple_list_dict) < 1: 
     476      raise NatLookupError("No Valid entries were found for nat forwarders") 
     477 
     478 
     479    # drop all of the tuples into a list ordered by increasing  
     480    # forwarder load 
     481    ranked_tuple_list = [] 
     482    key_list =  tuple_list_dict.keys() 
     483    key_list.sort() 
     484    for key in key_list: 
     485      for tuple in tuple_list_dict[key]: 
     486        ranked_tuple_list.append(tuple) 
     487     
     488    return ranked_tuple_list 
     489 
     490 
     491 
     492 
     493 
     494 
     495  def advertise_host(self,state_obj): 
     496    while state_obj.running: 
     497       
     498      try: 
     499        advertise_announce(state_obj.adkey,state_obj.adval, 
     500                                self.advertise_wait_time*2) 
     501      except: 
     502        pass  #TODO can i provid some kind of indication that this happens? 
     503 
     504 
     505      # sleep in increments so that we stop this thread 
     506      # as soon as possible after stopcomm 
     507      slept = 0       
     508      while slept < self.advertise_wait_time: 
     509        if not state_obj.running: return 
     510        slept += 5 
     511        sleep(5) 
     512       
     513       
     514 
     515 
     516  def lookup_host(self,host,port): 
     517    # returns a list of tuples (forwarderip,forwarderport) 
     518    # TODO remove lookuptype when DOR advertise is fixed 
     519    raw_list = advertise_lookup(host+':'+str(port), 
     520                                 lookuptype=['central','opendht']) 
     521    if raw_list is None or len(raw_list) == 0: 
     522      raise NatLookupError('No lookup results for: '+host+':'+str(port)) 
     523 
     524    tuple_list = [] 
     525    for item in raw_list: 
     526      try: 
     527        (ip,port) = item.split(':') 
     528      except: 
     529        pass 
     530      else: tuple_list.append((ip,int(port))) 
     531 
     532    if len(tuple_list) == 0: 
     533      raise NatLookupError('No valid lookup results for: '+host+':'+str(port)) 
     534    else: 
     535      return tuple_list 
     536 
     537 
     538 
     539 
    287540 
    288541 
  • seattle/trunk/NatForwardingService/src/Nat_Forwarder.repy

    r3344 r3361  
    3131include session.repy 
    3232include NAT_CONSTANTS.repy 
     33include advertise.repy 
     34 
     35 
    3336 
    3437 
     
    4043SRV_DICT = {}  # keep track of registered servers 
    4144CLT_DICT = {} # HOLDS CLIENT SOCKETS 
     45 
     46CLT_SERVER_DICT = {} #maps clts to servers they want to connect to 
     47 
    4248 
    4349NEW_SERVER_LOCK = getlock() 
     
    161167        exchange_streams(clt_sock,sock,False) 
    162168     
     169      clt = key+','+port 
     170 
    163171      #del the clt dict entry when the connection is finished 
    164       del CLT_DICT[key+','+port] 
    165  
     172      del CLT_DICT[clt] 
     173       
     174      # decrement the number of clients 
     175      try: 
     176        server_state = SRV_DICT[CLT_SERVER_DICT[clt]]     
     177        server_state['lock'].acquire() 
     178        server_state['clts'] -= 1 
     179        server_state['lock'].release() 
     180      except: 
     181        pass # the server may have been deleted if it had an error 
     182       
     183      # remove the entry from the clt server dict 
     184      del CLT_SERVER_DICT[clt] 
    166185 
    167186  # unknown action 
     
    195214    srv_lock = SRV_DICT[server]['lock'] 
    196215  except: 
    197     print 'ERROR: requested server not found' 
    198     try: 
    199       session_sendmessage(clt_sock,NAT_NO) 
     216    print 'ERROR: clt: '+rip+':'+str(rport)+' requested unknown server:'+server 
     217    try: 
     218      session_sendmessage(clt_sock,NAT_SERVER_NOT_FOUND) 
    200219      clt_sock.close() 
    201220    except: pass #TODO log this? 
     
    205224  #add the clt to the clt dict 
    206225  CLT_DICT[rip+','+str(rport)] = clt_sock 
     226  CLT_SERVER_DICT[rip+','+str(rport)] = server 
    207227 
    208228  # begin the client/server handshake,  
     
    213233    print 'INFO: maxclients recieved for '+key+':'+port+' not allowing new client' 
    214234    try: 
    215       session_sendmessage(clt_sock,NAT_NO) 
     235      session_sendmessage(clt_sock,NAT_FORWARDER_BUSY) 
    216236    except: 
    217237      clt_sock.close() #TODO log this? 
     
    237257    safely_remove_server(server) 
    238258    try: 
    239       session_sendmessage(clt_sock,NAT_NO) 
     259      session_sendmessage(clt_sock,NAT_SERVER_ERROR) 
    240260    except: 
    241261      pass #todo log this? 
     
    252272    print 'ERROR: requested server did not make new connection' 
    253273    try: 
    254       session_sendmessage(clt_sock,NAT_NO) 
     274      session_sendmessage(clt_sock,NAT_SERVER_ERROR) 
    255275    except: 
    256276      pass #todo log this? 
     
    298318    try: 
    299319      msg = from_sock.recv(1024) 
    300       to_sock.send(msg) 
     320      sent = to_sock.send(msg) 
     321      if sent != len(msg): 
     322        # this is a subtle error that can occur and will violate 
     323        # semantics if not detected.  
     324 
     325        # TODO actually address the error and do the right thing 
     326        # for now i just detect the error and close the connection  
     327        # which does perserve semantics 
     328        raise Exception("ERROR IN EXCHANGE:recv: "+str(len(msg))+"bytes and sent"+str(sent)) 
    301329    except Exception,e: 
    302330      if do_print: print 'INFO: terminated a connection because, '+str(e) 
     
    350378 
    351379 
     380def forwarder_advertise_thread(ip,waitport): 
     381  # advertises the forwarder 
     382 
     383  base_value = ip+':'+str(waitport)+':' 
     384  while True: 
     385    value = base_value+str(len(SRV_DICT)) 
     386    try: 
     387      advertise_announce(NAT_FORWARDER,value,120) 
     388    except Exception,e: 
     389       print 'ADVERTISE ERROR: '+str(e) 
     390       sleep(10) 
     391    else: 
     392      sleep(60) 
     393 
    352394 
    353395if callfunc == 'initialize': 
    354396 
    355   if len(callargs) < 2: 
    356     print 'usage: ip wait_port ' 
     397  if len(callargs) == 1: 
     398    ip = getmyip() 
     399    wait_port = int(callargs[0]) 
     400  elif len(callargs) == 2: 
     401    ip = callargs[0] 
     402    wait_port = int(callargs[1]) 
     403  else: 
     404    print 'usage: [ip] wait_port ' 
    357405    exitall() 
    358406     
    359   ip = callargs[0] 
    360   wait_port = int(callargs[1]) 
     407   
    361408   
    362409  # we need these when servers are making new connections  
     
    366413  waitforconn(ip,wait_port,common_entry) 
    367414   
     415  forwarder_advertise_thread(ip,wait_port) 
  • seattle/trunk/NatForwardingService/src/NullShim.repy

    r3344 r3361  
    1313""" 
    1414 
    15  
     15include ShimStack.repy 
    1616 
    1717class NullShim(): 
  • seattle/trunk/NatForwardingService/tests/nat_client.repy

    r3344 r3361  
    1212if callfunc == 'initialize': 
    1313   
    14   serverkey = 'NAT$BLAHBLAHBLAH' 
     14  serverkey = 'BASICNATTESTSERVER' 
    1515   
    16   if len(callargs) != 2: 
    17     print 'usage: ip port' 
    18     exitall() 
     16   
     17  shim = ShimStack('(NatForwardingShim)(NullShim)') 
     18   
     19  # do it ten times 
     20  for j in range(10):  
     21    try: 
     22      sock = shim.openconn(serverkey,12347) 
     23    except Exception,e: 
     24      print 'ERROR, got exception: '+str(e) 
     25    else: 
     26      for i in range(10): 
     27        sock.send('a') 
     28        msg = sock.recv(1) 
     29        if msg != 'A': print 'ERROR: got '+msg 
     30      sock.close() 
     31      print 'completed pass: '+str(j) 
    1932 
    20   shim = ShimStack('(NatForwardingShim,'+callargs[0]+','+callargs[1]+')(NullShim)') 
    21   sock = shim.openconn(serverkey,12347) 
    22    
    23   for i in range(10): 
    24     sock.send(str(i)) 
    25     print sock.recv(10) 
    26  
    27   sock.close() 
  • seattle/trunk/NatForwardingService/tests/nat_server.repy

    r3344 r3361  
    1212def response(remote_ip,remote_port,sock,th,listenhandle): 
    1313  try: 
     14    count = 1 
    1415    while True: 
    15       msg = sock.recv(1024) 
    16       print msg 
    17       sock.send('got it') 
     16      sock.recv(1) 
     17      sock.send('A') 
     18      print 'sent response '+str(count) 
     19      count +=1 
    1820  except: 
     21     print 'closed a connection' 
    1922     sock.close() 
    2023 
     
    2326if callfunc == 'initialize': 
    2427   
    25   mykey = 'NAT$BLAHBLAHBLAH' 
     28  mykey = 'BASICNATTESTSERVER' 
    2629 
    27   if len(callargs) != 2: 
    28     print 'usage: ip port' 
    29     exitall() 
    3030 
    31   shim = ShimStack('(NatForwardingShim,'+callargs[0]+','+callargs[1]+')(NullShim)') 
     31  shim = ShimStack('(NatForwardingShim)(NullShim)') 
    3232 
    3333  shim.waitforconn(mykey,12347,response) 
  • seattle/trunk/NatForwardingService/tests/test_2_stopcomm.repy

    r3344 r3361  
    4646  try: 
    4747   sock = client_shim.openconn(serverkey,12347) 
    48   except: 
     48  except NatConnError: 
    4949    pass 
    5050  else:  
  • seattle/trunk/NatForwardingService/tests/test_9_maxcleints.repy

    r3344 r3361  
    1515def response(remote_ip,remote_port,sock,th,listenhandle): 
    1616  sock.send('A') 
    17   sock.close() 
     17   
    1818 
    1919 
     
    2121  try: 
    2222    sock = client_shim.openconn(serverkey,12347) 
    23   except: 
     23    mycontext['socks'].append(sock) 
     24  except NatConnError: 
    2425    LOCK.acquire() 
    2526    mycontext['failed_clients'] +=1 
     
    2930 
    3031  msg = sock.recv(1) 
    31   sock.close() 
    3232  if msg != 'A': 
    3333    raise Exception("got wrong message in client thread: "+msg) 
     
    3535  mycontext['threads_done'] = mycontext['threads_done'] +1  
    3636  LOCK.release() 
     37  return True 
    3738 
    3839if callfunc == 'initialize': 
     
    4142  mycontext['threads_done'] = 0 
    4243  mycontext['failed_clients'] = 0 
     44  mycontext['socks'] = [] 
    4345  ip = '127.0.0.1' 
    4446  port = 12345 
     
    6466    sleep(1) 
    6567 
     68  # close all the clients 
     69  for sock in mycontext['socks']: 
     70    sock.close() 
     71 
     72  if mycontext['failed_clients'] != 1: 
     73    raise Exception("Failed number of clients is wrong: "+str(mycontext['failed_clients'])) 
     74 
     75 
     76  # make sure a client can connect again 
     77  if not client_thread(client_shim): 
     78    raise Exception("Client cant connection after dropping some connections") 
     79 
     80 
    6681  # do the stopcomm 
    6782  server_shim.stopcomm(handle) 
    6883 
    69   if mycontext['failed_clients'] != 1: 
    70     raise Exception("Failed number of clients is wrong: "+str(mycontext['failed_clients'])) 
     84   
     85 
     86