Changeset 3341

Show
Ignore:
Timestamp:
01/10/10 20:12:58 (10 years ago)
Author:
armon
Message:

Added IPC for repy on *NIX systems. We now transmit diskused and stop time info. Cleanup of Windows CPU throttling code, and storage of stoptimes. Added the private implementation of get_resources. NOTE: This is NOT a client facing API yet.

Location:
seattle/trunk/repy
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • seattle/trunk/repy/nonportable.py

    r3232 r3341  
    5151# This gives us our restrictions information 
    5252import nanny_resource_limits 
     53 
     54# This is used for IPC 
     55import marshal 
    5356 
    5457# This will fail on non-windows systems 
     
    259262  # Return the new elapsedtime 
    260263  return elapsedtime 
    261    
     264  
     265 
     266# This lock is used to serialize calls to get_resouces 
     267get_resources_lock = threading.Lock() 
     268 
     269# These are the resources we expose in get_resources 
     270exposed_resources = set(["cpu","memory","diskused","events", 
     271                     "filewrite","fileread","filesopened", 
     272                     "insockets","outsockets","netsend", 
     273                     "netrecv","loopsend","looprecv", 
     274                     "lograte","random","messport","connport"]) 
     275             
     276# These are the resources that we don't flatten using 
     277# len() for the usage. For example, instead of given the 
     278# set of thread's, we flatten this into N number of threads. 
     279flatten_exempt_resources = set(["connport","messport"]) 
     280 
     281# Cache the disk used from the external process 
     282cached_disk_used = 0L 
     283 
     284# This array holds the times that repy was stopped. 
     285# It is an array of tuples, of the form (time, amount) 
     286# where time is when repy was stopped (from getruntime()) and amount 
     287# is the stop time in seconds. The last process_stopped_max_entries are retained 
     288process_stopped_timeline = [] 
     289process_stopped_max_entries = 100 
     290 
     291# Method to expose resource limits and usage 
     292def get_resources(): 
     293  """ 
     294  <Purpose> 
     295    Returns the resouce utilization limits as well 
     296    as the current resource utilization. 
     297 
     298  <Arguments> 
     299    None. 
     300 
     301  <Returns> 
     302    A tuple of dictionaries and an array (limits, usage, stoptimes). 
     303 
     304    Limits is the dictionary which maps the resouce name 
     305    to its maximum limit. 
     306 
     307    Usage is the dictionary which maps the resource name 
     308    to its current usage. 
     309 
     310    Stoptimes is an array of tuples with the times which the Repy proces 
     311    was stopped and for how long, due to CPU over-use. 
     312    Each entry in the array is a tuple (TOS, Sleep Time) where TOS is the 
     313    time of stop (respective to getruntime()) and Sleep Time is how long the 
     314    repy process was suspended. 
     315 
     316    The stop times array holds a fixed number of the last stop times. 
     317    Currently, it holds the last 100 stop times. 
     318  """ 
     319  # Acquire the lock 
     320  get_resources_lock.acquire() 
     321 
     322  # Construct the dictionaries as copies from nanny 
     323  limits = nanny_resource_limits.resource_restriction_table.copy() 
     324  usage = nanny_resource_limits.resource_consumption_table.copy() 
     325 
     326  # These are the type we need to copy or flatten 
     327  check_types = set([list,dict,set]) 
     328 
     329  # Check the limits dictionary for bad keys 
     330  for resource in limits.keys(): 
     331    # Remove any resources we should not expose 
     332    if resource not in exposed_resources: 
     333      del limits[resource] 
     334 
     335    # Check the type 
     336    if type(limits[resource]) in check_types: 
     337      # Copy the data structure 
     338      limits[resource] = limits[resource].copy() 
     339 
     340  # Check the usage dictionary 
     341  for resource in usage.keys(): 
     342    # Remove any resources that are not exposed 
     343    if resource not in exposed_resources: 
     344      del usage[resource] 
     345 
     346    # Check the type, copy any data structures 
     347    # Flatten any structures using len() other than 
     348    # "connport" and "messport" 
     349    if type(usage[resource]) in check_types: 
     350      # Check if they are exempt from flattening, store a shallow copy 
     351      if resource in flatten_exempt_resources: 
     352        usage[resource] = usage[resource].copy() 
     353 
     354      # Store the size of the data set 
     355      else: 
     356        usage[resource] = len(usage[resource]) 
     357     
     358 
     359 
     360  # Calculate all the usage's 
     361  pid = os.getpid() 
     362 
     363  # Get CPU and memory, this is thread specific 
     364  if ostype in ["Linux", "Darwin"]: 
     365     
     366    # Get CPU first, then memory 
     367    usage["cpu"] = os_api.get_process_cpu_time(pid) 
     368 
     369    # This uses the cached PID data from the CPU check 
     370    usage["memory"] = os_api.get_process_rss() 
     371 
     372    # Get the thread specific CPU usage 
     373    usage["threadcpu"] = os_api.get_current_thread_cpu_time()  
     374 
     375 
     376  # Windows Specific versions 
     377  elif ostype in ["Windows","WindowsCE"]: 
     378     
     379    # Get the CPU time 
     380    usage["cpu"] = windows_api.get_process_cpu_time(pid) 
     381 
     382    # Get the memory, use the resident set size 
     383    usage["memory"] = windows_api.process_memory_info(pid)['WorkingSetSize']  
     384 
     385    # Get thread-level CPU  
     386    usage["threadcpu"] = windows_api.get_current_thread_cpu_time() 
     387 
     388  # Unknown OS 
     389  else: 
     390    raise EnvironmentError("Unsupported Platform!") 
     391 
     392  # Use the cached disk used amount 
     393  usage["diskused"] = cached_disk_used 
     394 
     395  # Release the lock 
     396  get_resources_lock.release() 
     397 
     398  # Copy the stop times 
     399  stoptimes = process_stopped_timeline[:] 
     400 
     401  # Return the dictionaries and the stoptimes 
     402  return (limits,usage,stoptimes) 
     403 
     404 
    262405###################     Windows specific functions   ####################### 
    263406 
     
    326469  # get use information and time... 
    327470  now = getruntime() 
    328   usedata = windows_api.process_times(pid) 
    329  
    330   # Add kernel and user time together...   It's in units of 100ns so divide 
    331   # by 10,000,000 
    332   usertime = (usedata['KernelTime'] + usedata['UserTime'] ) / 10000000.0 
     471 
     472  # Get the total cpu time 
     473  usertime = windows_api.get_process_cpu_time(pid) 
     474 
    333475  useinfo = [usertime, now] 
    334476 
     
    358500  stoptime = nanny_resource_limits.calculate_cpu_sleep_interval(cpulim, percentused,elapsedtime) 
    359501 
    360   # Call new api to suspend/resume process and sleep for specified time 
    361   if windows_api.timeout_process(pid, stoptime): 
    362     # Return how long we slept so parent knows whether it should sleep 
    363     return stoptime 
     502  if stoptime > 0.0: 
     503    # Try to timeout the process 
     504    if windows_api.timeout_process(pid, stoptime): 
     505      # Log the stoptime 
     506      process_stopped_timeline.append((now, stoptime)) 
     507 
     508      # Drop the first element if the length is greater than the maximum entries 
     509      if len(process_stopped_timeline) > process_stopped_max_entries: 
     510        process_stopped_timeline.pop(0) 
     511 
     512      # Return how long we slept so parent knows whether it should sleep 
     513      return stoptime 
     514   
     515    else: 
     516      # Process must have been making system call, try again next time 
     517      return -1 
     518   
     519  # If the stop time is 0, then avoid calling timeout_process 
    364520  else: 
    365     # Process must have been making system call, try again next time 
    366     return -1 
     521    return 0.0 
    367522     
    368523             
     
    416571 
    417572##############     *nix specific functions (may include Mac)  ############### 
    418                  
     573 
     574# This method handles messages on the "diskused" channel from 
     575# the external process. When the external process measures disk used, 
     576# it is piped in and cached for calls to getresources. 
     577def IPC_handle_diskused(bytes): 
     578  cached_disk_used = bytes 
     579 
     580 
     581# This method handles meessages on the "repystopped" channel from 
     582# the external process. When the external process stops repy, it sends 
     583# a tuple with (TOS, amount) where TOS is time of stop (getruntime()) and 
     584# amount is the amount of time execution was suspended. 
     585def IPC_handle_stoptime(info): 
     586  # Push this onto the timeline 
     587  process_stopped_timeline.append(info) 
     588 
     589  # Drop the first element if the length is greater than the max 
     590  if len(process_stopped_timeline) > process_stopped_max_entries: 
     591    process_stopped_timeline.pop(0) 
     592 
     593 
    419594# Use a special class of exception for when 
    420595# resource limits are exceeded 
     
    423598 
    424599 
    425 # Armon: A simple thread to check for the parent process 
    426 # and exit repy if the parent terminates 
     600# Armon: Method to write a message to the pipe, used for IPC. 
     601# This allows the pipe to be multiplexed by sending simple dictionaries 
     602def write_message_to_pipe(writehandle, channel, data): 
     603  """ 
     604  <Purpose> 
     605    Writes a message to the pipe 
     606 
     607  <Arguments> 
     608    writehandle: 
     609        A handle to a pipe which can be written to. 
     610 
     611    channel: 
     612        The channel used to describe the data. Used for multiplexing. 
     613 
     614    data: 
     615        The data to send. 
     616 
     617  <Exceptions> 
     618    As with os.write() 
     619    EnvironmentError will be thrown if os.write() sends 0 bytes, indicating the 
     620    pipe is broken. 
     621  """ 
     622  # Construct the dictionary 
     623  mesg_dict = {"ch":channel,"d":data} 
     624 
     625  # Convert to a string 
     626  mesg_dict_str = marshal.dumps(mesg_dict) 
     627 
     628  # Make a full string 
     629  mesg = str(len(mesg_dict_str)) + ":" + mesg_dict_str 
     630 
     631  # Send this 
     632  index = 0 
     633  while index < len(mesg): 
     634    bytes = os.write(writehandle, mesg[index:]) 
     635    if bytes == 0: 
     636      raise EnvironmentError, "Write send 0 bytes! Pipe broken!" 
     637    index += bytes 
     638 
     639 
     640# Armon: Method to read a message from the pipe, used for IPC. 
     641# This allows the pipe to be multiplexed by sending simple dictionaries 
     642def read_message_from_pipe(readhandle): 
     643  """ 
     644  <Purpose> 
     645    Reads a message from a pipe. 
     646 
     647  <Arguments> 
     648    readhandle: 
     649        A handle to a pipe which can be read from 
     650 
     651  <Exceptions> 
     652    As with os.read(). 
     653    EnvironmentError will be thrown if os.read() returns a 0-length string, indicating 
     654    the pipe is broken. 
     655 
     656  <Returns> 
     657    A tuple (Channel, Data) where Channel is used to multiplex the pipe. 
     658  """ 
     659  # Read until we get to a colon 
     660  data = "" 
     661  index = 0 
     662 
     663  # Loop until we get a message 
     664  while True: 
     665 
     666    # Read in data if the buffer is empty 
     667    if index >= len(data): 
     668      # Read 8 bytes at a time 
     669      mesg = os.read(readhandle,8) 
     670      if len(mesg) == 0: 
     671        raise EnvironmentError, "Read returned emtpy string! Pipe broken!" 
     672      data += mesg 
     673 
     674    # Increment the index while there is data and we have not found a colon 
     675    while index < len(data) and data[index] != ":": 
     676      index += 1 
     677 
     678    # Check if we've found a colon 
     679    if len(data) > index and data[index] == ":": 
     680      # Get the message length 
     681      mesg_length = int(data[:index]) 
     682 
     683      # Determine how much more data we need 
     684      more_data = mesg_length - len(data) + index + 1 
     685 
     686      # Read in the rest of the message 
     687      while more_data > 0: 
     688        mesg = os.read(readhandle, more_data) 
     689        if len(mesg) == 0: 
     690          raise EnvironmentError, "Read returned emtpy string! Pipe broken!" 
     691        data += mesg 
     692        more_data -= len(mesg) 
     693 
     694      # Done, convert the message to a dict 
     695      whole_mesg = data[index+1:] 
     696      mesg_dict = marshal.loads(whole_mesg) 
     697 
     698      # Return a tuple (Channel, Data) 
     699      return (mesg_dict["ch"],mesg_dict["d"]) 
     700 
     701 
     702 
     703# This dictionary defines the functions that handle messages 
     704# on each channel. E.g. when a message arrives on the "repystopped" channel, 
     705# the IPC_handle_stoptime function should be invoked to handle it. 
     706IPC_HANDLER_FUNCTIONS = {"repystopped":IPC_handle_stoptime, 
     707                         "diskused":IPC_handle_diskused } 
     708 
     709 
     710# This thread checks that the parent process is alive and invokes 
     711# delegate methods when messages arrive on the pipe. 
    427712class parent_process_checker(threading.Thread): 
    428713  def __init__(self, readhandle): 
     
    441726 
    442727  def run(self): 
    443     # Attempt to read 8 bytes from the pipe, this should block until we end execution 
    444     try: 
    445       mesg = os.read(self.readhandle,8) 
    446     except: 
    447       # It is possible we got an interrupted system call (on FreeBSD) when the parent is killed 
    448       mesg = "" 
     728    # Run forever 
     729    while True: 
     730      # Read a message 
     731      try: 
     732        mesg = read_message_from_pipe(self.readhandle) 
     733      except Exception, e: 
     734        break 
     735 
     736      # Check for a handler function 
     737      if mesg[0] in IPC_HANDLER_FUNCTIONS: 
     738        # Invoke the handler function with the data 
     739        handler = IPC_HANDLER_FUNCTIONS[mesg[0]] 
     740        handler(mesg[1]) 
     741 
     742      # Print a message if there is a message on an unknown channel 
     743      else: 
     744        print "[WARN] Message on unknown channel from parent process:", mesg[0] 
     745 
     746 
     747    ### We only leave the loop on a fatal error, so we need to exit now 
    449748 
    450749    # Write out status information, our parent would do this, but its dead. 
    451750    statusstorage.write_status("Terminated")   
    452      
    453     # Check the message. If it is the empty string the pipe was closed,  
    454     # if there is any data, this is unexpected and is also an error. 
    455     if mesg == "": 
    456       print >> sys.stderr, "Monitor process died! Terminating!" 
    457       harshexit.harshexit(70) 
    458     else: 
    459       print >> sys.stderr, "Unexpectedly received data! Terminating!" 
    460       harshexit.harshexit(71) 
     751    print >> sys.stderr, "Monitor process died! Terminating!" 
     752    harshexit.harshexit(70) 
    461753 
    462754 
     
    521813     
    522814    # Launch the resource monitor, if it fails determine why and restart if necessary 
    523     resource_monitor(childpid) 
     815    resource_monitor(childpid, writehandle) 
    524816     
    525817  except ResourceException, exp: 
     
    544836      raise 
    545837   
    546 def resource_monitor(childpid): 
     838def resource_monitor(childpid, pipe_handle): 
    547839  """ 
    548840  <Purpose> 
     
    553845    childpid: 
    554846      The child pid, e.g. the pid of repy 
     847 
     848    pipe_handle: 
     849      A handle to the pipe to the repy process. Allows sending resource use information. 
    555850  """ 
    556851  # Get our pid 
     
    559854  # Calculate how often disk should be checked 
    560855  disk_interval = int(repy_constants.RESOURCE_POLLING_FREQ_LINUX / repy_constants.CPU_POLLING_FREQ_LINUX) 
    561   current_interval = 0 # What cycle are we on   
     856  current_interval = -1 # What cycle are we on   
    562857   
    563858  # Store time of the last interval 
     
    611906      # Save the resume time 
    612907      resume_time = getruntime() 
     908 
     909      # Send this information as a tuple containing the time repy was stopped and 
     910      # for how long it was stopped 
     911      write_message_to_pipe(pipe_handle, "repystopped", (currenttime, stoptime)) 
    613912       
    614913     
     
    641940      if diskused > nanny_resource_limits.resource_limit("diskused"): 
    642941        raise ResourceException, "Disk use '"+str(diskused)+"' over limit '"+str(nanny_resource_limits.resource_limit("diskused"))+"'." 
     942 
     943      # Send the disk usage information, raw bytes used 
     944      write_message_to_pipe(pipe_handle, "diskused", diskused) 
    643945     
    644946    ########### End Check Disk ########### 
  • seattle/trunk/repy/repy.py

    r3263 r3341  
    7373import tracebackrepy 
    7474 
    75  
     75import nonportable 
    7676 
    7777# This block allows or denies different actions in the safe module.   I'm  
     
    152152  usercontext["_context"] = usercontext 
    153153 
     154  # BAD:REMOVE 
     155  usercontext["getresources"] = nonportable.get_resources 
    154156       
    155157  # grab the user code from the file