Please note that the CVS and issue trackers have moved to GitHub. These Trac pages are no longer kept up-to-date.

root/seattle/trunk/repy/emulcomm.py@5640

Revision 5640, 57.2 KB (checked in by albert, 7 years ago)

Fix for #1039, backported to RepyV1

Line 
1"""
2   Author: Justin Cappos
3
4   Start Date: 27 June 2008
5
6   Description:
7
8   This is a collection of communications routines that provide a programmer
9   with a reasonable environment.   This is used by repy.py to provide a
10   highly restricted (but usable) environment.
11   
12   Note that this module can also be used to place restrictions on the
13   interfaces and IP addresses that are used by Seattle.   The order in which
14   the interfaces are specified is their preference.   
15
16   One odd piece of behavior is that if an IP address changes, you can't use
17   it until calling getmyip.   It's not clear how you would know about this
18   occurrence, but it should be noted.
19"""
20
21import restrictions
22import socket
23
24# Armon: Used to check if a socket is ready
25import select
26
27# socket uses getattr and setattr.   We need to make these available to it...
28socket.getattr = getattr
29socket.setattr = setattr
30
31
32# needed to set threads for recvmess and waitforconn
33import threading
34threading.hasattr = hasattr    # Fix for #1039
35
36# to force destruction of old sockets
37import gc
38
39# So I can exit all threads when an error occurs or do select
40import harshexit
41
42# Needed for finding out info about sockets, available interfaces, etc
43import nonportable
44
45# So I can print a clean traceback when an error happens
46import tracebackrepy
47
48# accounting
49import nanny
50
51# give me uniqueIDs for the comminfo table
52import idhelper
53
54# for sleep
55import time 
56
57# Armon: Used for decoding the error messages
58import errno
59
60# Armon: Used for getting the constant IP values for resolving our external IP
61import repy_constants 
62
63# The architecture is that I have a thread which "polls" all of the sockets
64# that are being listened on using select.  If a connection
65# oriented socket has a connection pending, or a message-based socket has a
66# message pending, and there are enough events it calls the appropriate
67# function.
68
69
70
71
72
73# Table of communications structures:
74# {'type':'UDP','localip':ip, 'localport':port,'function':func,'socket':s, outgoing:True, 'closing_lock':lockobj}
75# {'type':'TCP','remotehost':None, 'remoteport':None,'localip':None,'localport':None, 'socket':s, 'function':func, outgoing:False, 'closing_lock':lockobj}
76
77comminfo = {}
78
79# If we have a preference for an IP/Interface this flag is set to True
80user_ip_interface_preferences = False
81
82# Do we allow non-specified IPs
83allow_nonspecified_ips = True
84
85# Armon: Specified the list of allowed IP and Interfaces in order of their preference
86# The basic structure is list of tuples (IP, Value), IP is True if its an IP, False if its an interface
87user_specified_ip_interface_list = []
88
89# This list caches the allowed IP's
90# It is updated at the launch of repy, or by calls to getmyip and update_ip_cache
91# NOTE: The loopback address 127.0.0.1 is always permitted. update_ip_cache will always add this
92# if it is not specified explicitly by the user
93allowediplist = []
94cachelock = threading.Lock()  # This allows only a single simultaneous cache update
95
96
97# Determines if a specified IP address is allowed in the context of user settings
98def ip_is_allowed(ip):
99  """
100  <Purpose>
101    Determines if a given IP is allowed, by checking against the cached allowed IP's.
102 
103  <Arguments>
104    ip: The IP address to search for.
105 
106  <Returns>
107    True, if allowed. False, otherwise.
108  """
109  global allowediplist
110  global user_ip_interface_preferences
111  global allow_nonspecified_ips
112 
113  # If there is no preference, anything goes
114  # same with allow_nonspecified_ips
115  if not user_ip_interface_preferences or allow_nonspecified_ips:
116    return True
117 
118  # Check the list of allowed IP's
119  return (ip in allowediplist)
120
121
122# Only appends the elem to lst if the elem is unique
123def unique_append(lst, elem):
124  if elem not in lst:
125    lst.append(elem)
126     
127# This function updates the allowed IP cache
128# It iterates through all possible IP's and stores ones which are bindable as part of the allowediplist
129def update_ip_cache():
130  global allowediplist
131  global user_ip_interface_preferences
132  global user_specified_ip_interface_list
133  global allow_nonspecified_ips
134 
135  # If there is no preference, this is a no-op
136  if not user_ip_interface_preferences:
137    return
138   
139  # Acquire the lock to update the cache
140  cachelock.acquire()
141 
142  # If there is any exception release the cachelock
143  try: 
144    # Stores the IP's
145    allowed_list = []
146 
147    # Iterate through the allowed list, handle each element
148    for (is_ip_addr, value) in user_specified_ip_interface_list:
149      # Handle normal IP's
150      if is_ip_addr:
151        unique_append(allowed_list, value)
152   
153      # Handle interfaces
154      else:
155        try:
156          # Get the IP's associated with the NIC
157          interface_ips = nonportable.os_api.get_interface_ip_addresses(value)
158          for interface_ip in interface_ips:
159            unique_append(allowed_list, interface_ip)
160        except:
161          # Catch exceptions if the NIC does not exist
162          pass
163 
164    # This will store all the IP's that we are able to bind to
165    bindable_list = []
166       
167    # Try binding to every ip
168    for ip in allowed_list:
169      sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
170      try:
171        sock.bind((ip,0))
172      except:
173        pass # Not a good ip, skip it
174      else:
175        bindable_list.append(ip) # This is a good ip, store it
176      finally:
177        sock.close()
178
179    # Add loopback
180    unique_append(bindable_list, "127.0.0.1")
181 
182    # Update the global cache
183    allowediplist = bindable_list
184 
185  finally:     
186    # Release the lock
187    cachelock.release()
188 
189########################### General Purpose socket functions #################
190
191def is_already_connected_exception(exceptionobj):
192  """
193  <Purpose>
194    Determines if a given error number indicates that the socket
195    is already connected.
196
197  <Arguments>
198    An exception object from a network call.
199
200  <Returns>
201    True if already connected, false otherwise
202  """
203  # Get the type
204  exception_type = type(exceptionobj)
205
206  # Only continue if the type is socket.error
207  if exception_type is not socket.error:
208    return False
209
210  # Get the error number
211  errnum = exceptionobj[0]
212
213  # Store a list of error messages meaning we are connected
214  connected_errors = ["EISCONN", "WSAEISCONN"]
215
216  # Convert the errno to and error string name
217  try:
218    errname = errno.errorcode[errnum]
219  except Exception,e:
220    # The error is unknown for some reason...
221    errname = None
222 
223  # Return if the error name is in our white list
224  return (errname in connected_errors)
225
226
227def is_recoverable_network_exception(exceptionobj):
228  """
229  <Purpose>
230    Determines if a given error number is recoverable or fatal.
231
232  <Arguments>
233    An exception object from a network call.
234
235  <Returns>
236    True if potentially recoverable, False if fatal.
237  """
238  # Get the type
239  exception_type = type(exceptionobj)
240
241  # socket.timeout is recoverable always
242  if exception_type == socket.timeout:
243    return True
244
245  # Only continue if the type is socket.error or select.error
246  elif exception_type != socket.error and exception_type != select.error:
247    return False
248 
249  # Get the error number
250  errnum = exceptionobj[0]
251
252  # Store a list of recoverable error numbers
253  recoverable_errors = ["EINTR","EAGAIN","EBUSY","EWOULDBLOCK","ETIMEDOUT","ERESTART",
254                        "WSAEINTR","WSAEWOULDBLOCK","WSAETIMEDOUT","EALREADY","WSAEALREADY",
255                       "EINPROGRESS","WSAEINPROGRESS"]
256
257  # Convert the errno to and error string name
258  try:
259    errname = errno.errorcode[errnum]
260  except Exception,e:
261    # The error is unknown for some reason...
262    errname = None
263 
264  # Return if the error name is in our white list
265  return (errname in recoverable_errors)
266
267
268# Determines based on exception if the connection has been terminated
269def is_terminated_connection_exception(exceptionobj):
270  """
271  <Purpose>
272    Determines if the exception is indicated the connection is terminated.
273
274  <Arguments>
275    An exception object from a network call.
276
277  <Returns>
278    True if the connection is terminated, False otherwise.
279    False means we could not determine with certainty if the socket is closed.
280  """
281  # Get the type
282  exception_type = type(exceptionobj)
283
284  # We only want to continue if it is socket.error or select.error
285  if exception_type != socket.error and exception_type != select.error:
286    return False
287
288  # Get the error number
289  errnum = exceptionobj[0]
290
291  # Store a list of errors which indicate connection closed
292  connection_closed_errors = ["EPIPE","EBADF","EBADR","ENOLINK","EBADFD","ENETRESET",
293                              "ECONNRESET","WSAEBADF","WSAENOTSOCK","WSAECONNRESET",]
294
295  # Convert the errnum to an error string
296  try:
297    errname = errno.errorcode[errnum]
298  except:
299    # The error number is not defined...
300    errname = None
301
302  # Return whether the errname is in our pre-defined list
303  return (errname in connection_closed_errors)
304
305
306# Armon: This is used for semantics, to determine if we have a valid IP.
307def is_valid_ip_address(ipaddr):
308  """
309  <Purpose>
310    Determines if ipaddr is a valid IP address.
311    Address 0.0.0.0 is considered valid.
312
313  <Arguments>
314    ipaddr: String to check for validity. (It will check that this is a string).
315
316  <Returns>
317    True if a valid IP, False otherwise.
318  """
319  # Argument must be of the string type
320  if not type(ipaddr) == str:
321    return False
322
323  # A valid IP should have 4 segments, explode on the period
324  parts = ipaddr.split(".")
325
326  # Check that we have 4 parts
327  if len(parts) != 4:
328    return False
329
330  # Check that each segment is a number between 0 and 255 inclusively.
331  for part in parts:
332    # Check the length of each segment
333    digits = len(part)
334    if digits >= 1 and digits <= 3:
335      # Attempt to convert to an integer
336      try:
337        number = int(part)
338        if not (number >= 0 and number <= 255):
339          return False
340
341      except:
342        # There was an error converting to an integer, not an IP
343        return False
344    else:
345      return False
346
347  # At this point, assume the IP is valid
348  return True
349
350# Armon: This is used for semantics, to determine if the given port is valid
351def is_valid_network_port(port, allowzero=False):
352  """
353  <Purpose>
354    Determines if a given network port is valid.
355
356  <Arguments>
357    port: A numeric type (this will be checked) port number.
358    allowzero: Allows 0 as a valid port if true
359
360  <Returns>
361    True if valid, False otherwise.
362  """
363  # Check the type is int or long
364  if not (type(port) == long or type(port) == int):
365    return False
366
367  return ((allowzero and port == 0) or (port >= 1 and port <= 65535))
368
369
370# Constant prefix for comm handles.
371COMM_PREFIX = "_COMMH:"
372
373# Makes commhandles for networking functions
374def generate_commhandle():
375  """
376  <Purpose>
377    Generates a string commhandle that can be used to uniquely identify
378    a socket, while providing a means of "pseudo" verification.
379
380  <Returns>
381    A string handle.
382  """
383  # Get a unique value from idhelper
384  uniqueid = idhelper.getuniqueid()
385
386  # Return the id prefixed by the COMM_PREFIX
387  return (COMM_PREFIX + uniqueid)
388
389
390# Helps determine if a commhandle is valid
391def is_valid_commhandle(commhandle):
392  """
393  <Purpose>
394    Determines if the given commhandle is potentially valid.
395    This is not a guarentee of validity, e.g. the commhandle may not
396    exist.
397
398  <Arguments>
399    commhandle:
400      The handle to be checked for validity
401
402  <Returns>
403    True if the handle if valid, False otherwise.
404  """
405  # Check if the handle is a string, this is a requirement
406  if type(commhandle) != str:
407    return False
408
409  # Return if the handle starts with the correct prefix
410  # This way we are not relying on the format of idhelper.getuniqueid()
411  return commhandle.startswith(COMM_PREFIX)
412
413
414########################### SocketSelector functions #########################
415
416
417
418# used to lock the methods that check to see if the thread is running
419selectorlock = threading.Lock()
420
421# is the selector thread started...
422selectorstarted = False
423
424
425#### helper functions
426
427# return the table entry for this socketobject
428def find_socket_entry(socketobject):
429  for commhandle in comminfo.keys():
430    if comminfo[commhandle]['socket'] is socketobject:
431      return comminfo[commhandle], commhandle
432  raise KeyError, "Can't find commhandle"
433
434
435
436
437# wait until there is a free event
438def wait_for_event(eventname):
439  while True:
440    try:
441      nanny.tattle_add_item('events',eventname)
442      break
443    except Exception:
444      # They must be over their event limit.   I'll sleep and check later
445      time.sleep(.1)
446
447
448
449def should_selector_exit():
450  global selectorstarted
451
452  # Let's check to see if we should exit...   False means "nonblocking"
453  if selectorlock.acquire(False):
454
455    # Check that selector started is true.   This should *always* be the case
456    # when I enter this function.   This is to test for bugs in my code
457    if not selectorstarted:
458      # This will cause the program to exit and log things if logging is
459      # enabled. -Brent
460      tracebackrepy.handle_internalerror("SocketSelector is started when" +
461          ' selectorstarted is False', 39)
462
463    # Got the lock...
464    for comm in comminfo.values():
465      # I'm listening and waiting so all is well
466      if not comm['outgoing']:
467        break
468    else:
469      # there is no listening function so I should exit...
470      selectorstarted = False
471      # I'm exiting...
472      nanny.tattle_remove_item('events',"SocketSelector")
473      selectorlock.release()
474      return True
475
476    # I should continue
477    selectorlock.release()
478  return False
479   
480
481
482
483
484# This function starts a thread to handle an entry with a readable socket in
485# the comminfo table
486def start_event(entry, handle,eventhandle):
487  if entry['type'] == 'UDP':
488    # some sort of socket error, I'll assume they closed the socket or it's
489    # not important
490    try:
491      # NOTE: 64K is the max UDP dgram size.   Let's read it all
492      data, addr = entry['socket'].recvfrom(65535)
493    except socket.error:
494      # they closed in the meantime?
495      nanny.tattle_remove_item('events',eventhandle)
496      return
497
498    # wait if we're over the limit
499    if data:
500      if is_loopback(entry['localip']):
501        nanny.tattle_quantity('looprecv',len(data))
502      else:
503       # We will charge looprecv for UDP from the net, (see #887 for details)
504        nanny.tattle_quantity('looprecv',len(data))
505    else:
506      # no data...   Let's stop this...
507      nanny.tattle_remove_item('events',eventhandle)
508      return
509
510     
511    try:
512      EventDeliverer(entry['function'],(addr[0], addr[1], data, handle), eventhandle).start()
513    except Exception, e:
514      # This is an internal error I think...
515      # This will cause the program to exit and log things if logging is
516      # enabled. -Brent
517      tracebackrepy.handle_internalerror("Can't start UDP EventDeliverer '" + str(e)+"'", 29)
518
519
520
521  # or it's a TCP accept event...
522  elif entry['type'] == 'TCP':
523    try:
524      realsocket, addr = entry['socket'].accept()
525    except socket.error:
526      # they closed in the meantime?
527      nanny.tattle_remove_item('events',eventhandle)
528      return
529   
530    # put this handle in the table
531    newhandle = generate_commhandle()
532    comminfo[newhandle] = {'type':'TCP','remotehost':addr[0], 'remoteport':addr[1],'localip':entry['localip'],'localport':entry['localport'],'socket':realsocket,'outgoing':True, 'closing_lock':threading.Lock()}
533    # I don't think it makes sense to count this as an outgoing socket, does
534    # it?
535
536    # Armon: Create the emulated socket after the table entry
537    safesocket = emulated_socket(newhandle)
538
539    try:
540      EventDeliverer(entry['function'],(addr[0], addr[1], safesocket, newhandle, handle),eventhandle).start()
541    except Exception, e:
542      # This is an internal error I think...
543      # This will cause the program to exit and log things if logging is
544      # enabled. -Brent
545      tracebackrepy.handle_internalerror("Can't start TCP EventDeliverer '"+str(e)+"'", 23)
546
547
548  else:
549    # Should never get here
550    # This will cause the program to exit and log things if logging is
551    # enabled. -Brent
552    tracebackrepy.handle_internalerror("In start event, Unknown entry type '"+entry['type']+"'", 51)
553
554
555
556# Armon: What is the maximum number of samples to perform per second?
557# This is to prevent excessive sampling if there is a bad socket and
558# select() returns before timing out
559MAX_SAMPLES_PER_SEC = 10
560TIME_BETWEEN_SAMPLES = 1.0 / MAX_SAMPLES_PER_SEC
561
562# Check for sockets using select and fire up user event threads as needed.
563#
564# This class holds nearly all of the complexity in this module.   It's
565# basically just a loop that gets pending sockets (using select) and then
566# fires up events that call user provided functions
567class SocketSelector(threading.Thread):
568 
569  def __init__(self):
570    threading.Thread.__init__(self, name="SocketSelector")
571
572
573  # Gets a list of all the sockets which are ready to have
574  # accept() called on them
575  def get_acceptable_sockets(self):
576    # get the list of socket objects we might have a pending request on
577    requestlist = []
578    for comm in comminfo.values():
579      if not comm['outgoing']:
580        requestlist.append(comm['socket'])
581
582    # nothing to request.   We should loop back around and check if all
583    # sockets have been closed
584    if requestlist == []:
585      return []
586
587    # Perform a select on these sockets
588    try:
589      # Call select
590      (acceptable, not_applic, has_excp) = select.select(requestlist,[],requestlist,0.5)
591   
592      # Add all the sockets with exceptions to the acceptable list
593      for sock in has_excp:
594        if sock not in acceptable:
595          acceptable.append(sock)
596
597      # Return the acceptable list
598      return acceptable
599   
600    # There was probably an exception on the socket level, check individually
601    except:
602
603      # Hold the ready sockets
604      readylist = []
605
606      # Check each requested socket
607      for socket in requestlist:
608        try:
609          (accept_will_block, write_will_block) = socket_state(socket, "r")
610          if not accept_will_block:
611            readylist.append(socket)
612       
613        # Ignore errors, probably the socket is closed.
614        except:
615          pass
616
617      # Return the ready list
618      return readylist
619
620
621
622  def run(self):
623    # Keep track of the last sample time
624    # updated when there are no ready sockets
625    last_sample = 0
626
627    while True:
628
629      # I'll stop myself only when there are no active threads to monitor
630      if should_selector_exit():
631        return
632
633      # If the last sample with 0 ready sockets was less than TIME_BETWEEN_SAMPLES
634      # seconds ago, sleep a while. This is to prevent a tight loop from consuming
635      # CPU time doing nothing.
636      current_time = nonportable.getruntime()
637      time_diff = current_time - last_sample
638      if time_diff < TIME_BETWEEN_SAMPLES:
639        time.sleep(TIME_BETWEEN_SAMPLES - time_diff)
640
641      # Get all the ready sockets
642      readylist = self.get_acceptable_sockets()
643
644      # If there is nothing to do, potentially delay the next sample
645      if len(readylist) == 0:
646        last_sample = current_time
647
648      # go through the pending sockets, grab an event and then start a thread
649      # to handle the connection
650      for thisitem in readylist:
651        try: 
652          commtableentry,commhandle = find_socket_entry(thisitem)
653        except KeyError:
654          # let's skip this one, it's likely it was closed in the interim
655          continue
656
657        # now it's time to get the event...   I'll loop until there is a free
658        # event
659        eventhandle = idhelper.getuniqueid()
660        wait_for_event(eventhandle)
661
662        # wait if already oversubscribed
663        if is_loopback(commtableentry['localip']):
664          nanny.tattle_quantity('looprecv',0)
665        else:
666          nanny.tattle_quantity('netrecv',0)
667
668        # Now I can start a thread to run the user's code...
669        start_event(commtableentry,commhandle,eventhandle)
670     
671
672
673
674
675
676
677
678# this gives an actual event to the user's code
679class EventDeliverer(threading.Thread):
680  func = None
681  args = None
682  eventid = None
683
684  def __init__(self, f, a,e):
685    self.func = f
686    self.args = a
687    self.eventid = e
688
689    # Initialize with a custom and unique thread name
690    threading.Thread.__init__(self,name=idhelper.get_new_thread_name(COMM_PREFIX))
691
692  def run(self):
693    try:
694      self.func(*(self.args))
695    except:
696      # we probably should exit if they raise an exception in a thread...
697      tracebackrepy.handle_exception()
698      harshexit.harshexit(14)
699
700    finally:
701      # our event is going away...
702      nanny.tattle_remove_item('events',self.eventid)
703     
704
705
706
707
708       
709#### used by other threads to interact with the SocketSelector...
710
711
712# private.   Check if the SocketSelector is running and start it if it isn't
713def check_selector():
714  global selectorstarted
715
716  # acquire the lock.
717  if selectorlock.acquire():
718    # If I've not started, then start me...
719    if not selectorstarted:
720      # wait until there is a free event...
721      wait_for_event("SocketSelector")
722      selectorstarted = True
723      SocketSelector().start()
724
725    # verify a thread with the name "SocketSelector" is running
726    for threadobj in threading.enumerate():
727      if threadobj.getName() == "SocketSelector":
728        # all is well
729        selectorlock.release()
730        return
731 
732    # this is bad.   The socketselector went away...
733    # This will cause the program to exit and log things if logging is
734    # enabled. -Brent
735    tracebackrepy.handle_internalerror("SocketSelector died", 59)
736
737
738
739
740# return the table entry for this type of socket, ip, port
741def find_tip_entry(socktype, ip, port):
742  for commhandle in comminfo.keys():
743    if comminfo[commhandle]['type'] == socktype and comminfo[commhandle]['localip'] == ip and comminfo[commhandle]['localport'] == port:
744      return comminfo[commhandle], commhandle
745  return (None,None)
746
747
748
749# Find a commhandle, given TIPO: type, ip, port, outgoing
750def find_tipo_commhandle(socktype, ip, port, outgoing):
751  for commhandle in comminfo.keys():
752    if comminfo[commhandle]['type'] == socktype and comminfo[commhandle]['localip'] == ip and comminfo[commhandle]['localport'] == port and comminfo[commhandle]['outgoing'] == outgoing:
753      return commhandle
754  return None
755
756
757# Find an outgoing TCP commhandle, given local ip, local port, remote ip, remote port,
758def find_outgoing_tcp_commhandle(localip, localport, remoteip, remoteport):
759  for commhandle in comminfo.keys():
760    if comminfo[commhandle]['type'] == "TCP" and comminfo[commhandle]['localip'] == localip \
761    and comminfo[commhandle]['localport'] == localport and comminfo[commhandle]['remotehost'] == remoteip \
762    and comminfo[commhandle]['remoteport'] == remoteport and comminfo[commhandle]['outgoing'] == True:
763      return commhandle
764  return None
765
766
767
768
769
770
771######################### Simple Public Functions ##########################
772
773
774
775# Public interface
776def gethostbyname_ex(name):
777  """
778   <Purpose>
779      Provides information about a hostname.   Calls socket.gethostbyname_ex()
780
781   <Arguments>
782      name:
783         The host name to get information about
784
785   <Exceptions>
786      As from socket.gethostbyname_ex()
787
788   <Side Effects>
789      None.
790
791   <Returns>
792      A tuple containing (hostname, aliaslist, ipaddrlist).   See the
793      python docs for socket.gethostbyname_ex()
794  """
795
796  restrictions.assertisallowed('gethostbyname_ex',name)
797
798  # charge 4K for a look up...   I don't know the right number, but we should
799  # charge something.   We'll always charge to the netsend interface...
800  nanny.tattle_quantity('netsend',4096) 
801  nanny.tattle_quantity('netrecv',4096)
802  return socket.gethostbyname_ex(name)
803
804
805
806# Public interface
807def getmyip():
808  """
809   <Purpose>
810      Provides the external IP of this computer.   Does some clever trickery.
811
812   <Arguments>
813      None
814
815   <Exceptions>
816      As from socket.gethostbyname_ex()
817
818   <Side Effects>
819      None.
820
821   <Returns>
822      The localhost's IP address
823      python docs for socket.gethostbyname_ex()
824  """
825
826  restrictions.assertisallowed('getmyip')
827  # I got some of this from: http://groups.google.com/group/comp.lang.python/browse_thread/thread/d931cdc326d7032b?hl=en
828 
829  # Update the cache and return the first allowed IP
830  # Only if a preference is set
831  if user_ip_interface_preferences:
832    update_ip_cache()
833    # Return the first allowed ip, there is always at least 1 element (loopback)
834    return allowediplist[0]
835 
836  # Initialize these to None, so we can detect a failure
837  myip = None
838 
839  # It's possible on some platforms (Windows Mobile) that the IP will be
840  # 0.0.0.0 even when I have a public IP and the external IP is up. However, if
841  # I get a real connection with SOCK_STREAM, then I should get the real
842  # answer.
843  for conn_type in [socket.SOCK_DGRAM, socket.SOCK_STREAM]:
844       
845    # Try each stable IP 
846    for ip_addr in repy_constants.STABLE_PUBLIC_IPS: 
847      try:
848        # Try to resolve using the current connection type and
849        # stable IP, using port 80 since some platforms panic
850        # when given 0 (FreeBSD)
851        myip = get_localIP_to_remoteIP(conn_type, ip_addr, 80)
852      except (socket.error, socket.timeout):
853        # We can ignore any networking related errors, since we want to try
854        # the other connection types and IP addresses. If we fail,
855        # we will eventually raise an exception anyways.
856        pass
857      else:
858        # Return immediately if the IP address is good
859        if myip != None and myip != '' and myip != "0.0.0.0": 
860          return myip
861
862
863  # Since we haven't returned yet, we must have failed.
864  # Raise an exception, we must not be connected to the internet
865  raise Exception("Cannot detect a connection to the Internet.")
866
867
868
869def get_localIP_to_remoteIP(connection_type, external_ip, external_port=80):
870  """
871  <Purpose>
872    Resolve the local ip used when connecting outbound to an external ip.
873 
874  <Arguments>
875    connection_type:
876      The type of connection to attempt. See socket.socket().
877   
878    external_ip:
879      The external IP to attempt to connect to.
880     
881    external_port:
882      The port on the remote host to attempt to connect to.
883 
884  <Exceptions>
885    As with socket.socket(), socketobj.connect(), etc.
886 
887  <Returns>
888    The locally assigned IP for the connection.
889  """
890  # Open a socket
891  sockobj = socket.socket(socket.AF_INET, connection_type)
892
893  try:
894    sockobj.connect((external_ip, external_port))
895
896    # Get the local connection information for this socket
897    (myip, localport) = sockobj.getsockname()
898     
899  # Always close the socket
900  finally:
901    sockobj.close()
902 
903  return myip
904
905
906
907
908###################### Shared message / connection items ###################
909
910
911# Used to decide if an IP is the loopback IP or not.   This is needed for
912# accounting
913def is_loopback(host):
914  if not host.startswith('127.'):
915    return False
916  if len(host.split('.')) != 4:
917    return False
918
919  for number in host.split('.'):
920    for char in number:
921      if char not in '0123456789':
922        return False
923
924    try:
925      if int(number) > 255 or int(number) < 0:
926        return False
927    except ValueError:
928      return False
929 
930  return True
931
932
933
934
935
936
937
938
939
940# Public interface !!!
941def stopcomm(commhandle):
942  """
943   <Purpose>
944      Stop handling events for a commhandle.   This works for both message and
945      connection based event handlers.
946
947   <Arguments>
948      commhandle:
949         A commhandle as returned by recvmess or waitforconn.
950
951   <Exceptions>
952      None.
953
954   <Side Effects>
955      This has an undefined effect on a socket-like object if it is currently
956      in use.
957
958   <Returns>
959      Returns True if commhandle was successfully closed, False if the handle
960      cannot be closed (i.e. it was already closed).
961  """
962  # Armon: Check that the handle is valid, an exception needs to be raised otherwise.
963  if not is_valid_commhandle(commhandle):
964    raise Exception("Invalid commhandle specified!")
965
966  # if it has already been cleaned up, exit.
967  if commhandle not in comminfo:
968    # Armon: Semantic update, stopcomm needs to return True/False
969    # since the handle does not exist we will return False
970    return False
971
972  restrictions.assertisallowed('stopcomm',comminfo[commhandle])
973
974  cleanup(commhandle)
975 
976  # Armon: Semantic update, we successfully closed
977  # if we made it here, since cleanup blocks.
978  return True
979
980
981
982# Armon: How frequently should we check for the availability of the socket?
983RETRY_INTERVAL = 0.2 # In seconds
984
985# Private
986def cleanup(handle):
987  # Armon: lock the cleanup so that only one thread will do the cleanup, but
988  # all the others will block as well
989  try:
990    handle_lock = comminfo[handle]['closing_lock']
991  except KeyError:
992    # Handle a possible race condition, the socket has already been cleaned up.
993    return
994 
995  # Acquire the lock       
996  handle_lock.acquire()
997
998  # if it's in the table then remove the entry and tattle...
999  try:
1000    if handle in comminfo:
1001      # Armon: Shutdown the socket for writing prior to close
1002      # to unblock any threads that are writing
1003      try:
1004        comminfo[handle]['socket'].shutdown(socket.SHUT_WR)
1005      except:
1006        pass
1007
1008      try:
1009        comminfo[handle]['socket'].close()
1010      except:
1011        pass
1012     
1013      info = comminfo[handle]  # Store the info
1014
1015      if info['outgoing']:
1016        nanny.tattle_remove_item('outsockets', handle)
1017      else:
1018        nanny.tattle_remove_item('insockets', handle)
1019   
1020        # Armon: Block while the socket is not yet cleaned up
1021        # Get the socket info
1022        ip = info['localip']
1023        port = info['localport']
1024        socketType = info['type']
1025        tcp = (socketType == 'TCP') # Check if this is a TCP typed connection
1026   
1027        # Loop until the socket no longer exists
1028        # BUG: There exists a potential race condition here. The problem is that
1029        # the socket may be cleaned up and then before we are able to check for it again
1030        # another process binds to the ip/port we are checking. This would cause us to detect
1031        # the socket from the other process and we would block indefinately while that socket
1032        # is open.
1033        while nonportable.os_api.exists_listening_network_socket(ip,port, tcp):
1034          time.sleep(RETRY_INTERVAL)
1035     
1036      # Delete the entry last, so that other stopcomm operations will block
1037      try: # Guard against a rare and poorly understood error. #1052
1038        del comminfo[handle]
1039      except KeyError:
1040        pass
1041
1042  finally:
1043    # Always release the lock
1044    handle_lock.release()
1045
1046
1047
1048####################### Message sending #############################
1049
1050
1051
1052# Public interface!!!
1053def sendmess(desthost, destport, message,localip=None,localport = None):
1054  """
1055   <Purpose>
1056      Send a message to a host / port
1057
1058   <Arguments>
1059      desthost:
1060         The host to send a message to
1061      destport:
1062         The port to send the message to
1063      message:
1064         The message to send
1065      localhost (optional):
1066         The local IP to send the message from
1067      localport (optional):
1068         The local port to send the message from (0 for a random port)
1069
1070   <Exceptions>
1071      socket.error when communication errors happen
1072
1073   <Side Effects>
1074      None.
1075
1076   <Returns>
1077      The number of bytes sent on success
1078  """
1079  # Check that if either localip or local port is specified, that both are
1080  if (localip != None and localport == None) or (localport != None and localip == None):
1081    raise Exception("Localip and localport must be specified simultaneously.")
1082 
1083  # Assign the default value to localport if none given
1084  if localport == None:
1085    localport = 0
1086
1087  if not localip or localip == '0.0.0.0':
1088    localip = None
1089# JAC: removed since this breaks semantics
1090#  else:
1091#    if not is_valid_ip_address(localip):
1092#      raise Exception("Local IP address is invalid.")
1093
1094# JAC: removed since this breaks semantics
1095#  if not is_valid_ip_address(desthost):
1096#    raise Exception("Destination host IP address is invalid.")
1097 
1098  if not is_valid_network_port(destport):
1099    raise Exception("Destination port number must be an integer, between 1 and 65535.")
1100
1101  if not is_valid_network_port(localport, True):
1102    raise Exception("Local port number must be an integer, between 1 and 65535.")
1103
1104  restrictions.assertisallowed('sendmess', desthost, destport, message,localip,localport)
1105
1106  if localport:
1107    nanny.tattle_check('messport',localport)
1108
1109  # Armon: Check if the specified local ip is allowed
1110  # this check only makes sense if the localip is specified
1111  if localip and not ip_is_allowed(localip):
1112    raise Exception, "IP '"+str(localip)+"' is not allowed."
1113 
1114  # If there is a preference, but no localip, then get one
1115  elif user_ip_interface_preferences and not localip:
1116    # Use whatever getmyip returns
1117    localip = getmyip()
1118
1119  # this is used to track errors when trying to resend data
1120  firsterror = None
1121
1122  if localip and localport:
1123    # let's see if the socket already exists...
1124    commtableentry,commhandle = find_tip_entry('UDP',localip,localport)
1125  else:
1126    # no, we'll skip
1127    commhandle = None
1128
1129  # yes it does! let's use the existing socket
1130  if commhandle:
1131
1132    # block in case we're oversubscribed
1133    if is_loopback(desthost):
1134      nanny.tattle_quantity('loopsend',0)
1135    else:
1136      nanny.tattle_quantity('netsend',0)
1137
1138    # try to send using this socket
1139    try:
1140      bytessent =  commtableentry['socket'].sendto(message,(desthost,destport))
1141    except socket.error,e:
1142      # we're going to save this error in case we also get an error below.   
1143      # This is likely to be the error we actually want to raise
1144      firsterror = e
1145      # should I really fall through here?
1146    else:
1147      # send succeeded, let's wait and return
1148      if is_loopback(desthost):
1149        nanny.tattle_quantity('loopsend',bytessent)
1150      else:
1151        nanny.tattle_quantity('netsend',bytessent)
1152      return bytessent
1153 
1154
1155  # open a new socket
1156  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1157 
1158  # the send buffer must also be set or it will constrain UDP sendmess
1159  # size on Mac.
1160  s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 66000)
1161
1162  try:
1163    if localip:
1164      try:
1165        s.bind((localip,localport))
1166      except socket.error, e:
1167        if firsterror:
1168          raise Exception, firsterror
1169        raise Exception, e
1170
1171    # wait if already oversubscribed
1172    if is_loopback(desthost):
1173      nanny.tattle_quantity('loopsend',0)
1174    else:
1175      nanny.tattle_quantity('netsend',0)
1176
1177    bytessent =  s.sendto(message,(desthost,destport))
1178
1179    if is_loopback(desthost):
1180      nanny.tattle_quantity('loopsend',bytessent)
1181    else:
1182      nanny.tattle_quantity('netsend',bytessent)
1183
1184    return bytessent
1185
1186  finally:
1187    # close no matter what
1188    try:
1189      s.close()
1190    except:
1191      pass
1192
1193
1194
1195
1196
1197
1198# Public interface!!!
1199def recvmess(localip, localport, function):
1200  """
1201   <Purpose>
1202      Registers a function as an event handler for incoming messages
1203
1204   <Arguments>
1205      localip:
1206         The local IP or hostname to register the handler on
1207      localport:
1208         The port to listen on
1209      function:
1210         The function that messages should be delivered to.   It should expect
1211         the following arguments: (remoteIP, remoteport, message, commhandle)
1212
1213   <Exceptions>
1214      None.
1215
1216   <Side Effects>
1217      Registers an event handler.
1218
1219   <Returns>
1220      The commhandle for this event handler.
1221  """
1222  if not localip or localip == '0.0.0.0':
1223    raise Exception("Must specify a local IP address")
1224
1225# JAC: removed since this breaks semantics
1226#  if not is_valid_ip_address(localip):
1227#    raise Exception("Local IP address is invalid.")
1228
1229  if not is_valid_network_port(localport):
1230    raise Exception("Local port number must be an integer, between 1 and 65535.")
1231
1232# Armon: Disabled function check since it is incompatible with functions that have
1233# a variable number of parameters. e.g. func1(*args)
1234#  # Check that the user specified function exists and takes 4 arguments
1235#  try:
1236#    # Get the argument count
1237#    arg_count = function.func_code.co_argcount
1238#   
1239#    # Is "self" the first argument?
1240#    object_function = function.func_code.co_varnames[0] == "self"   
1241#   
1242#    # We need the function to take 4 parameters, or 5 if its an object function
1243#    assert(arg_count == 4 or (arg_count == 5 and object_function))
1244#  except:
1245#    # If this is not a function, an exception will be raised.
1246#    raise Exception("Specified function must be valid, and take 4 parameters. See recvmess.")
1247
1248  restrictions.assertisallowed('recvmess',localip,localport)
1249
1250  nanny.tattle_check('messport',localport)
1251 
1252  # Armon: Check if the specified local ip is allowed
1253  if not ip_is_allowed(localip):
1254    raise Exception, "IP '"+localip+"' is not allowed."
1255 
1256  # Armon: Generate the new handle since we need it
1257  # to replace the old handle if it exists
1258  handle = generate_commhandle()
1259
1260  # check if I'm already listening on this port / ip
1261  # NOTE: I check as though there might be a socket open that is sending a
1262  # message.   This is nonsense since sendmess doesn't result in a socket
1263  # persisting.   This is done so that if sockets for sendmess are cached
1264  # later (as seems likely) the resulting code will not break.
1265  oldhandle = find_tipo_commhandle('UDP', localip, localport, False)
1266  if oldhandle:
1267    # if it was already there, update the function and return
1268    comminfo[oldhandle]['function'] = function
1269
1270    # Armon: Create a new comminfo entry with the same info
1271    comminfo[handle] = comminfo[oldhandle]
1272
1273    # Remove the old entry
1274    cleanup(oldhandle)
1275
1276    # We need nanny to substitute the old handle with the new one
1277    nanny.tattle_remove_item('insockets',oldhandle)
1278    nanny.tattle_add_item('insockets',handle)
1279   
1280    # Return the new handle
1281    return handle
1282   
1283  # we'll need to add it, so add a socket...
1284  nanny.tattle_add_item('insockets',handle)
1285
1286  # get the socket
1287  try:
1288    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1289    s.bind((localip,localport))
1290
1291    # set the receive buffer size to slightly more than 64K+e (see ticket #887)
1292    s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 66000)
1293    # the send buffer must also be set or it will constrain UDP sendmess
1294    # size on Mac.   I set it here because this socket may be used for sending
1295    s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 66000)
1296
1297    nonportable.preparesocket(s)
1298  except:
1299    try:
1300      s.close()
1301    except:
1302      pass
1303    nanny.tattle_remove_item('insockets',handle)
1304    raise
1305
1306  # set up our table entry
1307  comminfo[handle] = {'type':'UDP','localip':localip, 'localport':localport,'function':function,'socket':s, 'outgoing':False, 'closing_lock':threading.Lock() }
1308
1309  # start the selector if it's not running already
1310  check_selector()
1311
1312  return handle
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326####################### Connection oriented #############################
1327
1328
1329
1330# Public interface!!!
1331def openconn(desthost, destport,localip=None, localport=None,timeout=None):
1332  """
1333   <Purpose>
1334      Opens a connection, returning a socket-like object
1335
1336   <Arguments>
1337      desthost:
1338         The host to open communcations with
1339      destport:
1340         The port to use for communication
1341      localip (optional):
1342         The local ip to use for the communication
1343      localport (optional):
1344         The local port to use for communication (0 for a random port)
1345      timeout (optional):
1346         The maximum amount of time to wait to connect
1347
1348   <Exceptions>
1349      As from socket.connect, etc.
1350
1351   <Side Effects>
1352      None.
1353
1354   <Returns>
1355      A socket-like object that can be used for communication.   Use send,
1356      recv, and close just like you would an actual socket object in python.
1357  """
1358
1359  # Set a default timeout of 5 seconds if none is specified.
1360  if timeout is None:
1361    timeout = 5.0
1362
1363  # Check that both localip and localport are given if either is specified
1364  if localip != None and localport == None or localport != None and localip == None:
1365    raise Exception("Localip and localport must be specified simultaneously.")
1366
1367  # Set the default value of localip
1368  if not localip or localip == '0.0.0.0':
1369    localip = None
1370#  else:
1371# JAC: removed since this breaks semantics
1372    # Check that the localip is valid if given
1373#    if not is_valid_ip_address(localip):
1374#      raise Exception("Local IP address is invalid.")
1375
1376  # Assign the default value of localport if none is given.
1377  if localport == None:
1378    localport = 0
1379 
1380# JAC: removed since this breaks semantics
1381  # Check the remote IP for validity
1382#  if not is_valid_ip_address(desthost):
1383#    raise Exception("Destination host IP address is invalid.")
1384
1385  if not is_valid_network_port(destport):
1386    raise Exception("Destination port number must be an integer, between 1 and 65535.")
1387
1388  # Allow the localport to be 0, which is the default.
1389  if not is_valid_network_port(localport, True):
1390    raise Exception("Local port number must be an integer, between 1 and 65535.")
1391
1392  # Check that the timeout is a number, greater than 0
1393  if not (type(timeout) == float or type(timeout) == int or type(timeout) == long) or timeout <= 0.0:
1394    raise Exception("Timeout parameter must be a numeric value greater than 0.")
1395
1396  # Armon: Check if the specified local ip is allowed
1397  # this check only makes sense if the localip is specified
1398  if localip and not ip_is_allowed(localip):
1399    raise Exception, "IP '"+str(localip)+"' is not allowed."
1400
1401  # If there is a preference, but no localip, then get one
1402  elif user_ip_interface_preferences and not localip:
1403    # Use whatever getmyip returns
1404    localip = getmyip()
1405
1406  restrictions.assertisallowed('openconn',desthost,destport,localip,localport)
1407 
1408  # Get our start time
1409  starttime = nonportable.getruntime()
1410
1411  # Armon: Check for any pre-existing sockets. If they are being closed, wait for them.
1412  # This will also serve to check if repy has a pre-existing socket open on this same tuple
1413  exists = True
1414  while exists and nonportable.getruntime() - starttime < timeout:
1415    # Update the status
1416    (exists, status) = nonportable.os_api.exists_outgoing_network_socket(localip,localport,desthost,destport)
1417    if exists:
1418      # Check the socket state
1419      if "ESTABLISH" in status or "CLOSE_WAIT" in status:
1420        # Check if the socket is from this repy vessel
1421        handle = find_outgoing_tcp_commhandle(localip, localport, desthost, destport)
1422       
1423        message = "Network socket is in use by an external process!"
1424        if handle != None:
1425          message = " Duplicate handle exists with name: "+str(handle)
1426       
1427        raise Exception, message
1428      else:
1429        # Wait for socket cleanup
1430        time.sleep(RETRY_INTERVAL)
1431  else:
1432    # Check if a socket exists still and we timed out
1433    if exists:
1434      raise Exception, "Timed out checking for socket cleanup!"
1435       
1436
1437  if localport:
1438    nanny.tattle_check('connport',localport)
1439
1440  handle = generate_commhandle()
1441
1442  # If allocation of an outsocket fails, we garbage collect and try again
1443  # -- this forces destruction of unreferenced objects, which is how we
1444  # free resources.
1445  try:
1446    nanny.tattle_add_item('outsockets',handle)
1447  except:
1448    gc.collect()
1449    nanny.tattle_add_item('outsockets',handle)
1450
1451 
1452  try:
1453    s = get_real_socket(localip,localport)
1454
1455    # prevent excessive TCP buffering (#895)
1456    s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 10000)
1457    s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 10000)
1458
1459 
1460    # add the socket to the comminfo table
1461    comminfo[handle] = {'type':'TCP','remotehost':None, 'remoteport':None,'localip':localip,'localport':localport,'socket':s, 'outgoing':True, 'closing_lock':threading.Lock()}
1462  except:
1463    # the socket wasn't passed to the user prog...
1464    nanny.tattle_remove_item('outsockets',handle)
1465    raise
1466
1467
1468  try:
1469    thissock = emulated_socket(handle)
1470    # We set a timeout before we connect.  This allows us to timeout slow
1471    # connections...
1472    oldtimeout = comminfo[handle]['socket'].gettimeout()
1473 
1474    # Set the new timeout
1475    comminfo[handle]['socket'].settimeout(timeout)
1476
1477    # Store exceptions until we exit the loop, default to timed out
1478    # in case we are given a very small timeout
1479    connect_exception = Exception("Connection timed out!")
1480
1481    # Ignore errors and retry if we have not yet reached the timeout
1482    while nonportable.getruntime() - starttime < timeout:
1483      try:
1484        comminfo[handle]['socket'].connect((desthost,destport))
1485        break
1486      except Exception,e:
1487        # Check if the socket is already connected (EISCONN or WSAEISCONN)
1488        if is_already_connected_exception(e):
1489          break
1490
1491        # Check if this is recoverable, only continue if it is
1492        elif not is_recoverable_network_exception(e):
1493          raise
1494
1495        else:
1496          # Store the exception
1497          connect_exception = e
1498
1499        # Sleep a bit, avoid excessive iterations of the loop
1500        time.sleep(0.2)
1501    else:
1502      # Raise any exception that was raised
1503      if connect_exception != None:
1504        raise connect_exception
1505
1506    comminfo[handle]['remotehost']=desthost
1507    comminfo[handle]['remoteport']=destport
1508 
1509  except:
1510    cleanup(handle)
1511    raise
1512  else:
1513    # and restore the old timeout...
1514    comminfo[handle]['socket'].settimeout(oldtimeout)
1515
1516  return thissock
1517
1518
1519
1520
1521# Public interface!!!
1522def waitforconn(localip, localport,function):
1523  """
1524   <Purpose>
1525      Waits for a connection to a port.   Calls function with a socket-like
1526      object if it succeeds.
1527
1528   <Arguments>
1529      localip:
1530         The local IP to listen on
1531      localport:
1532         The local port to bind to
1533      function:
1534         The function to call.   It should take five arguments:
1535         (remoteip, remoteport, socketlikeobj, thiscommhandle, maincommhandle)
1536         If your function has an uncaught exception, the socket-like object it
1537         is using will be closed.
1538         
1539   <Exceptions>
1540      None.
1541
1542   <Side Effects>
1543      Starts an event handler that listens for connections.
1544
1545   <Returns>
1546      A handle to the comm object.   This can be used to stop listening
1547  """
1548  if not localip or localip == '0.0.0.0':
1549    raise Exception("Must specify a local IP address")
1550
1551# JAC: removed since this breaks semantics
1552#  if not is_valid_ip_address(localip):
1553#    raise Exception("Local IP address is invalid.")
1554 
1555  if not is_valid_network_port(localport):
1556    raise Exception("Local port number must be an integer, between 1 and 65535.")
1557
1558  restrictions.assertisallowed('waitforconn',localip,localport)
1559
1560  nanny.tattle_check('connport',localport)
1561
1562  # Armon: Check if the specified local ip is allowed
1563  if not ip_is_allowed(localip):
1564    raise Exception, "IP '"+localip+"' is not allowed."
1565
1566  # Get the new handle first, because we need to replace
1567  # the oldhandle if it exists to match semantics
1568  handle = generate_commhandle()
1569 
1570  # check if I'm already listening on this port / ip
1571  oldhandle = find_tipo_commhandle('TCP', localip, localport, False)
1572  if oldhandle:
1573    # if it was already there, update the function and return
1574    comminfo[oldhandle]['function'] = function
1575
1576    # Armon: Create an entry for the handle, replicate the information
1577    comminfo[handle] = comminfo[oldhandle]
1578   
1579    # Remove the entry for the old socket
1580    cleanup(oldhandle)
1581
1582    # Un "tattle" the old handle, re-add the new handle
1583    nanny.tattle_remove_item('insockets',oldhandle)
1584    nanny.tattle_add_item('insockets',handle)
1585
1586    # Give the new handle
1587    return handle
1588   
1589  # we'll need to add it, so add a socket...
1590  nanny.tattle_add_item('insockets',handle)
1591
1592  # get the socket
1593  try:
1594    mainsock = get_real_socket(localip,localport)
1595
1596    # prevent excessive TCP buffering (#895)
1597    mainsock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 10000)
1598    mainsock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 10000)
1599
1600    # NOTE: Should this be anything other than a hardcoded number?
1601    mainsock.listen(5)
1602    # set up our table entry
1603    comminfo[handle] = {'type':'TCP','remotehost':None, 'remoteport':None,'localip':localip,'localport':localport,'socket':mainsock, 'outgoing':False, 'function':function, 'closing_lock':threading.Lock()}
1604  except:
1605    nanny.tattle_remove_item('insockets',handle)
1606    raise
1607
1608
1609  # start the selector if it's not running already
1610  check_selector()
1611
1612  return handle
1613
1614
1615
1616
1617
1618# Private
1619def get_real_socket(localip=None, localport = None):
1620
1621  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1622
1623  # reuse the socket if it's "pseudo-availible"
1624  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1625
1626
1627  if localip and localport:
1628    try:
1629      s.bind((localip,localport))
1630    except socket.error, e:
1631      # don't leak sockets
1632      s.close()
1633      raise Exception, e
1634    except:
1635      # don't leak sockets
1636      s.close()
1637      raise
1638
1639  return s
1640
1641
1642# Checks if the given real socket would block
1643def socket_state(realsock, waitfor="rw", timeout=0.0):
1644  """
1645  <Purpose>
1646    Checks if the given socket would block on a send() or recv().
1647    In the case of a listening socket, read_will_block equates to
1648    accept_will_block.
1649
1650  <Arguments>
1651    realsock:
1652              A real socket.socket() object to check for.
1653
1654    waitfor:
1655              An optional specifier of what to wait for. "r" for read only, "w" for write only,
1656              and "rw" for read or write. E.g. if timeout is 10, and wait is "r", this will block
1657              for up to 10 seconds until read_will_block is false. If you specify "r", then
1658              write_will_block is always true, and if you specify "w" then read_will_block is
1659              always true.
1660
1661    timeout:
1662              An optional timeout to wait for the socket to be read or write ready.
1663
1664  <Returns>
1665    A tuple, (read_will_block, write_will_block).
1666
1667  <Exceptions>
1668    As with select.select(). Probably best to wrap this with is_recoverable_network_exception
1669    and is_terminated_connection_exception. Throws an exception if waitfor is not in ["r","w","rw"]
1670  """
1671  # Check that waitfor is valid
1672  if waitfor not in ["rw","r","w"]:
1673    raise Exception, "Illegal waitfor argument!"
1674
1675  # Array to hold the socket
1676  sock_array = [realsock]
1677
1678  # Generate the read/write arrays
1679  read_array = []
1680  if "r" in waitfor:
1681    read_array = sock_array
1682
1683  write_array = []
1684  if "w" in waitfor:
1685    write_array = sock_array
1686
1687  # Call select()
1688  (readable, writeable, exception) = select.select(read_array,write_array,sock_array,timeout)
1689
1690  # If the socket is in the exception list, then assume its both read and writable
1691  if (realsock in exception):
1692    return (False, False)
1693
1694  # Return normally then
1695  return (realsock not in readable, realsock not in writeable)
1696
1697
1698
1699
1700# Public.   We pass these to the users for communication purposes
1701class emulated_socket:
1702  # This is an index into the comminfo table...
1703
1704  commid = 0
1705
1706  def __init__(self, handle):
1707    self.commid = handle
1708
1709    # Armon: Get the real socket
1710    try:
1711      realsocket = comminfo[handle]['socket']
1712
1713    # Shouldn't happen because my caller should create the table entry first
1714    except KeyError:
1715      raise Exception, "Internal Error. No table entry for new socket!"
1716
1717    # Make the socket non-blocking
1718    realsocket.setblocking(0)
1719
1720    try:
1721      # Store the send buffer size.   We'll send less than this to avoid a bug
1722      comminfo[handle]['sendbuffersize'] = realsocket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
1723
1724    # Really shouldn't happen.   We just checked!
1725    except KeyError:
1726      raise Exception, "Internal Error. No table entry when looking up sendbuffersize for new socket!"
1727
1728
1729    return None 
1730
1731
1732
1733
1734  def close(self):
1735    """
1736      <Purpose>
1737        Closes a socket.   Pending remote recv() calls will return with the
1738        remaining information.   Local recv / send calls will fail after this.
1739
1740      <Arguments>
1741        None
1742
1743      <Exceptions>
1744        None
1745
1746      <Side Effects>
1747        Pending local recv calls will either return or have an exception.
1748
1749      <Returns>
1750        True if this is the first close call to this socket, False otherwise.
1751    """
1752    # prevent TOCTOU race with client changing the object's properties
1753    mycommid = self.commid
1754    restrictions.assertisallowed('socket.close')
1755   
1756    # Armon: Semantic update, return whatever stopcomm does.
1757    # This will result in socket.close() returning a True/False indicator
1758    return stopcomm(mycommid)
1759
1760
1761
1762  def recv(self,bytes):
1763    """
1764      <Purpose>
1765        Receives data from a socket.   It may receive fewer bytes than
1766        requested.   
1767
1768      <Arguments>
1769        bytes:
1770           The maximum number of bytes to read.   
1771
1772      <Exceptions>
1773        Exception if the socket is closed either locally or remotely.
1774
1775      <Side Effects>
1776        This call will block the thread until the other side calls send.
1777
1778      <Returns>
1779        The data received from the socket (as a string).   If '' is returned,
1780        the other side has closed the socket and no more data will arrive.
1781    """
1782    # prevent TOCTOU race with client changing the object's properties
1783    mycommid = self.commid
1784    restrictions.assertisallowed('socket.recv',bytes)
1785
1786    # I set this here so that I don't screw up accounting with a keyerror later
1787    try:
1788      this_is_loopback = is_loopback(comminfo[mycommid]['remotehost'])
1789    # they likely closed the connection
1790    except KeyError:
1791      raise Exception, "Socket closed"
1792
1793    # wait if already oversubscribed
1794    if this_is_loopback:
1795      nanny.tattle_quantity('looprecv',0)
1796    else:
1797      nanny.tattle_quantity('netrecv',0)
1798
1799    datarecvd = 0
1800    # loop until we recv the information (looping is needed for Windows)
1801    while True:
1802      try:
1803        # the timeout is needed so that if the socket is closed in another
1804        # thread, we notice it
1805        # BUG: What should the timeout be?   What is the right value?
1806        #comminfo[mycommid]['socket'].settimeout(0.2)
1807       
1808        # Armon: Get the real socket
1809        realsocket = comminfo[mycommid]['socket']
1810       
1811        # Check if the socket is ready for reading
1812        (read_will_block, write_will_block) = socket_state(realsocket, "r", 0.2)       
1813        if not read_will_block:
1814          datarecvd = realsocket.recv(bytes)
1815          break
1816
1817      # they likely closed the connection
1818      except KeyError:
1819        raise Exception, "Socket closed"
1820
1821      # Catch all other exceptions, check if they are recoverable
1822      except Exception, e:
1823        # Check if this error is recoverable
1824        if is_recoverable_network_exception(e):
1825          continue
1826
1827        # Otherwise, raise the exception
1828        else:
1829          # Check if this is a connection termination
1830          if is_terminated_connection_exception(e):
1831            raise Exception("Socket closed")
1832          else:
1833            raise
1834
1835    # Armon: Calculate the length of the data
1836    data_length = len(datarecvd)
1837   
1838    # Raise an exception if there was no data
1839    if data_length == 0:
1840      raise Exception("Socket closed")
1841
1842    # do accounting here...
1843    if this_is_loopback:
1844      nanny.tattle_quantity('looprecv',data_length)
1845    else:
1846      nanny.tattle_quantity('netrecv',data_length)
1847
1848    return datarecvd
1849
1850
1851
1852  def send(self,message):
1853    """
1854      <Purpose>
1855        Sends data on a socket.   It may send fewer bytes than requested.   
1856
1857      <Arguments>
1858        message:
1859          The string to send.
1860
1861      <Exceptions>
1862        Exception if the socket is closed either locally or remotely.
1863
1864      <Side Effects>
1865        This call may block the thread until the other side calls recv.
1866
1867      <Returns>
1868        The number of bytes sent.   Be sure not to assume this is always the
1869        complete amount!
1870    """
1871    # prevent TOCTOU race with client changing the object's properties
1872    mycommid = self.commid
1873    restrictions.assertisallowed('socket.send',message)
1874
1875    # I factor this out because we must do the accounting at the bottom of this
1876    # function and I want to make sure we account properly even if they close
1877    # the socket right after their data is sent
1878    try:
1879      this_is_loopback = is_loopback(comminfo[mycommid]['remotehost'])
1880    except KeyError:
1881      raise Exception, "Socket closed!"
1882
1883    # wait if already oversubscribed
1884    if this_is_loopback:
1885      nanny.tattle_quantity('loopsend',0)
1886    else:
1887      nanny.tattle_quantity('netsend',0)
1888
1889    try:
1890      # Trim the message size to be less than the sendbuffersize.
1891      # This is a fix for http://support.microsoft.com/kb/823764
1892      message = message[:comminfo[mycommid]['sendbuffersize']-1]
1893    except KeyError:
1894      raise Exception, "Socket closed!"
1895
1896    # loop until we send the information (looping is needed for Windows)
1897    while True:
1898      try:
1899        # Armon: Get the real socket
1900        realsocket = comminfo[mycommid]['socket']
1901       
1902        # Check if the socket is ready for writing, wait 0.2 seconds
1903        (read_will_block, write_will_block) = socket_state(realsocket, "w", 0.2)
1904        if not write_will_block:
1905          bytessent = realsocket.send(message)
1906          break
1907     
1908      except KeyError:
1909        raise Exception, "Socket closed"
1910
1911      except Exception,e:
1912        # Determine if the exception is fatal
1913        if is_recoverable_network_exception(e):
1914          continue
1915        else:
1916          # Check if this is a conn. term., and give a more specific exception.
1917          if is_terminated_connection_exception(e):
1918            raise Exception("Socket closed")
1919          else:
1920            raise
1921
1922    if this_is_loopback:
1923      nanny.tattle_quantity('loopsend',bytessent)
1924    else:
1925      nanny.tattle_quantity('netsend',bytessent)
1926
1927    return bytessent
1928
1929
1930  # Checks if socket read/write operations will block
1931  def willblock(self):
1932    """
1933    <Purpose>
1934      Determines if a socket would block if send() or recv() was called.
1935
1936    <Exceptions>
1937      Socket Closed if the socket has been closed.
1938
1939    <Returns>
1940      A tuple, (recv_will_block, send_will_block) both are boolean values.
1941
1942    """
1943
1944    try:
1945      # Get the real socket
1946      realsocket = comminfo[self.commid]['socket']
1947
1948      # Call into socket_state with no timout to return instantly
1949      return socket_state(realsocket)
1950   
1951    # The socket is closed or in the process of being closed...
1952    except KeyError:
1953      raise Exception, "Socket closed"
1954
1955    except Exception, e:
1956      # Determine if the socket is closed
1957      if is_terminated_connection_exception(e):
1958        raise Exception("Socket closed")
1959     
1960      # Otherwise raise whatever we have
1961      else:
1962        raise
1963
1964
1965
1966  def __del__(self):
1967    cleanup(self.commid)
1968
1969
1970
1971# End of emulated_socket class
Note: See TracBrowser for help on using the browser.