1
0
mirror of https://github.com/HorlogeSkynet/archey4 synced 2025-06-07 02:54:30 +02:00

[ENTRY][Disk] Fixes overcounting of APFS volumes on macOS

Replaces each APFS volume entry by an aggregation of their respective
containers.

> closes #115
This commit is contained in:
Michael Bromilow 2022-08-08 01:25:13 +01:00 committed by Samuel FORESTIER
parent cd7413285f
commit 1c51b2a163
2 changed files with 310 additions and 52 deletions
archey
entries
test/entries

@ -1,8 +1,10 @@
"""Disk usage detection class"""
import re
import platform
import plistlib
from subprocess import DEVNULL, PIPE, run
from subprocess import DEVNULL, PIPE, run, check_output
from typing import Dict, List
from archey.colors import Colors
@ -30,20 +32,28 @@ class Disk(Entry):
Extracts local (i.e. /dev/xxx) filesystems for any *NIX from `self._disk_dict`,
returning a copy with those filesystems only.
We specifically ignore...
Ignored device paths are...
Loop devices:-
/dev(/...)/loop filesystems (Linux)
/dev(/...)/*vnd filesystems (BSD)
/dev(/...)/lofi filesystems (Solaris)
Device mappers:- (since their partitions are already included!)
/dev(/...)/dm filesystems (Linux)
(macOS only) any APFS volumes, only APFS containers are counted
"""
# Compile a REGEXP pattern to match the device paths we will accept.
device_path_regexp = re.compile(r'^\/dev\/(?:(?!loop|[rs]?vnd|lofi|dm).)+$')
# If we are on macOS, then remove APFS volumes from our disk dict
# and replace them with their respective containers.
if platform.system() == 'Darwin':
disk_dict = self._replace_apfs_volumes_by_their_containers()
else:
disk_dict = self._disk_dict
# Build the dictionary
local_disk_dict: Dict[str, dict] = {}
for mount_point, disk_data in self._disk_dict.items():
for mount_point, disk_data in disk_dict.items():
if (
device_path_regexp.match(disk_data['device_path'])
# De-duplication based on `device_path`s:
@ -57,6 +67,64 @@ class Disk(Entry):
return local_disk_dict
def _replace_apfs_volumes_by_their_containers(self) -> Dict[str, dict]:
# Call `diskutil` to generate a property list (PList) of all APFS containers
try:
property_list = plistlib.loads(check_output(['diskutil', 'apfs', 'list', '-plist']))
except FileNotFoundError:
self._logger.warning(
"APFS volumes cannot be deduplicated as diskutil program could not be found."
)
return self._disk_dict
except plistlib.InvalidFileException:
self._logger.error(
"APFS volumes cannot be deduplicated as diskutil output could not be parsed."
)
return self._disk_dict
# Local (shallow) copy of `_disk_dict`
disk_dict = self._disk_dict.copy()
# Generate an inverted disk_dict: from device_path -> mount_point
inverted_disk_dict = {
value['device_path']: mount_point for mount_point, value in disk_dict.items()
}
# Remove volumes from disk_dict and replace with their aggregated containers
for plist_container in property_list['Containers']:
# Temporary dict for each container
container_dict = {
# the container's "real" location:
'device_path': f"/dev/{plist_container['DesignatedPhysicalStore']}",
'used_blocks': 0,
'total_blocks': 0
}
for plist_volume in plist_container['Volumes']:
# Get volumes which start with this volume's device path, i.e. include snapshots
volume_paths = [
device_path for device_path in inverted_disk_dict.keys()
if device_path.startswith(f"/dev/{plist_volume['DeviceIdentifier']}")
]
for volume_path in volume_paths:
try:
# Get this volume from disk_dict (removing it)
volume = disk_dict.pop(inverted_disk_dict[volume_path])
except KeyError:
# skip this volume as it misses from `disk_dict`
continue
# Now add it to the container entry
container_dict['used_blocks'] += volume['used_blocks']
# Total is always the container total
container_dict['total_blocks'] = volume['total_blocks']
# Use the "reference" (virtual disk) as the mountpoint, since APFS containers
# cannot be directly mounted
disk_dict[plist_container['ContainerReference']] = container_dict
return disk_dict
def _get_specified_filesystems(self, specified_filesystems: List[str]) -> Dict[str, dict]:
"""
Extracts the specified filesystems (if found) from `self._disk_dict`,

@ -7,7 +7,6 @@ from archey.colors import Colors
from archey.entries.disk import Disk
from archey.test.entries import HelperMethods
class TestDiskEntry(unittest.TestCase):
"""
Here, we mock `subprocess.run` calls to disk utility tools.
@ -17,62 +16,253 @@ class TestDiskEntry(unittest.TestCase):
self.disk_instance_mock = HelperMethods.entry_mock(Disk)
self.output_mock = MagicMock()
def test_disk_get_local_filesystems(self):
# Used to make `_replace_apfs_volumes_by_their_containers` call void (see below).
@patch.object(
Disk,
"_replace_apfs_volumes_by_their_containers",
)
def test_disk_get_local_filesystems(self, apfs_disk_dict_mock):
"""Tests `Disk._get_local_filesystems`."""
# This minimal `_disk_dict` contains everything this method touches.
self.disk_instance_mock._disk_dict = { # pylint: disable=protected-access
'/very/good/mountpoint': {
'device_path': '/dev/sda1'
},
'/mounted/here/too': {
'device_path': '/dev/sda1'
},
'/other/acceptable/device/paths': {
'device_path': '/dev/anything-really'
},
'/a/samba/share': {
'device_path': '//server.local/cool_share' # ignored - not `/dev/...`
},
'/linux/loop/device/one': {
'device_path': '/dev/loop0' # ignored - loop device
},
'/linux/loop/device/two': {
'device_path': '/dev/blah/loop0' # ignored - loop device
},
'/bsd/s/loop/device/one': {
'device_path': '/dev/svnd' # ignored - loop device
},
'/bsd/s/loop/device/two': {
'device_path': '/dev/blah/svnd1' # ignored - loop device
},
'/bsd/r/loop/device/one': {
'device_path': '/dev/rvnd' # ignored - loop device
},
'/bsd/r/loop/device/two': {
'device_path': '/dev/blah/rvnd1' # ignored - loop device
},
'/solaris/loop/device/one': {
'device_path': '/dev/lofi1' # ignored - loop device
},
'/solaris/loop/device/two': {
'device_path': '/dev/blah/lofi' # ignored - loop device
},
'/linux/device/mapper': {
'device_path': '/dev/dm-1' # ignored - device mapper
}
}
self.assertDictEqual(
Disk._get_local_filesystems(self.disk_instance_mock), # pylint: disable=protected-access
{
with self.subTest('Ignoring loop devs, dev mappers & network shares.'):
# This minimal `_disk_dict` is sufficient for this test.
self.disk_instance_mock._disk_dict = { # pylint: disable=protected-access
'/very/good/mountpoint': {
'device_path': '/dev/sda1'
},
'/mounted/here/too': {
'device_path': '/dev/sda1'
},
'/other/acceptable/device/paths': {
'device_path': '/dev/anything-really'
},
'/a/samba/share': {
'device_path': '//server.local/cool_share' # ignored - not `/dev/...`
},
'/linux/loop/device/one': {
'device_path': '/dev/loop0' # ignored - loop device
},
'/linux/loop/device/two': {
'device_path': '/dev/blah/loop0' # ignored - loop device
},
'/bsd/s/loop/device/one': {
'device_path': '/dev/svnd' # ignored - loop device
},
'/bsd/s/loop/device/two': {
'device_path': '/dev/blah/svnd1' # ignored - loop device
},
'/bsd/r/loop/device/one': {
'device_path': '/dev/rvnd' # ignored - loop device
},
'/bsd/r/loop/device/two': {
'device_path': '/dev/blah/rvnd1' # ignored - loop device
},
'/solaris/loop/device/one': {
'device_path': '/dev/lofi1' # ignored - loop device
},
'/solaris/loop/device/two': {
'device_path': '/dev/blah/lofi' # ignored - loop device
},
'/linux/device/mapper': {
'device_path': '/dev/dm-1' # ignored - device mapper
}
}
)
apfs_disk_dict_mock.return_value = (
self.disk_instance_mock._disk_dict # pylint: disable=protected-access
)
self.assertDictEqual(
Disk._get_local_filesystems(self.disk_instance_mock), # pylint: disable=protected-access
{
'/very/good/mountpoint': {
'device_path': '/dev/sda1'
},
'/other/acceptable/device/paths': {
'device_path': '/dev/anything-really'
}
}
)
@patch(
'archey.entries.disk.check_output',
# This diskutil output is greatly simplified for brevity
return_value=b"""\
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Containers</key>
<array>
<dict>
<key>ContainerReference</key>
<string>disk1</string>
<key>DesignatedPhysicalStore</key>
<string>disk0s1</string>
<key>Volumes</key>
<array>
<dict>
<key>DeviceIdentifier</key>
<string>disk1s1</string>
</dict>
<dict>
<key>DeviceIdentifier</key>
<string>disk1s2</string>
</dict>
</array>
</dict>
<dict>
<key>ContainerReference</key>
<string>disk2</string>
<key>DesignatedPhysicalStore</key>
<string>disk0s3</string>
<key>Volumes</key>
<array>
<dict>
<key>DeviceIdentifier</key>
<string>disk2s1</string>
</dict>
</array>
</dict>
<dict>
<key>ContainerReference</key>
<string>disk3</string>
<key>DesignatedPhysicalStore</key>
<string>disk0s2</string>
<key>Volumes</key>
<array>
<dict>
<key>DeviceIdentifier</key>
<string>disk3s1</string>
</dict>
<dict>
<key>DeviceIdentifier</key>
<string>disk3s5</string>
</dict>
<dict>
<key>DeviceIdentifier</key>
<string>disk3s6</string>
</dict>
</array>
</dict>
</array>
</dict>
</plist>
""")
def test_replace_apfs_volumes_by_their_containers(self, _):
"""Tests `Disk._replace_apfs_volumes_by_their_containers` for APFS volumes deduplication."""
with self.subTest('Ignoring APFS volumes on macOS'):
# Shortened example from issue #115, ie standard macOS 12.4 install on M1
# see https://eclecticlight.co/2021/01/14/m1-macs-radically-change-boot-and-recovery/
self.disk_instance_mock._disk_dict = { # pylint: disable=protected-access
'/': {
'device_path': '/dev/disk3s1s1', # in apfs container (disk0s2)
'used_blocks': 0,
'total_blocks': 0,
},
'/System/Volumes/VM': {
'device_path': '/dev/disk3s6', # in apfs container (disk0s2)
'used_blocks': 0,
'total_blocks': 0,
},
'/System/Volumes/xarts': {
'device_path': '/dev/disk1s2', # in iboot system container (disk0s1)
'used_blocks': 0,
'total_blocks': 0,
},
'/System/Volumes/iSCPreboot': {
'device_path': '/dev/disk1s1', # in iboot system container (disk0s1)
'used_blocks': 0,
'total_blocks': 0,
},
'/System/Volumes/Data': {
'device_path': '/dev/disk3s5', # in apfs container (disk0s2)
'used_blocks': 0,
'total_blocks': 0,
},
'/System/Volumes/Update/SFR/mnt1': {
'device_path': '/dev/disk2s1', # in recovery container (disk0s3)
'used_blocks': 0,
'total_blocks': 0,
}
}
# We should end up with the 3 container device paths
self.assertDictEqual(
Disk._replace_apfs_volumes_by_their_containers(self.disk_instance_mock), # pylint: disable=protected-access
{
'disk3': {
'device_path': '/dev/disk0s2',
'used_blocks': 0,
'total_blocks': 0,
},
'disk1': {
'device_path': '/dev/disk0s1',
'used_blocks': 0,
'total_blocks': 0,
},
'disk2': {
'device_path': '/dev/disk0s3',
'used_blocks': 0,
'total_blocks': 0,
}
}
)
with self.subTest('Adding usage of ignored APFS volumes on macOS'):
# As above test, but checking for correct usage and total figures once combined
self.disk_instance_mock._disk_dict = { # pylint: disable=protected-access
'/': {
'device_path': '/dev/disk3s1s1', # in apfs container (disk0s2)
'used_blocks': 23068672,
'total_blocks': 970981376
},
'/System/Volumes/VM': {
'device_path': '/dev/disk3s6', # in apfs container (disk0s2)
'used_blocks': 1048576,
'total_blocks': 970981376
},
'/System/Volumes/xarts': {
'device_path': '/dev/disk1s2', # in iboot system container (disk0s1)
'used_blocks': 6144,
'total_blocks': 512000
},
'/System/Volumes/iSCPreboot': {
'device_path': '/dev/disk1s1', # in iboot system container (disk0s1)
'used_blocks': 7578,
'total_blocks': 512000
},
'/System/Volumes/Data': {
'device_path': '/dev/disk3s5', # in apfs container (disk0s2)
'used_blocks': 266338304,
'total_blocks': 970981376
},
'/System/Volumes/Update/SFR/mnt1': {
'device_path': '/dev/disk2s1', # in recovery container (disk0s3)
'used_blocks': 1677722,
'total_blocks': 5242880
}
}
# We should end up with the 3 container device paths
self.assertDictEqual(
Disk._replace_apfs_volumes_by_their_containers(self.disk_instance_mock), # pylint: disable=protected-access
{
'disk3': {
'device_path': '/dev/disk0s2',
'used_blocks': 290455552,
'total_blocks': 970981376
},
'disk1': {
'device_path': '/dev/disk0s1',
'used_blocks': 13722,
'total_blocks': 512000
},
'disk2': {
'device_path': '/dev/disk0s3',
'used_blocks': 1677722,
'total_blocks': 5242880
}
}
)
def test_disk_get_specified_filesystems(self):
"""Tests `Disk._get_specified_filesystems`."""