Please note that the CVS and issue trackers have moved to GitHub. These Trac pages are no longer kept up-to-date.

root/seattle/trunk/deploymentscripts/deploy_threading.py@5637

Revision 2810, 14.4 KB (checked in by konp, 10 years ago)

Fixed high cpu usage, added summary tool, a number of other fixes and improvements as well. Note: blackbox node keys have not been added yet.

Line 
1"""
2<Program Name>
3  deploy_threading.py
4
5<Started>
6  May 2009
7
8<Author>
9  n2k8000@u.washington.edu
10  Konstantin Pik
11
12<Purpose>
13  This is the file contains threading-related functions that are used by the
14  deployment script.  This file is not to be executed by itself, but is to
15  be used with the deploy_* modules.
16
17<Usage>
18  See deploy_main.py.
19 
20"""
21
22import deploy_logging
23import deploy_helper
24import parallelize_repy
25import thread
26import time
27import os
28
29
30# the number of threads to launch max
31threadsmax = 25
32
33# Thread communication dictionary. Used for communication between threads
34thread_communications = {}
35
36# lock for the dictionary
37thread_communications_lock = thread.allocate_lock()
38
39# Set that the module hasn't been initialized yet
40thread_communications['init'] = False
41
42
43   
44def start_thread(callfunc, arguments, max_threads):
45  """
46  <Purpose>
47    Starts the worker threads and calls init() for this module.  This'll allow for
48    the timeout thread to function properly.
49   
50    intended to be called only from connect_and_do_work.
51   
52  <Arguments>
53    callfunc:
54      the function to call.
55    arguments:
56      the list of tuples where each tuple is (username, host) to connect to
57    max_threads:
58      the max # of threads to start.
59   
60  <Exceptions>
61    None.
62
63  <Side Effects>
64    None.
65
66  <Returns>
67    Boolean.
68   
69    True on success.
70    False on failure.
71  """
72  init()
73  func_handle = parallelize_repy.parallelize_initfunction(arguments, callfunc, max_threads) 
74  thread_communications['func_handle'] = func_handle
75  thread_communications['hosts_left'] = arguments[:]
76  thread_communications['total_hosts'] = len(arguments)
77
78
79
80def has_unreachable_hosts():
81  """
82  <Purpose>
83    Checks to see if we have any hosts sitting in our list that contains
84    the unreachable hosts.
85   
86  <Arguments>
87    None.   
88   
89  <Exceptions>
90    None.
91
92  <Side Effects>
93    None.
94
95  <Returns>
96    Boolean.
97   
98    True on success.
99    False on failure.
100  """
101  # if there is at least one entry in the list, then we have at least
102  # one host that was unreachable.
103  return len(thread_communications['unreachable_host']) > 0
104
105
106def destroy():
107  """
108  <Purpose>
109    Resets this module to it's initial state, opposite of init().
110   
111  <Arguments>
112    None.   
113   
114  <Exceptions>
115    On any type of exception it'll return false.
116
117  <Side Effects>
118    None.
119
120  <Returns>
121    Boolean.
122   
123    True on success.
124    False on failure.
125  """
126  try:
127    # initialize, keep track of how many threads are running
128    thread_communications['threads_running'] = 0
129
130    # set the kill flag to false and start the thread monitoring pids
131    thread_communications['kill_flag'] = True
132   
133    # tells the module it has been initialized
134    thread_communications['init'] = False
135  except Exception, e:
136    print e
137    return False
138  else:
139    return True
140
141
142def init():
143  """
144  <Purpose>
145    Initializes all the globals and things to the default values and
146    starts the thread that deals with killing processes started that
147    have timed out.
148   
149  <Arguments>
150    None.   
151   
152  <Exceptions>
153    Critical exception thrown if thread monitor could not be started.
154
155  <Side Effects>
156    None.
157
158  <Returns>
159    Boolean.
160   
161    True on success.
162    False on failure.
163  """
164
165  # initialize, keep track of how many threads are running
166  thread_communications['threads_running'] = 0
167
168  # set the kill flag to false and start the thread monitoring pids
169  thread_communications['kill_flag'] = False
170 
171  # tells the module it has been initialized
172  thread_communications['init'] = True
173 
174  try:
175    thread.start_new_thread(pid_timeout, ())
176  except Exception, e:
177    deploy_logging.logerror("Trouble starting pid thread monitor")
178    return False
179    # clone my hosts: this entry in the dict will keep track of what elements'
180
181  return True
182
183
184
185def node_was_reachable(node):
186  """
187  <Purpose>
188    Checks to see if the node was reachable, or if it's in the unreachable
189    pile.
190
191  <Arguments>
192    node:
193      A list with a single element which is a tuple containing
194      (username, hostname).
195
196  <Exceptions>
197    None.
198
199  <Side Effects>
200    None.
201
202  <Returns>
203    Boolean.
204   
205    True on node was reachable.
206    False on node was unreachable.
207  """ 
208  # if the node is in not found inside the unreachable_hosts set, then it was
209  # obviously reachable
210  return node not in set(thread_communications['unreachable_host'])
211
212
213
214def add_instructional_node(node):
215  """
216  <Purpose>
217    Adds an instructional machine to the list of instructional machines.
218    Creates the array if necessary.
219
220  <Arguments>
221    Node:
222      A list with a single element which is a tuple containing
223      (username, hostname).
224
225  <Exceptions>
226    None.
227
228  <Side Effects>
229    Modifies a list in the global dictionary
230
231  <Returns>
232    None.
233  """ 
234 
235  # adds a node to the instructional machine nodelist
236  if 'instructional_machines' in thread_communications.keys():
237  # it exists, just add
238    thread_communications['instructional_machines'].append(node)
239  else:
240    # doesn't exist, create array, then add
241    thread_communications['instructional_machines'] = [node]
242
243
244def subtract_host_left(hosts, sub_counter = True):
245  """
246  <Purpose>
247    Subtracts the finished host from the list of hosts that are still running.
248
249  <Arguments>
250    hosts:
251      List of tuple of form (username, hostname) that have finished.
252    sub_counter:
253      Tells us to subtract the running threads counter.
254
255  <Exceptions>
256    None.
257
258  <Side Effects>
259    Modifies a list in the global dictionary
260
261  <Returns>
262    None.
263  """ 
264 
265  # subtract us from the running hosts
266  # basically convert host (which is a tuple of (username, hostname)) to a set
267  # and then convert hosts_left entry in the list to a set as well and then
268  # perform set subtraction and cast back to a list.
269  threading_lock()
270  thread_communications['hosts_left'] =\
271      list(set(thread_communications['hosts_left']) - set(hosts))
272  threading_unlock()
273  if sub_counter:
274    threading_lock_and_sub()
275
276
277
278def add_unreachable_host(host_tuple):
279  """
280  <Purpose>
281    Adds host (which is a (username, host) tuple) to the list of failed #
282    of hosts in the dictionary
283
284  <Arguments>
285    None.
286
287  <Exceptions>
288    None.
289
290  <Side Effects>
291    Modifies a list in the global dictionary
292
293  <Returns>
294    None.
295  """
296
297  thread_communications['unreachable_host'].append(host_tuple)
298
299
300
301def threading_currentlyrunning():
302  """
303  <Purpose>
304    Helper method for figuring out if the function is still running or not.
305
306  <Arguments>
307    None.
308
309  <Exceptions>
310    None.
311
312  <Side Effects>
313    None.
314
315  <Returns>
316    Boolean. Is the function done or not?
317  """
318 
319  # grab the function handle and then the results of that function
320  func_handle = thread_communications['func_handle']
321 
322  threads_running = parallelize_repy.parallelize_isfunctionfinished(func_handle)
323  results_dict = parallelize_repy.parallelize_getresults(func_handle)
324  size = thread_communications['total_hosts']
325  print str(len(results_dict['aborted']))+' aborted, '+str(len(results_dict['exception']))+\
326      ' exceptioned, '+str(len(results_dict['returned']))+' finished of '+str(size)+' total.'
327
328  return not threads_running
329
330
331
332def threading_lock():
333  """
334  <Purpose>
335    Helper method for getting for locking the thread dictionary
336
337  <Arguments>
338    None.
339
340  <Exceptions>
341    None.
342
343  <Side Effects>
344    None.
345
346  <Returns>
347    None.
348  """
349
350  thread_communications_lock.acquire()
351  return
352
353
354
355def threading_unlock():
356  """
357  <Purpose>
358    Helper method for getting for unlocking the thread dictionary
359
360  <Arguments>
361    None.
362
363  <Exceptions>
364    None.
365
366  <Side Effects>
367    None.
368
369  <Returns>
370    None.
371  """
372  global thread_communications_lock
373  thread_communications_lock.release()
374  return
375
376
377
378def threading_lock_and_add():
379  """
380  <Purpose>
381    Helper method for getting for changing the number of running threads by
382      incrementing our counter by 1. Takes care of locking the dictionary and
383      unlocking it.
384
385  <Arguments>
386    None.
387
388  <Exceptions>
389    None.
390
391  <Side Effects>
392    None.
393
394  <Returns>
395    None.
396  """
397  global thread_communications
398  threading_lock()
399  current_number = thread_communications['threads_running']
400  thread_communications['threads_running'] = current_number + 1
401  threading_unlock()
402  return
403
404
405
406def threading_lock_and_sub():
407  """
408  <Purpose>
409    Helper method for getting for changing the number of running threads by
410      decrementing our counter by 1. Takes care of locking the dictionary and
411      unlocking it.
412
413  <Arguments>
414    None.
415
416  <Exceptions>
417    None.
418
419  <Side Effects>
420    None.
421
422  <Returns>
423    None.
424  """
425  global thread_communications
426  threading_lock()
427  current_number = thread_communications['threads_running']
428  thread_communications['threads_running'] = current_number - 1
429  threading_unlock()
430  return
431
432 
433 
434def remove_host_from_hosts_left(user, remote_host):
435  """
436  <Purpose>
437    This function is a helper which removes the host from the hosts_left
438    array which keeps track of the hosts which are left to process.
439
440
441  <Arguments>
442    remote_host:
443      the remote host to remove
444
445  <Exceptions>
446    ValueError: occurs when the host to be removed is not in the array
447
448  <Side Effects>
449    None.
450
451  <Returns>
452    None.
453  """
454  try:
455    thread_communications['hosts_left'].remove((user, remote_host))
456  except ValueError, e:
457    # host is already removed, keep going
458    pass
459  except Exception, e:
460    print e
461    deploy_logging.logerror("Error in remove_host_from_hosts_left: "+str(e))
462  else:
463    # no error, decrease the running thread count
464    threading_lock_and_sub()
465
466 
467def pid_timeout():
468  """
469  <Purpose>
470    This function is intented to be called once and supposed to run on a
471    separate thread. Until the 'kill' flag is set, it will spin and see
472    which pid's need to be killed.
473   
474    All process IDs are set via the set_pid_timeout method.
475
476  <Arguments>
477    None.
478
479  <Exceptions>
480    OSError: the process no longer exists, ignore
481    ValueError: when removing host from running hosts this means that the
482      host has already been terminated.
483    Any other exception is unexpected
484
485  <Side Effects>
486    None.
487
488  <Returns>
489    None.
490  """
491  # keeps spinning and sleeping, checking which PIDs need to be killed
492  thread_communications['running_process_ids'] = []
493  # while the kill flag is false. Kill flag is modified right before
494  # exit
495  while not thread_communications['kill_flag']:
496    # sleep and wakeup every couple seconds.
497    time.sleep(5)
498    # this list will keep track of the pids that we've killed
499    killed_pids = []
500   
501    # check the running_process_ids and see if any of them have expired
502    for each_process in thread_communications['running_process_ids']:
503      # each process is a tuple that consists of (pid, expiretime, hostname, username)
504      process_to_kill = each_process[0]
505      expire_time = each_process[1]
506      remote_host = each_process[2]
507      user = each_process[3]
508      # if the current time is past the set expire time then we need to try and kill it
509      if expire_time <= time.time():
510        # try to kill process
511        try:
512          # check if process is still running
513          if os.path.exists('/proc/'+str(process_to_kill)):
514            os.kill(process_to_kill, 9)
515            killed_pids.append(each_process)
516            # sleep a second, and then check that the process was killed. if
517            # not, try a 2nd and third time
518            time.sleep(1)
519            if os.path.exists('/proc/'+str(process_to_kill)):
520              # try os.kill again, and if that doesn't work, use shellexec method
521              os.kill(process_to_kill, 9)
522              time.sleep(1)
523              if os.path.exists('/proc/'+str(process_to_kill)):
524                deploy_helper.shellexec2('kill -9 '+str(process_to_kill))
525                time.sleep(1)
526            if remote_host:
527              deploy_logging.logerror("Forced kill of PID "+str(process_to_kill)+" due to timeout! The host"+\
528                      " on this thread is "+remote_host)
529            else:
530              deploy_logging.logerror("Forced kill of PID "+str(process_to_kill)+" due to timeout!")
531            # subtract from out running thread count and remove host
532            subtract_host_left([(user, remote_host)])
533          else:
534            # the process is dead, just remove host from hosts_left just in case, and
535            # remove from running pids as well, but dont sub the # of threads
536            killed_pids.append(each_process)
537            subtract_host_left([(user, remote_host)], False)
538           
539        except OSError, ose:
540          # this means no pid found and process has most likely
541          # already terminated
542          deploy_logging.logerror("Process"+str(process_to_kill)+"("+remote_host+") is already done.")
543          subtract_host_left([(user, remote_host)], False)
544          pass
545        except Exception, e:
546          deploy_logging.logerror("Unexpected error in pid_timeout thread "+\
547            "while killing a child process: "+str(e))
548    if killed_pids:
549      # remove the killed pids from the list
550      threading_lock()
551      thread_communications['running_process_ids'] =\
552          list(set(thread_communications['running_process_ids']) - set(killed_pids))
553      threading_unlock()
554       
555def monitor_timeout(pid, sleep_time, remote_host, user):
556  # launches a new thread and allows the parent thread to be not block
557  # basically this covers up set_pid_timeout but launches on a different
558  # thread
559
560  thread.start_new_thread(set_pid_timeout, (pid, 2 * int(sleep_time), 
561      remote_host, user))
562  return
563             
564
565def set_pid_timeout(process_to_kill, sleep_time, remote_host, user):
566  """
567  <Purpose>
568    This function sets a timeout adds it to a list that's monitored on a
569    separate thread that checks to see when the process is supposed to
570    timeout.
571
572  <Arguments>
573    process_to_kill:
574      the process ID to kill
575    sleep_time:
576      sleep time in seconds
577    remote_host:
578      The remote host that is on the other 'side' of this connection
579      (as all process are ssh/scp)
580    user:
581      the username to log in as
582
583  <Exceptions>
584    None.
585
586  <Side Effects>
587    None.
588
589  <Returns>
590    None.
591  """
592 
593  # add this pid to the list of running PIDs. Sometimes the program exits and forgets
594  # about this thread and so it keep running, we don't want that. By adding it to this
595  # list, we ensure that it'll get killed from the thread-monitor thread.
596  threading_lock()
597  tuple_to_add = (process_to_kill, time.time()+sleep_time, remote_host, user)
598  thread_communications['running_process_ids'].append(tuple_to_add)
599  threading_unlock()
Note: See TracBrowser for help on using the browser.