Please note that the CVS and issue trackers have moved to GitHub. These Trac pages are no longer kept up-to-date.

root/seattle/trunk/repy/apps/mapreduce/mappri.repy

Revision 1806, 16.7 KB (checked in by alpers, 10 years ago)

mapred - the whole pass works now, yayyy

  • Property svn:executable set to *
Line 
1# MapReduce for python/repy!
2#
3#
4# TODO list:
5#
6#
7
8#begin include timeout_socket.repy
9"""
10<Description>
11  A socket that guarentees the receipt of a message.   Raises TimeoutError if it does not
12  receive any message before a given timeout.
13  If actually receives the message, returns the message and continues.
14
15<Usage>
16  Text-replacable for Repy Sockets:
17    timeout_openconn(desthost, destport, localip=None, localport=None, timeout = 5)
18    timeout_waitforconn(localip, localport, function)
19
20  Object:
21    sockobj.send(data)
22    sockobj.recv(bytes)
23    sockobj.close()
24
25<Date>
26  Sun Mar  1 10:27:35 PST 2009
27
28<Example>
29  # hello world
30  include sockettimer.repy
31
32  def callback(ip, port, timeout_sockobj, commhandle, listenhandle):
33    hw_message = timeout_sockobj.recv(1047)
34
35    # cleanup
36    stopcomm(commhandle)
37    stopcomm(listenhandle)
38    timeout_sockobj.close()
39
40    print hw_message # => "hello world!"
41 
42  def server():
43    sockobj = timeout_waitforconn(getmyip(), 12345, callback)
44
45  def client():
46    sockobj = timeout_openconn(getmyip(), 12345)
47    sockobj.send("hello world!")
48
49  def main():
50    server()
51    client()
52    exitall()
53
54  if callfunc == 'initialize':
55    main()
56"""
57
58class SocketTimeoutError(Exception):
59  """The socket timed out before receiving a response"""
60
61def timeout_openconn(desthost, destport, localip=None, localport=None, timeout = 5):
62  """
63  <Purpose>
64    Wrapper for Repy like socket interface
65
66  <Args>
67    Same as Repy openconn
68
69  <Exception>
70    Timeout exception if the dest address doesnt respond.
71
72  <Returns>
73    socket obj on success
74  """
75
76  tsock = TimeoutSocket()
77  tsock.settimeout(timeout)
78  if localip and localport:
79    tsock.bind((localip, localport))
80  tsock.connect((desthost, destport))
81  return tsock
82
83def timeout_waitforconn(localip, localport, function):
84  """
85  <Purpose>
86    Wrapper for Repy like socket interface
87
88  <Args>
89    Same as Repy waitforconn
90
91  <Side Effects>
92    Sets up event listener which calls function on messages.
93
94  <Returns>
95    Handle to listener.
96  """
97
98  tsock = TimeoutSocket()
99  tsock.bind((localip, localport))
100  tsock.setcallback(function)
101  return tsock.listen()
102
103class TimeoutSocket:
104  """
105  <Purpose>
106    Provide an socket object like the Repy usual one.
107
108  <Side Effects>
109    Uses a getlock() to watch for a timeout
110    Uses waitforconn and openconn to simulate socket
111  """
112
113  ################
114  # Constructors
115  ################
116
117  def __init__(self):
118    """ Constructor for socket """
119#    self.lock = getlock() # general lock BUG: Do we need to lock everything?
120    self.timeout_lock = getlock() # special lock for Timeout condition
121    self.timeout = 5 # seconds to wait
122
123    # user vars   
124    self.local_address = None # ip, port
125    self.remote_address = None # ip, port
126    self.callback = None # the user's function to call
127
128    # repy socket vars
129    self.sockobj = None #  the Repy socket
130    self.commhandle = None # the current comm
131    self.listencommhandle = None # the listener comm
132
133  ################
134  # Mutator methods
135  #################
136
137  def settimeout(self, value):
138    """ Setter for timeout"""
139    self.timeout = value
140
141  def setcallback(self, function):
142    """ Setter for callback function"""
143    self.callback = function
144
145  ####################
146  # Public Methods
147  ####################
148
149  def bind(self, local_address = None):
150    """
151    <Purpose>
152      Set local address
153
154    <Args>
155      Tuple of (ip, port) local.
156    """
157    self.local_address = local_address
158
159  def listen(self):
160    """
161    <Purpose>
162      Listen for peer
163   
164    <Side Effects>
165      Calls Repy waitforconn()
166    """
167    return self._waitforconn()
168
169  def connect(self, remote_address):
170    """
171    <Purpose>
172      Connect to peer.
173
174    <Args>
175      Tuple of (ip, port) remote.
176   
177    <Side Effects>
178      Calls Repy openconn.
179    """
180    self.remote_address = remote_address
181    self._openconn()
182
183  def recv(self, maxLen): # timeout as optional arg ???
184    """
185    <Purpose>
186      If it fails to finish within the timeout, I close the socket and raise a
187      TimeoutError exception. I.e. if there's no message, we call it an error
188      and raise it.
189     
190    <Arguments>
191      maxLen - bytes to recv
192
193    <Exception>
194      Raises TimeoutError exception if the recv times out
195      without receiving a message.
196
197    <Side Effects>
198      Closes the connection if times out.
199
200    <Returns>
201      The message.
202    """
203    return self._recv_or_close(maxLen)
204
205  def send(self, data):
206    """
207    <Purpose>
208      Just like normal Repy socket.  Sends messages.
209     
210    <Arguments>
211      data - the string message
212
213    <Exception>
214      Same as Repy socket.
215 
216    <Returns>
217      The bytes sent.
218    """
219    return self._send(data)
220
221  def close(self):
222    self.local_address = None # ip, port
223    self.remote_address = None # ip, port
224    self.callback = None # the user's function to call
225
226    self.sockobj.close()
227    self.sockobj = None #  the Repy socket
228    stopcomm(self.commhandle)
229    self.commhandle = None # the current comm
230    stopcomm(self.listencommhandle)
231    self.listencommhandle = None # the listener comm
232
233
234  ########################
235  # Private
236  #########################
237
238  def _openconn(self):
239    """Handle current state variables and call Repy openconn."""
240
241    destip, destport = self.remote_address
242    if self.local_address:
243      srcip, srcport = self.local_address
244      self.sockobj = openconn(destip, destport, srcip, srcport, self.timeout)
245    else:
246      self.sockobj = openconn(destip, destport)
247
248  def _waitforconn(self):
249    """Setup way between Repy waitforconn event"""
250    localip, localport = self.local_address
251    self.listencommhandle = waitforconn(localip, localport, self._callback)
252    return self.listencommhandle
253
254  def _callback(self, ip, port, sockobj, ch, lh):
255    """Pass on through to user callback"""
256    self.sockobj = sockobj
257    self.listencommhandle = lh # same as the 1st from wait for comm, right?
258    self.commhandle = ch # should we care?
259   
260    print "sockettimeout|remote_address:", self.remote_address
261    if not self.remote_address:
262      self.remote_address = (ip, port)
263    else: 
264      raise Exception("what! peer does not match?")
265
266    self.callback(ip, port, self, ch, lh)
267
268  def _send(self, data):
269    """Send data"""
270    return self.sockobj.send(data)
271
272  def _recv(self, maxLen):
273    """Recv data of length maxLen"""
274    return self.sockobj.recv(maxLen)
275
276  def _recv_or_close(self, amount):
277    """Raise the Timeout Error if no receipt.  Keep track by timeout_lock."""
278    timerhandle = settimer(self.timeout, self._clobbersocket, ())
279    try:
280      retdata = self._recv(amount)
281    except Exception, e:
282      # if it's not the timeout, reraise...
283      if self.timeout_lock.acquire(False):
284        raise
285      raise SocketTimeoutError
286   
287    # I acquired the lock, I should stop the timer because I succeeded...
288    if self.timeout_lock.acquire(False):
289      # even if this isn't in time, the lock prevents a race condition
290      # this is merely an optimization to prevent the timer from ever firing...
291      canceltimer(timerhandle)
292      self.timeout_lock.release() # Alper's bug 3/10/09
293      return retdata
294    else:
295      raise SocketTimeoutError
296
297  def _clobbersocket(self):
298
299    # alpers - don't close the socket if we timeout!  It might be just an error
300    return
301
302    """If I can acquire the lock without blocking, then close the socket to abort"""
303    if self.timeout_lock.acquire(False):
304      self.close()
305
306
307############################
308# Deprecated functions
309##############################
310
311# private function...
312def sockettimeout_clobbersocket(sockobj,mylock):
313  # if I can acquire the lock without blocking, then close the socket to abort
314  if mylock.acquire(False):
315    sockobj.close()
316
317# if it fails to finish within the timeout, I close the socket and raise a
318# SocketTimeout exception...
319def sockettimeout_recv_or_close(sockobj, amount, timeout):
320  # A lock I'll use for this attempt
321  mylock = getlock()
322  timerhandle = settimer(timeout,clobbersocket, (sockobj, mylock))
323  try:
324    retdata = sockobj.recv(amount)
325  except Exception, e:
326    # if it's not the timeout, reraise...
327    if mylock.acquire(False):
328      raise
329    raise SocketTimeout
330   
331  # I acquired the lock, I should stop the timer because I succeeded...
332  if mylock.acquire(False):
333    # even if this isn't in time, the lock prevents a race condition
334    # this is merely an optimization to prevent the timer from ever firing...
335    canceltimer(timerhandle)
336    return retdata
337  else:
338    raise SocketTimeout
339
340
341#end include timeout_socket.repy
342
343
344# !! currently only works for a single mapper !!
345# i'll worry about splitting the data file up at a later time
346def initalize_replicas(data):
347   
348    # generate strings:
349    # generate primary location string
350    prim_str = format_message(mycontext['my_addr'])
351
352    # generate replica list to send
353    replica_str = pad_int_to_str(len(mycontext['scoreboard'].keys()), 4)
354    replica_str += mycontext['replica_str']
355
356#    print "replica_str:", replica_str
357
358    replicas = mycontext['scoreboard'].keys()
359    replicas.sort()
360
361    # split up the data
362    data_lines = data.split("\n")
363    chunk_size = len(data_lines) / len(replicas)
364    if len(data_lines) % len(replicas) != 0:
365        chunk_size + 1
366
367    data_dist = []
368    for i in xrange(0, len(data_lines), chunk_size):
369        data_dist.append(data_lines[i:i+chunk_size])
370
371    split_data = []
372    for single_data in data_dist:
373        split_data.append("\n".join(single_data))
374
375   
376#     for i in range(len(split_data)):
377#         print str(i) + "->", split_data[i]
378           
379
380    # generate data strings to send - size limited to 1e6-1 bytes
381    for i in range(len(split_data)):
382        split_data[i] = format_message(split_data[i])
383
384   
385    # prepare socket array
386    mycontext['peer_sockets'] = [""] * len(mycontext['scoreboard'].keys())
387
388    # send data to each replica
389    for i in range(len(replicas)):
390        replica = replicas[i]
391
392        addr_parts = replica.partition(":")
393        replica_ip = addr_parts[0]
394        replica_port = int(addr_parts[2])
395
396        print "opening connection to", replica
397        socketobj = timeout_openconn(replica_ip, replica_port, timeout=10)
398        socketobj.send(prim_str)
399        socketobj.send(replica_str)
400        socketobj.send(split_data[i])
401
402        socketobj.settimeout(5)
403
404        mycontext['scoreboard'][replica]["socket"] = socketobj
405
406        print "receiving", socketobj.recv(1)
407
408
409# only worried about active replicas now, add support for inactive peers later
410def construct_scoreboard(replica_list):
411    mycontext['scoreboard'] = {}
412   
413    for replica in replica_list:
414        active_entry = {}
415        active_entry["socket"] = ""
416        active_entry["active"] = 1
417        active_entry["state"] = "Initialized"
418        active_entry["heartbeat_failed"] = 0
419        mycontext['scoreboard'][replica] = active_entry
420
421
422def keep_scoreboard():
423   
424    active_replicas = mycontext['scoreboard'].keys()
425    active_replicas.sort()
426
427    mycontext['final_output'] = {}
428    finished_replicas = 0
429
430    scoreboard = mycontext['scoreboard']
431
432    print "active_replicas:", active_replicas
433
434    while finished_replicas < len(active_replicas):
435
436        for replica in active_replicas:
437
438            print
439            print "retrying, only got %d of %d data" % (finished_replicas, len(active_replicas))
440
441            print "scoreboard:", scoreboard
442            # if this replica says it's done, skip it
443            if scoreboard[replica]['state'] == "Done":
444                continue
445
446            print "current replica:", replica
447            send_message(scoreboard[replica]['socket'],"heartbeat")
448           
449            cur_state = ""
450            try:
451                print "recving:"
452                cur_state = recv_message(scoreboard[replica]['socket'],
453                                         timeout=3)
454            except SocketTimeoutError:
455                print "timed out!"
456                scoreboard[replica]['heartbeat_failed'] += 1
457                if scoreboard[replica]['heartbeat_failed'] >= 3:
458                    pass # add activating an inactive node here!
459            except Exception, e:
460                if "refused" not in str(e):
461                    raise
462            else:
463                print "recving state (%s for %s).." % (cur_state, replica)
464                scoreboard[replica]['state'] = cur_state
465                if scoreboard[replica]['heartbeat_failed'] != 0:
466                    scoreboard[replica]["heartbeat_failed"] -= 1
467
468                # data is available for pickup!
469                if cur_state == "ReducerDone":
470                    red_data = recv_message_dict(scoreboard[replica]['socket'],
471                                                 timeout=5)
472                    mycontext['final_output'].update(red_data)
473                    scoreboard[replica]['state'] = "Done"
474                    finished_replicas += 1
475                    print "received mapred data:", mycontext['final_output']
476                    continue
477
478            # give the replicas a bit of a break!
479            sleep(5)
480       
481           
482    # if we've reached this point, we've received all data
483    buf = ""
484    for key, value in mycontext['final_output'].iteritems():
485        buf += key + "\t" + value + "\n"
486                   
487    fileobj = open("final_data.dat", "w")
488    fileobj.write(buf)
489    fileobj.close()
490    print "finished data acquisition, job finished"
491
492
493def send_message(socketobj, data):
494    data = str(len(data)) + "*" + data
495    socketobj.send(data)
496
497
498def format_message(data):
499    return str(len(data)) + "*" + data
500
501
502def recv_message_dict(socketobj, initialread=2, timeout=None):
503    serialized_dict = recv_message(socketobj, initialread, timeout)
504
505    data_dict = {} 
506   
507    cur_key = ""
508    for line in serialized_dict.split("\n"):
509        if cur_key == "":
510            cur_key = line
511        elif line == "":
512            cur_key = ""
513        else:
514            data_dict[cur_key] = line
515
516    return data_dict
517
518
519def recv_message(socketobj, initialread=2, timeout=None):
520    buf = ""
521    found = False
522
523    # if timeout, we have a timeout_socket object; try recving, but can throw
524    # a SocketTimeoutError here.
525    if timeout:
526        socketobj.settimeout(timeout)
527        buf += socketobj.recv(1)
528   
529    while not found:
530        buf += socketobj.recv(initialread)
531        if "*" in buf:
532            found = True
533
534    parts = buf.partition("*")
535    data_len = int(parts[0])
536   
537    data = parts[2]
538    data += socketobj.recv(data_len - len(parts[2]))     
539    return data
540
541
542
543def pad_int_to_str(i, len):
544    if i / (10**len) > 0:
545        raise Exception("integer overflow; consider revising protocol") 
546    return ('%0' + str(len) + "d") % i
547
548
549
550
551def get_results(ip, port, sockobj, thiscommhandle, listencommhandle):
552   
553    # parse actual data, write to file
554    buf = recv_message(sockobj)
555    result_file = open("results.dat", "w")
556    result_file.write(buf)
557    result_file.close()
558
559
560def print_usage():
561    print """Usage: mappri.repy [options] <datafile> <replica-ip>:<replica-port>...
562    -n               Do not parse the data file with \\t splitting key and value
563    <datafile>       The data file to use in the map-reduce pipeline
564    <replica>:<port> A list of the replica addresses to use for the job
565"""
566    exit()
567
568
569if callfunc == 'initialize':
570    mycontext['replica_list'] = {}
571
572    if len(callargs) == 0: 
573        print "* No arguments provided."
574        print_usage()
575   
576    argpointer = 0
577    port = 12346
578    try:
579        port = int(callargs[0])
580        ip = getmyip()
581        argpointer += 1
582       
583        if len(callargs) == 1:
584            print "* No arguments provided"
585            print_usage()
586
587    except ValueError:
588        ip = '127.0.0.1'
589
590    mycontext['my_addr'] = ip + ":" + str(port)
591#    print "my_addr:", mycontext['my_addr']
592
593    if callargs[argpointer] == '-n':
594        # do something special here, it's a no-op atm
595        pass
596
597    try:
598        data_file = open(callargs[argpointer], "r")
599        data = data_file.read()
600        data_file.close()
601        argpointer += 1
602    except IOError:
603        print "* Error reading file (does it exist?)"
604        print_usage()
605
606    input_replicas = callargs[argpointer:]
607    if len(input_replicas) == 0:
608        print "* List of replicas not found."
609        print_usage()
610
611    replica_list = []
612    for replica in input_replicas:
613        address = replica.partition(':')
614        if not address[2]:
615            print "* Port not found!"
616            print_usage()
617        replica_list.append(replica)
618
619    construct_scoreboard(replica_list)
620
621    # ensure that everything is sorted in the same way
622    mycontext['replica_str'] = ""
623    replica_list.sort()
624    for replica in replica_list:
625        mycontext['replica_str'] += format_message(replica)
626
627    initalize_replicas(data)
628
629    # once the job has started, we need to listen for reporting back
630    # by the replicas (right now, just handle the scoreboard!)
631   
632    keep_scoreboard()
Note: See TracBrowser for help on using the browser.