The links in this page point to the source files of xapi
v25.11.0.
Xapi directly communicates only with the SMAPIv2 layer. There are no
plugins directly implementing the SMAPIv2 interface, but the plugins in
other layers are accessed through it:
---
theme: ''
---
graph TD
A[xapi] --> B[SMAPIv2 interface]
B --> C[SMAPIv2 <-> SMAPIv1 state machine: storage_smapiv1_wrapper.ml]
C --> G[SMAPIv2 <-> SMAPIv1 translation: storage_smapiv1.ml]
B --> D[SMAPIv2 <-> SMAPIv3 translation: xapi-storage-script]
G --> E[SMAPIv1 plugins]
D --> F[SMAPIv3 plugins]
SMAPIv1
These are the files related to SMAPIv1 in xen-api/ocaml/xapi/:
sm.ml: OCaml ābindingsā for the SMAPIv1 Python ādriversā (SM)
sm_exec.ml:
support for implementing the above ābindingsā. The
parameters are converted to XML-RPC, passed to the relevant python
script (ādriverā), and then the standard output of the program is
parsed as an XML-RPC response (we use
xen-api-libs-transitional/http-svr/xMLRPC.ml for parsing XML-RPC).
When adding new functionality, we can modify type call to add parameters,
but when we donāt add any common ones, we should just pass the new
parameters in the args record.
smint.ml: Contains types, exceptions, ⦠for the SMAPIv1 OCaml
interface
storage_smapiv1_wrapper.ml: A state machine for SMAPIv1 operations. It computes
the required actions to reach the desired state from the current state.
storage_smapiv1.ml: Contains the actual translation of SMAPIv2 calls to SMAPIv1
calls, by calling the bindings provided in sm.ml.
SMAPIv2
These are the files related to SMAPIv2, which need to be modified to
implement new calls:
xcp-idl/storage/storage_skeleton.ml:
A stub SMAPIv2 storage server implementation that matches the
SMAPIv2 storage server interface (this is verified by
storage_skeleton_test.ml),
each of its function just raise a Storage_interface.Unimplemented
error. This skeleton is used to automatically fill the unimplemented
methods of the below storage servers to satisfy the interface.
xen-api/ocaml/xapi/storage_smapiv1.ml:
a SMAPIv2 server that does SMAPIv2 -> SMAPIv1 translation.
It passes the XML-RPC requests as the first command-line argument to the
corresponding Python script, which returns an XML-RPC response on standard
output.
xen-api/ocaml/xapi/storage_smapiv1_wrapper.ml:
The Wrapper
module wraps a SMAPIv2 server (Server_impl) and takes care of
locking and datapaths (in case of multiple connections (=datapaths)
from VMs to the same VDI, it will use the superstate computed by the
Vdi_automaton
in xcp-idl). It also implements some functionality, like the DP
module, that is not implemented in lower layers.
xen-api/ocaml/xapi/storage_mux.ml:
A SMAPIv2 server, which multiplexes between other servers. A
different SMAPIv2 server can be registered for each SR. Then it
forwards the calls for each SR to the āstorage pluginā registered
for that SR.
How SMAPIv2 works:
We use message-switch under the hood for RPC communication between
xapi-idl components. The
main Storage_mux.Server (basically Storage_impl.Wrapper(Mux)) is
registered to
listen
on the āorg.xen.xapi.storageā queue during xapiās
startup,
and this is the main entry point for incoming SMAPIv2 function calls.
Storage_mux does not really multiplex between different plugins right
now: earlier during xapiās
startup,
the same SMAPIv1 storage server module is
registered
on the various āorg.xen.xapi.storage.<sr type>ā queues for each
supported SR type. (This will change with SMAPIv3, which is accessed via
a SMAPIv2 plugin outside of xapi that translates between SMAPIv2 and
SMAPIv3.) Then, in
Storage_access.create_sr,
which is called
during SR.create,
and also
during PBD.plug,
the relevant āorg.xen.xapi.storage.<sr type>ā queue needed for that
PBD is registered with Storage_mux in
Storage_access.bind
for the SR of that PBD. So basically what happens is that xapi registers itself as a SMAPIv2
server, and forwards incoming function calls to itself through
message-switch, using its Storage_mux module. These calls are
forwarded to xapiās SMAPIv1 module doing SMAPIv2 -> SMAPIv1
translation.
Registration of the various storage servers
---
theme: ''
---
sequenceDiagram
participant q as message-switch
participant v1 as Storage_smapiv1.SMAPIv1
participant svr as Storage_mux.Server
Note over q, svr: xapi startup, "Starting SMAPIv1 proxies"
q ->> v1:org.xen.xapi.storage.sr_type_1
q ->> v1:org.xen.xapi.storage.sr_type_2
q ->> v1:org.xen.xapi.storage.sr_type_3
Note over q, svr: xapi startup, "Starting SM service"
q ->> svr:org.xen.xapi.storage
Note over q, svr: SR.create, PBD.plug
svr ->> q:org.xapi.storage.sr_type_2
What happens when a SMAPIv2 āfunctionā is called
---
theme: ''
---
graph TD
call[SMAPIv2 call] --VDI.attach2--> org.xen.xapi.storage
subgraph message-switch
org.xen.xapi.storage
org.xen.xapi.storage.SR_type_x
end
org.xen.xapi.storage --VDI.attach2--> Storage_smapiv1_wrapper.Wrapper
subgraph xapi
subgraph Storage_mux.server
Storage_smapiv1_wrapper.Wrapper --> Storage_mux.mux
end
Storage_smapiv1.SMAPIv1
end
Storage_mux.mux --VDI.attach2--> org.xen.xapi.storage.SR_type_x
org.xen.xapi.storage.SR_type_x --VDI.attach2--> Storage_smapiv1.SMAPIv1
subgraph SMAPIv1
driver_x[SMAPIv1 driver for SR_type_x]
end
Storage_smapiv1.SMAPIv1 --vdi_attach--> driver_x
Interface Changes, Backward Compatibility, & SXM
During SXM, xapi calls SMAPIv2 functions on a remote xapi. Therefore it
is important to keep all those SMAPIv2 functions backward-compatible
that we call remotely (e.g. Remote.VDI.attach), otherwise SXM from an
older to a newer xapi will break.
Functionality implemented in SMAPIv2 layers
The layer between SMAPIv2 and SMAPIv1 is much fatter than the one between
SMAPIv2 and SMAPIv3. The latter does not do much, apart from simple
translation. However, the former has large portions of code in its intermediate
layers, in addition to the basic SMAPIv2 <-> SMAPIv1 translation in
storage_access.ml.
These are the two files in xapi that implement the SMAPIv2 storage interface,
from higher to lower level:
Functionality implemented by higher layers is not implemented by the layers below it.
Extra functionality in storage_task.ml
storage_smapiv1_wrapper.ml also implements the UPDATES and TASK SMAPIv2 APIs. These are backed by the Updates, Task_server, and Scheduler modules from
xcp-idl, instantiated in xapiās Storage_task module. Migration code in
Storage_mux will interact with these to update task progress. There is also
an event loop in xapi that keeps calling UPDATES.get to keep the tasks in
xapiās database in sync with the storage managerās tasks.
Storage_smapiv1_wrapper.ml also implements the legacy VDI.attach call by simply
calling the newer VDI.attach2 call in the same module. In general, this is a
good place to implement a compatibility layer for deprecated functionality
removed from other layers, because this is the first module that intercepts a
SMAPIv2 call.
Extra functionality in storage_mux.ml
Storage_mux redirects all storage motion (SXM) code to storage_migrate.ml,
and the multiplexed will be managed by storage_migrate.ml. The main implementation
resides in the DATA and DATA.MIRROR modules. Migration code will use
the Storage_task module to run the operations and update the taskās progress.
It also implements the Policy module from the SMAPIv2 interface.
SMAPIv3
SMAPIv3 has a slightly
different interface from SMAPIv2.The
xapi-storage-script
daemon is a SMAPIv2 plugin separate from xapi that is doing the SMAPIv2
ā SMAPIv3 translation. It keeps the plugins registered with xapi-idl
(their message-switch queues) up to date as their files appear or
disappear from the relevant directory.
Python bindings, used by the SM scripts that implement the SMAPIv3
interface.
These bindings are built by running āmakeā in the root
xapi-storage,
and appear in the _build/default/python/xapi/storage/api/v5
directory.
On a XenServer host, they are stored in the
/usr/lib/python3.6/site-packages/xapi/storage/api/v5/
directory
SMAPIv3 Plugins
For SMAPIv3 we have
volume plugins to manipulate SRs and volumes (=VDIs) in them, and
datapath plugins for connecting to the volumes. Volume plugins tell us
which datapath plugins we can use with each volume, and what to pass to
the plugin. Both volume and datapath plugins implement some common
functionality: the SMAPIv3 plugin
interface.
How SMAPIv3 works:
The xapi-storage-script daemon detects volume and datapath plugins
stored in subdirectories of the
/usr/libexec/xapi-storage-script/volume/ and
/usr/libexec/xapi-storage-script/datapath/ directories, respectively.
When it finds a new datapath plugin, it adds the plugin to a lookup table and
uses it the next time that datapath is required. When it finds a new volume
plugin, it binds a new message-switch queue named after the pluginās
subdirectory to a new server instance that uses these volume scripts.
To invoke a SMAPIv3 method, it executes a program named
<Interface name>.<function name> in the pluginās directory, for
example
/usr/libexec/xapi-storage-script/volume/org.xen.xapi.storage.gfs2/SR.ls.
The inputs to each script can be passed as command-line arguments and
are type-checked using the generated Python bindings, and so are the
outputs. The URIs of the SRs that xapi-storage-script knows about are
stored in the /var/run/nonpersistent/xapi-storage-script/state.db
file, these URIs can be used on the command line when an sr argument is
expected.
Registration of the various SMAPIv3 plugins
---
theme: ''
---
sequenceDiagram
participant q as message-switch
participant v1 as (Storage_access.SMAPIv1)
participant svr as Storage_mux.Server
participant vol_dir as /../volume/
participant dp_dir as /../datapath/
participant script as xapi-storage-script
Note over script, vol_dir: xapi-storage-script startup
script ->> vol_dir: new subdir org.xen.xapi.storage.sr_type_4
q ->> script: org.xen.xapi.storage.sr_type_4
script ->> dp_dir: new subdir sr_type_4_dp
Note over q, svr: xapi startup, "Starting SMAPIv1 proxies"
q -->> v1:org.xen.xapi.storage.sr_type_1
q -->> v1:org.xen.xapi.storage.sr_type_2
q -->> v1:org.xen.xapi.storage.sr_type_3
Note over q, svr: xapi startup, "Starting SM service"
q ->> svr:org.xen.xapi.storage
Note over q, svr: SR.create, PBD.plug
svr ->> q:org.xapi.storage.sr_type_4
What happens when a SMAPIv3 āfunctionā is called
---
theme: ''
---
graph TD
call[SMAPIv2 call] --VDI.attach2--> org.xen.xapi.storage
subgraph message-switch
org.xen.xapi.storage
org.xen.xapi.storage.SR_type_x
end
org.xen.xapi.storage --VDI.attach2--> Storage_impl.Wrapper
subgraph xapi
subgraph Storage_mux.server
Storage_impl.Wrapper --> Storage_mux.mux
end
Storage_access.SMAPIv1
end
Storage_mux.mux --VDI.attach2--> org.xen.xapi.storage.SR_type_x
org.xen.xapi.storage.SR_type_x -."VDI.attach2".-> Storage_access.SMAPIv1
subgraph SMAPIv1
driver_x[SMAPIv1 driver for SR_type_x]
end
Storage_access.SMAPIv1 -.vdi_attach.-> driver_x
subgraph SMAPIv3
xapi-storage-script --Datapath.attach--> v3_dp_plugin_x
subgraph SMAPIv3 plugins
v3_vol_plugin_x[volume plugin for SR_type_x]
v3_dp_plugin_x[datapath plugin for SR_type_x]
end
end
org.xen.xapi.storage.SR_type_x --VDI.attach2-->xapi-storage-script
The file
xcp-idl/storage/storage_interface.ml
defines a number of SMAPIv2 errors, ultimately all errors from the various
SMAPIv2 storage servers in xapi will be returned as one of these. Most of the
errors arenāt converted into a specific exception in Storage_interface, but
are simply wrapped with Storage_interface.Backend_error.
The
Storage_utils.transform_storage_exn
function is used by the client code in xapi to translate the SMAPIv2
errors into XenAPI errors again, this unwraps the errors wrapped with
Storage_interface.Backend_error.
Message Forwarding
In the message forwarding layer, first we check the validity of VDI
operations using mark_vdi and mark_sr. These first check that the
operation is valid operations,
using Xapi_vdi.check_operation_error,
for mark_vdi, which also inspects the current operations of the VDI,
and then, if the operation is valid, it is added to the VDIās current
operations, and update_allowed_operations is called. Then we forward
the VDI operation to a suitable host that has a PBD plugged for the
VDIās SR.
Checking that the SR is attached
For the VDI operations, we check at two different places whether the SR
is attached: first, at the Xapi level, in
Xapi_vdi.check_operation_error,
for the resize operation, and then, at the SMAPIv1 level, in
Sm.assert_pbd_is_plugged. Sm.assert_pbd_is_plugged performs the
same checks, plus it checks that the PBD is attached to the localhost,
unlike Xapi_vdi.check_operation_error. This behaviour is correct,
because Xapi_vdi.check_operation_error is called from the message
forwarding layer, which forwards the call to a host that has the SR
attached.
VDI Identifiers and Storage Motion
VDI ālocationā: this is the VDI identifier used by the SM backend.
It is usually the UUID of the VDI, but for ISO SRs it is the name of
the ISO.
VDI ācontent_idā: this is used for storage motion, to reduce the
amount of data copied. When we copy over a VDI, the content_id will
initially be the same. However, when we attach a VDI as read-write,
and then detach it, then we will blank its content_id (set it to a
random UUID), because we may have written to it, so the content
could be different. .
Subsections of Storage
Storage migration
Overview
---
theme: ''
---
sequenceDiagram
participant local_tapdisk as local tapdisk
participant local_smapiv2 as local SMAPIv2
participant xapi
participant remote_xapi as remote xapi
participant remote_smapiv2 as remote SMAPIv2 (might redirect)
participant remote_tapdisk as remote tapdisk
Note over xapi: Sort VDIs increasingly by size and then age
loop VM's & snapshots' VDIs & suspend images
xapi->>remote_xapi: plug dest SR to dest host and pool master
alt VDI is not mirrored
Note over xapi: We don't mirror RO VDIs & VDIs of snapshots
xapi->>local_smapiv2: DATA.copy remote_sm_url
activate local_smapiv2
local_smapiv2-->>local_smapiv2: SR.scan
local_smapiv2-->>local_smapiv2: VDI.similar_content
local_smapiv2-->>remote_smapiv2: SR.scan
Note over local_smapiv2: Find nearest smaller remote VDI remote_base, if any
alt remote_base
local_smapiv2-->>remote_smapiv2: VDI.clone
local_smapiv2-->>remote_smapiv2: VDI.resize
else no remote_base
local_smapiv2-->>remote_smapiv2: VDI.create
end
Note over local_smapiv2: call copy'
activate local_smapiv2
local_smapiv2-->>remote_smapiv2: SR.list
local_smapiv2-->>remote_smapiv2: SR.scan
Note over local_smapiv2: create new datapaths remote_dp, base_dp, leaf_dp
Note over local_smapiv2: find local base_vdi with same content_id as dest, if any
local_smapiv2-->>remote_smapiv2: VDI.attach2 remote_dp dest
local_smapiv2-->>remote_smapiv2: VDI.activate remote_dp dest
opt base_vdi
local_smapiv2-->>local_smapiv2: VDI.attach2 base_dp base_vdi
local_smapiv2-->>local_smapiv2: VDI.activate base_dp base_vdi
end
local_smapiv2-->>local_smapiv2: VDI.attach2 leaf_dp vdi
local_smapiv2-->>local_smapiv2: VDI.activate leaf_dp vdi
local_smapiv2-->>remote_xapi: sparse_dd base_vdi vdi dest [NBD URI for dest & remote_dp]
Note over remote_xapi: HTTP handler verifies credentials
remote_xapi-->>remote_tapdisk: then passes connection to tapdisk's NBD server
local_smapiv2-->>local_smapiv2: VDI.deactivate leaf_dp vdi
local_smapiv2-->>local_smapiv2: VDI.detach leaf_dp vdi
opt base_vdi
local_smapiv2-->>local_smapiv2: VDI.deactivate base_dp base_vdi
local_smapiv2-->>local_smapiv2: VDI.detach base_dp base_vdi
end
local_smapiv2-->>remote_smapiv2: DP.destroy remote_dp
deactivate local_smapiv2
local_smapiv2-->>remote_smapiv2: VDI.snapshot remote_copy
local_smapiv2-->>remote_smapiv2: VDI.destroy remote_copy
local_smapiv2->>xapi: task(snapshot)
deactivate local_smapiv2
else VDI is mirrored
Note over xapi: We mirror RW VDIs of the VM
Note over xapi: create new datapath dp
xapi->>local_smapiv2: VDI.attach2 dp
xapi->>local_smapiv2: VDI.activate dp
xapi->>local_smapiv2: DATA.MIRROR.start dp remote_sm_url
activate local_smapiv2
Note over local_smapiv2: copy disk data & mirror local writes
local_smapiv2-->>local_smapiv2: SR.scan
local_smapiv2-->>local_smapiv2: VDI.similar_content
local_smapiv2-->>remote_smapiv2: DATA.MIRROR.receive_start similars
activate remote_smapiv2
remote_smapiv2-->>local_smapiv2: mirror_vdi,mirror_dp,copy_diffs_from,copy_diffs_to,dummy_vdi
deactivate remote_smapiv2
local_smapiv2-->>local_smapiv2: DP.attach_info dp
local_smapiv2-->>remote_xapi: connect to [NBD URI for mirror_vdi & mirror_dp]
Note over remote_xapi: HTTP handler verifies credentials
remote_xapi-->>remote_tapdisk: then passes connection to tapdisk's NBD server
local_smapiv2-->>local_tapdisk: pass socket & dp to tapdisk of dp
local_smapiv2-->>local_smapiv2: VDI.snapshot local_vdi [mirror:dp]
local_smapiv2-->>local_tapdisk: [Python] unpause disk, pass dp
local_tapdisk-->>remote_tapdisk: mirror new writes via NBD to socket
Note over local_smapiv2: call copy' snapshot copy_diffs_to
local_smapiv2-->>remote_smapiv2: VDI.compose copy_diffs_to mirror_vdi
local_smapiv2-->>remote_smapiv2: VDI.remove_from_sm_config mirror_vdi base_mirror
local_smapiv2-->>remote_smapiv2: VDI.destroy dummy_vdi
local_smapiv2-->>local_smapiv2: VDI.destroy snapshot
local_smapiv2->>xapi: task(mirror ID)
deactivate local_smapiv2
xapi->>local_smapiv2: DATA.MIRROR.stat
activate local_smapiv2
local_smapiv2->>xapi: dest_vdi
deactivate local_smapiv2
end
loop until task finished
xapi->>local_smapiv2: UPDATES.get
xapi->>local_smapiv2: TASK.stat
end
xapi->>local_smapiv2: TASK.stat
xapi->>local_smapiv2: TASK.destroy
end
opt for snapshot VDIs
xapi->>local_smapiv2: SR.update_snapshot_info_src remote_sm_url
activate local_smapiv2
local_smapiv2-->>remote_smapiv2: SR.update_snapshot_info_dest
deactivate local_smapiv2
end
Note over xapi: ...
Note over xapi: reserve resources for the new VM in dest host
loop all VDIs
opt VDI is mirrored
xapi->>local_smapiv2: DP.destroy dp
end
end
opt post_detach_hook
opt active local mirror
local_smapiv2-->>remote_smapiv2: DATA.MIRROR.receive_finalize [mirror ID]
Note over remote_smapiv2: destroy mirror dp
end
end
Note over xapi: memory image migration by xenopsd
Note over xapi: destroy the VM record
Receiving SXM
These are the remote calls in the above diagram sent from the remote host to
the receiving end of storage motion:
Remote SMAPIv2 -> local SMAPIv2 RPC calls:
SR.list
SR.scan
SR.update_snapshot_info_dest
VDI.attach2
VDI.activate
VDI.snapshot
VDI.destroy
For copying:
For copying from base:
VDI.clone
VDI.resize
For copying without base:
VDI.create
For mirroring:
DATA.MIRROR.receive_start
VDI.compose
VDI.remove_from_sm_config
DATA.MIRROR.receive_finalize
HTTP requests to xapi:
Connecting to NBD URI via xapiās HTTP handler
This is how xapi coordinates storage migration. Weāll do it as a code walkthrough through the two layers: xapi and storage-in-xapi (SMAPIv2).
a dictionary of (string * string) key-value pairs about the destination (dest). This is the result of a previous call to the destination pool, Host.migrate_receive
live, a boolean of whether we should live-migrate or suspend-resume,
vdi_map, a mapping of VDI references to destination SR references,
vif_map, a mapping of VIF references to destination network references,
vgpu_map, similar for VGPUs
options, another dictionary of options
let migrate_send' ~__context ~vm ~dest ~live ~vdi_map ~vif_map ~vgpu_map ~options = SMPERF.debug "vm.migrate_send called vm:%s"(Db.VM.get_uuid ~__context ~self:vm);letopenXapi_xenopsinlet localhost = Helpers.get_localhost ~__context inlet remote = remote_of_dest dest in(* Copy mode means we don't destroy the VM on the source host. We also don't
copy over the RRDs/messages *)let copy =try bool_of_string (List.assoc "copy" options)with_-> false in
It begins by getting the local host reference, deciding whether weāre copying or moving, and converting the input dest parameter from an untyped string association list to a typed record, remote, which is declared further up the file:
type remote ={ rpc : Rpc.call -> Rpc.response; session : API.ref_session; sm_url :string; xenops_url :string; master_url :string; remote_ip :string;(* IP address *) remote_master_ip :string;(* IP address *) dest_host : API.ref_host;}
this contains:
A function, rpc, for calling XenAPI RPCs on the destination
A session valid on the destination
A sm_url on which SMAPIv2 APIs can be called on the destination
A master_url on which XenAPI commands can be called (not currently used)
The IP address, remote_ip, of the destination host
The IP address, remote_master_ip, of the master of the destination pool
Next, we determine which VDIs to copy:
(* The first thing to do is to create mirrors of all the disks on the remote.
We look through the VM's VBDs and all of those of the snapshots. We then
compile a list of all of the associated VDIs, whether we mirror them or not
(mirroring means we believe the VDI to be active and new writes should be
mirrored to the destination - otherwise we just copy it)
We look at the VDIs of the VM, the VDIs of all of the snapshots, and any
suspend-image VDIs. *)let vm_uuid = Db.VM.get_uuid ~__context ~self:vm inlet vbds = Db.VM.get_VBDs ~__context ~self:vm inlet vifs = Db.VM.get_VIFs ~__context ~self:vm inlet snapshots = Db.VM.get_snapshots ~__context ~self:vm inlet vm_and_snapshots = vm :: snapshots inlet snapshots_vbds = List.concat_map (fun self -> Db.VM.get_VBDs ~__context ~self) snapshots inlet snapshot_vifs = List.concat_map (fun self -> Db.VM.get_VIFs ~__context ~self) snapshots in
we now decide whether weāre intra-pool or not, and if weāre intra-pool whether weāre migrating onto the same host (localhost migrate). Intra-pool is decided by trying to do a lookup of our current host uuid on the destination pool.
let is_intra_pool =try ignore(Db.Host.get_uuid ~__context ~self:remote.dest_host); true with_-> false inlet is_same_host = is_intra_pool && remote.dest_host == localhost inif copy && is_intra_pool thenraise(Api_errors.Server_error(Api_errors.operation_not_allowed,["Copy mode is disallowed on intra pool storage migration, try efficient alternatives e.g. VM.copy/clone."]));
Having got all of the VBDs of the VM, we now need to find the associated VDIs, filtering out empty CDs, and decide whether weāre going to copy them or mirror them - read-only VDIs can be copied but RW VDIs must be mirrored.
let vms_vdis = List.filter_map (vdi_filter __context true) vbds in
where vdi_filter is defined earler:
(* We ignore empty or CD VBDs - nothing to do there. Possible redundancy here:
I don't think any VBDs other than CD VBDs can be 'empty' *)let vdi_filter __context allow_mirror vbd =if Db.VBD.get_empty ~__context ~self:vbd || Db.VBD.get_type ~__context ~self:vbd =`CDthenNoneelselet do_mirror = allow_mirror &&(Db.VBD.get_mode ~__context ~self:vbd =`RW)inlet vm = Db.VBD.get_VM ~__context ~self:vbd inlet vdi = Db.VBD.get_VDI ~__context ~self:vbd inSome(get_vdi_mirror __context vm vdi do_mirror)
This in turn calls get_vdi_mirror which gathers together some important info:
type vdi_mirror ={ vdi :[`VDI] API.Ref.t;(* The API reference of the local VDI *) dp :string;(* The datapath the VDI will be using if the VM is running *) location :string;(* The location of the VDI in the current SR *) sr :string;(* The VDI's current SR uuid *) xenops_locator :string;(* The 'locator' xenops uses to refer to the VDI on the current host *) size : Int64.t;(* Size of the VDI *) snapshot_of :[`VDI] API.Ref.t;(* API's snapshot_of reference *) do_mirror :bool;(* Whether we should mirror or just copy the VDI *)}
xenops_locator is <sr uuid>/<vdi uuid>, and dp is vbd/<domid>/<device> if the VM is running and vbd/<vm_uuid>/<vdi_uuid> if not.
So now we have a list of these records for all VDIs attached to the VM. For these we check explicitly that theyāre all defined in the vdi_map, the mapping of VDI references to their destination SR references.
check_vdi_map ~__context vms_vdis vdi_map;
We then figure out the VIF map:
let vif_map =if is_intra_pool then vif_map
else infer_vif_map ~__context (vifs @ snapshot_vifs) vif_map
in
More sanity checks: We canāt do a storage migration if any of the VDIs is a reset-on-boot one - since the state will be lost on the destination when itās attached:
(* Block SXM when VM has a VDI with on_boot=reset *) List.(iter (fun vconf ->let vdi = vconf.vdi inif(Db.VDI.get_on_boot ~__context ~self:vdi ==`reset)thenraise(Api_errors.Server_error(Api_errors.vdi_on_boot_mode_incompatible_with_operation,[Ref.string_of vdi]))) vms_vdis);
We now consider all of the VDIs associated with the snapshots. As for the VMās VBDs above, we end up with a vdi_mirror list. Note we pass false to the allow_mirror parameter of the get_vdi_mirror function as none of these snapshot VDIs will ever require mirrorring.
let snapshots_vdis = List.filter_map (vdi_filter __context false)
Finally we get all of the suspend-image VDIs from all snapshots as well as the actual VM, since it might be suspended itself:
snapshots_vbds inlet suspends_vdis = List.fold_left
(fun acc vm ->if Db.VM.get_power_state ~__context ~self:vm =`Suspendedthenlet vdi = Db.VM.get_suspend_VDI ~__context ~self:vm inlet sr = Db.VDI.get_SR ~__context ~self:vdi inif is_intra_pool && Helpers.host_has_pbd_for_sr ~__context ~host:remote.dest_host ~sr
then acc
else(get_vdi_mirror __context vm vdi false):: acc
else acc) [] vm_and_snapshots in
Sanity check that we can see all of the suspend-image VDIs on this host:
(* Double check that all of the suspend VDIs are all visible on the source *) List.iter (fun vdi_mirror ->let sr = Db.VDI.get_SR ~__context ~self:vdi_mirror.vdi inif not (Helpers.host_has_pbd_for_sr ~__context ~host:localhost ~sr)thenraise(Api_errors.Server_error(Api_errors.suspend_image_not_accessible,[ Ref.string_of vdi_mirror.vdi ]))) suspends_vdis;
Next is a fairly complex piece that determines the destination SR for all of these VDIs. We donāt require API uses to decide destinations for all of the VDIs on snapshots and hence we have to make some decisions here:
let dest_pool = List.hd (XenAPI.Pool.get_all remote.rpc remote.session)inlet default_sr_ref = XenAPI.Pool.get_default_SR remote.rpc remote.session dest_pool inlet suspend_sr_ref =let pool_suspend_SR = XenAPI.Pool.get_suspend_image_SR remote.rpc remote.session dest_pool
and host_suspend_SR = XenAPI.Host.get_suspend_image_sr remote.rpc remote.session remote.dest_host inif pool_suspend_SR <> Ref.null then pool_suspend_SR else host_suspend_SR in(* Resolve placement of unspecified VDIs here - unspecified VDIs that
are 'snapshot_of' a specified VDI go to the same place. suspend VDIs
that are unspecified go to the suspend_sr_ref defined above *)let extra_vdis = suspends_vdis @ snapshots_vdis inlet extra_vdi_map = List.map
(fun vconf ->let dest_sr_ref =let is_mapped = List.mem_assoc vconf.vdi vdi_map
and snapshot_of_is_mapped = List.mem_assoc vconf.snapshot_of vdi_map
and is_suspend_vdi = List.mem vconf suspends_vdis
and remote_has_suspend_sr = suspend_sr_ref <> Ref.null
and remote_has_default_sr = default_sr_ref <> Ref.null inlet log_prefix = Printf.sprintf "Resolving VDI->SR map for VDI %s:"(Db.VDI.get_uuid ~__context ~self:vconf.vdi)inif is_mapped thenbegin debug "%s VDI has been specified in the map" log_prefix; List.assoc vconf.vdi vdi_map
endelseif snapshot_of_is_mapped thenbegin debug "%s Snapshot VDI has entry in map for it's snapshot_of link" log_prefix; List.assoc vconf.snapshot_of vdi_map
endelseif is_suspend_vdi && remote_has_suspend_sr thenbegin debug "%s Mapping suspend VDI to remote suspend SR" log_prefix; suspend_sr_ref
endelseif is_suspend_vdi && remote_has_default_sr thenbegin debug "%s Remote suspend SR not set, mapping suspend VDI to remote default SR" log_prefix; default_sr_ref
endelseif remote_has_default_sr thenbegin debug "%s Mapping unspecified VDI to remote default SR" log_prefix; default_sr_ref
endelsebegin error "%s VDI not in VDI->SR map and no remote default SR is set" log_prefix;raise(Api_errors.Server_error(Api_errors.vdi_not_in_map,[ Ref.string_of vconf.vdi ]))endin(vconf.vdi, dest_sr_ref)) extra_vdis in
At the end of this weāve got all of the VDIs that need to be copied and destinations for all of them:
let vdi_map = vdi_map @ extra_vdi_map inlet all_vdis = vms_vdis @ extra_vdis in(* The vdi_map should be complete at this point - it should include all the
VDIs in the all_vdis list. *)
check thereās no CBT (we canāt currently migrate the CBT metadata), make our client to talk to Xenopsd, make a mutable list of remote VDIs (which I think is redundant right now), decide whether we need to do anything for HA (we disable HA protection for this VM on the destination until itās fully migrated) and eject any CDs from the VM.
Up until now this has mostly been gathering info (aside from the ejecting CDs bit), but now weāll start to do some actions, so we begin a try-catch block:
try
but weāve still got a bit of thinking to do: we sort the VDIs to copy based on age/size:
(* Sort VDIs by size in principle and then age secondly. This gives better
chances that similar but smaller VDIs would arrive comparatively
earlier, which can serve as base for incremental copying the larger
ones. *)let compare_fun v1 v2 =let r = Int64.compare v1.size v2.size inif r = 0 thenlet t1 = Date.to_unix_time (Db.VDI.get_snapshot_time ~__context ~self:v1.vdi)inlet t2 = Date.to_unix_time (Db.VDI.get_snapshot_time ~__context ~self:v2.vdi)in compare t1 t2
else r inlet all_vdis = all_vdis |> List.sort compare_fun inlet total_size = List.fold_left (fun acc vconf -> Int64.add acc vconf.size) 0L all_vdis inlet so_far = ref 0L in
The copy functions are written such that they take continuations. This it to make the error handling simpler - each individual component function can perform its setup and execute the continuation. In the event of an exception coming from the continuation it can then unroll its bit of state and rethrow the exception for the next layer to handle.
with_many is a simple helper function for nesting invocations of functions that take continuations. It has the delightful type:
('a -> ('b -> 'c) -> 'c) -> 'a list -> ('b list -> 'c) -> 'c
(* Helper function to apply a 'with_x' function to a list *)letrec with_many withfn many fn =letrec inner l acc =match l with| [] -> fn acc
| x::xs -> withfn x (fun y -> inner xs (y::acc))in inner many []
As an example of its operation, imagine our withfn is as follows:
let withfn x c = Printf.printf "Starting withfn: x=%d\n" x;try c (string_of_int x)with e -> Printf.printf "Handling exception for x=%d\n" x;raise e;;
All the real action is in vdi_copy_fun, which copies or mirrors a single VDI:
let vdi_copy_fun __context dbg vdi_map remote is_intra_pool remote_vdis so_far total_size copy vconf continuation = TaskHelper.exn_if_cancelling ~__context;letopenStorage_accessinlet dest_sr_ref = List.assoc vconf.vdi vdi_map inlet dest_sr_uuid = XenAPI.SR.get_uuid remote.rpc remote.session dest_sr_ref in(* Plug the destination shared SR into destination host and pool master if unplugged.
Plug the local SR into destination host only if unplugged *)let dest_pool = List.hd (XenAPI.Pool.get_all remote.rpc remote.session)inlet master_host = XenAPI.Pool.get_master remote.rpc remote.session dest_pool inlet pbds = XenAPI.SR.get_PBDs remote.rpc remote.session dest_sr_ref inlet pbd_host_pair = List.map (fun pbd ->(pbd, XenAPI.PBD.get_host remote.rpc remote.session pbd)) pbds inlet hosts_to_be_attached =[master_host; remote.dest_host]inlet pbds_to_be_plugged = List.filter (fun(_, host)->(List.mem host hosts_to_be_attached)&&(XenAPI.Host.get_enabled remote.rpc remote.session host)) pbd_host_pair in List.iter (fun(pbd,_)->if not (XenAPI.PBD.get_currently_attached remote.rpc remote.session pbd)then XenAPI.PBD.plug remote.rpc remote.session pbd) pbds_to_be_plugged;
It begins by attempting to ensure the SRs we require are definitely attached on the destination host and on the destination pool master.
Thereās now a little logic to support the case where we have cross-pool SRs and the VDI is already visible to the destination pool. Since this is outside our normal support envelope there is a key in xapi_globs that has to be set (via xapi.conf) to enable this:
letrec dest_vdi_exists_on_sr vdi_uuid sr_ref retry =trylet dest_vdi_ref = XenAPI.VDI.get_by_uuid remote.rpc remote.session vdi_uuid inlet dest_vdi_sr_ref = XenAPI.VDI.get_SR remote.rpc remote.session dest_vdi_ref inif dest_vdi_sr_ref = sr_ref then true
else false
with_->if retry thenbegin XenAPI.SR.scan remote.rpc remote.session sr_ref; dest_vdi_exists_on_sr vdi_uuid sr_ref false
endelse false
in(* CP-4498 added an unsupported mode to use cross-pool shared SRs - the initial
use case is for a shared raw iSCSI SR (same uuid, same VDI uuid) *)let vdi_uuid = Db.VDI.get_uuid ~__context ~self:vconf.vdi inlet mirror =if!Xapi_globs.relax_xsm_sr_check thenif(dest_sr_uuid = vconf.sr)thenbegin(* Check if the VDI uuid already exists in the target SR *)if(dest_vdi_exists_on_sr vdi_uuid dest_sr_ref true)then false
else failwith ("SR UUID matches on destination but VDI does not exist")endelse true
else(not is_intra_pool)||(dest_sr_uuid <> vconf.sr)in
The check also covers the case where weāre doing an intra-pool migration and not copying all of the disks, in which case we donāt need to do anything for that disk.
We now have a wrapper function that creates a new datapath and passes it to a continuation function. On error it handles the destruction of the datapath:
let with_new_dp cont =let dp = Printf.sprintf (if vconf.do_mirror then"mirror_%s"else"copy_%s") vconf.dp intry cont dp
with e ->(try SMAPI.DP.destroy ~dbg ~dp ~allow_leak:false with_-> info "Failed to cleanup datapath: %s" dp);raise e in
and now a helper that, given a remote VDI uuid, looks up the reference on the remote host and gives it to a continuation function. On failure of the continuation it will destroy the remote VDI:
let with_remote_vdi remote_vdi cont = debug "Executing remote scan to ensure VDI is known to xapi"; XenAPI.SR.scan remote.rpc remote.session dest_sr_ref;let query = Printf.sprintf "(field \"location\"=\"%s\") and (field \"SR\"=\"%s\")" remote_vdi (Ref.string_of dest_sr_ref)inlet vdis = XenAPI.VDI.get_all_records_where remote.rpc remote.session query inlet remote_vdi_ref =match vdis with| [] ->raise(Api_errors.Server_error(Api_errors.vdi_location_missing,[Ref.string_of dest_sr_ref; remote_vdi]))| h :: [] -> debug "Found remote vdi reference: %s"(Ref.string_of (fst h)); fst h
|_->raise(Api_errors.Server_error(Api_errors.location_not_unique,[Ref.string_of dest_sr_ref; remote_vdi]))intry cont remote_vdi_ref
with e ->(try XenAPI.VDI.destroy remote.rpc remote.session remote_vdi_ref with_-> error "Failed to destroy remote VDI");raise e in
another helper to gather together info about a mirrored VDI:
let mirror_to_remote new_dp =let task =if not vconf.do_mirror then SMAPI.DATA.copy ~dbg ~sr:vconf.sr ~vdi:vconf.location ~dp:new_dp ~url:remote.sm_url ~dest:dest_sr_uuid
elsebegin(* Though we have no intention of "write", here we use the same mode as the
associated VBD on a mirrored VDIs (i.e. always RW). This avoids problem
when we need to start/stop the VM along the migration. *)let read_write = true in(* DP set up is only essential for MIRROR.start/stop due to their open ended pattern.
It's not necessary for copy which will take care of that itself. *) ignore(SMAPI.VDI.attach ~dbg ~dp:new_dp ~sr:vconf.sr ~vdi:vconf.location ~read_write); SMAPI.VDI.activate ~dbg ~dp:new_dp ~sr:vconf.sr ~vdi:vconf.location; ignore(Storage_access.register_mirror __context vconf.location); SMAPI.DATA.MIRROR.start ~dbg ~sr:vconf.sr ~vdi:vconf.location ~dp:new_dp ~url:remote.sm_url ~dest:dest_sr_uuid
endinlet mapfn x =let total = Int64.to_float total_size inlet done_ = Int64.to_float !so_far /. total inlet remaining = Int64.to_float vconf.size /. total in done_ +. x *. remaining inletopenStorage_accessinlet task_result = task |> register_task __context
|> add_to_progress_map mapfn
|> wait_for_task dbg
|> remove_from_progress_map
|> unregister_task __context
|> success_task dbg inlet mirror_id, remote_vdi =if not vconf.do_mirror thenlet vdi = task_result |> vdi_of_task dbg in remote_vdis := vdi.vdi ::!remote_vdis;None, vdi.vdi
elselet mirrorid = task_result |> mirror_of_task dbg inlet m = SMAPI.DATA.MIRROR.stat ~dbg ~id:mirrorid inSome mirrorid, m.Mirror.dest_vdi in so_far := Int64.add !so_far vconf.size; debug "Local VDI %s %s to %s" vconf.location (if vconf.do_mirror then"mirrored"else"copied") remote_vdi; mirror_id, remote_vdi in
This is the bit that actually starts the mirroring or copying. Before the call to mirror we call VDI.attach and VDI.activate locally to ensure that if the VM is shutdown then the detach/deactivate there doesnāt kill the mirroring process.
Note the parameters to the SMAPI call are sr and vdi, locating the local VDI and SM backend, new_dp, the datapath weāre using for the mirroring, url, which is the remote url on which SMAPI calls work, and dest, the destination SR uuid. These are also the arguments to copy above too.
Thereās a little function to calculate the overall progress of the task, and the function waits until the completion of the task before it continues. The function success_task will raise an exception if the task failed. For DATA.mirror, completion implies both that the disk data has been copied to the destination and that all local writes are being mirrored to the destination. Hence more cleanup must be done on cancellation. In contrast, if the DATA.copy path had been taken then the operation at this point has completely finished.
The result of this function is an optional mirror id and the remote VDI uuid.
Next, there is a post_mirror function:
let post_mirror mirror_id mirror_record =trylet result = continuation mirror_record in(match mirror_id with|Some mid -> ignore(Storage_access.unregister_mirror mid);|None-> ());if mirror && not (Xapi_fist.storage_motion_keep_vdi () || copy)then Helpers.call_api_functions ~__context (fun rpc session_id -> XenAPI.VDI.destroy rpc session_id vconf.vdi); result
with e ->let mirror_failed =match mirror_id with|Some mid -> ignore(Storage_access.unregister_mirror mid);let m = SMAPI.DATA.MIRROR.stat ~dbg ~id:mid in(try SMAPI.DATA.MIRROR.stop ~dbg ~id:mid with_-> ()); m.Mirror.failed
|None-> false inif mirror_failed thenraise(Api_errors.Server_error(Api_errors.mirror_failed,[Ref.string_of vconf.vdi]))elseraise e in
This is poorly named - it is post mirror and copy. The aim of this function is to destroy the source VDIs on successful completion of the continuation function, which will have migrated the VM to the destination. In its exception handler it will stop the mirroring, but before doing so it will check to see if the mirroring process it was looking after has itself failed, and raise mirror_failed if so. This is because a failed mirror can result in a range of actual errors, and we decide here that the failed mirror was probably the root cause.
These functions are assembled together at the end of the vdi_copy_fun function:
if mirror then
with_new_dp (fun new_dp ->
let mirror_id, remote_vdi = mirror_to_remote new_dp in
with_remote_vdi remote_vdi (fun remote_vdi_ref ->
let mirror_record = get_mirror_record ~new_dp remote_vdi remote_vdi_ref in
post_mirror mirror_id mirror_record))
else
let mirror_record = get_mirror_record vconf.location (XenAPI.VDI.get_by_uuid remote.rpc remote.session vdi_uuid) in
continuation mirror_record
again, mirror here is poorly named, and means mirror or copy.
Once all of the disks have been mirrored or copied, we jump back to the body of migrate_send. We split apart the mirror records according to the source of the VDI:
then we reassemble all_map from this, for some reason:
let all_map = List.concat [suspends_map; snapshots_map; vdi_map]in
Now we need to update the snapshot-of links:
(* All the disks and snapshots have been created in the remote SR(s),
* so update the snapshot links if there are any snapshots. *)if snapshots_map <> [] then update_snapshot_info ~__context ~dbg ~url:remote.sm_url ~vdi_map ~snapshots_map;
Iām not entirely sure why this is done in this layer as opposed to in the storage layer.
A little housekeeping:
let xenops_vdi_map = List.map (fun mirror_record ->(mirror_record.mr_local_xenops_locator, mirror_record.mr_remote_xenops_locator)) all_map in(* Wait for delay fist to disappear *) wait_for_fist __context Xapi_fist.pause_storage_migrate "pause_storage_migrate"; TaskHelper.exn_if_cancelling ~__context;
the fist thing here simply allows tests to put in a delay at this specific point.
We also check the task to see if weāve been cancelled and raise an exception if so.
The VM metadata is now imported into the remote pool, with all the XenAPI level objects remapped:
let new_vm =if is_intra_pool
then vm
else(* Make sure HA replaning cycle won't occur right during the import process or immediately after *)let () =if ha_always_run_reset then XenAPI.Pool.ha_prevent_restarts_for ~rpc:remote.rpc ~session_id:remote.session ~seconds:(Int64.of_float !Xapi_globs.ha_monitor_interval)in(* Move the xapi VM metadata to the remote pool. *)let vms =let vdi_map = List.map (fun mirror_record ->{ local_vdi_reference = mirror_record.mr_local_vdi_reference; remote_vdi_reference =Some mirror_record.mr_remote_vdi_reference;}) all_map inlet vif_map = List.map (fun(vif, network)->{ local_vif_reference = vif; remote_network_reference = network;}) vif_map inlet vgpu_map = List.map (fun(vgpu, gpu_group)->{ local_vgpu_reference = vgpu; remote_gpu_group_reference = gpu_group;}) vgpu_map
in inter_pool_metadata_transfer ~__context ~remote ~vm ~vdi_map
~vif_map ~vgpu_map ~dry_run:false ~live:true ~copy
inlet vm = List.hd vms inlet () =if ha_always_run_reset then XenAPI.VM.set_ha_always_run ~rpc:remote.rpc ~session_id:remote.session ~self:vm ~value:false in(* Reserve resources for the new VM on the destination pool's host *)let () = XenAPI.Host.allocate_resources_for_vm remote.rpc remote.session remote.dest_host vm true in vm in
also make sure all the networks are plugged for the VM on the destination.
Next we create the xenopsd-level vif map, equivalent to the vdi_map above:
(* Create the vif-map for xenops, linking VIF devices to bridge names on the remote *)let xenops_vif_map =let vifs = XenAPI.VM.get_VIFs ~rpc:remote.rpc ~session_id:remote.session ~self:new_vm in List.map (fun vif ->let vifr = XenAPI.VIF.get_record ~rpc:remote.rpc ~session_id:remote.session ~self:vif inlet bridge = Xenops_interface.Network.Local(XenAPI.Network.get_bridge ~rpc:remote.rpc ~session_id:remote.session ~self:vifr.API.vIF_network)in vifr.API.vIF_device, bridge
) vifs
in
Now we destroy any extra mirror datapaths we set up previously:
(* Destroy the local datapaths - this allows the VDIs to properly detach, invoking the migrate_finalize calls *) List.iter (fun mirror_record ->if mirror_record.mr_mirrored
thenmatch mirror_record.mr_dp with|Some dp -> SMAPI.DP.destroy ~dbg ~dp ~allow_leak:false |None-> ()) all_map;
More housekeeping:
SMPERF.debug "vm.migrate_send: migration initiated vm:%s" vm_uuid;(* In case when we do SXM on the same host (mostly likely a VDI
migration), the VM's metadata in xenopsd will be in-place updated
as soon as the domain migration starts. For these case, there
will be no (clean) way back from this point. So we disable task
cancellation for them here.
*)if is_same_host then(TaskHelper.exn_if_cancelling ~__context; TaskHelper.set_not_cancellable ~__context);
Finally we get to the memory-image part of the migration:
(* It's acceptable for the VM not to exist at this point; shutdown commutes with storage migrate *)begintry Xapi_xenops.Events_from_xenopsd.with_suppressed queue_name dbg vm_uuid
(fun () ->let xenops_vgpu_map =(* can raise VGPU_mapping *) infer_vgpu_map ~__context ~remote new_vm in migrate_with_retry
~__context queue_name dbg vm_uuid xenops_vdi_map
xenops_vif_map xenops_vgpu_map remote.xenops_url; Xapi_xenops.Xenopsd_metadata.delete ~__context vm_uuid)with| Xenops_interface.Does_not_exist("VM",_)| Xenops_interface.Does_not_exist("extra",_)-> info "%s: VM %s stopped being live during migration""vm_migrate_send" vm_uuid
|VGPU_mapping(msg)-> info "%s: VM %s - can't infer vGPU map: %s""vm_migrate_send" vm_uuid msg;raise Api_errors.
(Server_error(vm_migrate_failed,([ vm_uuid
; Helpers.get_localhost_uuid ()
; Db.Host.get_uuid ~__context ~self:remote.dest_host
;"The VM changed its power state during migration"])))end; debug "Migration complete"; SMPERF.debug "vm.migrate_send: migration complete vm:%s" vm_uuid;
Now we tidy up after ourselves:
(* So far the main body of migration is completed, and the rests are
updates, config or cleanup on the source and destination. There will
be no (clean) way back from this point, due to these destructive
changes, so we don't want user intervention e.g. task cancellation.
*) TaskHelper.exn_if_cancelling ~__context; TaskHelper.set_not_cancellable ~__context; XenAPI.VM.pool_migrate_complete remote.rpc remote.session new_vm remote.dest_host; detach_local_network_for_vm ~__context ~vm ~destination:remote.dest_host; Xapi_xenops.refresh_vm ~__context ~self:vm;
the function pool_migrate_complete is called on the destination host, and consists of a few things that ordinarily would be set up during VM.start or the like:
let pool_migrate_complete ~__context ~vm ~host =let id = Db.VM.get_uuid ~__context ~self:vm in debug "VM.pool_migrate_complete %s" id;let dbg = Context.string_of_task __context inlet queue_name = Xapi_xenops_queue.queue_of_vm ~__context ~self:vm inif Xapi_xenops.vm_exists_in_xenopsd queue_name dbg id thenbegin Cpuid_helpers.update_cpu_flags ~__context ~vm ~host; Xapi_xenops.set_resident_on ~__context ~self:vm; Xapi_xenops.add_caches id; Xapi_xenops.refresh_vm ~__context ~self:vm; Monitor_dbcalls_cache.clear_cache_for_vm ~vm_uuid:id
end
More tidying up, remapping some remaining VBDs and clearing state on the sender:
(* Those disks that were attached at the point the migration happened will have been
remapped by the Events_from_xenopsd logic. We need to remap any other disks at
this point here *)if is_intra_pool
then List.iter
(fun vm' -> intra_pool_vdi_remap ~__context vm' all_map; intra_pool_fix_suspend_sr ~__context remote.dest_host vm') vm_and_snapshots;(* If it's an inter-pool migrate, the VBDs will still be 'currently-attached=true'
because we supressed the events coming from xenopsd. Destroy them, so that the
VDIs can be destroyed *)if not is_intra_pool && not copy
then List.iter (fun vbd -> Db.VBD.destroy ~__context ~self:vbd)(vbds @ snapshots_vbds); new_vm
in
The remark about the Events_from_xenopsd is that we have a thread watching for events that are emitted by xenopsd, and we resynchronise xapiās state according to xenopsdās state for several fields for which xenopsd is considered the canonical source of truth. One of these is the exact VDI the VBD is associated with.
The suspend_SR field of the VM is set to the sourceās value, so we reset that.
Now we move the RRDs:
if not copy thenbegin Rrdd_proxy.migrate_rrd ~__context ~remote_address:remote.remote_ip ~session_id:(Ref.string_of remote.session)~vm_uuid:vm_uuid ~host_uuid:(Ref.string_of remote.dest_host) ()
end;
This can be done for intra- and inter- pool migrates in the same way, simplifying the logic.
However, for messages and blobs we have to only migrate them for inter-pool migrations:
if not is_intra_pool && not copy thenbegin(* Replicate HA runtime flag if necessary *)if ha_always_run_reset then XenAPI.VM.set_ha_always_run ~rpc:remote.rpc ~session_id:remote.session ~self:new_vm ~value:true;(* Send non-database metadata *) Xapi_message.send_messages ~__context ~cls:`VM~obj_uuid:vm_uuid
~session_id:remote.session ~remote_address:remote.remote_master_ip; Xapi_blob.migrate_push ~__context ~rpc:remote.rpc
~remote_address:remote.remote_master_ip ~session_id:remote.session ~old_vm:vm ~new_vm ;(* Signal the remote pool that we're done *)end;
Lastly, we destroy the VM record on the source:
Helpers.call_api_functions ~__context (fun rpc session_id ->if not is_intra_pool && not copy thenbegin info "Destroying VM ref=%s uuid=%s"(Ref.string_of vm) vm_uuid; Xapi_vm_lifecycle.force_state_reset ~__context ~self:vm ~value:`Halted; List.iter (fun self -> Db.VM.destroy ~__context ~self) vm_and_snapshots
end); SMPERF.debug "vm.migrate_send exiting vm:%s" vm_uuid; new_vm
The exception handler still has to clean some state, but mostly things are handled in the CPS functions declared above:
with e -> error "Caught %s: cleaning up"(Printexc.to_string e);(* We do our best to tidy up the state left behind *) Events_from_xenopsd.with_suppressed queue_name dbg vm_uuid (fun () ->trylet_, state = XenopsAPI.VM.stat dbg vm_uuid inif Xenops_interface.(state.Vm.power_state =Suspended)thenbegin debug "xenops: %s: shutting down suspended VM" vm_uuid; Xapi_xenops.shutdown ~__context ~self:vm None;end;with_-> ());if not is_intra_pool && Db.is_valid_ref __context vm thenbegin List.map (fun self -> Db.VM.get_uuid ~__context ~self) vm_and_snapshots
|> List.iter (fun self ->trylet vm_ref = XenAPI.VM.get_by_uuid remote.rpc remote.session self in info "Destroying stale VM uuid=%s on destination host" self; XenAPI.VM.destroy remote.rpc remote.session vm_ref
with e -> error "Caught %s while destroying VM uuid=%s on destination host"(Printexc.to_string e) self)end;let task = Context.get_task_id __context inlet oc = Db.Task.get_other_config ~__context ~self:task inif List.mem_assoc "mirror_failed" oc thenbeginlet failed_vdi = List.assoc "mirror_failed" oc inlet vconf = List.find (fun vconf -> vconf.location=failed_vdi) vms_vdis in debug "Mirror failed for VDI: %s" failed_vdi;raise(Api_errors.Server_error(Api_errors.mirror_failed,[Ref.string_of vconf.vdi]))end; TaskHelper.exn_if_cancelling ~__context;beginmatch e with| Storage_interface.Backend_error(code, params)->raise(Api_errors.Server_error(code, params))| Storage_interface.Unimplemented(code)->raise(Api_errors.Server_error(Api_errors.unimplemented_in_sm_backend,[code]))| Xenops_interface.Cancelled_-> TaskHelper.raise_cancelled ~__context
|_->raise e
end
Failures during the migration can result in the VM being in a suspended state. Thereās no point leaving it like this since thereās nothing that can be done to resume it, so we force shut it down.
We also try to remove the VM record from the destination if we managed to send it there.
Finally we check for mirror failure in the task - this is set by the events thread watching for events from the storage layer, in storage_access.ml
Storage code
The part of the code that is conceptually in the storage layer, but physically in xapi, is located in
storage_migrate.ml. There are logically a few separate parts to this file:
A stateful module for persisting state across xapi restarts.
Letās start by considering the way the storage APIs are intended to be used.
Copying a VDI
DATA.copy takes several parameters:
dbg - a debug string
sr - the source SR (a uuid)
vdi - the source VDI (a uuid)
dp - unused
url - a URL on which SMAPIv2 API calls can be made
sr - the destination SR in which the VDI should be copied
and returns a parameter of type Task.id. The API call is intended to be called in an asynchronous fashion - ie., the caller makes the call, receives the task ID back and polls or uses the event mechanism to wait until the task has completed. The task may be cancelled via the Task.cancel API call. The result of the operation is obtained by calling TASK.stat, which returns a record:
type t ={ id: id; dbg:string; ctime:float; state: state; subtasks:(string* state)list; debug_info:(string*string)list; backtrace:string;}
Where the state field contains the result once the task has completed:
type async_result_t =|Vdi_infoof vdi_info
|Mirror_idof Mirror.id
type completion_t ={ duration :float; result : async_result_t option
}type state =|Pendingoffloat|Completedof completion_t
|Failedof Rpc.t
Once the result has been obtained from the task, the task should be destroyed via the TASK.destroy API call.
The implementation uses the url parameter to make SMAPIv2 calls to the destination SR. This is used, for example, to invoke a VDI.create call if necessary. The URL contains an authentication token within it (valid for the duration of the XenAPI call that caused this DATA.copy API call).
The implementation tries to minimize the amount of data copied by looking for related VDIs on the destination SR. See below for more details.
Mirroring a VDI
DATA.MIRROR.start takes a similar set of parameters to that of copy:
dbg - a debug string
sr - the source SR (a uuid)
vdi - the source VDI (a uuid)
dp - the datapath on which the VDI has been attached
url - a URL on which SMAPIv2 API calls can be made
sr - the destination SR in which the VDI should be copied
Similar to copy above, this returns a task id. The task ācompletesā once the mirror has been set up - that is, at any point afterwards we can detach the disk and the destination disk will be identical to the source. Unlike for copy the operation is ongoing after the API call completes, since new writes need to be mirrored to the destination. Therefore the completion type of the mirror operation is Mirror_id which contains a handle on which further API calls related to the mirror call can be made. For example MIRROR.stat whose signature is:
Here we are constructing a module Remote on which we can do SMAPIv2 calls directly on the destination.
try
Wrap the whole function in an exception handler.
(* Find the local VDI *)let vdis = Local.SR.scan ~dbg ~sr inlet local_vdi =try List.find (fun x -> x.vdi = vdi) vdis
withNot_found-> failwith (Printf.sprintf "Local VDI %s not found" vdi)in
We first find the metadata for our source VDI by doing a local SMAPIv2 call SR.scan. This returns a list of VDI metadata, out of which we extract the VDI weāre interested in.
try
Another exception handler. This looks redundant to me right now.
let similar_vdis = Local.VDI.similar_content ~dbg ~sr ~vdi in
let similars = List.map (fun vdi -> vdi.content_id) similar_vdis in
debug "Similar VDIs to %s = [ %s ]" vdi (String.concat "; " (List.map (fun x -> Printf.sprintf "(vdi=%s,content_id=%s)" x.vdi x.content_id) similar_vdis));
Here we look for related VDIs locally using the VDI.similar_content SMAPIv2 API call. This searches for related VDIs and returns an ordered list where the most similar is first in the list. It returns both clones and snapshots, and hence is more general than simply following snapshot_of links.
let remote_vdis = Remote.SR.scan ~dbg ~sr:dest in
(** We drop cbt_metadata VDIs that do not have any actual data *)
let remote_vdis = List.filter (fun vdi -> vdi.ty <> "cbt_metadata") remote_vdis in
let nearest = List.fold_left
(fun acc content_id -> match acc with
| Some x -> acc
| None ->
try Some (List.find (fun vdi -> vdi.content_id = content_id && vdi.virtual_size <= local_vdi.virtual_size) remote_vdis)
with Not_found -> None) None similars in
debug "Nearest VDI: content_id=%s vdi=%s"
(Opt.default "None" (Opt.map (fun x -> x.content_id) nearest))
(Opt.default "None" (Opt.map (fun x -> x.vdi) nearest));
Here we look for VDIs on the destination with the same content_id as one of the locally similar VDIs. We will use this as a base image and only copy deltas to the destination. This is done by cloning the VDI on the destination and then using sparse_dd to find the deltas from our local disk to our local copy of the content_id disk and streaming these to the destination. Note that we need to ensure the VDI is smaller than the one we want to copy since we canāt resize disks downwards in size.
If weāve found a base VDI we clone it and resize it immediately. If thereās nothing on the destination already we can use, we just create a new VDI. Note that the calls to create and clone may well fail if the destination host is not the SRmaster. This is handled purely in the rpc function:
letrec rpc ~srcstr ~dststr url call =let result = XMLRPC_protocol.rpc ~transport:(transport_of_url url)~srcstr ~dststr ~http:(xmlrpc ~version:"1.0"?auth:(Http.Url.auth_of url)~query:(Http.Url.get_query_params url)(Http.Url.get_uri url)) call
inif not result.Rpc.success thenbegin debug "Got failure: checking for redirect"; debug "Call was: %s"(Rpc.string_of_call call); debug "result.contents: %s"(Jsonrpc.to_string result.Rpc.contents);match Storage_interface.Exception.exnty_of_rpc result.Rpc.contents with| Storage_interface.Exception.Redirect(Some ip)->letopen Http.Urlinlet newurl =match url with|(Http h, d)->(Http{h with host=ip}, d)|_-> remote_url ip in debug "Redirecting to ip: %s" ip;let r = rpc ~srcstr ~dststr newurl call in debug "Successfully redirected. Returning"; r
|_-> debug "Not a redirect"; result
endelse result
Back to the copy function:
let remote_copy = copy' ~task ~dbg ~sr ~vdi ~url ~dest ~dest_vdi:remote_base.vdi |> vdi_info in
This calls the actual data copy part. See below for more on that.
let snapshot = Remote.VDI.snapshot ~dbg ~sr:dest ~vdi_info:remote_copy in Remote.VDI.destroy ~dbg ~sr:dest ~vdi:remote_copy.vdi;Some(Vdi_info snapshot)
Finally we snapshot the remote VDI to ensure weāve got a VDI of type āsnapshotā on the destination, and we delete the non-snapshot VDI.
with e -> error "Caught %s: copying snapshots vdi"(Printexc.to_string e);raise(Internal_error(Printexc.to_string e))with|Backend_error(code, params)| Api_errors.Server_error(code, params)->raise(Backend_error(code, params))| e ->raise(Internal_error(Printexc.to_string e))
The exception handler does nothing - so we leak remote VDIs if the exception happens after weāve done our cloning :-(
DATA.copy_into
Letās now look at the data-copying part. This is common code shared between VDI.copy, VDI.copy_into and MIRROR.start and hence has some duplication of the calls made above.
This call takes roughly the same parameters as the ``DATA.copy` call above, except it specifies the destination VDI.
Once again we construct a module to do remote SMAPIv2 calls
(* Check the remote SR exists *)let srs = Remote.SR.list ~dbg inif not(List.mem dest srs)then failwith (Printf.sprintf "Remote SR %s not found" dest);
Sanity check.
let vdis = Remote.SR.scan ~dbg ~sr:dest inlet remote_vdi =try List.find (fun x -> x.vdi = dest_vdi) vdis
withNot_found-> failwith (Printf.sprintf "Remote VDI %s not found" dest_vdi)in
Find the metadata of the destination VDI
let dest_content_id = remote_vdi.content_id in
If weāve got a local VDI with the same content_id as the destination, we only need copy the deltas, so we make a note of the destination content ID here.
(* Find the local VDI *)let vdis = Local.SR.scan ~dbg ~sr inlet local_vdi =try List.find (fun x -> x.vdi = vdi) vdis
withNot_found-> failwith (Printf.sprintf "Local VDI %s not found" vdi)in debug "copy local=%s/%s content_id=%s" sr vdi local_vdi.content_id; debug "copy remote=%s/%s content_id=%s" dest dest_vdi remote_vdi.content_id;
Find the source VDI metadata.
if local_vdi.virtual_size > remote_vdi.virtual_size then begin
(* This should never happen provided the higher-level logic is working properly *)
error "copy local=%s/%s virtual_size=%Ld > remote=%s/%s virtual_size = %Ld" sr vdi local_vdi.virtual_size dest dest_vdi remote_vdi.virtual_size;
failwith "local VDI is larger than the remote VDI";
end;
Sanity check - the remote VDI canāt be smaller than the source.
let on_fail :(unit->unit)list ref = ref [] in
We do some ugly error handling here by keeping a mutable list of operations to perform in the event of a failure.
let base_vdi =trylet x =(List.find (fun x -> x.content_id = dest_content_id) vdis).vdi in debug "local VDI %s has content_id = %s; we will perform an incremental copy" x dest_content_id;Some x
with_-> debug "no local VDI has content_id = %s; we will perform a full copy" dest_content_id;Nonein
See if we can identify a local VDI with the same content_id as the destination. If not, no problem.
Construct some datapaths - named reasons why the VDI is attached - that we will pass to VDI.attach/activate.
let dest_vdi_url = Http.Url.set_uri remote_url (Printf.sprintf "%s/nbd/%s/%s/%s"(Http.Url.get_uri remote_url) dest dest_vdi remote_dp)|> Http.Url.to_string in debug "copy remote=%s/%s NBD URL = %s" dest dest_vdi dest_vdi_url;
Here we are constructing a URI that we use to connect to the destination xapi. The handler for this particular path will verify the credentials and then pass the connection on to tapdisk which will behave as a NBD server. The VDI has to be attached and activated for this to work, unlike the new NBD handler in xapi-nbd that is smarter. The handler for this URI is declared in this file
let id=State.copy_id_of (sr,vdi)in debug "Persisting state for copy (id=%s)" id; State.add id State.(Copy_op Copy_state.({ base_dp; leaf_dp; remote_dp; dest_sr=dest; copy_vdi=remote_vdi.vdi; remote_url=url}));
Since weāre about to perform a long-running operation that is stateful, we persist the state here so that if xapi is restarted we can cancel the operation and not leak VDI attaches. Normally in xapi code we would be doing VBD.plug operations to persist the state in the xapi db, but this is storage code so we have to use a different mechanism.
In this chunk of code we attach and activate the disk on the remote SR via the SMAPI, then locally attach and activate both the VDI weāre copying and the base image weāre copying deltas from (if weāve got one). We then call sparse_dd to copy the data to the remote NBD URL. There is some logic to update progress indicators and to cancel the operation if the SMAPIv2 call TASK.cancel is called.
Once the operation has terminated (either on success, error or cancellation), we remove the local attach and activations in the with_activated_disk function and the remote attach and activation by destroying the datapath on the remote SR. We then remove the persistent state relating to the copy.
SMPERF.debug "mirror.copy: copy complete local_vdi:%s dest_vdi:%s" vdi dest_vdi; debug "setting remote=%s/%s content_id <- %s" dest dest_vdi local_vdi.content_id; Remote.VDI.set_content_id ~dbg ~sr:dest ~vdi:dest_vdi ~content_id:local_vdi.content_id;(* PR-1255: XXX: this is useful because we don't have content_ids by default *) debug "setting local=%s/%s content_id <- %s" sr local_vdi.vdi local_vdi.content_id; Local.VDI.set_content_id ~dbg ~sr ~vdi:local_vdi.vdi ~content_id:local_vdi.content_id;Some(Vdi_info remote_vdi)
The last thing we do is to set the local and remote content_id. The local set_content_id is there because the content_id of the VDI is constructed from the location if it is unset in the storage_access.ml module of xapi (still part of the storage layer)
with e -> error "Caught %s: performing cleanup actions"(Printexc.to_string e); perform_cleanup_actions !on_fail;raise e
Here we perform the list of cleanup operations. Theoretically. It seems we donāt ever actually set this to anything, so this is dead code.
DATA.MIRROR.start
let start' ~task ~dbg ~sr ~vdi ~dp ~url ~dest = debug "Mirror.start sr:%s vdi:%s url:%s dest:%s" sr vdi url dest; SMPERF.debug "mirror.start called sr:%s vdi:%s url:%s dest:%s" sr vdi url dest;let remote_url = Http.Url.of_string url inletmoduleRemote=Client(structlet rpc = rpc ~srcstr:"smapiv2"~dststr:"dst_smapiv2" remote_url end)in(* Find the local VDI *)let vdis = Local.SR.scan ~dbg ~sr inlet local_vdi =try List.find (fun x -> x.vdi = vdi) vdis
withNot_found-> failwith (Printf.sprintf "Local VDI %s not found" vdi)in
As with the previous calls, we make a remote module for SMAPIv2 calls on the destination, and we find local VDI metadata via SR.scan
let id = State.mirror_id_of (sr,local_vdi.vdi)in
Mirror ids are deterministically constructed.
(* A list of cleanup actions to perform if the operation should fail. *)let on_fail :(unit->unit)list ref = ref [] in
As with copy we look locally for similar VDIs. However, rather than use that here we actually pass this information on to the destination SR via the receive_start internal SMAPIv2 call:
let result_ty = Remote.DATA.MIRROR.receive_start ~dbg ~sr:dest ~vdi_info:local_vdi ~id ~similar:similars inlet result =match result_ty with Mirror.Vhd_mirror x -> x
in
This gives the destination SR a chance to say what sort of migration it can support. We only support Vhd_mirror style migrations which require the destination to support the compose SMAPIv2 operation. The type of x is a record:
mirror_vdi is the VDI to which new writes should be mirrored.
mirror_datapath is the remote datapath on which the VDI has been attached and activated. This is required to construct the remote NBD url
copy_diffs_from represents the source base VDI to be used for the non-mirrored data copy.
copy_diffs_to is the remote VDI to copy those diffs to
dummy_vdi exists to prevent leaf-coalesce on the mirror_vdi
(* Enable mirroring on the local machine *)let mirror_dp = result.Mirror.mirror_datapath inlet uri =(Printf.sprintf "/services/SM/nbd/%s/%s/%s" dest result.Mirror.mirror_vdi.vdi mirror_dp)inlet dest_url = Http.Url.set_uri remote_url uri inlet request = Http.Request.make ~query:(Http.Url.get_query_params dest_url)~version:"1.0"~user_agent:"smapiv2" Http.Put uri inlet transport = Xmlrpc_client.transport_of_url dest_url in
This is where we connect to the NBD server on the destination.
debug "Searching for data path: %s" dp;let attach_info = Local.DP.attach_info ~dbg:"nbd"~sr ~vdi ~dp in debug "Got it!";
we need the local attach_info to find the local tapdisk so we can send it the connected NBD socket.
This should probably be set directly after the call to receive_start
let tapdev =match tapdisk_of_attach_info attach_info with|Some tapdev -> debug "Got tapdev";let pid = Tapctl.get_tapdisk_pid tapdev inlet path = Printf.sprintf "/var/run/blktap-control/nbdclient%d" pid in with_transport transport (with_http request (fun(response, s)-> debug "Here inside the with_transport";let control_fd = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in finally
(fun () -> debug "Connecting to path: %s" path; Unix.connect control_fd (Unix.ADDR_UNIX path);let msg = dp inlet len = String.length msg inlet written = Unixext.send_fd control_fd msg 0 len [] s in debug "Sent fd";if written <> len thenbegin error "Failed to transfer fd to %s" path; failwith "foo"end)(fun () -> Unix.close control_fd))); tapdev
|None-> failwith "Not attached"in
Here we connect to the remote NBD server, then pass that connected fd to the local tapdisk that is using the disk. This fd is passed with a name that is later used to tell tapdisk to start using it - we use the datapath name for this.
debug "Adding to active local mirrors: id=%s" id;let alm = State.Send_state.({ url; dest_sr=dest; remote_dp=mirror_dp; local_dp=dp; mirror_vdi=result.Mirror.mirror_vdi.vdi; remote_url=url; tapdev; failed=false; watchdog=None})in State.add id (State.Send_op alm); debug "Added";
As for copy we persist some state to disk to say that weāre doing a mirror so we can undo any state changes after a toolstack restart.
debug "About to snapshot VDI = %s"(string_of_vdi_info local_vdi);let local_vdi = add_to_sm_config local_vdi "mirror"("nbd:"^ dp)inlet local_vdi = add_to_sm_config local_vdi "base_mirror" id inlet snapshot =try Local.VDI.snapshot ~dbg ~sr ~vdi_info:local_vdi
with| Storage_interface.Backend_error(code,_)when code ="SR_BACKEND_FAILURE_44"->raise(Api_errors.Server_error(Api_errors.sr_source_space_insufficient,[ sr ]))| e ->raise e
in debug "Done!"; SMPERF.debug "mirror.start: snapshot created, mirror initiated vdi:%s snapshot_of:%s" snapshot.vdi local_vdi.vdi ; on_fail :=(fun () -> Local.VDI.destroy ~dbg ~sr ~vdi:snapshot.vdi)::!on_fail;
This bit inserts into sm_config the name of the fd we passed earlier to do mirroring. This is interpreted by the python SM backends and passed on the tap-ctl invocation to unpause the disk. This causes all new writes to be mirrored via NBD to the file descriptor passed earlier.
This is the watchdog that runs tap-ctl stats every 5 seconds watching mirror_failed for evidence of a failure in the mirroring code. If it detects one the only thing it does is to notify that the state of the mirroring has changed. This will be picked up by the thread in xapi that is monitoring the state of the mirror. It will then issue a MIRROR.stat call which will return the state of the mirror including the information that it has failed.
This is where we copy the VDI returned by the snapshot invocation to the remote VDI called copy_diffs_to. We only copy deltas, but we rely on copy' to figure out which disk the deltas should be taken from, which it does via the content_id field.
we can now destroy the dummy vdi on the remote (which will cause a leaf-coalesce in due course), and we destroy the local snapshot here (which will also cause a leaf-coalesce in due course, providing we donāt destroy it first). The return value from the function is the mirror_id that we can use to monitor the state or cancel the mirror.
This is not the end of the story, since we need to detach the remote datapath being used for mirroring when we detach this end. The hook function is in storage_migrate.ml:
let post_detach_hook ~sr ~vdi ~dp =letopen State.Send_stateinlet id = State.mirror_id_of (sr,vdi)in State.find_active_local_mirror id |> Opt.iter (fun r ->let remote_url = Http.Url.of_string r.url inletmoduleRemote=Client(structlet rpc = rpc ~srcstr:"smapiv2"~dststr:"dst_smapiv2" remote_url end)inlet t = Thread.create (fun () -> debug "Calling receive_finalize"; log_and_ignore_exn
(fun () -> Remote.DATA.MIRROR.receive_finalize ~dbg:"Mirror-cleanup"~id); debug "Finished calling receive_finalize"; State.remove_local_mirror id; debug "Removed active local mirror: %s" id
) () in Opt.iter (fun id -> Scheduler.cancel scheduler id) r.watchdog; debug "Created thread %d to call receive finalize and dp destroy"(Thread.id t))
This removes the persistent state and calls receive_finalize on the destination. The body of that functions is:
let receive_finalize ~dbg ~id =let recv_state = State.find_active_receive_mirror id inletopen State.Receive_statein Opt.iter (fun r -> Local.DP.destroy ~dbg ~dp:r.leaf_dp ~allow_leak:false) recv_state; State.remove_receive_mirror id
which removes the persistent state on the destination and destroys the datapath associated with the mirror.
Additionally, there is also a pre-deactivate hook. The rationale for this is that we want to detect any failures to write that occur right at the end of the SXM process. So if there is a mirror operation going on, before we deactivate we wait for tapdisk to flush its queue of outstanding requests, then we query whether there has been a mirror failure. The code is just above the detach hook in storage_migrate.ml:
let pre_deactivate_hook ~dbg ~dp ~sr ~vdi =letopen State.Send_stateinlet id = State.mirror_id_of (sr,vdi)inlet start = Mtime_clock.counter () inlet get_delta () = Mtime_clock.count start |> Mtime.Span.to_s in State.find_active_local_mirror id |> Opt.iter (fun s ->try(* We used to pause here and then check the nbd_mirror_failed key. Now, we poll
until the number of outstanding requests has gone to zero, then check the
status. This avoids confusing the backend (CA-128460) *)letopenTapctlinlet ctx = create () inletrec wait () =if get_delta () > reqs_outstanding_timeout thenraiseTimeout;let st = stats ctx s.tapdev inif st.Stats.reqs_outstanding > 0
then(Thread.delay 1.0; wait ())else st
inlet st = wait () in debug "Got final stats after waiting %f seconds"(get_delta ());if st.Stats.nbd_mirror_failed = 1
thenbegin error "tapdisk reports mirroring failed"; s.failed <- true
end;with|Timeout-> error "Timeout out after %f seconds waiting for tapdisk to complete all outstanding requests"(get_delta ()); s.failed <- true
| e -> error "Caught exception while finally checking mirror state: %s"(Printexc.to_string e); s.failed <- true
)