diff --git a/swift/obj/server.py b/swift/obj/server.py index 658f207..97424cf 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -498,10 +498,14 @@ class ObjectController(BaseStorageServer): except ValueError as e: return HTTPBadRequest(body=str(e), request=request, content_type='text/plain') + # SSYNC will include Frag-Index header for subrequests to primary + # nodes; handoff nodes should 409 subrequests to over-write an + # existing data fragment until they offloaded the existing fragment + frag_index = request.headers.get('X-Backend-Ssync-Frag-Index') try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy=policy) + policy=policy, frag_index=frag_index) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index b636a16..20150e0 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -169,11 +169,17 @@ class Receiver(object): self.request.environ['eventlet.minimum_write_chunk_size'] = 0 self.device, self.partition, self.policy = \ request_helpers.get_name_and_placement(self.request, 2, 2, False) + self.frag_index = self.node_index = None if 'X-Backend-Ssync-Frag-Index' in self.request.headers: self.frag_index = int( self.request.headers['X-Backend-Ssync-Frag-Index']) - else: - self.frag_index = None + if 'X-Backend-Ssync-Node-Index' in self.request.headers: + self.node_index = int( + self.request.headers['X-Backend-Ssync-Node-Index']) + if self.node_index != self.frag_index: + # a primary node should only recieve it's own fragments + raise swob.HTTPBadRequest('Frag-Index (%s) != Node-Index (%s)' + % (self.frag_index, self.node_index)) utils.validate_device_partition(self.device, self.partition) self.diskfile_mgr = self.app._diskfile_router[self.policy] if self.diskfile_mgr.mount_check and not constraints.check_mount( @@ -361,6 +367,9 @@ class Receiver(object): raise Exception('Invalid subrequest method %s' % method) subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy) subreq.headers['X-Backend-Replication'] = 'True' + if self.node_index is not None: + # primary node should not 409 if it has a non-primary fragment + subreq.headers['X-Backend-Ssync-Frag-Index'] = self.node_index if replication_headers: subreq.headers['X-Backend-Replication-Headers'] = \ ' '.join(replication_headers) diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 8e9202c..b996cd1 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -129,8 +129,15 @@ class Sender(object): self.connection.putheader('Transfer-Encoding', 'chunked') self.connection.putheader('X-Backend-Storage-Policy-Index', int(self.job['policy'])) - self.connection.putheader('X-Backend-Ssync-Frag-Index', - self.node['index']) + # a sync job must use the node's index for the frag_index of the + # rebuilt fragments instead of the frag_index from the job which + # will be rebuilding them + self.connection.putheader( + 'X-Backend-Ssync-Frag-Index', + self.node.get('index', self.job['frag_index'])) + # a revert job to a handoff will not have a node index + self.connection.putheader('X-Backend-Ssync-Node-Index', + self.node.get('index')) self.connection.endheaders() with exceptions.MessageTimeout( self.daemon.node_timeout, 'connect receive'):