=== modified file 'memcache.py' --- memcache.py 2009-11-28 07:57:59 +0000 +++ memcache.py 2011-03-03 12:11:53 +0000 @@ -88,9 +88,15 @@ # after importing this module. SERVER_MAX_VALUE_LENGTH = 1024*1024 + class _Error(Exception): pass + +class _ConnectionDeadError(Exception): + pass + + try: # Only exists in Python 2.4+ from threading import local @@ -100,6 +106,10 @@ pass +_DEAD_RETRY = 30 # number of seconds before retrying a dead server. +_SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout. + + class Client(local): """ Object representing a pool of memcache servers. @@ -145,7 +155,8 @@ def __init__(self, servers, debug=0, pickleProtocol=0, pickler=pickle.Pickler, unpickler=pickle.Unpickler, pload=None, pid=None, server_max_key_length=SERVER_MAX_KEY_LENGTH, - server_max_value_length=SERVER_MAX_VALUE_LENGTH): + server_max_value_length=SERVER_MAX_VALUE_LENGTH, + dead_retry=_DEAD_RETRY, socket_timeout=_SOCKET_TIMEOUT): """ Create a new Client object with the given list of servers. @@ -159,9 +170,15 @@ Useful for cPickle since subclassing isn't allowed. @param pid: optional persistent_id function to call on pickle storing. Useful for cPickle since subclassing isn't allowed. + @param dead_retry: number of seconds before retrying a blacklisted + server. Default to 30 s. + @param socket_timeout: timeout in seconds for all calls to a server. Defaults + to 3 seconds. """ local.__init__(self) self.debug = debug + self.dead_retry = dead_retry + self.socket_timeout = socket_timeout self.set_servers(servers) self.stats = {} self.cas_ids = {} @@ -193,7 +210,9 @@ 2. Tuples of the form C{("host:port", weight)}, where C{weight} is an integer weight value. """ - self.servers = [_Host(s, self.debug) for s in servers] + self.servers = [_Host(s, self.debug, dead_retry=self.dead_retry, + socket_timeout=self.socket_timeout) + for s in servers] self._init_buckets() def get_stats(self): @@ -514,7 +533,6 @@ ''' return self._set("set", key, val, time, min_compress_len) - def cas(self, key, val, time=0, min_compress_len=0): '''Sets a key to a given value in the memcache if it hasn't been altered since last fetched. (See L{gets}). @@ -720,28 +738,40 @@ if not server: return 0 - self._statlog(cmd) - - store_info = self._val_to_store_info(val, min_compress_len) - if not store_info: return(0) - - if cmd == 'cas': - if key not in self.cas_ids: - return self._set('set', key, val, time, min_compress_len) - fullcmd = "%s %s %d %d %d %d\r\n%s" % ( - cmd, key, store_info[0], time, store_info[1], - self.cas_ids[key], store_info[2]) - else: - fullcmd = "%s %s %d %d %d\r\n%s" % ( - cmd, key, store_info[0], time, store_info[1], store_info[2]) + def _unsafe_set(): + self._statlog(cmd) + + store_info = self._val_to_store_info(val, min_compress_len) + if not store_info: return(0) + + if cmd == 'cas': + if key not in self.cas_ids: + return self._set('set', key, val, time, min_compress_len) + fullcmd = "%s %s %d %d %d %d\r\n%s" % ( + cmd, key, store_info[0], time, store_info[1], + self.cas_ids[key], store_info[2]) + else: + fullcmd = "%s %s %d %d %d\r\n%s" % ( + cmd, key, store_info[0], time, store_info[1], store_info[2]) + + try: + server.send_cmd(fullcmd) + return(server.expect("STORED") == "STORED") + except socket.error, msg: + if isinstance(msg, tuple): msg = msg[1] + server.mark_dead(msg) + return 0 try: - server.send_cmd(fullcmd) - return(server.expect("STORED") == "STORED") - except socket.error, msg: - if isinstance(msg, tuple): msg = msg[1] - server.mark_dead(msg) - return 0 + return _unsafe_set() + except _ConnectionDeadError: + # retry once + try: + server._get_socket() + return _unsafe_set() + except (_ConnectionDeadError, socket.error), msg: + server.mark_dead(msg) + return 0 def _get(self, cmd, key): self.check_key(key) @@ -749,27 +779,41 @@ if not server: return None - self._statlog(cmd) + def _unsafe_get(): + self._statlog(cmd) + + try: + server.send_cmd("%s %s" % (cmd, key)) + rkey = flags = rlen = cas_id = None + + if cmd == 'gets': + rkey, flags, rlen, cas_id, = self._expect_cas_value(server) + if rkey: + self.cas_ids[rkey] = cas_id + else: + rkey, flags, rlen, = self._expectvalue(server) + + if not rkey: + return None + value = self._recv_value(server, flags, rlen) + server.expect("END") + except (_Error, socket.error), msg: + if isinstance(msg, tuple): msg = msg[1] + server.mark_dead(msg) + return None + + return value try: - server.send_cmd("%s %s" % (cmd, key)) - rkey = flags = rlen = cas_id = None - if cmd == 'gets': - rkey, flags, rlen, cas_id, = self._expect_cas_value(server) - if rkey: - self.cas_ids[rkey] = cas_id - else: - rkey, flags, rlen, = self._expectvalue(server) - - if not rkey: - return None - value = self._recv_value(server, flags, rlen) - server.expect("END") - except (_Error, socket.error), msg: - if isinstance(msg, tuple): msg = msg[1] - server.mark_dead(msg) + return _unsafe_get() + except _ConnectionDeadError: + # retry once + try: + server._get_socket() + return _unsafe_get() + except (_ConnectionDeadError, socket.error), msg: + server.mark_dead(msg) return None - return value def get(self, key): '''Retrieves a key from the memcache. @@ -945,10 +989,11 @@ class _Host(object): - _DEAD_RETRY = 30 # number of seconds before retrying a dead server. - _SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout. - def __init__(self, host, debug=0): + def __init__(self, host, debug=0, dead_retry=_DEAD_RETRY, + socket_timeout=_SOCKET_TIMEOUT): + self.dead_retry = dead_retry + self.socket_timeout = socket_timeout self.debug = debug if isinstance(host, tuple): host, self.weight = host @@ -996,7 +1041,7 @@ def mark_dead(self, reason): self.debuglog("MemCache: %s: %s. Marking dead." % (self, reason)) - self.deaduntil = time.time() + _Host._DEAD_RETRY + self.deaduntil = time.time() + self.dead_retry self.close_socket() def _get_socket(self): @@ -1005,7 +1050,7 @@ if self.socket: return self.socket s = socket.socket(self.family, socket.SOCK_STREAM) - if hasattr(s, 'settimeout'): s.settimeout(self._SOCKET_TIMEOUT) + if hasattr(s, 'settimeout'): s.settimeout(self.socket_timeout) try: s.connect(self.address) except socket.timeout, msg: @@ -1040,10 +1085,10 @@ break data = recv(4096) if not data: - self.mark_dead('Connection closed while reading from %s' - % repr(self)) - self.buffer = '' - return None + # connection close, let's kill it and raise + self.close_socket() + raise _ConnectionDeadError() + buf += data self.buffer = buf[index+2:] return buf[:index]