Please note that the CVS and issue trackers have moved to GitHub. These Trac pages are no longer kept up-to-date.

root/seattle/trunk/repy/apps/mapreduce/mapred.repy

Revision 1806, 24.4 KB (checked in by alpers, 10 years ago)

mapred - the whole pass works now, yayyy

  • Property svn:executable set to *
Line 
1
2'''
3- header at top of file : docstring
4- header for each function (you already have the descriptions) : docstring
5- more space between functions, 3-4 space lines
6'''
7
8#begin include sockettimeout.repy
9"""
10<Description>
11  A socket that guarentees the receipt of a message.   Raises TimeoutError if it does not
12  receive any message before a given timeout.
13  If actually receives the message, returns the message and continues.
14
15<Usage>
16  Text-replacable for Repy Sockets:
17    timeout_openconn(desthost, destport, localip=None, localport=None, timeout = 5)
18    timeout_waitforconn(localip, localport, function)
19
20  Object:
21    sockobj.send(data)
22    sockobj.recv(bytes)
23    sockobj.close()
24
25<Date>
26  Sun Mar  1 10:27:35 PST 2009
27
28<Example>
29  # hello world
30  include sockettimer.repy
31
32  def callback(ip, port, timeout_sockobj, commhandle, listenhandle):
33    hw_message = timeout_sockobj.recv(1047)
34
35    # cleanup
36    stopcomm(commhandle)
37    stopcomm(listenhandle)
38    timeout_sockobj.close()
39
40    print hw_message # => "hello world!"
41 
42  def server():
43    sockobj = timeout_waitforconn(getmyip(), 12345, callback)
44
45  def client():
46    sockobj = timeout_openconn(getmyip(), 12345)
47    sockobj.send("hello world!")
48
49  def main():
50    server()
51    client()
52    exitall()
53
54  if callfunc == 'initialize':
55    main()
56"""
57
58class SocketTimeoutError(Exception):
59  """The socket timed out before receiving a response"""
60
61def timeout_openconn(desthost, destport, localip=None, localport=None, timeout = 5):
62  """
63  <Purpose>
64    Wrapper for Repy like socket interface
65
66  <Args>
67    Same as Repy openconn
68
69  <Exception>
70    Timeout exception if the dest address doesnt respond.
71
72  <Returns>
73    socket obj on success
74  """
75
76  tsock = TimeoutSocket()
77  tsock.settimeout(timeout)
78  if localip and localport:
79    tsock.bind((localip, localport))
80  tsock.connect((desthost, destport))
81  return tsock
82
83def timeout_waitforconn(localip, localport, function):
84  """
85  <Purpose>
86    Wrapper for Repy like socket interface
87
88  <Args>
89    Same as Repy waitforconn
90
91  <Side Effects>
92    Sets up event listener which calls function on messages.
93
94  <Returns>
95    Handle to listener.
96  """
97
98  tsock = TimeoutSocket()
99  tsock.bind((localip, localport))
100  tsock.setcallback(function)
101  return tsock.listen()
102
103class TimeoutSocket:
104  """
105  <Purpose>
106    Provide an socket object like the Repy usual one.
107
108  <Side Effects>
109    Uses a getlock() to watch for a timeout
110    Uses waitforconn and openconn to simulate socket
111  """
112
113  ################
114  # Constructors
115  ################
116
117  def __init__(self):
118    """ Constructor for socket """
119#    self.lock = getlock() # general lock BUG: Do we need to lock everything?
120    self.timeout_lock = getlock() # special lock for Timeout condition
121    self.timeout = 5 # seconds to wait
122
123    # user vars   
124    self.local_address = None # ip, port
125    self.remote_address = None # ip, port
126    self.callback = None # the user's function to call
127
128    # repy socket vars
129    self.sockobj = None #  the Repy socket
130    self.commhandle = None # the current comm
131    self.listencommhandle = None # the listener comm
132
133  ################
134  # Mutator methods
135  #################
136
137  def settimeout(self, value):
138    """ Setter for timeout"""
139    self.timeout = value
140
141  def setcallback(self, function):
142    """ Setter for callback function"""
143    self.callback = function
144
145  ####################
146  # Public Methods
147  ####################
148
149  def bind(self, local_address = None):
150    """
151    <Purpose>
152      Set local address
153
154    <Args>
155      Tuple of (ip, port) local.
156    """
157    self.local_address = local_address
158
159  def listen(self):
160    """
161    <Purpose>
162      Listen for peer
163   
164    <Side Effects>
165      Calls Repy waitforconn()
166    """
167    return self._waitforconn()
168
169  def connect(self, remote_address):
170    """
171    <Purpose>
172      Connect to peer.
173
174    <Args>
175      Tuple of (ip, port) remote.
176   
177    <Side Effects>
178      Calls Repy openconn.
179    """
180    self.remote_address = remote_address
181    self._openconn()
182
183  def recv(self, maxLen): # timeout as optional arg ???
184    """
185    <Purpose>
186      If it fails to finish within the timeout, I close the socket and raise a
187      TimeoutError exception. I.e. if there's no message, we call it an error
188      and raise it.
189     
190    <Arguments>
191      maxLen - bytes to recv
192
193    <Exception>
194      Raises TimeoutError exception if the recv times out
195      without receiving a message.
196
197    <Side Effects>
198      Closes the connection if times out.
199
200    <Returns>
201      The message.
202    """
203    return self._recv_or_close(maxLen)
204
205  def send(self, data):
206    """
207    <Purpose>
208      Just like normal Repy socket.  Sends messages.
209     
210    <Arguments>
211      data - the string message
212
213    <Exception>
214      Same as Repy socket.
215 
216    <Returns>
217      The bytes sent.
218    """
219    return self._send(data)
220
221  def close(self):
222    self.local_address = None # ip, port
223    self.remote_address = None # ip, port
224    self.callback = None # the user's function to call
225
226    self.sockobj.close()
227    self.sockobj = None #  the Repy socket
228    stopcomm(self.commhandle)
229    self.commhandle = None # the current comm
230    stopcomm(self.listencommhandle)
231    self.listencommhandle = None # the listener comm
232
233
234  ########################
235  # Private
236  #########################
237
238  def _openconn(self):
239    """Handle current state variables and call Repy openconn."""
240
241    destip, destport = self.remote_address
242    if self.local_address:
243      srcip, srcport = self.local_address
244      self.sockobj = openconn(destip, destport, srcip, srcport, self.timeout)
245    else:
246      self.sockobj = openconn(destip, destport)
247
248  def _waitforconn(self):
249    """Setup way between Repy waitforconn event"""
250    localip, localport = self.local_address
251    self.listencommhandle = waitforconn(localip, localport, self._callback)
252    return self.listencommhandle
253
254  def _callback(self, ip, port, sockobj, ch, lh):
255    """Pass on through to user callback"""
256    self.sockobj = sockobj
257    self.listencommhandle = lh # same as the 1st from wait for comm, right?
258    self.commhandle = ch # should we care?
259   
260    print "sockettimeout|remote_address:", self.remote_address
261    if not self.remote_address:
262      pass
263      #raise Exception("what! peer does not match?")
264
265    self.callback(ip, port, self, ch, lh)
266
267  def _send(self, data):
268    """Send data"""
269    return self.sockobj.send(data)
270
271  def _recv(self, maxLen):
272    """Recv data of length maxLen"""
273    return self.sockobj.recv(maxLen)
274
275  def _recv_or_close(self, amount):
276    """Raise the Timeout Error if no receipt.  Keep track by timeout_lock."""
277    timerhandle = settimer(self.timeout, self._clobbersocket, ())
278    try:
279      retdata = self._recv(amount)
280    except Exception, e:
281      # if it's not the timeout, reraise...
282      if self.timeout_lock.acquire(False):
283        raise
284      raise SocketTimeoutError
285   
286    # I acquired the lock, I should stop the timer because I succeeded...
287    if self.timeout_lock.acquire(False):
288      # even if this isn't in time, the lock prevents a race condition
289      # this is merely an optimization to prevent the timer from ever firing...
290      canceltimer(timerhandle)
291      self.timeout_lock.release() # Alper's bug 3/10/09
292      return retdata
293    else:
294      raise SocketTimeoutError
295
296  def _clobbersocket(self):
297
298    # alpers - don't close the socket if we timeout!  It might be just an error
299    return
300
301    """If I can acquire the lock without blocking, then close the socket to abort"""
302    if self.timeout_lock.acquire(False):
303      self.close()
304
305
306############################
307# Deprecated functions
308##############################
309
310# private function...
311def sockettimeout_clobbersocket(sockobj,mylock):
312  # if I can acquire the lock without blocking, then close the socket to abort
313  if mylock.acquire(False):
314    sockobj.close()
315
316# if it fails to finish within the timeout, I close the socket and raise a
317# SocketTimeout exception...
318def sockettimeout_recv_or_close(sockobj, amount, timeout):
319  # A lock I'll use for this attempt
320  mylock = getlock()
321  timerhandle = settimer(timeout,clobbersocket, (sockobj, mylock))
322  try:
323    retdata = sockobj.recv(amount)
324  except Exception, e:
325    # if it's not the timeout, reraise...
326    if mylock.acquire(False):
327      raise
328    raise SocketTimeout
329   
330  # I acquired the lock, I should stop the timer because I succeeded...
331  if mylock.acquire(False):
332    # even if this isn't in time, the lock prevents a race condition
333    # this is merely an optimization to prevent the timer from ever firing...
334    canceltimer(timerhandle)
335    return retdata
336  else:
337    raise SocketTimeout
338
339
340#end include sockettimeout.repy
341
342# MapReduce for python/repy!
343#
344
345
346# MapReduce for python/repy!
347#
348
349def map_func(key, value):
350    toRet = []
351    for word in value.split():
352        output = (word, 1)
353        toRet.append(output)
354    sleep(0.1)
355    return toRet
356
357
358
359# MapReduce for python/repy!
360#
361
362def reduce_func(key, values):
363    toRet = []
364    sum = 0
365    for value in values:
366        sum += int(value)
367
368    return {key: sum}
369
370
371
372#
373
374def hash_func(data):
375    return ord(data[0])/8
376
377
378def get_data(ip, port, socketobj, thiscommhandle, listencommhandle):
379    """ Listens for connections on a well-defined port, imports data files """
380
381    # get a list of all of our neighbors!
382    mycontext['primary'] = recv_message(socketobj)
383    print "Primary init thread: got primary loc: ", mycontext['primary']
384
385    # we need to know how many peers we have..
386    mycontext['num_peers'] = int(socketobj.recv(4))
387    print "Primary init thread: got num_peers: ", mycontext['num_peers']
388
389    mycontext['peers'] = []
390    for i in range(mycontext['num_peers']):
391        mycontext['peers'].append(recv_message(socketobj))
392
393    # parse and save data file:
394    buf = recv_message(socketobj)
395    print "Primary init thread: got file data"
396    dataobj = open("map_data.dat", "w")
397    dataobj.write(buf)
398    dataobj.close()
399
400    # make sure each replica has the same order!
401    # this line isn't needed --> mycontext['peers'].sort()
402    print "Primary init thread: got my peers: ", mycontext['peers']
403
404    # which peer am I?  save this for future reference..
405    address_str = mycontext['myip'] + ":" + str(mycontext['myport']-1)
406    mycontext['my_peer_index'] = mycontext['peers'].index(address_str)       
407    print "Primary init thread: my_peer_index is", mycontext['my_peer_index']
408
409    # start the peer_sockets variable
410    mycontext['peer_sockets'] = [""] * mycontext['num_peers']
411       
412    # save the primary socket for heartbeats
413    mycontext['primary_socket'] = socketobj
414
415    # try ACKing
416    socketobj.send("i")
417
418    # destroy the listen socket as we're done initializing
419    mycontext['state'] = 'Initialized'
420    stopcomm(listencommhandle)
421
422
423   
424# respond to any queries from the primary
425def heartbeat_response():
426    msg = ""
427    while True:
428        try:
429            msg = recv_message(mycontext['primary_socket'], timeout=5)
430        except SocketTimeoutError:
431            print "Primary control thread: timed out waiting for heartbeat"
432        else:
433            if "control" in msg:
434                pass  # change peers in here
435            elif msg == "heartbeat":
436                state = mycontext['state']
437                print "Primary control thread: sent state %s to primary" % state
438                send_message(mycontext['primary_socket'], state)
439
440                if state == "ReducerDone":
441                    print "Primary control thread: sending datadict:", mycontext['reduce_result']
442                    send_message_dict(mycontext['primary_socket'], 
443                                      mycontext['reduce_result'])
444                    mycontext['state'] = "Done"
445                    break
446
447
448
449def init_replica_sockets():
450    # we only want to make one socket for each link, so let's be smart about it
451    # and only make new connections to equal or higher indexed peers..
452
453    print "entered init_replica_sockets"
454
455    # put a fake socket into mycontext['peer_sockets'] for self
456    mycontext['peer_sockets'][mycontext['my_peer_index']] = "self"
457
458    # if we're the last peer, just skip this.  we'll listen for all sockets.
459    if mycontext['my_peer_index'] + 1 >= len(mycontext['peers']):
460        print "Main thread: quit early, I'm last in list."
461        return
462
463    print "Main thread: continuing init_replica_sockets()"
464    # + 1 is added to skip creating self-socket
465    peer_subset = mycontext['peers'][mycontext['my_peer_index'] + 1:]
466    for peer in peer_subset:
467        addr_parts = peer.partition(":")
468        peer_index = mycontext['peers'].index(peer)
469
470        print "Main thread: attempting to connect to peer %d, (addr: %s:%d)" % (peer_index, addr_parts[0], int(addr_parts[2])+1)
471        socketobj = timeout_openconn(addr_parts[0], int(addr_parts[2])+1,
472                                     mycontext['myip'], 12347)
473        socketobj.send("hello")
474        socketobj.recv(1)
475
476        mycontext['connection_lock'].acquire()
477        mycontext['peer_sockets'][peer_index] = socketobj
478        mycontext['peer_conn_left'] -= 1
479        mycontext['connection_lock'].release()
480
481        print "succesfully communicated with", peer
482
483
484
485# listens for incoming tcp connections, establishes a port
486def recv_init_replica_conn(ip, port, sockobj, thiscommhandle, listencommhandle):
487    data = sockobj.recv(5)
488   
489    print "Peer init thread: receiving peer_init message from %s, %s" % (ip, port)
490
491    if not data == "hello":
492        print "Received an unintelligible message from a peer %s:%d, %s" % (ip, port, data)
493        exitall()
494    else:
495        sockobj.send("i")
496
497        # save socket obj for later
498        incoming_index = mycontext['peers'].index(ip+":12344")
499
500        mycontext['connection_lock'].acquire()
501        mycontext['peer_sockets'][incoming_index] = sockobj
502
503        # decrement our connection counter
504        mycontext['peer_conn_left'] -= 1
505
506        print "Peer init thread: adding new socket to index " + str(incoming_index)
507        print "Peer init thread: peer_sockets:", mycontext['peer_sockets']
508
509        mycontext['connection_lock'].release()
510
511        print "Peer init thread: successfully received init message from", ip+":"+str(port)
512
513
514# listens for incoming map data (this is run asynchronously)
515def recv_peer_map_data():
516    peer_socket_recvd = []   
517
518    print "Peer datarecv thread: peer_sockets:", mycontext['peer_sockets']
519
520    while len(peer_socket_recvd) != mycontext['num_peers']:
521
522        # cycle through all the sockets, trying to receive
523        for socket in mycontext['peer_sockets']:
524
525            print "Peer datarecv thread: trying to receive on socket", socket
526            print "Peer datarecv thread: received from", peer_socket_recvd
527            print "Wanted %d receptions, only got %d" % (len(mycontext['peer_sockets']), len(peer_socket_recvd))
528
529            # pass over the socket if we've received data already
530            if socket in peer_socket_recvd:
531                continue
532
533            # pass over our own socket
534            if socket == "self":
535                continue
536           
537            # add socket if we've added data to ourself in partition
538            if socket == "selfrecvd":
539                peer_socket_recvd.append(socket)
540                continue
541
542            # try to receive
543            recv_data = {}
544            try:
545                print "Peer datarecv thread: recving on socket", socket
546                recv_data = recv_message_dict(socket, timeout=5)
547            except SocketTimeoutError:
548                print "Peer datarecv thread: timed out on socket", socket, "... continuing"
549                continue
550            else:
551                mycontext['connection_lock'].acquire()
552                mycontext['reduce_data'].update(recv_data)
553                print "Peer datarecv thread: received data"
554                peer_socket_recvd.append(socket)
555                mycontext['connection_lock'].release()
556           
557    print "Peer datarecv thread: finished recving data"
558    mycontext['state'] = "ReducerWaiting"
559
560
561
562# Assumptions to make this simpler:
563# - all this data fits in memory (<2 GB) in the variable map_result
564# - data is stored in the files/string as "(key)(\t)(value)" 
565def do_map():
566    mycontext['state'] = "Mapping"
567
568    data = open("map_data.dat", "r")
569
570    map_result = []
571    for line in data:
572        line_parts = line.partition('\t')
573        # I assume that results are returned in the form "<key>\t<value>"
574        # map.mapper takes key, value as two separate arguments
575        map_result.extend(map_func(line_parts[0], line_parts[2]))
576#        print "map_result" , map_result
577    map_result.sort()
578    return map_result
579   
580
581   
582# the user must define their own partition function (hash_func)
583def partition(map_result):
584    mycontext['state'] = "Partitioning"
585
586    key_value = {}
587
588    # generate a key-value dict
589    # ! could use simple dict([]) here, but duplicate keys get overwritten
590    for kv_pair in map_result:
591        if kv_pair[0] in key_value:
592            key_value[kv_pair[0]].append(kv_pair[1])
593        else:
594            key_value[kv_pair[0]] = [kv_pair[1]]
595
596    print "map_result" , map_result
597    print
598    print "kv_pair", kv_pair
599    print
600    print "key_value", key_value
601    print
602
603    # try to partition objects bashed on their hash
604    partition_hash = {}
605    for key, values in key_value.iteritems():
606        hashcode = hash_func(key)
607       
608        # the following is always key in index 0, values following
609        # this is so we don't lose track of the key if the hash_func is not
610        # totally unique/reversible
611        values.insert(0,key)
612        if hashcode in partition_hash:
613            partition_hash[hashcode].append(values)
614        else:
615            partition_hash[hashcode] = [values]
616
617    # we should have a grouping of elements by hash now
618    print "partition hash:", partition_hash
619    print
620
621    # attempt to partition elements based on the number of replicas,
622    #   first, get all the hash values and sort them
623    hash_list = partition_hash.keys()
624    hash_list.sort()
625
626    # initialize the peer_data list
627    peer_data = []
628    for index in range(mycontext['num_peers']):
629        peer_data.append({})
630   
631
632    # go through each hash, and assign it to a replica
633    for hashcode in hash_list:
634        cur_replica = hashcode % mycontext['num_peers']
635       
636        for kv_pairs in partition_hash[hashcode]:
637            print "kv_pairs", kv_pairs
638            kv = {kv_pairs[0]: kv_pairs[1:]}
639            peer_data[cur_replica].update(kv)
640
641
642    # print out the peer data, just for my reference
643    index = 0
644    print "peer_data:"
645    for peer in peer_data:
646        print index, "->", peer
647        index += 1
648
649    # send data to replicas, each replica should have a dictionary of kv pairs 
650    for peer_index in range(mycontext['num_peers']):
651        if peer_index == mycontext['my_peer_index']:
652            print "Main thread: sending data to self"
653            mycontext['connection_lock'].acquire()
654            mycontext['reduce_data'].update(peer_data[peer_index])
655            mycontext['peer_sockets'][mycontext['my_peer_index']] = "selfrecvd"
656            mycontext['connection_lock'].release()
657        else:
658            print "Main thread: sending data to peer %d..." % peer_index
659            mycontext['connection_lock'].acquire()
660            socketobj = mycontext['peer_sockets'][peer_index]
661            send_message_dict(socketobj, peer_data[peer_index])
662            mycontext['connection_lock'].release()
663
664   
665def do_reduce():
666    #data = open("reduce_data.dat", "r")
667    data = mycontext['reduce_data']
668
669    print "reduce_data", mycontext['reduce_data']
670       
671    reduce_result = {}
672    results_list = []
673
674
675    for key, values in data.iteritems():
676        print "reducing..."
677        returned_dict = reduce_func(key, values)
678        results_list.append(returned_dict)
679
680    for single_dict in results_list:
681        for key, value in single_dict.iteritems():
682            if key in reduce_result:
683                reduce_result[key] += " " + value
684            else:
685                reduce_result[key] = value
686                         
687    print "Reducing: reduce_result:", reduce_result
688
689    mycontext['reduce_result'] = reduce_result
690
691
692# TODO...
693def report_results(map_results):
694    pass
695   
696
697   
698def send_message(socketobj, data):
699    data = str(len(data)) + "*" + data
700    socketobj.send(data)
701
702
703
704def send_message_dict(socketobj, data_dict):
705    buf = ""
706    for key,values in data_dict.iteritems():
707        buf += str(key) + "\n"
708        if isinstance(values, list):
709            for value in values:
710                buf += str(value) + "\n"
711        else:
712            buf += str(values) + "\n"
713       
714        # after last value, add another return to close key
715        buf += "\n"
716
717    send_message(socketobj, buf)
718
719
720
721def recv_message_dict(socketobj, initialread=2, timeout=None):
722    serialized_dict = recv_message(socketobj, initialread, timeout)
723
724    data_dict = {} 
725   
726    cur_key = ""
727    for line in serialized_dict.split("\n"):
728        if cur_key == "":
729            cur_key = line
730            data_dict[cur_key] = []
731        elif line == "":
732            cur_key = ""
733        else:
734            data_dict[cur_key].append(line)
735
736    return data_dict
737
738
739def recv_message(socketobj, initialread=2, timeout=None):
740    buf = ""
741    found = False
742
743#    print "recv_message: trying to recv from " + socketobj.remote_address
744
745    # if timeout, we have a timeout_socket object; try recving, but can throw
746    # a SocketTimeoutError here.
747    if timeout:
748        print "timeout was set to", timeout, "- entered timeout recv section"
749        socketobj.settimeout(timeout)
750        buf += socketobj.recv(1)
751
752    while not found:
753        buf += socketobj.recv(initialread)
754        if "*" in buf:
755            found = True
756
757    parts = buf.partition("*")
758    data_len = int(parts[0])
759   
760    data = parts[2]
761    data += socketobj.recv(data_len - len(parts[2]))
762    return data
763
764
765
766if callfunc == 'initialize':
767    mycontext['num_mappers'] = 1
768    mycontext['num_reducers'] = 1
769    mycontext['state'] = 'Ready'
770   
771    if len(callargs) > 1:
772        raise Exception("too many args")
773    elif len(callargs) == 1:
774        port = int(callargs[0])
775        ip = getmyip()
776    else:
777        port = 12345
778        ip = '127.0.0.1'
779
780    # save my ip/port for future reference
781    mycontext['myip'] = ip
782    mycontext['myport'] = port
783   
784    # wait for primary initialization data
785    print "Main thread: waiting for connection on ", ip, ":",  port
786    listencommhandle = timeout_waitforconn(ip, port-1, get_data)
787   
788    # block until we've been initialized
789    while mycontext['state'] == 'Ready':
790        sleep(.1)
791
792    # maintain connection with primary, keep scoreboard updated
793    # this is the main channel of communication with the primary
794    settimer(0, heartbeat_response, ())
795   
796    # try to open connections to all hosts
797    mycontext['connection_lock'] = getlock()
798    mycontext['peer_conn_left'] = mycontext['num_peers']
799
800    # listen for connections from peers
801    listen_replica_init = timeout_waitforconn(ip, port, recv_init_replica_conn)
802    print "Main thread: listening on %s:%d for peer connections" % (ip, port)
803
804    # sleep to slow the socket thing down, we're working over WAN so it's tricky
805    default_timeout = 4 * mycontext['num_peers']
806    lag = 4 * mycontext['my_peer_index']
807    sleeptime = default_timeout - lag
808
809    print "Main thread: sleeping for %d seconds to allow listeners to catch up" % sleeptime
810    sleep(sleeptime)
811
812    # open connections to peers
813    init_replica_sockets()
814
815    while mycontext['peer_conn_left'] > 1:
816        print "Main thread: sleeping for peer_conn_left (%d)" % mycontext['peer_conn_left']
817        sleep(0.1)
818
819    # destroy the listener
820    print "Main thread: destroyed peer-peer initialization listener"
821    stopcomm(listen_replica_init)
822    mycontext['state'] = 'Connected'
823
824    # start listening for map data from peers
825    # !! CAN'T DO IT LIKE THIS: use the sockettimeout thing?
826    # listencommhandle = waitforconn(ip, port, recv_peer_map_data)
827   
828    # this is the better way to do it... sockettimeout.repy
829    mycontext['reduce_data'] = {}
830    recv_handle = settimer(0, recv_peer_map_data, ())
831
832    # start mapping, synchronous call
833    map_result = do_map()
834
835    # send map results to all reducers, split as necessary
836    partition(map_result)
837   
838    # block until all partition data is acquired
839    while mycontext['state'] != 'ReducerWaiting':
840        print "Main thread: sleeping for state ReducerWaiting (state: %s)" % mycontext['state']
841        sleep(0.5)
842
843
844    # start reducing, synchronous call
845    do_reduce()
846    mycontext['state'] = "ReducerDone"
847
848    # wait for a primary heartbeat to come in, then return data, change state
849    # and terminate
850    while mycontext['state'] != "Done":
851        print "Main thread: sleeping for state Done (state: %s)" % mycontext['state']
852        sleep(0.5)
853
854
855
856
857   
Note: See TracBrowser for help on using the browser.