Index: src/zope/sendmail/delivery.py =================================================================== --- src/zope/sendmail/delivery.py (revision 103922) +++ src/zope/sendmail/delivery.py (working copy) @@ -212,12 +212,14 @@ """ log = logging.getLogger("QueueProcessorThread") - __stopped = False + _stopped = False interval = 3.0 # process queue every X second def __init__(self, interval=3.0): threading.Thread.__init__(self) self.interval = interval + self._lock = threading.Lock() + self.setDaemon(True) def setMaildir(self, maildir): """Set the maildir. @@ -260,10 +262,10 @@ def run(self, forever=True): atexit.register(self.stop) - while not self.__stopped: + while not self._stopped: for filename in self.maildir: # if we are asked to stop while sending messages, do so - if self.__stopped: + if self._stopped: break fromaddr = '' @@ -349,32 +351,48 @@ message = file.read() file.close() fromaddr, toaddrs, message = self._parseMessage(message) + # The next block is the only one that is sensitive to + # interruptions. Everywhere else, if this daemon thread + # stops, we should be able to correctly handle a restart. + # In this block, if we send the message, but we are + # stopped before we unlink the file, we will resend the + # message when we are restarted. We limit the likelihood + # of this somewhat by using a lock to link the two + # operations. When the process gets an interrupt, it + # will call the atexit that we registered (``stop`` + # below). This will try to get the same lock before it + # lets go. Because this can cause the daemon thread to + # continue (that is, to not act like a daemon thread), we + # still use the _stopped flag to communicate. + self._lock.acquire() try: - self.mailer.send(fromaddr, toaddrs, message) - except smtplib.SMTPResponseException, e: - if 500 <= e.smtp_code <= 599: - # permanent error, ditch the message - self.log.error( - "Discarding email from %s to %s due to" - " a permanent error: %s", - fromaddr, ", ".join(toaddrs), str(e)) - #os.link(filename, rejected_filename) - _os_link(filename, rejected_filename) - else: - # Log an error and retry later - raise + try: + self.mailer.send(fromaddr, toaddrs, message) + except smtplib.SMTPResponseException, e: + if 500 <= e.smtp_code <= 599: + # permanent error, ditch the message + self.log.error( + "Discarding email from %s to %s due to" + " a permanent error: %s", + fromaddr, ", ".join(toaddrs), str(e)) + #os.link(filename, rejected_filename) + _os_link(filename, rejected_filename) + else: + # Log an error and retry later + raise + try: + os.unlink(filename) + except OSError, e: + if e.errno == 2: # file does not exist + # someone else unlinked the file; oh well + pass + else: + # something bad happend, log it + raise + finally: + self._lock.release() try: - os.unlink(filename) - except OSError, e: - if e.errno == 2: # file does not exist - # someone else unlinked the file; oh well - pass - else: - # something bad happend, log it - raise - - try: os.unlink(tmp_filename) except OSError, e: if e.errno == 2: # file does not exist @@ -407,4 +425,6 @@ break def stop(self): - self.__stopped = True + self._stopped = True + self._lock.acquire() + self._lock.release()