Please note that the CVS and issue trackers have moved to GitHub. These Trac pages are no longer kept up-to-date.

root/seattle/trunk/repy/emulcomm.py@5617

Revision 5617, 57.2 KB (checked in by justinc, 7 years ago)

comment explaining multiple interface behavior

Line 
1"""
2   Author: Justin Cappos
3
4   Start Date: 27 June 2008
5
6   Description:
7
8   This is a collection of communications routines that provide a programmer
9   with a reasonable environment.   This is used by repy.py to provide a
10   highly restricted (but usable) environment.
11   
12   Note that this module can also be used to place restrictions on the
13   interfaces and IP addresses that are used by Seattle.   The order in which
14   the interfaces are specified is their preference.   
15
16   One odd piece of behavior is that if an IP address changes, you can't use
17   it until calling getmyip.   It's not clear how you would know about this
18   occurrence, but it should be noted.
19"""
20
21import restrictions
22import socket
23
24# Armon: Used to check if a socket is ready
25import select
26
27# socket uses getattr and setattr.   We need to make these available to it...
28socket.getattr = getattr
29socket.setattr = setattr
30
31
32# needed to set threads for recvmess and waitforconn
33import threading
34
35# to force destruction of old sockets
36import gc
37
38# So I can exit all threads when an error occurs or do select
39import harshexit
40
41# Needed for finding out info about sockets, available interfaces, etc
42import nonportable
43
44# So I can print a clean traceback when an error happens
45import tracebackrepy
46
47# accounting
48import nanny
49
50# give me uniqueIDs for the comminfo table
51import idhelper
52
53# for sleep
54import time 
55
56# Armon: Used for decoding the error messages
57import errno
58
59# Armon: Used for getting the constant IP values for resolving our external IP
60import repy_constants 
61
62# The architecture is that I have a thread which "polls" all of the sockets
63# that are being listened on using select.  If a connection
64# oriented socket has a connection pending, or a message-based socket has a
65# message pending, and there are enough events it calls the appropriate
66# function.
67
68
69
70
71
72# Table of communications structures:
73# {'type':'UDP','localip':ip, 'localport':port,'function':func,'socket':s, outgoing:True, 'closing_lock':lockobj}
74# {'type':'TCP','remotehost':None, 'remoteport':None,'localip':None,'localport':None, 'socket':s, 'function':func, outgoing:False, 'closing_lock':lockobj}
75
76comminfo = {}
77
78# If we have a preference for an IP/Interface this flag is set to True
79user_ip_interface_preferences = False
80
81# Do we allow non-specified IPs
82allow_nonspecified_ips = True
83
84# Armon: Specified the list of allowed IP and Interfaces in order of their preference
85# The basic structure is list of tuples (IP, Value), IP is True if its an IP, False if its an interface
86user_specified_ip_interface_list = []
87
88# This list caches the allowed IP's
89# It is updated at the launch of repy, or by calls to getmyip and update_ip_cache
90# NOTE: The loopback address 127.0.0.1 is always permitted. update_ip_cache will always add this
91# if it is not specified explicitly by the user
92allowediplist = []
93cachelock = threading.Lock()  # This allows only a single simultaneous cache update
94
95
96# Determines if a specified IP address is allowed in the context of user settings
97def ip_is_allowed(ip):
98  """
99  <Purpose>
100    Determines if a given IP is allowed, by checking against the cached allowed IP's.
101 
102  <Arguments>
103    ip: The IP address to search for.
104 
105  <Returns>
106    True, if allowed. False, otherwise.
107  """
108  global allowediplist
109  global user_ip_interface_preferences
110  global allow_nonspecified_ips
111 
112  # If there is no preference, anything goes
113  # same with allow_nonspecified_ips
114  if not user_ip_interface_preferences or allow_nonspecified_ips:
115    return True
116 
117  # Check the list of allowed IP's
118  return (ip in allowediplist)
119
120
121# Only appends the elem to lst if the elem is unique
122def unique_append(lst, elem):
123  if elem not in lst:
124    lst.append(elem)
125     
126# This function updates the allowed IP cache
127# It iterates through all possible IP's and stores ones which are bindable as part of the allowediplist
128def update_ip_cache():
129  global allowediplist
130  global user_ip_interface_preferences
131  global user_specified_ip_interface_list
132  global allow_nonspecified_ips
133 
134  # If there is no preference, this is a no-op
135  if not user_ip_interface_preferences:
136    return
137   
138  # Acquire the lock to update the cache
139  cachelock.acquire()
140 
141  # If there is any exception release the cachelock
142  try: 
143    # Stores the IP's
144    allowed_list = []
145 
146    # Iterate through the allowed list, handle each element
147    for (is_ip_addr, value) in user_specified_ip_interface_list:
148      # Handle normal IP's
149      if is_ip_addr:
150        unique_append(allowed_list, value)
151   
152      # Handle interfaces
153      else:
154        try:
155          # Get the IP's associated with the NIC
156          interface_ips = nonportable.os_api.get_interface_ip_addresses(value)
157          for interface_ip in interface_ips:
158            unique_append(allowed_list, interface_ip)
159        except:
160          # Catch exceptions if the NIC does not exist
161          pass
162 
163    # This will store all the IP's that we are able to bind to
164    bindable_list = []
165       
166    # Try binding to every ip
167    for ip in allowed_list:
168      sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
169      try:
170        sock.bind((ip,0))
171      except:
172        pass # Not a good ip, skip it
173      else:
174        bindable_list.append(ip) # This is a good ip, store it
175      finally:
176        sock.close()
177
178    # Add loopback
179    unique_append(bindable_list, "127.0.0.1")
180 
181    # Update the global cache
182    allowediplist = bindable_list
183 
184  finally:     
185    # Release the lock
186    cachelock.release()
187 
188########################### General Purpose socket functions #################
189
190def is_already_connected_exception(exceptionobj):
191  """
192  <Purpose>
193    Determines if a given error number indicates that the socket
194    is already connected.
195
196  <Arguments>
197    An exception object from a network call.
198
199  <Returns>
200    True if already connected, false otherwise
201  """
202  # Get the type
203  exception_type = type(exceptionobj)
204
205  # Only continue if the type is socket.error
206  if exception_type is not socket.error:
207    return False
208
209  # Get the error number
210  errnum = exceptionobj[0]
211
212  # Store a list of error messages meaning we are connected
213  connected_errors = ["EISCONN", "WSAEISCONN"]
214
215  # Convert the errno to and error string name
216  try:
217    errname = errno.errorcode[errnum]
218  except Exception,e:
219    # The error is unknown for some reason...
220    errname = None
221 
222  # Return if the error name is in our white list
223  return (errname in connected_errors)
224
225
226def is_recoverable_network_exception(exceptionobj):
227  """
228  <Purpose>
229    Determines if a given error number is recoverable or fatal.
230
231  <Arguments>
232    An exception object from a network call.
233
234  <Returns>
235    True if potentially recoverable, False if fatal.
236  """
237  # Get the type
238  exception_type = type(exceptionobj)
239
240  # socket.timeout is recoverable always
241  if exception_type == socket.timeout:
242    return True
243
244  # Only continue if the type is socket.error or select.error
245  elif exception_type != socket.error and exception_type != select.error:
246    return False
247 
248  # Get the error number
249  errnum = exceptionobj[0]
250
251  # Store a list of recoverable error numbers
252  recoverable_errors = ["EINTR","EAGAIN","EBUSY","EWOULDBLOCK","ETIMEDOUT","ERESTART",
253                        "WSAEINTR","WSAEWOULDBLOCK","WSAETIMEDOUT","EALREADY","WSAEALREADY",
254                       "EINPROGRESS","WSAEINPROGRESS"]
255
256  # Convert the errno to and error string name
257  try:
258    errname = errno.errorcode[errnum]
259  except Exception,e:
260    # The error is unknown for some reason...
261    errname = None
262 
263  # Return if the error name is in our white list
264  return (errname in recoverable_errors)
265
266
267# Determines based on exception if the connection has been terminated
268def is_terminated_connection_exception(exceptionobj):
269  """
270  <Purpose>
271    Determines if the exception is indicated the connection is terminated.
272
273  <Arguments>
274    An exception object from a network call.
275
276  <Returns>
277    True if the connection is terminated, False otherwise.
278    False means we could not determine with certainty if the socket is closed.
279  """
280  # Get the type
281  exception_type = type(exceptionobj)
282
283  # We only want to continue if it is socket.error or select.error
284  if exception_type != socket.error and exception_type != select.error:
285    return False
286
287  # Get the error number
288  errnum = exceptionobj[0]
289
290  # Store a list of errors which indicate connection closed
291  connection_closed_errors = ["EPIPE","EBADF","EBADR","ENOLINK","EBADFD","ENETRESET",
292                              "ECONNRESET","WSAEBADF","WSAENOTSOCK","WSAECONNRESET",]
293
294  # Convert the errnum to an error string
295  try:
296    errname = errno.errorcode[errnum]
297  except:
298    # The error number is not defined...
299    errname = None
300
301  # Return whether the errname is in our pre-defined list
302  return (errname in connection_closed_errors)
303
304
305# Armon: This is used for semantics, to determine if we have a valid IP.
306def is_valid_ip_address(ipaddr):
307  """
308  <Purpose>
309    Determines if ipaddr is a valid IP address.
310    Address 0.0.0.0 is considered valid.
311
312  <Arguments>
313    ipaddr: String to check for validity. (It will check that this is a string).
314
315  <Returns>
316    True if a valid IP, False otherwise.
317  """
318  # Argument must be of the string type
319  if not type(ipaddr) == str:
320    return False
321
322  # A valid IP should have 4 segments, explode on the period
323  parts = ipaddr.split(".")
324
325  # Check that we have 4 parts
326  if len(parts) != 4:
327    return False
328
329  # Check that each segment is a number between 0 and 255 inclusively.
330  for part in parts:
331    # Check the length of each segment
332    digits = len(part)
333    if digits >= 1 and digits <= 3:
334      # Attempt to convert to an integer
335      try:
336        number = int(part)
337        if not (number >= 0 and number <= 255):
338          return False
339
340      except:
341        # There was an error converting to an integer, not an IP
342        return False
343    else:
344      return False
345
346  # At this point, assume the IP is valid
347  return True
348
349# Armon: This is used for semantics, to determine if the given port is valid
350def is_valid_network_port(port, allowzero=False):
351  """
352  <Purpose>
353    Determines if a given network port is valid.
354
355  <Arguments>
356    port: A numeric type (this will be checked) port number.
357    allowzero: Allows 0 as a valid port if true
358
359  <Returns>
360    True if valid, False otherwise.
361  """
362  # Check the type is int or long
363  if not (type(port) == long or type(port) == int):
364    return False
365
366  return ((allowzero and port == 0) or (port >= 1 and port <= 65535))
367
368
369# Constant prefix for comm handles.
370COMM_PREFIX = "_COMMH:"
371
372# Makes commhandles for networking functions
373def generate_commhandle():
374  """
375  <Purpose>
376    Generates a string commhandle that can be used to uniquely identify
377    a socket, while providing a means of "pseudo" verification.
378
379  <Returns>
380    A string handle.
381  """
382  # Get a unique value from idhelper
383  uniqueid = idhelper.getuniqueid()
384
385  # Return the id prefixed by the COMM_PREFIX
386  return (COMM_PREFIX + uniqueid)
387
388
389# Helps determine if a commhandle is valid
390def is_valid_commhandle(commhandle):
391  """
392  <Purpose>
393    Determines if the given commhandle is potentially valid.
394    This is not a guarentee of validity, e.g. the commhandle may not
395    exist.
396
397  <Arguments>
398    commhandle:
399      The handle to be checked for validity
400
401  <Returns>
402    True if the handle if valid, False otherwise.
403  """
404  # Check if the handle is a string, this is a requirement
405  if type(commhandle) != str:
406    return False
407
408  # Return if the handle starts with the correct prefix
409  # This way we are not relying on the format of idhelper.getuniqueid()
410  return commhandle.startswith(COMM_PREFIX)
411
412
413########################### SocketSelector functions #########################
414
415
416
417# used to lock the methods that check to see if the thread is running
418selectorlock = threading.Lock()
419
420# is the selector thread started...
421selectorstarted = False
422
423
424#### helper functions
425
426# return the table entry for this socketobject
427def find_socket_entry(socketobject):
428  for commhandle in comminfo.keys():
429    if comminfo[commhandle]['socket'] is socketobject:
430      return comminfo[commhandle], commhandle
431  raise KeyError, "Can't find commhandle"
432
433
434
435
436# wait until there is a free event
437def wait_for_event(eventname):
438  while True:
439    try:
440      nanny.tattle_add_item('events',eventname)
441      break
442    except Exception:
443      # They must be over their event limit.   I'll sleep and check later
444      time.sleep(.1)
445
446
447
448def should_selector_exit():
449  global selectorstarted
450
451  # Let's check to see if we should exit...   False means "nonblocking"
452  if selectorlock.acquire(False):
453
454    # Check that selector started is true.   This should *always* be the case
455    # when I enter this function.   This is to test for bugs in my code
456    if not selectorstarted:
457      # This will cause the program to exit and log things if logging is
458      # enabled. -Brent
459      tracebackrepy.handle_internalerror("SocketSelector is started when" +
460          ' selectorstarted is False', 39)
461
462    # Got the lock...
463    for comm in comminfo.values():
464      # I'm listening and waiting so all is well
465      if not comm['outgoing']:
466        break
467    else:
468      # there is no listening function so I should exit...
469      selectorstarted = False
470      # I'm exiting...
471      nanny.tattle_remove_item('events',"SocketSelector")
472      selectorlock.release()
473      return True
474
475    # I should continue
476    selectorlock.release()
477  return False
478   
479
480
481
482
483# This function starts a thread to handle an entry with a readable socket in
484# the comminfo table
485def start_event(entry, handle,eventhandle):
486  if entry['type'] == 'UDP':
487    # some sort of socket error, I'll assume they closed the socket or it's
488    # not important
489    try:
490      # NOTE: 64K is the max UDP dgram size.   Let's read it all
491      data, addr = entry['socket'].recvfrom(65535)
492    except socket.error:
493      # they closed in the meantime?
494      nanny.tattle_remove_item('events',eventhandle)
495      return
496
497    # wait if we're over the limit
498    if data:
499      if is_loopback(entry['localip']):
500        nanny.tattle_quantity('looprecv',len(data))
501      else:
502       # We will charge looprecv for UDP from the net, (see #887 for details)
503        nanny.tattle_quantity('looprecv',len(data))
504    else:
505      # no data...   Let's stop this...
506      nanny.tattle_remove_item('events',eventhandle)
507      return
508
509     
510    try:
511      EventDeliverer(entry['function'],(addr[0], addr[1], data, handle), eventhandle).start()
512    except Exception, e:
513      # This is an internal error I think...
514      # This will cause the program to exit and log things if logging is
515      # enabled. -Brent
516      tracebackrepy.handle_internalerror("Can't start UDP EventDeliverer '" + str(e)+"'", 29)
517
518
519
520  # or it's a TCP accept event...
521  elif entry['type'] == 'TCP':
522    try:
523      realsocket, addr = entry['socket'].accept()
524    except socket.error:
525      # they closed in the meantime?
526      nanny.tattle_remove_item('events',eventhandle)
527      return
528   
529    # put this handle in the table
530    newhandle = generate_commhandle()
531    comminfo[newhandle] = {'type':'TCP','remotehost':addr[0], 'remoteport':addr[1],'localip':entry['localip'],'localport':entry['localport'],'socket':realsocket,'outgoing':True, 'closing_lock':threading.Lock()}
532    # I don't think it makes sense to count this as an outgoing socket, does
533    # it?
534
535    # Armon: Create the emulated socket after the table entry
536    safesocket = emulated_socket(newhandle)
537
538    try:
539      EventDeliverer(entry['function'],(addr[0], addr[1], safesocket, newhandle, handle),eventhandle).start()
540    except Exception, e:
541      # This is an internal error I think...
542      # This will cause the program to exit and log things if logging is
543      # enabled. -Brent
544      tracebackrepy.handle_internalerror("Can't start TCP EventDeliverer '"+str(e)+"'", 23)
545
546
547  else:
548    # Should never get here
549    # This will cause the program to exit and log things if logging is
550    # enabled. -Brent
551    tracebackrepy.handle_internalerror("In start event, Unknown entry type '"+entry['type']+"'", 51)
552
553
554
555# Armon: What is the maximum number of samples to perform per second?
556# This is to prevent excessive sampling if there is a bad socket and
557# select() returns before timing out
558MAX_SAMPLES_PER_SEC = 10
559TIME_BETWEEN_SAMPLES = 1.0 / MAX_SAMPLES_PER_SEC
560
561# Check for sockets using select and fire up user event threads as needed.
562#
563# This class holds nearly all of the complexity in this module.   It's
564# basically just a loop that gets pending sockets (using select) and then
565# fires up events that call user provided functions
566class SocketSelector(threading.Thread):
567 
568  def __init__(self):
569    threading.Thread.__init__(self, name="SocketSelector")
570
571
572  # Gets a list of all the sockets which are ready to have
573  # accept() called on them
574  def get_acceptable_sockets(self):
575    # get the list of socket objects we might have a pending request on
576    requestlist = []
577    for comm in comminfo.values():
578      if not comm['outgoing']:
579        requestlist.append(comm['socket'])
580
581    # nothing to request.   We should loop back around and check if all
582    # sockets have been closed
583    if requestlist == []:
584      return []
585
586    # Perform a select on these sockets
587    try:
588      # Call select
589      (acceptable, not_applic, has_excp) = select.select(requestlist,[],requestlist,0.5)
590   
591      # Add all the sockets with exceptions to the acceptable list
592      for sock in has_excp:
593        if sock not in acceptable:
594          acceptable.append(sock)
595
596      # Return the acceptable list
597      return acceptable
598   
599    # There was probably an exception on the socket level, check individually
600    except:
601
602      # Hold the ready sockets
603      readylist = []
604
605      # Check each requested socket
606      for socket in requestlist:
607        try:
608          (accept_will_block, write_will_block) = socket_state(socket, "r")
609          if not accept_will_block:
610            readylist.append(socket)
611       
612        # Ignore errors, probably the socket is closed.
613        except:
614          pass
615
616      # Return the ready list
617      return readylist
618
619
620
621  def run(self):
622    # Keep track of the last sample time
623    # updated when there are no ready sockets
624    last_sample = 0
625
626    while True:
627
628      # I'll stop myself only when there are no active threads to monitor
629      if should_selector_exit():
630        return
631
632      # If the last sample with 0 ready sockets was less than TIME_BETWEEN_SAMPLES
633      # seconds ago, sleep a while. This is to prevent a tight loop from consuming
634      # CPU time doing nothing.
635      current_time = nonportable.getruntime()
636      time_diff = current_time - last_sample
637      if time_diff < TIME_BETWEEN_SAMPLES:
638        time.sleep(TIME_BETWEEN_SAMPLES - time_diff)
639
640      # Get all the ready sockets
641      readylist = self.get_acceptable_sockets()
642
643      # If there is nothing to do, potentially delay the next sample
644      if len(readylist) == 0:
645        last_sample = current_time
646
647      # go through the pending sockets, grab an event and then start a thread
648      # to handle the connection
649      for thisitem in readylist:
650        try: 
651          commtableentry,commhandle = find_socket_entry(thisitem)
652        except KeyError:
653          # let's skip this one, it's likely it was closed in the interim
654          continue
655
656        # now it's time to get the event...   I'll loop until there is a free
657        # event
658        eventhandle = idhelper.getuniqueid()
659        wait_for_event(eventhandle)
660
661        # wait if already oversubscribed
662        if is_loopback(commtableentry['localip']):
663          nanny.tattle_quantity('looprecv',0)
664        else:
665          nanny.tattle_quantity('netrecv',0)
666
667        # Now I can start a thread to run the user's code...
668        start_event(commtableentry,commhandle,eventhandle)
669     
670
671
672
673
674
675
676
677# this gives an actual event to the user's code
678class EventDeliverer(threading.Thread):
679  func = None
680  args = None
681  eventid = None
682
683  def __init__(self, f, a,e):
684    self.func = f
685    self.args = a
686    self.eventid = e
687
688    # Initialize with a custom and unique thread name
689    threading.Thread.__init__(self,name=idhelper.get_new_thread_name(COMM_PREFIX))
690
691  def run(self):
692    try:
693      self.func(*(self.args))
694    except:
695      # we probably should exit if they raise an exception in a thread...
696      tracebackrepy.handle_exception()
697      harshexit.harshexit(14)
698
699    finally:
700      # our event is going away...
701      nanny.tattle_remove_item('events',self.eventid)
702     
703
704
705
706
707       
708#### used by other threads to interact with the SocketSelector...
709
710
711# private.   Check if the SocketSelector is running and start it if it isn't
712def check_selector():
713  global selectorstarted
714
715  # acquire the lock.
716  if selectorlock.acquire():
717    # If I've not started, then start me...
718    if not selectorstarted:
719      # wait until there is a free event...
720      wait_for_event("SocketSelector")
721      selectorstarted = True
722      SocketSelector().start()
723
724    # verify a thread with the name "SocketSelector" is running
725    for threadobj in threading.enumerate():
726      if threadobj.getName() == "SocketSelector":
727        # all is well
728        selectorlock.release()
729        return
730 
731    # this is bad.   The socketselector went away...
732    # This will cause the program to exit and log things if logging is
733    # enabled. -Brent
734    tracebackrepy.handle_internalerror("SocketSelector died", 59)
735
736
737
738
739# return the table entry for this type of socket, ip, port
740def find_tip_entry(socktype, ip, port):
741  for commhandle in comminfo.keys():
742    if comminfo[commhandle]['type'] == socktype and comminfo[commhandle]['localip'] == ip and comminfo[commhandle]['localport'] == port:
743      return comminfo[commhandle], commhandle
744  return (None,None)
745
746
747
748# Find a commhandle, given TIPO: type, ip, port, outgoing
749def find_tipo_commhandle(socktype, ip, port, outgoing):
750  for commhandle in comminfo.keys():
751    if comminfo[commhandle]['type'] == socktype and comminfo[commhandle]['localip'] == ip and comminfo[commhandle]['localport'] == port and comminfo[commhandle]['outgoing'] == outgoing:
752      return commhandle
753  return None
754
755
756# Find an outgoing TCP commhandle, given local ip, local port, remote ip, remote port,
757def find_outgoing_tcp_commhandle(localip, localport, remoteip, remoteport):
758  for commhandle in comminfo.keys():
759    if comminfo[commhandle]['type'] == "TCP" and comminfo[commhandle]['localip'] == localip \
760    and comminfo[commhandle]['localport'] == localport and comminfo[commhandle]['remotehost'] == remoteip \
761    and comminfo[commhandle]['remoteport'] == remoteport and comminfo[commhandle]['outgoing'] == True:
762      return commhandle
763  return None
764
765
766
767
768
769
770######################### Simple Public Functions ##########################
771
772
773
774# Public interface
775def gethostbyname_ex(name):
776  """
777   <Purpose>
778      Provides information about a hostname.   Calls socket.gethostbyname_ex()
779
780   <Arguments>
781      name:
782         The host name to get information about
783
784   <Exceptions>
785      As from socket.gethostbyname_ex()
786
787   <Side Effects>
788      None.
789
790   <Returns>
791      A tuple containing (hostname, aliaslist, ipaddrlist).   See the
792      python docs for socket.gethostbyname_ex()
793  """
794
795  restrictions.assertisallowed('gethostbyname_ex',name)
796
797  # charge 4K for a look up...   I don't know the right number, but we should
798  # charge something.   We'll always charge to the netsend interface...
799  nanny.tattle_quantity('netsend',4096) 
800  nanny.tattle_quantity('netrecv',4096)
801  return socket.gethostbyname_ex(name)
802
803
804
805# Public interface
806def getmyip():
807  """
808   <Purpose>
809      Provides the external IP of this computer.   Does some clever trickery.
810
811   <Arguments>
812      None
813
814   <Exceptions>
815      As from socket.gethostbyname_ex()
816
817   <Side Effects>
818      None.
819
820   <Returns>
821      The localhost's IP address
822      python docs for socket.gethostbyname_ex()
823  """
824
825  restrictions.assertisallowed('getmyip')
826  # I got some of this from: http://groups.google.com/group/comp.lang.python/browse_thread/thread/d931cdc326d7032b?hl=en
827 
828  # Update the cache and return the first allowed IP
829  # Only if a preference is set
830  if user_ip_interface_preferences:
831    update_ip_cache()
832    # Return the first allowed ip, there is always at least 1 element (loopback)
833    return allowediplist[0]
834 
835  # Initialize these to None, so we can detect a failure
836  myip = None
837 
838  # It's possible on some platforms (Windows Mobile) that the IP will be
839  # 0.0.0.0 even when I have a public IP and the external IP is up. However, if
840  # I get a real connection with SOCK_STREAM, then I should get the real
841  # answer.
842  for conn_type in [socket.SOCK_DGRAM, socket.SOCK_STREAM]:
843       
844    # Try each stable IP 
845    for ip_addr in repy_constants.STABLE_PUBLIC_IPS: 
846      try:
847        # Try to resolve using the current connection type and
848        # stable IP, using port 80 since some platforms panic
849        # when given 0 (FreeBSD)
850        myip = get_localIP_to_remoteIP(conn_type, ip_addr, 80)
851      except (socket.error, socket.timeout):
852        # We can ignore any networking related errors, since we want to try
853        # the other connection types and IP addresses. If we fail,
854        # we will eventually raise an exception anyways.
855        pass
856      else:
857        # Return immediately if the IP address is good
858        if myip != None and myip != '' and myip != "0.0.0.0": 
859          return myip
860
861
862  # Since we haven't returned yet, we must have failed.
863  # Raise an exception, we must not be connected to the internet
864  raise Exception("Cannot detect a connection to the Internet.")
865
866
867
868def get_localIP_to_remoteIP(connection_type, external_ip, external_port=80):
869  """
870  <Purpose>
871    Resolve the local ip used when connecting outbound to an external ip.
872 
873  <Arguments>
874    connection_type:
875      The type of connection to attempt. See socket.socket().
876   
877    external_ip:
878      The external IP to attempt to connect to.
879     
880    external_port:
881      The port on the remote host to attempt to connect to.
882 
883  <Exceptions>
884    As with socket.socket(), socketobj.connect(), etc.
885 
886  <Returns>
887    The locally assigned IP for the connection.
888  """
889  # Open a socket
890  sockobj = socket.socket(socket.AF_INET, connection_type)
891
892  try:
893    sockobj.connect((external_ip, external_port))
894
895    # Get the local connection information for this socket
896    (myip, localport) = sockobj.getsockname()
897     
898  # Always close the socket
899  finally:
900    sockobj.close()
901 
902  return myip
903
904
905
906
907###################### Shared message / connection items ###################
908
909
910# Used to decide if an IP is the loopback IP or not.   This is needed for
911# accounting
912def is_loopback(host):
913  if not host.startswith('127.'):
914    return False
915  if len(host.split('.')) != 4:
916    return False
917
918  for number in host.split('.'):
919    for char in number:
920      if char not in '0123456789':
921        return False
922
923    try:
924      if int(number) > 255 or int(number) < 0:
925        return False
926    except ValueError:
927      return False
928 
929  return True
930
931
932
933
934
935
936
937
938
939# Public interface !!!
940def stopcomm(commhandle):
941  """
942   <Purpose>
943      Stop handling events for a commhandle.   This works for both message and
944      connection based event handlers.
945
946   <Arguments>
947      commhandle:
948         A commhandle as returned by recvmess or waitforconn.
949
950   <Exceptions>
951      None.
952
953   <Side Effects>
954      This has an undefined effect on a socket-like object if it is currently
955      in use.
956
957   <Returns>
958      Returns True if commhandle was successfully closed, False if the handle
959      cannot be closed (i.e. it was already closed).
960  """
961  # Armon: Check that the handle is valid, an exception needs to be raised otherwise.
962  if not is_valid_commhandle(commhandle):
963    raise Exception("Invalid commhandle specified!")
964
965  # if it has already been cleaned up, exit.
966  if commhandle not in comminfo:
967    # Armon: Semantic update, stopcomm needs to return True/False
968    # since the handle does not exist we will return False
969    return False
970
971  restrictions.assertisallowed('stopcomm',comminfo[commhandle])
972
973  cleanup(commhandle)
974 
975  # Armon: Semantic update, we successfully closed
976  # if we made it here, since cleanup blocks.
977  return True
978
979
980
981# Armon: How frequently should we check for the availability of the socket?
982RETRY_INTERVAL = 0.2 # In seconds
983
984# Private
985def cleanup(handle):
986  # Armon: lock the cleanup so that only one thread will do the cleanup, but
987  # all the others will block as well
988  try:
989    handle_lock = comminfo[handle]['closing_lock']
990  except KeyError:
991    # Handle a possible race condition, the socket has already been cleaned up.
992    return
993 
994  # Acquire the lock       
995  handle_lock.acquire()
996
997  # if it's in the table then remove the entry and tattle...
998  try:
999    if handle in comminfo:
1000      # Armon: Shutdown the socket for writing prior to close
1001      # to unblock any threads that are writing
1002      try:
1003        comminfo[handle]['socket'].shutdown(socket.SHUT_WR)
1004      except:
1005        pass
1006
1007      try:
1008        comminfo[handle]['socket'].close()
1009      except:
1010        pass
1011     
1012      info = comminfo[handle]  # Store the info
1013
1014      if info['outgoing']:
1015        nanny.tattle_remove_item('outsockets', handle)
1016      else:
1017        nanny.tattle_remove_item('insockets', handle)
1018   
1019        # Armon: Block while the socket is not yet cleaned up
1020        # Get the socket info
1021        ip = info['localip']
1022        port = info['localport']
1023        socketType = info['type']
1024        tcp = (socketType == 'TCP') # Check if this is a TCP typed connection
1025   
1026        # Loop until the socket no longer exists
1027        # BUG: There exists a potential race condition here. The problem is that
1028        # the socket may be cleaned up and then before we are able to check for it again
1029        # another process binds to the ip/port we are checking. This would cause us to detect
1030        # the socket from the other process and we would block indefinately while that socket
1031        # is open.
1032        while nonportable.os_api.exists_listening_network_socket(ip,port, tcp):
1033          time.sleep(RETRY_INTERVAL)
1034     
1035      # Delete the entry last, so that other stopcomm operations will block
1036      try: # Guard against a rare and poorly understood error. #1052
1037        del comminfo[handle]
1038      except KeyError:
1039        pass
1040
1041  finally:
1042    # Always release the lock
1043    handle_lock.release()
1044
1045
1046
1047####################### Message sending #############################
1048
1049
1050
1051# Public interface!!!
1052def sendmess(desthost, destport, message,localip=None,localport = None):
1053  """
1054   <Purpose>
1055      Send a message to a host / port
1056
1057   <Arguments>
1058      desthost:
1059         The host to send a message to
1060      destport:
1061         The port to send the message to
1062      message:
1063         The message to send
1064      localhost (optional):
1065         The local IP to send the message from
1066      localport (optional):
1067         The local port to send the message from (0 for a random port)
1068
1069   <Exceptions>
1070      socket.error when communication errors happen
1071
1072   <Side Effects>
1073      None.
1074
1075   <Returns>
1076      The number of bytes sent on success
1077  """
1078  # Check that if either localip or local port is specified, that both are
1079  if (localip != None and localport == None) or (localport != None and localip == None):
1080    raise Exception("Localip and localport must be specified simultaneously.")
1081 
1082  # Assign the default value to localport if none given
1083  if localport == None:
1084    localport = 0
1085
1086  if not localip or localip == '0.0.0.0':
1087    localip = None
1088# JAC: removed since this breaks semantics
1089#  else:
1090#    if not is_valid_ip_address(localip):
1091#      raise Exception("Local IP address is invalid.")
1092
1093# JAC: removed since this breaks semantics
1094#  if not is_valid_ip_address(desthost):
1095#    raise Exception("Destination host IP address is invalid.")
1096 
1097  if not is_valid_network_port(destport):
1098    raise Exception("Destination port number must be an integer, between 1 and 65535.")
1099
1100  if not is_valid_network_port(localport, True):
1101    raise Exception("Local port number must be an integer, between 1 and 65535.")
1102
1103  restrictions.assertisallowed('sendmess', desthost, destport, message,localip,localport)
1104
1105  if localport:
1106    nanny.tattle_check('messport',localport)
1107
1108  # Armon: Check if the specified local ip is allowed
1109  # this check only makes sense if the localip is specified
1110  if localip and not ip_is_allowed(localip):
1111    raise Exception, "IP '"+str(localip)+"' is not allowed."
1112 
1113  # If there is a preference, but no localip, then get one
1114  elif user_ip_interface_preferences and not localip:
1115    # Use whatever getmyip returns
1116    localip = getmyip()
1117
1118  # this is used to track errors when trying to resend data
1119  firsterror = None
1120
1121  if localip and localport:
1122    # let's see if the socket already exists...
1123    commtableentry,commhandle = find_tip_entry('UDP',localip,localport)
1124  else:
1125    # no, we'll skip
1126    commhandle = None
1127
1128  # yes it does! let's use the existing socket
1129  if commhandle:
1130
1131    # block in case we're oversubscribed
1132    if is_loopback(desthost):
1133      nanny.tattle_quantity('loopsend',0)
1134    else:
1135      nanny.tattle_quantity('netsend',0)
1136
1137    # try to send using this socket
1138    try:
1139      bytessent =  commtableentry['socket'].sendto(message,(desthost,destport))
1140    except socket.error,e:
1141      # we're going to save this error in case we also get an error below.   
1142      # This is likely to be the error we actually want to raise
1143      firsterror = e
1144      # should I really fall through here?
1145    else:
1146      # send succeeded, let's wait and return
1147      if is_loopback(desthost):
1148        nanny.tattle_quantity('loopsend',bytessent)
1149      else:
1150        nanny.tattle_quantity('netsend',bytessent)
1151      return bytessent
1152 
1153
1154  # open a new socket
1155  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1156 
1157  # the send buffer must also be set or it will constrain UDP sendmess
1158  # size on Mac.
1159  s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 66000)
1160
1161  try:
1162    if localip:
1163      try:
1164        s.bind((localip,localport))
1165      except socket.error, e:
1166        if firsterror:
1167          raise Exception, firsterror
1168        raise Exception, e
1169
1170    # wait if already oversubscribed
1171    if is_loopback(desthost):
1172      nanny.tattle_quantity('loopsend',0)
1173    else:
1174      nanny.tattle_quantity('netsend',0)
1175
1176    bytessent =  s.sendto(message,(desthost,destport))
1177
1178    if is_loopback(desthost):
1179      nanny.tattle_quantity('loopsend',bytessent)
1180    else:
1181      nanny.tattle_quantity('netsend',bytessent)
1182
1183    return bytessent
1184
1185  finally:
1186    # close no matter what
1187    try:
1188      s.close()
1189    except:
1190      pass
1191
1192
1193
1194
1195
1196
1197# Public interface!!!
1198def recvmess(localip, localport, function):
1199  """
1200   <Purpose>
1201      Registers a function as an event handler for incoming messages
1202
1203   <Arguments>
1204      localip:
1205         The local IP or hostname to register the handler on
1206      localport:
1207         The port to listen on
1208      function:
1209         The function that messages should be delivered to.   It should expect
1210         the following arguments: (remoteIP, remoteport, message, commhandle)
1211
1212   <Exceptions>
1213      None.
1214
1215   <Side Effects>
1216      Registers an event handler.
1217
1218   <Returns>
1219      The commhandle for this event handler.
1220  """
1221  if not localip or localip == '0.0.0.0':
1222    raise Exception("Must specify a local IP address")
1223
1224# JAC: removed since this breaks semantics
1225#  if not is_valid_ip_address(localip):
1226#    raise Exception("Local IP address is invalid.")
1227
1228  if not is_valid_network_port(localport):
1229    raise Exception("Local port number must be an integer, between 1 and 65535.")
1230
1231# Armon: Disabled function check since it is incompatible with functions that have
1232# a variable number of parameters. e.g. func1(*args)
1233#  # Check that the user specified function exists and takes 4 arguments
1234#  try:
1235#    # Get the argument count
1236#    arg_count = function.func_code.co_argcount
1237#   
1238#    # Is "self" the first argument?
1239#    object_function = function.func_code.co_varnames[0] == "self"   
1240#   
1241#    # We need the function to take 4 parameters, or 5 if its an object function
1242#    assert(arg_count == 4 or (arg_count == 5 and object_function))
1243#  except:
1244#    # If this is not a function, an exception will be raised.
1245#    raise Exception("Specified function must be valid, and take 4 parameters. See recvmess.")
1246
1247  restrictions.assertisallowed('recvmess',localip,localport)
1248
1249  nanny.tattle_check('messport',localport)
1250 
1251  # Armon: Check if the specified local ip is allowed
1252  if not ip_is_allowed(localip):
1253    raise Exception, "IP '"+localip+"' is not allowed."
1254 
1255  # Armon: Generate the new handle since we need it
1256  # to replace the old handle if it exists
1257  handle = generate_commhandle()
1258
1259  # check if I'm already listening on this port / ip
1260  # NOTE: I check as though there might be a socket open that is sending a
1261  # message.   This is nonsense since sendmess doesn't result in a socket
1262  # persisting.   This is done so that if sockets for sendmess are cached
1263  # later (as seems likely) the resulting code will not break.
1264  oldhandle = find_tipo_commhandle('UDP', localip, localport, False)
1265  if oldhandle:
1266    # if it was already there, update the function and return
1267    comminfo[oldhandle]['function'] = function
1268
1269    # Armon: Create a new comminfo entry with the same info
1270    comminfo[handle] = comminfo[oldhandle]
1271
1272    # Remove the old entry
1273    cleanup(oldhandle)
1274
1275    # We need nanny to substitute the old handle with the new one
1276    nanny.tattle_remove_item('insockets',oldhandle)
1277    nanny.tattle_add_item('insockets',handle)
1278   
1279    # Return the new handle
1280    return handle
1281   
1282  # we'll need to add it, so add a socket...
1283  nanny.tattle_add_item('insockets',handle)
1284
1285  # get the socket
1286  try:
1287    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1288    s.bind((localip,localport))
1289
1290    # set the receive buffer size to slightly more than 64K+e (see ticket #887)
1291    s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 66000)
1292    # the send buffer must also be set or it will constrain UDP sendmess
1293    # size on Mac.   I set it here because this socket may be used for sending
1294    s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 66000)
1295
1296    nonportable.preparesocket(s)
1297  except:
1298    try:
1299      s.close()
1300    except:
1301      pass
1302    nanny.tattle_remove_item('insockets',handle)
1303    raise
1304
1305  # set up our table entry
1306  comminfo[handle] = {'type':'UDP','localip':localip, 'localport':localport,'function':function,'socket':s, 'outgoing':False, 'closing_lock':threading.Lock() }
1307
1308  # start the selector if it's not running already
1309  check_selector()
1310
1311  return handle
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325####################### Connection oriented #############################
1326
1327
1328
1329# Public interface!!!
1330def openconn(desthost, destport,localip=None, localport=None,timeout=None):
1331  """
1332   <Purpose>
1333      Opens a connection, returning a socket-like object
1334
1335   <Arguments>
1336      desthost:
1337         The host to open communcations with
1338      destport:
1339         The port to use for communication
1340      localip (optional):
1341         The local ip to use for the communication
1342      localport (optional):
1343         The local port to use for communication (0 for a random port)
1344      timeout (optional):
1345         The maximum amount of time to wait to connect
1346
1347   <Exceptions>
1348      As from socket.connect, etc.
1349
1350   <Side Effects>
1351      None.
1352
1353   <Returns>
1354      A socket-like object that can be used for communication.   Use send,
1355      recv, and close just like you would an actual socket object in python.
1356  """
1357
1358  # Set a default timeout of 5 seconds if none is specified.
1359  if timeout is None:
1360    timeout = 5.0
1361
1362  # Check that both localip and localport are given if either is specified
1363  if localip != None and localport == None or localport != None and localip == None:
1364    raise Exception("Localip and localport must be specified simultaneously.")
1365
1366  # Set the default value of localip
1367  if not localip or localip == '0.0.0.0':
1368    localip = None
1369#  else:
1370# JAC: removed since this breaks semantics
1371    # Check that the localip is valid if given
1372#    if not is_valid_ip_address(localip):
1373#      raise Exception("Local IP address is invalid.")
1374
1375  # Assign the default value of localport if none is given.
1376  if localport == None:
1377    localport = 0
1378 
1379# JAC: removed since this breaks semantics
1380  # Check the remote IP for validity
1381#  if not is_valid_ip_address(desthost):
1382#    raise Exception("Destination host IP address is invalid.")
1383
1384  if not is_valid_network_port(destport):
1385    raise Exception("Destination port number must be an integer, between 1 and 65535.")
1386
1387  # Allow the localport to be 0, which is the default.
1388  if not is_valid_network_port(localport, True):
1389    raise Exception("Local port number must be an integer, between 1 and 65535.")
1390
1391  # Check that the timeout is a number, greater than 0
1392  if not (type(timeout) == float or type(timeout) == int or type(timeout) == long) or timeout <= 0.0:
1393    raise Exception("Timeout parameter must be a numeric value greater than 0.")
1394
1395  # Armon: Check if the specified local ip is allowed
1396  # this check only makes sense if the localip is specified
1397  if localip and not ip_is_allowed(localip):
1398    raise Exception, "IP '"+str(localip)+"' is not allowed."
1399
1400  # If there is a preference, but no localip, then get one
1401  elif user_ip_interface_preferences and not localip:
1402    # Use whatever getmyip returns
1403    localip = getmyip()
1404
1405  restrictions.assertisallowed('openconn',desthost,destport,localip,localport)
1406 
1407  # Get our start time
1408  starttime = nonportable.getruntime()
1409
1410  # Armon: Check for any pre-existing sockets. If they are being closed, wait for them.
1411  # This will also serve to check if repy has a pre-existing socket open on this same tuple
1412  exists = True
1413  while exists and nonportable.getruntime() - starttime < timeout:
1414    # Update the status
1415    (exists, status) = nonportable.os_api.exists_outgoing_network_socket(localip,localport,desthost,destport)
1416    if exists:
1417      # Check the socket state
1418      if "ESTABLISH" in status or "CLOSE_WAIT" in status:
1419        # Check if the socket is from this repy vessel
1420        handle = find_outgoing_tcp_commhandle(localip, localport, desthost, destport)
1421       
1422        message = "Network socket is in use by an external process!"
1423        if handle != None:
1424          message = " Duplicate handle exists with name: "+str(handle)
1425       
1426        raise Exception, message
1427      else:
1428        # Wait for socket cleanup
1429        time.sleep(RETRY_INTERVAL)
1430  else:
1431    # Check if a socket exists still and we timed out
1432    if exists:
1433      raise Exception, "Timed out checking for socket cleanup!"
1434       
1435
1436  if localport:
1437    nanny.tattle_check('connport',localport)
1438
1439  handle = generate_commhandle()
1440
1441  # If allocation of an outsocket fails, we garbage collect and try again
1442  # -- this forces destruction of unreferenced objects, which is how we
1443  # free resources.
1444  try:
1445    nanny.tattle_add_item('outsockets',handle)
1446  except:
1447    gc.collect()
1448    nanny.tattle_add_item('outsockets',handle)
1449
1450 
1451  try:
1452    s = get_real_socket(localip,localport)
1453
1454    # prevent excessive TCP buffering (#895)
1455    s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 10000)
1456    s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 10000)
1457
1458 
1459    # add the socket to the comminfo table
1460    comminfo[handle] = {'type':'TCP','remotehost':None, 'remoteport':None,'localip':localip,'localport':localport,'socket':s, 'outgoing':True, 'closing_lock':threading.Lock()}
1461  except:
1462    # the socket wasn't passed to the user prog...
1463    nanny.tattle_remove_item('outsockets',handle)
1464    raise
1465
1466
1467  try:
1468    thissock = emulated_socket(handle)
1469    # We set a timeout before we connect.  This allows us to timeout slow
1470    # connections...
1471    oldtimeout = comminfo[handle]['socket'].gettimeout()
1472 
1473    # Set the new timeout
1474    comminfo[handle]['socket'].settimeout(timeout)
1475
1476    # Store exceptions until we exit the loop, default to timed out
1477    # in case we are given a very small timeout
1478    connect_exception = Exception("Connection timed out!")
1479
1480    # Ignore errors and retry if we have not yet reached the timeout
1481    while nonportable.getruntime() - starttime < timeout:
1482      try:
1483        comminfo[handle]['socket'].connect((desthost,destport))
1484        break
1485      except Exception,e:
1486        # Check if the socket is already connected (EISCONN or WSAEISCONN)
1487        if is_already_connected_exception(e):
1488          break
1489
1490        # Check if this is recoverable, only continue if it is
1491        elif not is_recoverable_network_exception(e):
1492          raise
1493
1494        else:
1495          # Store the exception
1496          connect_exception = e
1497
1498        # Sleep a bit, avoid excessive iterations of the loop
1499        time.sleep(0.2)
1500    else:
1501      # Raise any exception that was raised
1502      if connect_exception != None:
1503        raise connect_exception
1504
1505    comminfo[handle]['remotehost']=desthost
1506    comminfo[handle]['remoteport']=destport
1507 
1508  except:
1509    cleanup(handle)
1510    raise
1511  else:
1512    # and restore the old timeout...
1513    comminfo[handle]['socket'].settimeout(oldtimeout)
1514
1515  return thissock
1516
1517
1518
1519
1520# Public interface!!!
1521def waitforconn(localip, localport,function):
1522  """
1523   <Purpose>
1524      Waits for a connection to a port.   Calls function with a socket-like
1525      object if it succeeds.
1526
1527   <Arguments>
1528      localip:
1529         The local IP to listen on
1530      localport:
1531         The local port to bind to
1532      function:
1533         The function to call.   It should take five arguments:
1534         (remoteip, remoteport, socketlikeobj, thiscommhandle, maincommhandle)
1535         If your function has an uncaught exception, the socket-like object it
1536         is using will be closed.
1537         
1538   <Exceptions>
1539      None.
1540
1541   <Side Effects>
1542      Starts an event handler that listens for connections.
1543
1544   <Returns>
1545      A handle to the comm object.   This can be used to stop listening
1546  """
1547  if not localip or localip == '0.0.0.0':
1548    raise Exception("Must specify a local IP address")
1549
1550# JAC: removed since this breaks semantics
1551#  if not is_valid_ip_address(localip):
1552#    raise Exception("Local IP address is invalid.")
1553 
1554  if not is_valid_network_port(localport):
1555    raise Exception("Local port number must be an integer, between 1 and 65535.")
1556
1557  restrictions.assertisallowed('waitforconn',localip,localport)
1558
1559  nanny.tattle_check('connport',localport)
1560
1561  # Armon: Check if the specified local ip is allowed
1562  if not ip_is_allowed(localip):
1563    raise Exception, "IP '"+localip+"' is not allowed."
1564
1565  # Get the new handle first, because we need to replace
1566  # the oldhandle if it exists to match semantics
1567  handle = generate_commhandle()
1568 
1569  # check if I'm already listening on this port / ip
1570  oldhandle = find_tipo_commhandle('TCP', localip, localport, False)
1571  if oldhandle:
1572    # if it was already there, update the function and return
1573    comminfo[oldhandle]['function'] = function
1574
1575    # Armon: Create an entry for the handle, replicate the information
1576    comminfo[handle] = comminfo[oldhandle]
1577   
1578    # Remove the entry for the old socket
1579    cleanup(oldhandle)
1580
1581    # Un "tattle" the old handle, re-add the new handle
1582    nanny.tattle_remove_item('insockets',oldhandle)
1583    nanny.tattle_add_item('insockets',handle)
1584
1585    # Give the new handle
1586    return handle
1587   
1588  # we'll need to add it, so add a socket...
1589  nanny.tattle_add_item('insockets',handle)
1590
1591  # get the socket
1592  try:
1593    mainsock = get_real_socket(localip,localport)
1594
1595    # prevent excessive TCP buffering (#895)
1596    mainsock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 10000)
1597    mainsock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 10000)
1598
1599    # NOTE: Should this be anything other than a hardcoded number?
1600    mainsock.listen(5)
1601    # set up our table entry
1602    comminfo[handle] = {'type':'TCP','remotehost':None, 'remoteport':None,'localip':localip,'localport':localport,'socket':mainsock, 'outgoing':False, 'function':function, 'closing_lock':threading.Lock()}
1603  except:
1604    nanny.tattle_remove_item('insockets',handle)
1605    raise
1606
1607
1608  # start the selector if it's not running already
1609  check_selector()
1610
1611  return handle
1612
1613
1614
1615
1616
1617# Private
1618def get_real_socket(localip=None, localport = None):
1619
1620  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1621
1622  # reuse the socket if it's "pseudo-availible"
1623  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1624
1625
1626  if localip and localport:
1627    try:
1628      s.bind((localip,localport))
1629    except socket.error, e:
1630      # don't leak sockets
1631      s.close()
1632      raise Exception, e
1633    except:
1634      # don't leak sockets
1635      s.close()
1636      raise
1637
1638  return s
1639
1640
1641# Checks if the given real socket would block
1642def socket_state(realsock, waitfor="rw", timeout=0.0):
1643  """
1644  <Purpose>
1645    Checks if the given socket would block on a send() or recv().
1646    In the case of a listening socket, read_will_block equates to
1647    accept_will_block.
1648
1649  <Arguments>
1650    realsock:
1651              A real socket.socket() object to check for.
1652
1653    waitfor:
1654              An optional specifier of what to wait for. "r" for read only, "w" for write only,
1655              and "rw" for read or write. E.g. if timeout is 10, and wait is "r", this will block
1656              for up to 10 seconds until read_will_block is false. If you specify "r", then
1657              write_will_block is always true, and if you specify "w" then read_will_block is
1658              always true.
1659
1660    timeout:
1661              An optional timeout to wait for the socket to be read or write ready.
1662
1663  <Returns>
1664    A tuple, (read_will_block, write_will_block).
1665
1666  <Exceptions>
1667    As with select.select(). Probably best to wrap this with is_recoverable_network_exception
1668    and is_terminated_connection_exception. Throws an exception if waitfor is not in ["r","w","rw"]
1669  """
1670  # Check that waitfor is valid
1671  if waitfor not in ["rw","r","w"]:
1672    raise Exception, "Illegal waitfor argument!"
1673
1674  # Array to hold the socket
1675  sock_array = [realsock]
1676
1677  # Generate the read/write arrays
1678  read_array = []
1679  if "r" in waitfor:
1680    read_array = sock_array
1681
1682  write_array = []
1683  if "w" in waitfor:
1684    write_array = sock_array
1685
1686  # Call select()
1687  (readable, writeable, exception) = select.select(read_array,write_array,sock_array,timeout)
1688
1689  # If the socket is in the exception list, then assume its both read and writable
1690  if (realsock in exception):
1691    return (False, False)
1692
1693  # Return normally then
1694  return (realsock not in readable, realsock not in writeable)
1695
1696
1697
1698
1699# Public.   We pass these to the users for communication purposes
1700class emulated_socket:
1701  # This is an index into the comminfo table...
1702
1703  commid = 0
1704
1705  def __init__(self, handle):
1706    self.commid = handle
1707
1708    # Armon: Get the real socket
1709    try:
1710      realsocket = comminfo[handle]['socket']
1711
1712    # Shouldn't happen because my caller should create the table entry first
1713    except KeyError:
1714      raise Exception, "Internal Error. No table entry for new socket!"
1715
1716    # Make the socket non-blocking
1717    realsocket.setblocking(0)
1718
1719    try:
1720      # Store the send buffer size.   We'll send less than this to avoid a bug
1721      comminfo[handle]['sendbuffersize'] = realsocket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
1722
1723    # Really shouldn't happen.   We just checked!
1724    except KeyError:
1725      raise Exception, "Internal Error. No table entry when looking up sendbuffersize for new socket!"
1726
1727
1728    return None 
1729
1730
1731
1732
1733  def close(self):
1734    """
1735      <Purpose>
1736        Closes a socket.   Pending remote recv() calls will return with the
1737        remaining information.   Local recv / send calls will fail after this.
1738
1739      <Arguments>
1740        None
1741
1742      <Exceptions>
1743        None
1744
1745      <Side Effects>
1746        Pending local recv calls will either return or have an exception.
1747
1748      <Returns>
1749        True if this is the first close call to this socket, False otherwise.
1750    """
1751    # prevent TOCTOU race with client changing the object's properties
1752    mycommid = self.commid
1753    restrictions.assertisallowed('socket.close')
1754   
1755    # Armon: Semantic update, return whatever stopcomm does.
1756    # This will result in socket.close() returning a True/False indicator
1757    return stopcomm(mycommid)
1758
1759
1760
1761  def recv(self,bytes):
1762    """
1763      <Purpose>
1764        Receives data from a socket.   It may receive fewer bytes than
1765        requested.   
1766
1767      <Arguments>
1768        bytes:
1769           The maximum number of bytes to read.   
1770
1771      <Exceptions>
1772        Exception if the socket is closed either locally or remotely.
1773
1774      <Side Effects>
1775        This call will block the thread until the other side calls send.
1776
1777      <Returns>
1778        The data received from the socket (as a string).   If '' is returned,
1779        the other side has closed the socket and no more data will arrive.
1780    """
1781    # prevent TOCTOU race with client changing the object's properties
1782    mycommid = self.commid
1783    restrictions.assertisallowed('socket.recv',bytes)
1784
1785    # I set this here so that I don't screw up accounting with a keyerror later
1786    try:
1787      this_is_loopback = is_loopback(comminfo[mycommid]['remotehost'])
1788    # they likely closed the connection
1789    except KeyError:
1790      raise Exception, "Socket closed"
1791
1792    # wait if already oversubscribed
1793    if this_is_loopback:
1794      nanny.tattle_quantity('looprecv',0)
1795    else:
1796      nanny.tattle_quantity('netrecv',0)
1797
1798    datarecvd = 0
1799    # loop until we recv the information (looping is needed for Windows)
1800    while True:
1801      try:
1802        # the timeout is needed so that if the socket is closed in another
1803        # thread, we notice it
1804        # BUG: What should the timeout be?   What is the right value?
1805        #comminfo[mycommid]['socket'].settimeout(0.2)
1806       
1807        # Armon: Get the real socket
1808        realsocket = comminfo[mycommid]['socket']
1809       
1810        # Check if the socket is ready for reading
1811        (read_will_block, write_will_block) = socket_state(realsocket, "r", 0.2)       
1812        if not read_will_block:
1813          datarecvd = realsocket.recv(bytes)
1814          break
1815
1816      # they likely closed the connection
1817      except KeyError:
1818        raise Exception, "Socket closed"
1819
1820      # Catch all other exceptions, check if they are recoverable
1821      except Exception, e:
1822        # Check if this error is recoverable
1823        if is_recoverable_network_exception(e):
1824          continue
1825
1826        # Otherwise, raise the exception
1827        else:
1828          # Check if this is a connection termination
1829          if is_terminated_connection_exception(e):
1830            raise Exception("Socket closed")
1831          else:
1832            raise
1833
1834    # Armon: Calculate the length of the data
1835    data_length = len(datarecvd)
1836   
1837    # Raise an exception if there was no data
1838    if data_length == 0:
1839      raise Exception("Socket closed")
1840
1841    # do accounting here...
1842    if this_is_loopback:
1843      nanny.tattle_quantity('looprecv',data_length)
1844    else:
1845      nanny.tattle_quantity('netrecv',data_length)
1846
1847    return datarecvd
1848
1849
1850
1851  def send(self,message):
1852    """
1853      <Purpose>
1854        Sends data on a socket.   It may send fewer bytes than requested.   
1855
1856      <Arguments>
1857        message:
1858          The string to send.
1859
1860      <Exceptions>
1861        Exception if the socket is closed either locally or remotely.
1862
1863      <Side Effects>
1864        This call may block the thread until the other side calls recv.
1865
1866      <Returns>
1867        The number of bytes sent.   Be sure not to assume this is always the
1868        complete amount!
1869    """
1870    # prevent TOCTOU race with client changing the object's properties
1871    mycommid = self.commid
1872    restrictions.assertisallowed('socket.send',message)
1873
1874    # I factor this out because we must do the accounting at the bottom of this
1875    # function and I want to make sure we account properly even if they close
1876    # the socket right after their data is sent
1877    try:
1878      this_is_loopback = is_loopback(comminfo[mycommid]['remotehost'])
1879    except KeyError:
1880      raise Exception, "Socket closed!"
1881
1882    # wait if already oversubscribed
1883    if this_is_loopback:
1884      nanny.tattle_quantity('loopsend',0)
1885    else:
1886      nanny.tattle_quantity('netsend',0)
1887
1888    try:
1889      # Trim the message size to be less than the sendbuffersize.
1890      # This is a fix for http://support.microsoft.com/kb/823764
1891      message = message[:comminfo[mycommid]['sendbuffersize']-1]
1892    except KeyError:
1893      raise Exception, "Socket closed!"
1894
1895    # loop until we send the information (looping is needed for Windows)
1896    while True:
1897      try:
1898        # Armon: Get the real socket
1899        realsocket = comminfo[mycommid]['socket']
1900       
1901        # Check if the socket is ready for writing, wait 0.2 seconds
1902        (read_will_block, write_will_block) = socket_state(realsocket, "w", 0.2)
1903        if not write_will_block:
1904          bytessent = realsocket.send(message)
1905          break
1906     
1907      except KeyError:
1908        raise Exception, "Socket closed"
1909
1910      except Exception,e:
1911        # Determine if the exception is fatal
1912        if is_recoverable_network_exception(e):
1913          continue
1914        else:
1915          # Check if this is a conn. term., and give a more specific exception.
1916          if is_terminated_connection_exception(e):
1917            raise Exception("Socket closed")
1918          else:
1919            raise
1920
1921    if this_is_loopback:
1922      nanny.tattle_quantity('loopsend',bytessent)
1923    else:
1924      nanny.tattle_quantity('netsend',bytessent)
1925
1926    return bytessent
1927
1928
1929  # Checks if socket read/write operations will block
1930  def willblock(self):
1931    """
1932    <Purpose>
1933      Determines if a socket would block if send() or recv() was called.
1934
1935    <Exceptions>
1936      Socket Closed if the socket has been closed.
1937
1938    <Returns>
1939      A tuple, (recv_will_block, send_will_block) both are boolean values.
1940
1941    """
1942
1943    try:
1944      # Get the real socket
1945      realsocket = comminfo[self.commid]['socket']
1946
1947      # Call into socket_state with no timout to return instantly
1948      return socket_state(realsocket)
1949   
1950    # The socket is closed or in the process of being closed...
1951    except KeyError:
1952      raise Exception, "Socket closed"
1953
1954    except Exception, e:
1955      # Determine if the socket is closed
1956      if is_terminated_connection_exception(e):
1957        raise Exception("Socket closed")
1958     
1959      # Otherwise raise whatever we have
1960      else:
1961        raise
1962
1963
1964
1965  def __del__(self):
1966    cleanup(self.commid)
1967
1968
1969
1970# End of emulated_socket class
Note: See TracBrowser for help on using the browser.