diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 1962d66..393866b 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -27,11 +27,11 @@ from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ - dump_recon_cache, config_true_value, ismount + dump_recon_cache, config_true_value, ismount, Timestamp from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError -from swift.obj.diskfile import get_tmp_dir, ASYNCDIR_BASE +from swift.obj.diskfile import get_tmp_dir, ASYNCDIR_BASE, DiskFileManager from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR @@ -55,6 +55,7 @@ class ObjectUpdater(Daemon): self.recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift') self.rcache = os.path.join(self.recon_cache_path, 'object.recon') + self._diskfile_mgr = DiskFileManager(conf, self.logger) def _listdir(self, path): try: @@ -175,6 +176,7 @@ class ObjectUpdater(Daemon): continue try: obj_hash, timestamp = update.split('-') + timestamp = Timestamp(timestamp).internal except ValueError: self.logger.increment('errors') self.logger.error( @@ -182,7 +184,13 @@ class ObjectUpdater(Daemon): 'name %s') % (update_path)) continue - if obj_hash == last_obj_hash: + reclaim_date = time.time() - self._diskfile_mgr.reclaim_age + reclaim_date = Timestamp(reclaim_date).internal + if obj_hash == last_obj_hash or timestamp <= reclaim_date: + if timestamp <= reclaim_date: + self.logger.info( + _('Unlinking old async pending file %s') + % (update_path)) self.logger.increment("unlinks") os.unlink(update_path) else: diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 2b02824..3825639 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -36,7 +36,7 @@ from swift.common.ring import RingData from swift.common import utils from swift.common.header_key_dict import HeaderKeyDict from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \ - write_pickle + write_pickle, Timestamp from test.unit import debug_logger, patch_policies, mocked_http_conn from swift.common.storage_policy import StoragePolicy, POLICIES @@ -179,7 +179,8 @@ class TestObjectUpdater(unittest.TestCase): 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', - 'node_timeout': '5'}) + 'node_timeout': '5', + 'reclaim_age': 604800}) cu.logger = mock_logger = mock.MagicMock() cu.object_sweep(self.sda1) self.assertEqual(mock_logger.warning.call_count, warn) @@ -199,12 +200,15 @@ class TestObjectUpdater(unittest.TestCase): for not_dir in not_dirs: os.unlink(not_dir) - # first check with valid policies - for pol in POLICIES: - check_with_idx(str(pol.idx), 0, should_skip=False) - # now check with a bogus async dir policy and make sure we get - # a warning indicating that the '99' policy isn't valid - check_with_idx('99', 1, should_skip=True) + # Now that we remove older asyncs, we need a time value that will + # consistantly encompass all object timestamps. + with mock.patch('time.time', return_value=604800): + # first check with valid policies + for pol in POLICIES: + check_with_idx(str(pol.idx), 0, should_skip=False) + # now check with a bogus async dir policy and make sure we get + # a warning indicating that the '99' policy isn't valid + check_with_idx('99', 1, should_skip=True) @mock.patch.object(object_updater, 'ismount') def test_run_once_with_disk_unmounted(self, mock_ismount): @@ -484,6 +488,74 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(daemon.logger.get_increment_counts(), {'successes': 1, 'unlinks': 1, 'async_pendings': 1}) + def test_unlink_old_async_updates(self): + policy = random.choice(list(POLICIES)) + # setup updater + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'reclaim_age': 100, + } + async_dir = os.path.join(self.sda1, get_async_dir(policy)) + os.mkdir(async_dir) + dfmanager = DiskFileManager(conf, self.logger) + mock_time = time() + + class MockObjectUpdater(object_updater.ObjectUpdater): + processed = False + + def process_object_update(self, update_path, device, policy): + self.processed = True + os.unlink(update_path) + + def test_async_pending(op, timestamp, processed_expected): + ou = MockObjectUpdater(conf) + + # write an async + account, container, obj = 'a', 'c', 'o' + headers_out = HeaderKeyDict({ + 'x-size': 0, + 'x-content-type': 'text/plain', + 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', + 'x-timestamp': timestamp, + 'X-Backend-Storage-Policy-Index': int(policy), + }) + data = {'op': op, 'account': account, 'container': container, + 'obj': obj, 'headers': headers_out} + dfmanager.pickle_async_update(self.sda1, account, container, obj, + data, timestamp, policy) + + ou.logger = mock_logger = mock.MagicMock(spec=self.logger) + with mock.patch('time.time', return_value=mock_time): + ou.object_sweep(self.sda1) + + if processed_expected: + self.assertFalse(mock_logger.increment.called) + self.assertTrue(ou.processed) + else: + self.assertIsNone( + mock_logger.increment.assert_called_once_with('unlinks')) + self.assertFalse(ou.processed) + + expected_str = 'Unlinking old async pending file %s/%s' \ + % (self.sda1, get_async_dir(policy)) + expected_str += \ + '/a83/06fbf0b514e5199dfc4e00f42eb5ea83-%s' \ + % Timestamp(timestamp).internal + self.assertIn(expected_str, mock_logger.info.call_args[0]) + + checks = ( + # timestamp, expected to be processed (not just unlinked) + (mock_time - dfmanager.reclaim_age + 1, True), + (mock_time - dfmanager.reclaim_age + 5, True), + (mock_time - dfmanager.reclaim_age, False), + (mock_time - dfmanager.reclaim_age - 1, False), + (mock_time - dfmanager.reclaim_age - 5, False), + ) + for op in 'PUT', 'DELETE': + for ts, unlink_expected in checks: + test_async_pending(op, ts, unlink_expected) if __name__ == '__main__': unittest.main()