# Copyright 2008-2020 pydicom authors. See LICENSE file for details.
"""DICOM File-set handling."""

from collections.abc import Iterator, Iterable, Callable
import copy
import os
from pathlib import Path
import re
import shutil
from tempfile import TemporaryDirectory
from typing import Optional, Union, Any, cast
import uuid

from pydicom.charset import default_encoding
from pydicom.datadict import tag_for_keyword, dictionary_description
from pydicom.dataelem import DataElement
from pydicom.dataset import Dataset, FileMetaDataset, FileDataset
from pydicom.filebase import DicomBytesIO, DicomFileLike
from pydicom.filereader import dcmread
from pydicom.filewriter import write_dataset, write_data_element, write_file_meta_info
from pydicom.misc import warn_and_log
from pydicom.tag import Tag, BaseTag
import pydicom.uid as sop
from pydicom.uid import (
    generate_uid,
    UID,
    ExplicitVRLittleEndian,
    ImplicitVRLittleEndian,
    MediaStorageDirectoryStorage,
)


# Regex for conformant File ID paths - PS3.10 Section 8.5
_RE_FILE_ID = re.compile("^[A-Z0-9_]*$")
# Prefixes to use when generating File ID components
_PREFIXES = {
    "PATIENT": "PT",
    "STUDY": "ST",
    "SERIES": "SE",
    "IMAGE": "IM",
    "RT DOSE": "RD",
    "RT STRUCTURE SET": "RS",
    "RT PLAN": "RP",
    "RT TREAT RECORD": "RX",
    "PRESENTATION": "PR",
    "WAVEFORM": "WV",
    "SR DOCUMENT": "SR",
    "KEY OBJECT DOC": "KY",
    "SPECTROSCOPY": "SP",
    "RAW DATA": "RW",
    "REGISTRATION": "RG",
    "FIDUCIAL": "FD",
    "HANGING PROTOCOL": "HG",
    "ENCAP DOC": "ED",
    "VALUE MAP": "VM",
    "STEREOMETRIC": "SX",
    "PALETTE": "PA",
    "IMPLANT": "IP",
    "IMPLANT ASSY": "IA",
    "IMPLANT GROUP": "IG",
    "PLAN": "PL",
    "MEASUREMENT": "MX",
    "SURFACE": "SF",
    "SURFACE SCAN": "SS",
    "TRACT": "TR",
    "ASSESSMENT": "AS",
    "RADIOTHERAPY": "RT",
    "PRIVATE": "P",
}
_FIRST_OFFSET = "OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity"
_NEXT_OFFSET = "OffsetOfTheNextDirectoryRecord"
_LOWER_OFFSET = "OffsetOfReferencedLowerLevelDirectoryEntity"
_LAST_OFFSET = "OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity"


def generate_filename(
    prefix: str = "", start: int = 0, alphanumeric: bool = False
) -> Iterator[str]:
    """Yield File IDs for a File-set.

    Maximum number of File IDs is:

    * Numeric: (10 ** (8 - `prefix`)) - `start`
    * Alphanumeric: (35 ** (8 - `prefix`)) - `start`

    .. versionchanged:: 3.0

       The characters used when `alphanumeric` is ``True`` have been reduced to
       [0-9][A-I,K-Z]

    Parameters
    ----------
    prefix : str, optional
        The prefix to use for all filenames, default (``""``).
    start : int, optional
        The starting index to use for the suffixes, (default ``0``).
        i.e. if you want to start at ``'00010'`` then `start` should be ``10``.
    alphanumeric : bool, optional
        If ``False`` (default) then only generate suffixes using the characters
        [0-9], otherwise use [0-9][A-I,K-Z].

    Yields
    ------
    str
        A unique filename with 8 characters, with each incremented by 1 from
        the previous one (i.e. ``'00000000'``, ``'00000001'``, ``'00000002'``,
        and so on).
    """
    if len(prefix) > 7:
        raise ValueError("The 'prefix' must be less than 8 characters long")

    chars = "0123456789ABCDEFGHIKLMNOPQRSTUVWXYZ"
    if not alphanumeric:
        chars = chars[:10]

    idx = start
    b = len(chars)
    length = 8 - len(prefix)
    while idx < b**length:
        n = idx
        suffix = ""
        while n:
            suffix += chars[n % b]
            n //= b

        yield f"{prefix}{suffix[::-1]:>0{length}}"
        idx += 1


def is_conformant_file_id(path: Path) -> bool:
    """Return ``True`` if `path` is a conformant File ID.

    **Conformance**

    * :dcm:`No more than 8 components<part03/sect_F.3.2.2.html>` (parts) in
      the path
    * :dcm:`No more than 8 characters per component<part03/sect_F.3.2.2.html>`
    * :dcm:`Characters in a component must be ASCII<part10/sect_8.2.html>`
    * :dcm:`Valid characters in a component are 0-9, A-Z and _
      <part10/sect_8.5.html>`

    Parameters
    ----------
    path : pathlib.Path
        The path to check, relative to the File-set root directory.

    Returns
    -------
    bool
        ``True`` if `path` is conformant, ``False`` otherwise.
    """
    # No more than 8 characters per component
    parts = path.parts
    if any([len(pp) > 8 for pp in parts]):
        return False

    # No more than 8 components
    if len(parts) > 8:
        return False

    # Characters in the path are ASCII
    chars = "".join(parts)
    try:
        chars.encode(encoding="ascii", errors="strict")
    except UnicodeEncodeError:
        return False

    # Characters are in [0-9][A-Z] and _
    if re.match(_RE_FILE_ID, chars):
        return True

    return False


class RecordNode(Iterable["RecordNode"]):
    """Representation of a DICOMDIR's directory record.

    Attributes
    ----------
    children : list of RecordNode
        The current node's child nodes (if any)
    instance : FileInstance or None
        If the current node is a leaf node, a
        :class:`~pydicom.fileset.FileInstance` for the corresponding SOP
        Instance.
    """

    def __init__(self, record: Dataset | None = None) -> None:
        """Create a new ``RecordNode``.

        Parameters
        ----------
        record : pydicom.dataset.Dataset, optional
            A *Directory Record Sequence's* directory record.
        """
        self.children: list[RecordNode] = []
        self.instance: FileInstance | None = None
        self._parent: RecordNode | None = None
        self._record: Dataset

        if record:
            self._set_record(record)

        # When the record is encoded as part of the *Directory Record Sequence*
        #   this is the offset to the start of the sequence item containing
        #   the record - not guaranteed to be up-to-date
        self._offset = 0
        # The offset to the start of the encoded record's *Offset of the
        #   Next Directory Record* and *Offset of Referenced Lower Level
        #   Directory Entity* values - use _encode_record() to set them
        self._offset_next = 0
        self._offset_lower = 0

    def add(self, leaf: "RecordNode") -> None:
        """Add a leaf to the tree.

        Parameters
        ----------
        leaf : pydicom.fileset.RecordNode
            A leaf node (i.e. one with a
            :class:`~pydicom.fileset.FileInstance`) to be added to the tree
            (if not already present).
        """
        # Move up to the branch's furthest ancestor with a directory record
        node = leaf.root
        if node is self:
            node = node.children[0]

        # Move back down, inserting at the point where the node is unique
        current = self.root
        while node in current and node.children:
            current = current[node]
            node = node.children[0]

        node.parent = current

    @property
    def ancestors(self) -> list["RecordNode"]:
        """Return a list of the current node's ancestors, ordered from nearest
        to furthest.
        """
        return [nn for nn in self.reverse() if nn is not self]

    @property
    def component(self) -> str:
        """Return a File ID component as :class:`str` for the current node."""
        if self.is_root:
            raise ValueError("The root node doesn't contribute a File ID component")

        prefix = _PREFIXES[self.record_type]
        if self.record_type == "PRIVATE":
            prefix = f"{prefix}{self.depth}"

        chars = "0123456789ABCDEFGHIKLMNOPQRSTUVWXYZ"
        if not self.file_set._use_alphanumeric:
            chars = chars[:10]

        suffix = ""
        n = self.index
        b = len(chars)
        while n:
            suffix += chars[n % b]
            n //= b

        idx = f"{suffix[::-1]:>0{8 - len(prefix)}}"

        return f"{prefix}{idx}"

    def __contains__(self, key: Union[str, "RecordNode"]) -> bool:
        """Return ``True`` if the current node has a child matching `key`."""
        if isinstance(key, RecordNode):
            key = key.key

        return key in [child.key for child in self.children]

    def __delitem__(self, key: Union[str, "RecordNode"]) -> None:
        """Remove one of the current node's children and if the current node
        becomes childless recurse upwards and delete it from its parent.
        """
        if isinstance(key, RecordNode):
            key = key.key

        if key not in self:
            raise KeyError(key)

        self.children = [ii for ii in self.children if ii.key != key]

        # Recurse upwards to the root, removing any empty nodes
        if not self.children and not self.is_root:
            del self.parent[self]

    @property
    def depth(self) -> int:
        "Return the number of nodes to the level below the tree root"
        return len(list(self.reverse())) - 1

    def _encode_record(self, force_implicit: bool = False) -> int:
        """Encode the node's directory record.

        * Encodes the record as explicit VR little endian
        * Sets the ``RecordNode._offset_next`` and ``RecordNode._offset_lower``
          attributes to the position of the start of the values of the *Offset
          of the Next Directory Record* and *Offset of Referenced Lower Level
          Directory Entity* elements. Note that the offsets are relative to
          the start of the current directory record.

        The values for the *Offset Of The Next Directory Record* and *Offset
        of Referenced Lower Level Directory Entity* elements are not guaranteed
        to be correct.

        Parameters
        ----------
        force_implicit : bool, optional
            ``True`` to force using implicit VR encoding, which is
            non-conformant. Default ``False``.

        Returns
        -------
        int
            The length of the encoded directory record.

        See Also
        --------
        :meth:`~pydicom.fileset.RecordNode._update_record_offsets`
        """
        fp = DicomBytesIO()
        fp.is_little_endian = True
        fp.is_implicit_VR = force_implicit

        encoding = self._record.get("SpecificCharacterSet", default_encoding)

        for tag in sorted(self._record.keys()):
            if tag.element == 0 and tag.group > 6:
                continue

            # (0004,1400) Offset Of The Next Directory Record
            # (0004,1420) Offset Of Referenced Lower Level Directory Entity
            # Offset from start of tag to start of value for VR UL is always 8
            #   however the absolute position may change with transfer syntax
            if tag == 0x00041400:
                self._offset_next = fp.tell() + 8
            elif tag == 0x00041420:
                self._offset_lower = fp.tell() + 8

            write_data_element(fp, self._record[tag], encoding)

        return len(fp.getvalue())

    @property
    def _file_id(self) -> Path | None:
        """Return the *Referenced File ID* as a :class:`~pathlib.Path`.

        Returns
        -------
        pathlib.Path or None
            The *Referenced File ID* from the directory record as a
            :class:`pathlib.Path` or ``None`` if the element value is null.
        """
        if "ReferencedFileID" in self._record:
            elem = self._record["ReferencedFileID"]
            if elem.VM == 1:
                return Path(cast(str, self._record.ReferencedFileID))
            if elem.VM > 1:
                return Path(*cast(list[str], self._record.ReferencedFileID))

            return None

        raise AttributeError("No 'Referenced File ID' in the directory record")

    @property
    def file_set(self) -> "FileSet":
        """Return the tree's :class:`~pydicom.fileset.FileSet`."""
        return self.root.file_set

    def __getitem__(self, key: Union[str, "RecordNode"]) -> "RecordNode":
        """Return the current node's child using it's
        :attr:`~pydicom.fileset.RecordNode.key`
        """
        if isinstance(key, RecordNode):
            key = key.key

        for child in self.children:
            if key == child.key:
                return child

        raise KeyError(key)

    @property
    def has_instance(self) -> bool:
        """Return ``True`` if the current node corresponds to an instance."""
        return self.instance is not None

    @property
    def index(self) -> int:
        """Return the index of the current node amongst its siblings."""
        if not self.parent:
            return 0

        return self.parent.children.index(self)

    @property
    def is_root(self) -> bool:
        """Return ``True`` if the current node is the tree's root node."""
        return False

    def __iter__(self) -> Iterator["RecordNode"]:
        """Yield this node (unless it's the root node) and all nodes below it."""
        if not self.is_root:
            yield self

        for child in self.children:
            yield from child

    @property
    def key(self) -> str:
        """Return a unique key for the node's record as :class:`str`."""
        rtype = self.record_type
        if rtype == "PATIENT":
            # PS3.3, Annex F.5.1: Each Patient ID is unique within a File-set
            return cast(str, self._record.PatientID)
        if rtype == "STUDY":
            # PS3.3, Annex F.5.2: Type 1C
            if "StudyInstanceUID" in self._record:
                return cast(UID, self._record.StudyInstanceUID)
            else:
                return cast(UID, self._record.ReferencedSOPInstanceUIDInFile)
        if rtype == "SERIES":
            return cast(UID, self._record.SeriesInstanceUID)
        if rtype == "PRIVATE":
            return cast(UID, self._record.PrivateRecordUID)

        # PS3.3, Table F.3-3: Required if record references an instance
        try:
            return cast(UID, self._record.ReferencedSOPInstanceUIDInFile)
        except AttributeError as exc:
            raise AttributeError(
                f"Invalid '{rtype}' record - missing required element "
                "'Referenced SOP Instance UID in File'"
            ) from exc

    @property
    def next(self) -> Optional["RecordNode"]:
        """Return the node after the current one (if any), or ``None``."""
        if not self.parent:
            return None

        try:
            return self.parent.children[self.index + 1]
        except IndexError:
            return None

    @property
    def parent(self) -> "RecordNode":
        """Return the current node's parent (if it has one)."""
        return cast("RecordNode", self._parent)

    @parent.setter
    def parent(self, node: "RecordNode") -> None:
        """Set the parent of the current node."""
        self._parent = node
        if node is not None and self not in node.children:
            node.children.append(self)

    def prettify(self, indent_char: str = "  ") -> list[str]:
        """Return the tree structure as a list of pretty strings, starting at
        the current node (unless the current node is the root node).

        Parameters
        ----------
        indent_char : str, optional
            The characters to use to indent each level of the tree.
        """

        def leaf_summary(node: "RecordNode", indent_char: str) -> list[str]:
            """Summarize the leaves at the current level."""
            # Examples:
            #   IMAGE: 15 SOP Instances (10 initial, 9 additions, 4 removals)
            #   RTDOSE: 1 SOP Instance
            out = []
            if not node.children:
                indent = indent_char * node.depth
                sibs = [ii for ii in node.parent if ii.has_instance]
                # Split into record types
                rtypes = {ii.record_type for ii in sibs}
                for record_type in sorted(rtypes):
                    # nr = initial + additions
                    nr = [ii for ii in sibs if ii.record_type == record_type]
                    # All leaves should have a corresponding FileInstance
                    add = len(
                        [
                            ii
                            for ii in nr
                            if cast(FileInstance, ii.instance).for_addition
                        ]
                    )
                    rm = len(
                        [ii for ii in nr if cast(FileInstance, ii.instance).for_removal]
                    )
                    initial = len(nr) - add
                    result = len(nr) - rm

                    changes = []
                    if (add or rm) and initial > 0:
                        changes.append(f"{initial} initial")
                    if add:
                        plural = "s" if add > 1 else ""
                        changes.append(f"{add} addition{plural}")
                    if rm:
                        plural = "s" if rm > 1 else ""
                        changes.append(f"{rm} removal{plural}")

                    summary = (
                        f"{indent}{record_type}: {result} "
                        f"SOP Instance{'' if result == 1 else 's'}"
                    )
                    if changes:
                        summary += f" ({', '.join(changes)})"

                    out.append(summary)

            return out

        s = []
        for node in self:
            indent = indent_char * node.depth
            if node.children:
                s.append(f"{indent}{node}")
                # Summarise any leaves at the next level
                for child in node.children:
                    if child.has_instance:
                        s.extend(leaf_summary(child, indent_char))
                        break
            elif node.depth == 0 and node.has_instance:
                node.instance = cast(FileInstance, node.instance)
                # Single-level records
                line = f"{indent}{node.record_type}: 1 SOP Instance"
                if node.instance.for_addition:
                    line += " (to be added)"
                elif node.instance.for_removal:
                    line += " (to be removed)"

                s.append(line)

        return s

    @property
    def previous(self) -> Optional["RecordNode"]:
        """Return the node before the current one (if any), or ``None``."""
        if not self.parent:
            return None

        if self.index == 0:
            return None

        return self.parent.children[self.index - 1]

    def _set_record(self, ds: Dataset) -> None:
        """Set the node's initial directory record dataset.

        The record is used as a starting point when filling the DICOMDIR's
        *Directory Record Sequence* and is modified as required during
        encoding.

        Parameters
        ----------
        ds : pydicom.dataset.Dataset
            Set the node's initial directory record dataset, must be conformant
            to :dcm:`Part 3, Annex F of the DICOM Standard
            <part03/chapter_F.html>`.
        """
        offset = getattr(ds, "seq_item_tell", None)
        rtype = ds.get("DirectoryRecordType", None)
        rtype = f"{rtype} " if rtype else ""
        msg = f"The {rtype}directory record is missing"
        if offset:
            msg = f"The {rtype}directory record at offset {offset} is missing"

        keywords = ["DirectoryRecordType"]
        missing = [kw for kw in keywords if kw not in ds]
        if missing:
            msg = f"{msg} one or more required elements: {', '.join(missing)}"
            raise ValueError(msg)

        if _NEXT_OFFSET not in ds:
            setattr(ds, _NEXT_OFFSET, 0)
        if _LOWER_OFFSET not in ds:
            setattr(ds, _LOWER_OFFSET, 0)
        ds.RecordInUseFlag = 0xFFFF
        self._record = ds

        try:
            self.key
        except (AttributeError, ValueError) as exc:
            raise ValueError(f"{msg} a required element") from exc

    @property
    def record_type(self) -> str:
        """Return the record's *Directory Record Type* as :class:`str`."""
        return cast(str, self._record.DirectoryRecordType)

    def remove(self, node: "RecordNode") -> None:
        """Remove a leaf from the tree

        Parameters
        ----------
        node : pydicom.fileset.RecordNode
            The leaf node (i.e. one with a
            :class:`~pydicom.fileset.FileInstance`) to remove.
        """
        if not node.has_instance:
            raise ValueError("Only leaf nodes can be removed")

        del node.parent[node]

    def reverse(self) -> Iterable["RecordNode"]:
        """Yield nodes up to the level below the tree's root node."""
        node = self
        while node.parent:
            yield node
            node = node.parent

        if not node.is_root:
            yield node

    @property
    def root(self) -> "RecordNode":
        """Return the tree's root node."""
        if self.parent:
            return self.parent.root

        return self

    def __str__(self) -> str:
        """Return a string representation of the node."""
        if self.is_root:
            return "ROOT"

        ds = self._record
        record_type = f"{self.record_type}"

        s = []
        if self.record_type == "PATIENT":
            s += [f"PatientID='{ds.PatientID}'", f"PatientName='{ds.PatientName}'"]
        elif self.record_type == "STUDY":
            s += [f"StudyDate={ds.StudyDate}", f"StudyTime={ds.StudyTime}"]
            if getattr(ds, "StudyDescription", None):
                s.append(f"StudyDescription='{ds.StudyDescription}'")
        elif self.record_type == "SERIES":
            s += [f"Modality={ds.Modality}", f"SeriesNumber={ds.SeriesNumber}"]
            if getattr(ds, "SeriesDescription", None):
                s.append(f"SeriesDescription='{ds.SeriesDescription}'")
        elif self.record_type == "IMAGE":
            s.append(f"InstanceNumber={ds.InstanceNumber}")
        else:
            s.append(f"{self.key}")

        return f"{record_type}: {', '.join(s)}"

    def _update_record_offsets(self) -> None:
        """Update the record's offset elements.

        Updates the values for *Offset of the Next Directory Record* and
        *Offset of Referenced Lower Level Directory Entity*, provided all of
        the nodes have had their *_offset* attribute set correctly.
        """
        next_elem = self._record[_NEXT_OFFSET]
        next_elem.value = 0
        if self.next:
            next_elem.value = self.next._offset

        lower_elem = self._record[_LOWER_OFFSET]
        lower_elem.value = 0
        if self.children:
            self._record[_LOWER_OFFSET].value = self.children[0]._offset


class RootNode(RecordNode):
    """The root node for the File-set's record tree."""

    def __init__(self, fs: "FileSet") -> None:
        """Create a new root node.

        Parameters
        ----------
        fs : pydicom.fileset.FileSet
            The File-set the record tree belongs to.
        """
        super().__init__()

        self._fs = fs

    @property
    def file_set(self) -> "FileSet":
        """Return the tree's :class:`~pydicom.fileset.FileSet`."""
        return self._fs

    @property
    def is_root(self) -> bool:
        """Return ``True`` if the current node is the tree's root node."""
        return True


class FileInstance:
    """Representation of a File in the File-set.

    Attributes
    ----------
    node : pydicom.fileset.RecordNode
        The leaf record that references this instance.
    """

    def __init__(self, node: RecordNode) -> None:
        """Create a new FileInstance.

        Parameters
        ----------
        node : pydicom.fileset.RecordNode
            The record that references this instance.
        """

        class Flags:
            add: bool
            remove: bool

        self._uuid = uuid.uuid4()
        self._flags = Flags()
        self._apply_stage("x")
        self._stage_path: Path | None = None
        self.node = node

    def _apply_stage(self, flag: str) -> None:
        """Apply staging to the instance.

        Parameters
        ----------
        flag : str
            The staging to apply, one of ``'+'``, ``'-'`` or ``'x'``.
            This will flag the instance for addition to or removal from the
            File-set, or to reset the staging, respectively.
        """
        # Clear flags
        if flag == "x":
            self._flags.add = False
            self._flags.remove = False
            self._stage_path = None
        elif flag == "+":
            # remove + add = no change
            if self._flags.remove:
                self._flags.remove = False
                self._stage_path = None
            else:
                self._flags.add = True
                self._stage_path = self.file_set._stage["path"] / f"{self._uuid}"

        elif flag == "-":
            # add + remove = no change
            if self._flags.add:
                self._flags.add = False
                self._stage_path = None
            else:
                self._flags.remove = True
                self._stage_path = None

    def __contains__(self, name: str | int) -> bool:
        """Return ``True`` if the element with keyword or tag `name` is
        in one of the corresponding directory records.

        Parameters
        ----------
        name : str or int
            The element keyword or tag to search for.

        Returns
        -------
        bool
            ``True`` if the corresponding element is present, ``False``
            otherwise.
        """
        try:
            self[name]
        except KeyError:
            return False

        return True

    @property
    def FileID(self) -> str:
        """Return the File ID of the referenced instance."""
        root = self.node.root
        components = [ii.component for ii in self.node.reverse() if ii is not root]
        return os.fspath(Path(*components[::-1]))

    @property
    def file_set(self) -> "FileSet":
        """Return the :class:`~pydicom.fileset.FileSet` this instance belongs
        to.
        """
        return self.node.file_set

    @property
    def for_addition(self) -> bool:
        """Return ``True`` if the instance has been staged for addition to
        the File-set.
        """
        return self._flags.add

    @property
    def for_moving(self) -> bool:
        """Return ``True`` if the instance will be moved to a new location
        within the File-set.
        """
        if self.for_addition:
            return False

        if self["ReferencedFileID"].VM == 1:
            file_id = self.FileID.split(os.path.sep)
            return [self.ReferencedFileID] != file_id

        return cast(bool, self.ReferencedFileID != self.FileID.split(os.path.sep))

    @property
    def for_removal(self) -> bool:
        """Return ``True`` if the instance has been staged for removal from
        the File-set.
        """
        return self._flags.remove

    def __getattribute__(self, name: str) -> Any:
        """Return the class attribute value for `name`.

        Parameters
        ----------
        name : str
            An element keyword or a class attribute name.

        Returns
        -------
        object
            If `name` matches a DICOM keyword and the element is
            present in one of the directory records then returns the
            corresponding element's value. Otherwise returns the class
            attribute's value (if present). Directory records are searched
            from the lowest (i.e. an IMAGE or similar record type) to the
            highest (PATIENT or similar).
        """
        tag = tag_for_keyword(name)
        if tag is not None:
            tag = Tag(tag)
            for node in self.node.reverse():
                if tag in node._record:
                    return node._record[tag].value

        return super().__getattribute__(name)

    def __getitem__(self, key: str | int) -> DataElement:
        """Return the DataElement with keyword or tag `key`.

        Parameters
        ----------
        key : str or int
            An element keyword or tag.

        Returns
        -------
        pydicom.dataelem.DataElement
            The DataElement corresponding to `key`, if present in one of the
            directory records. Directory records are searched
            from the lowest (i.e. an IMAGE or similar record type) to the
            highest (PATIENT or similar).
        """

        if isinstance(key, BaseTag):
            tag = key
        else:
            tag = Tag(key)

        if tag == 0x00080018:
            # SOP Instance UID
            tag = Tag(0x00041511)
        elif tag == 0x00080016:
            # SOP Class UID
            tag = Tag(0x00041510)
        elif tag == 0x00020010:
            # Transfer Syntax UID
            tag = Tag(0x00041512)

        for node in self.node.reverse():
            if tag in node._record:
                return node._record[tag]

        raise KeyError(tag)

    @property
    def is_private(self) -> bool:
        """Return ``True`` if the instance is privately defined."""
        return self.node.record_type == "PRIVATE"

    @property
    def is_staged(self) -> bool:
        """Return ``True`` if the instance is staged for moving, addition or
        removal
        """
        return self.for_addition or self.for_moving or self.for_removal

    def load(self) -> Dataset:
        """Return the referenced instance as a
        :class:`~pydicom.dataset.Dataset`.
        """
        if self.for_addition:
            return dcmread(cast(Path, self._stage_path))

        return dcmread(self.path)

    @property
    def path(self) -> str:
        """Return the path to the corresponding instance as :class:`str`.

        Returns
        -------
        str
            The absolute path to the corresponding instance. If the instance is
            staged for addition to the File-set this will be a path to the
            staged file in the temporary staging directory.
        """
        if self.for_addition:
            return os.fspath(cast(Path, self._stage_path))

        # If not staged for addition then File Set must exist on file system
        return os.fspath(
            cast(Path, self.file_set.path) / cast(Path, self.node._file_id)
        )

    @property
    def SOPClassUID(self) -> UID:
        """Return the *SOP Class UID* of the referenced instance."""
        return cast(UID, self.ReferencedSOPClassUIDInFile)

    @property
    def SOPInstanceUID(self) -> UID:
        """Return the *SOP Instance UID* of the referenced instance."""
        return cast(UID, self.ReferencedSOPInstanceUIDInFile)

    @property
    def TransferSyntaxUID(self) -> UID:
        """Return the *Transfer Syntax UID* of the referenced instance."""
        return cast(UID, self.ReferencedTransferSyntaxUIDInFile)


DSPathType = Dataset | str | os.PathLike


class FileSet:
    """Representation of a DICOM File-set."""

    def __init__(self, ds: DSPathType | None = None) -> None:
        """Create or load a File-set.

        Parameters
        ----------
        ds : pydicom.dataset.Dataset, str or PathLike, optional
            If loading a File-set, the DICOMDIR dataset or the path
            to the DICOMDIR file.
        """
        # The nominal path to the root of the File-set
        self._path: Path | None = None
        # The root node of the record tree used to fill out the DICOMDIR's
        #   *Directory Record Sequence*.
        # The tree for instances currently in the File-set
        self._tree = RootNode(self)

        # For tracking changes to the File-set
        self._stage: dict[str, Any] = {
            "t": TemporaryDirectory(),
            "+": {},  # instances staged for addition
            "-": {},  # instances staged for removal
            "~": False,  # instances staged for moving
            "^": False,  # a File-set Identification module element has changed
        }
        self._stage["path"] = Path(self._stage["t"].name)

        # The DICOMDIR instance, not guaranteed to be up-to-date
        self._ds = Dataset()
        # The File-set's managed SOP Instances as list of FileInstance
        self._instances: list[FileInstance] = []
        # Use alphanumeric or numeric File IDs
        self._use_alphanumeric = False

        # The File-set ID
        self._id: str | None = None
        # The File-set UID
        self._uid: UID | None = None
        # The File-set Descriptor File ID
        self._descriptor: str | None = None
        # The Specific Character Set of File-set Descriptor File
        self._charset: str | None = None

        # Check the DICOMDIR dataset and create the record tree
        if ds:
            self.load(ds)
        else:
            # New File-set
            self.UID = generate_uid()

    def add(self, ds_or_path: DSPathType) -> FileInstance:
        """Stage an instance for addition to the File-set.

        If the instance has been staged for removal then calling
        :meth:`~pydicom.fileset.FileSet.add` will cancel the staging
        and the instance will not be removed.

        Parameters
        ----------
        ds_or_path : pydicom.dataset.Dataset, str or PathLike
            The instance to add to the File-set, either as a
            :class:`~pydicom.dataset.Dataset` or the path to the instance.

        Returns
        -------
        FileInstance
            The :class:`~pydicom.fileset.FileInstance` that was added.

        See Also
        --------
        :meth:`~pydicom.fileset.FileSet.add_custom`
        """
        ds: Dataset | FileDataset
        if isinstance(ds_or_path, str | os.PathLike):
            ds = dcmread(ds_or_path)
        else:
            ds = ds_or_path

        key = ds.SOPInstanceUID
        have_instance = [ii for ii in self if ii.SOPInstanceUID == key]

        # If staged for removal, keep instead - check this now because
        #   `have_instance` is False when instance staged for removal
        if key in self._stage["-"]:
            instance = self._stage["-"][key]
            del self._stage["-"][key]
            self._instances.append(instance)
            instance._apply_stage("+")

            return cast(FileInstance, instance)

        # The instance is already in the File-set (and not staged for removal)
        #   May or may not be staged for addition/movement
        if have_instance:
            return have_instance[0]

        # If not already in the File-set, stage for addition
        # Create the directory records and tree nodes for the dataset
        # For instances that won't contain PRIVATE records we shouldn't have
        #   to worry about exceeding the maximum component depth of 8
        record_gen = self._recordify(ds)
        record = next(record_gen)
        parent = RecordNode(record)
        node = parent  # Maybe only be a single record
        for record in record_gen:
            node = RecordNode(record)
            node.parent = parent
            parent = node

        instance = FileInstance(node)
        node.instance = instance
        self._tree.add(node)

        # Save the dataset to the stage
        self._stage["+"][instance.SOPInstanceUID] = instance
        self._instances.append(instance)
        instance._apply_stage("+")
        ds.save_as(instance.path, enforce_file_format=True)

        return cast(FileInstance, instance)

    def add_custom(self, ds_or_path: DSPathType, leaf: RecordNode) -> FileInstance:
        """Stage an instance for addition to the File-set using custom records.

        This method allows you to add a SOP instance and customize the
        directory records that will be used when writing the DICOMDIR file. It
        must be used when you require PRIVATE records and may be used instead
        of modifying :attr:`~pydicom.fileset.DIRECTORY_RECORDERS` with your
        own record definition functions when the default functions aren't
        suitable.

        The following elements will be added automatically to the supplied
        directory records if required and not present:

        * (0004,1400) *Offset of the Next Directory Record*
        * (0004,1410) *Record In-use Flag*
        * (0004,1420) *Offset of Referenced Lower-Level Directory Entity*
        * (0004,1500) *Referenced File ID*
        * (0004,1510) *Referenced SOP Class UID in File*
        * (0004,1511) *Referenced SOP Instance UID in File*
        * (0004,1512) *Referenced Transfer Syntax UID in File*

        If the instance has been staged for removal then calling
        :meth:`~pydicom.fileset.FileSet.add_custom` will cancel the staging
        and the instance will not be removed.

        Examples
        --------

        Add a SOP Instance using a two record hierarchy of PATIENT -> PRIVATE

        .. code-block:: python

            from pydicom import Dataset, examples
            from pydicom.fileset import FileSet, RecordNode
            from pydicom.uid import generate_uid

            # The instance to be added
            ds = examples.ct

            # Define the leaf node (the PRIVATE record)
            record = Dataset()
            record.DirectoryRecordType = "PRIVATE"
            record.PrivateRecordUID = generate_uid()
            leaf_node = RecordNode(record)

            # Define the top node (the PATIENT record)
            record = Dataset()
            record.DirectoryRecordType = "PATIENT"
            record.PatientID = ds.PatientID
            record.PatientName = ds.PatientName
            top_node = RecordNode(record)

            # Set the node relationship
            leaf_node.parent = top_node

            # Add the instance to the File-set
            fs = FileSet()
            instance = fs.add_custom(ds, leaf_node)

        Parameters
        ----------
        ds_or_path : pydicom.dataset.Dataset, str or PathLike
            The instance to add to the File-set, either as a
            :class:`~pydicom.dataset.Dataset` or the path to the instance.
        leaf : pydicom.fileset.RecordNode
            The leaf node for the instance, should have its ancestors nodes set
            correctly as well as their corresponding directory records. Should
            have no more than 7 ancestors due to the semantics used by
            :class:`~pydicom.fileset.FileSet` when creating the directory
            structure.

        Returns
        -------
        FileInstance
            The :class:`~pydicom.fileset.FileInstance` that was added.

        See Also
        --------
        :meth:`~pydicom.fileset.FileSet.add`
        """
        ds: Dataset | FileDataset
        if isinstance(ds_or_path, str | os.PathLike):
            ds = dcmread(ds_or_path)
        else:
            ds = ds_or_path

        # Check the supplied nodes
        if leaf.depth > 7:
            raise ValueError(
                "The 'leaf' node must not have more than 7 ancestors as "
                "'FileSet' supports a maximum directory structure depth of 8"
            )

        key = ds.SOPInstanceUID
        have_instance = [ii for ii in self if ii.SOPInstanceUID == key]

        # If staged for removal, keep instead - check this now because
        #   `have_instance` is False when instance staged for removal
        if key in self._stage["-"]:
            instance = self._stage["-"][key]
            del self._stage["-"][key]
            self._instances.append(instance)
            instance._apply_stage("+")

            return cast(FileInstance, instance)

        if have_instance:
            return have_instance[0]

        # Ensure the leaf node's record contains the required elements
        leaf._record.ReferencedFileID = None
        leaf._record.ReferencedSOPClassUIDInFile = ds.SOPClassUID
        leaf._record.ReferencedSOPInstanceUIDInFile = key
        leaf._record.ReferencedTransferSyntaxUIDInFile = ds.file_meta.TransferSyntaxUID

        instance = FileInstance(leaf)
        leaf.instance = instance
        self._tree.add(leaf)

        # Save the dataset to the stage
        self._stage["+"][instance.SOPInstanceUID] = instance
        self._instances.append(instance)
        instance._apply_stage("+")
        ds.save_as(instance.path, enforce_file_format=True)

        return cast(FileInstance, instance)

    def clear(self) -> None:
        """Clear the File-set."""
        self._tree.children = []
        self._instances = []
        self._path = None
        self._ds = Dataset()
        self._id = None
        self._uid = generate_uid()
        self._descriptor = None
        self._charset = None

        # Clean and reset the stage
        self._stage["+"] = {}
        self._stage["-"] = {}
        self._stage["~"] = False
        self._stage["^"] = False
        self._stage["t"].cleanup()
        self._stage["t"] = TemporaryDirectory()
        self._stage["path"] = Path(self._stage["t"].name)

    def copy(self, path: str | os.PathLike, force_implicit: bool = False) -> "FileSet":
        """Copy the File-set to a new root directory and return the copied
        File-set.

        Changes staged to the original :class:`~pydicom.fileset.FileSet` will
        be applied to the new File-set. The original
        :class:`~pydicom.fileset.FileSet` will remain staged.

        Parameters
        ----------
        path : str or PathLike
            The root directory where the File-set is to be copied to.
        force_implicit : bool, optional
            If ``True`` force the DICOMDIR file to be encoded using *Implicit
            VR Little Endian* which is non-conformant to the DICOM Standard
            (default ``False``).

        Returns
        -------
        pydicom.fileset.FileSet
            The copied File-set as a :class:`~pydicom.fileset.FileSet`.
        """
        # !! We can't change anything public in the original FileSet !!

        path = Path(path)
        if self.path and Path(self.path) == path:
            raise ValueError("Cannot copy the File-set as the 'path' is unchanged")

        if len(self) > 10**6:
            self._use_alphanumeric = True

        if len(self) > 35**6:
            raise NotImplementedError(
                "pydicom doesn't support writing File-sets with more than "
                "1838265625 managed instances"
            )

        # Removals are detached from the tree
        detached_nodes = []
        for instance in self._stage["-"].values():
            detached_nodes.append(instance.node)
            self._tree.remove(instance.node)
            continue

        file_ids = []
        for instance in self:
            file_ids.append(instance.ReferencedFileID)
            dst = path / Path(instance.FileID)
            dst.parent.mkdir(parents=True, exist_ok=True)
            shutil.copyfile(instance.path, dst)
            instance.node._record.ReferencedFileID = instance.FileID.split(os.path.sep)

        # Create the DICOMDIR file
        p = path / "DICOMDIR"
        with open(p, "wb") as fp:
            f = DicomFileLike(fp)
            self._write_dicomdir(f, copy_safe=True, force_implicit=force_implicit)

        # Reset the *Referenced File ID* values
        # The order here doesn't matter because removed instances aren't
        #   yielded by iter(self)
        for instance, file_id in zip(self, file_ids):
            instance.node._record.ReferencedFileID = file_id

        # Reattach the removed nodes
        for node in detached_nodes:
            self._tree.add(node)

        fs = FileSet()
        fs.load(p, raise_orphans=True)

        return fs

    def _create_dicomdir(self) -> Dataset:
        """Return a new minimal DICOMDIR dataset."""
        ds = Dataset()
        ds.filename = None

        ds.file_meta = FileMetaDataset()
        ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
        ds.file_meta.MediaStorageSOPInstanceUID = self.UID
        ds.file_meta.MediaStorageSOPClassUID = MediaStorageDirectoryStorage

        ds.FileSetID = self.ID
        ds.OffsetOfTheFirstDirectoryRecordOfTheRootDirectoryEntity = 0
        ds.OffsetOfTheLastDirectoryRecordOfTheRootDirectoryEntity = 0
        ds.FileSetConsistencyFlag = 0
        ds.DirectoryRecordSequence = []

        if self.descriptor_file_id:
            ds.FileSetDescriptorFileID = self.descriptor_file_id
        if self.descriptor_character_set:
            ds.SpecificCharacterSetOfFileSetDescriptorFile = (
                self.descriptor_character_set
            )

        return ds

    @property
    def descriptor_character_set(self) -> str | None:
        """Return the *Specific Character Set of File-set Descriptor File*
        (if available) or ``None``.
        """
        return self._charset

    @descriptor_character_set.setter
    def descriptor_character_set(self, val: str | None) -> None:
        """Set the *Specific Character Set of File-set Descriptor File*.

        The descriptor file itself is used for user comments related to the
        File-set (e.g. a README file) and is up the user to create.

        Parameters
        ----------
        val : str or None
            The value to use for the DICOMDIR's (0004,1142) *Specific
            Character Set of File-set Descriptor File*. See :dcm:`C.12.1.1.2
            in Part 3 of the DICOM Standard
            <part03/sect_C.12.html#sect_C.12.1.1.2>` for defined terms.

        See Also
        --------
        :attr:`~pydicom.fileset.FileSet.descriptor_file_id` set the descriptor
        file ID for the file that uses the character set.
        """
        if val == self._charset:
            return

        self._charset = val
        if self._ds:
            self._ds.SpecificCharacterSetOfFileSetDescriptorFile = val
        self._stage["^"] = True

    @property
    def descriptor_file_id(self) -> str | None:
        """Return the *File-set Descriptor File ID* (if available) or ``None``."""
        return self._descriptor

    @descriptor_file_id.setter
    def descriptor_file_id(self, val: str | None) -> None:
        """Set the *File-set Descriptor File ID*.

        The descriptor file itself is used for user comments related to the
        File-set (e.g. a README file) and is up the user to create.

        Parameters
        ----------
        val : str, list of str or None
            The value to use for the DICOMDIR's (0004,1141) *File-set
            Descriptor File ID*. Should be the relative path to the descriptor
            file and has a maximum length of 8 components, with each component
            up to 16 characters long.

        Raises
        ------
        ValueError
            If `val` has more than 8 items or if each item is longer than 16
            characters.

        See Also
        --------
        :attr:`~pydicom.fileset.FileSet.descriptor_character_set` the
        character set used in the descriptor file, required if an expanded or
        replaced character set is used.
        """
        if val == self._descriptor:
            return

        if val is None:
            pass
        elif isinstance(val, list):
            try:
                assert len(val) <= 8
                for component in val:
                    assert isinstance(component, str)
                    assert 0 <= len(component) <= 16
            except AssertionError:
                raise ValueError(
                    "The 'File-set Descriptor File ID' has a maximum of 8 "
                    "components, each between 0 and 16 characters long"
                )

            # Push the value through Path to clean it up and check validity
            val = list(Path(*val).parts)
        elif isinstance(val, str):
            if not 0 <= len(val) <= 16:
                raise ValueError(
                    "Each 'File-set Descriptor File ID' component has a "
                    "maximum length of 16 characters"
                )
        else:
            raise TypeError(
                "The 'DescriptorFileID' must be a str, list of str, or None"
            )

        self._descriptor = val
        if self._ds:
            self._ds.FileSetDescriptorFileID = self._descriptor
        self._stage["^"] = True

    def find(self, load: bool = False, **kwargs: Any) -> list[FileInstance]:
        """Return matching instances in the File-set

        **Limitations**

        * Only single value matching is supported so neither
          ``PatientID=['1234567', '7654321']`` or ``PatientID='1234567',
          PatientID='7654321'`` will work (although the first example will
          work if the *Patient ID* is actually multi-valued).
        * Repeating group and private elements cannot be used when searching.

        Parameters
        ----------
        load : bool, optional
            If ``True``, then load the SOP Instances belonging to the
            File-set and perform the search against their available elements.
            Otherwise (default) search only the elements available in the
            corresponding directory records (more efficient, but only a limited
            number of elements are available).
        **kwargs
            Search parameters, as element keyword=value (i.e.
            ``PatientID='1234567', StudyDescription="My study"``.

        Returns
        -------
        list of pydicom.fileset.FileInstance
            A list of matching instances.
        """
        if not kwargs:
            return self._instances[:]

        # Flag whether or not the query elements are in the DICOMDIR records
        has_elements = False

        def match(ds: Dataset | FileInstance, **kwargs: Any) -> bool:
            nonlocal has_elements
            if load:
                ds = ds.load()

            # Check that all query elements are present
            if all([kw in ds for kw in kwargs]):
                has_elements = True

            for kw, val in kwargs.items():
                try:
                    assert ds[kw].value == val
                except (AssertionError, KeyError):
                    return False

            return True

        matches = [instance for instance in self if match(instance, **kwargs)]

        if not load and not has_elements:
            warn_and_log(
                "None of the records in the DICOMDIR dataset contain all "
                "the query elements, consider using the 'load' parameter "
                "to expand the search to the corresponding SOP instances"
            )

        return matches

    def find_values(
        self,
        elements: str | int | list[str | int],
        instances: list[FileInstance] | None = None,
        load: bool = False,
    ) -> list[Any] | dict[str | int, list[Any]]:
        """Return a list of unique values for given element(s).

        Parameters
        ----------
        elements : str, int or pydicom.tag.BaseTag, or list of these
            The keyword or tag of the element(s) to search for.
        instances : list of pydicom.fileset.FileInstance, optional
            Search within the given instances. If not used then all available
            instances will be searched.
        load : bool, optional
            If ``True``, then load the SOP Instances belonging to the
            File-set and perform the search against their available elements.
            Otherwise (default) search only the elements available in the
            corresponding directory records (more efficient, but only a limited
            number of elements are available).

        Returns
        -------
        list of object(s), or dict of lists of object(s)

            * If single element was queried: A list of value(s) for the element
              available in the instances.
            * If list of elements was queried: A dict of element value pairs
              with lists of value(s) for the elements available in the instances.
        """
        element_list = elements if isinstance(elements, list) else [elements]
        has_element = {element: False for element in element_list}
        results: dict[str | int, list[Any]] = {element: [] for element in element_list}
        iter_instances = instances or iter(self)
        instance: Dataset | FileInstance
        for instance in iter_instances:
            if load:
                instance = instance.load()

            for element in element_list:
                if element not in instance:
                    continue

                has_element[element] = True
                val = instance[element].value
                # Not very efficient, but we can't use set
                if val not in results[element]:
                    results[element].append(val)

        missing_elements = [element for element, v in has_element.items() if not v]
        if not load and missing_elements:
            warn_and_log(
                "None of the records in the DICOMDIR dataset contain "
                f"{missing_elements}, consider using the 'load' parameter "
                "to expand the search to the corresponding SOP instances"
            )

        if not isinstance(elements, list):
            return results[element_list[0]]

        return results

    @property
    def ID(self) -> str | None:
        """Return the *File-set ID* (if available) or ``None``."""
        return self._id

    @ID.setter
    def ID(self, val: str | None) -> None:
        """Set the File-set ID.

        Parameters
        ----------
        val : str or None
            The value to use for the DICOMDIR's (0004,1130) *File-set ID*.

        Raises
        ------
        ValueError
            If `val` is greater than 16 characters long.
        """
        if val == self._id:
            return

        if val is None or 0 <= len(val) <= 16:
            self._id = val
            if self._ds:
                self._ds.FileSetID = val
            self._stage["^"] = True
        else:
            raise ValueError("The maximum length of the 'File-set ID' is 16 characters")

    @property
    def is_staged(self) -> bool:
        """Return ``True`` if the File-set is new or has changes staged."""
        return any(self._stage[c] for c in "+-^~")

    def __iter__(self) -> Iterator[FileInstance]:
        """Yield :class:`~pydicom.fileset.FileInstance` from the File-set."""
        yield from self._instances[:]

    def __len__(self) -> int:
        """Return the number of instances in the File-set."""
        return len(self._instances)

    def load(
        self,
        ds_or_path: DSPathType,
        include_orphans: bool = True,
        raise_orphans: bool = False,
    ) -> None:
        """Load an existing File-set.

        Existing File-sets that do not use the same directory structure as
        *pydicom* will be staged to be moved to a new structure. This is
        because the DICOM Standard attaches no semantics to *how* the files
        in a File-set are to be structured so it's impossible to determine what
        the layout will be when changes are to be made.

        Parameters
        ----------
        ds_or_path : pydicom.dataset.Dataset, str or PathLike
            An existing File-set's DICOMDIR, either as a
            :class:`~pydicom.dataset.Dataset` or the path to the DICOMDIR file
            as :class:`str` or pathlike.
        include_orphans : bool, optional
            If ``True`` (default) include instances referenced by orphaned
            directory records in the File-set.
        raise_orphans : bool, optional
            If ``True`` then raise an exception if orphaned directory records
            are found in the File-set (default ``False``).
        """
        if isinstance(ds_or_path, Dataset):
            ds = ds_or_path
        else:
            ds = dcmread(ds_or_path)

        sop_class = ds.file_meta.get("MediaStorageSOPClassUID", None)
        if sop_class != MediaStorageDirectoryStorage:
            raise ValueError(
                "Unable to load the File-set as the supplied dataset is "
                "not a 'Media Storage Directory' instance"
            )

        tsyntax = ds.file_meta.TransferSyntaxUID
        if tsyntax != ExplicitVRLittleEndian:
            warn_and_log(
                "The DICOMDIR dataset uses an invalid transfer syntax "
                f"'{tsyntax.name}' and will be updated to use 'Explicit VR "
                "Little Endian'"
            )

        try:
            path = Path(cast(str, ds.filename)).resolve(strict=True)
        except FileNotFoundError:
            raise FileNotFoundError(
                "Unable to load the File-set as the 'filename' attribute "
                "for the DICOMDIR dataset is not a valid path: "
                f"{ds.filename}"
            )
        except TypeError:
            # Custom message if DICOMDIR from bytes, etc
            raise TypeError(
                "Unable to load the File-set as the DICOMDIR dataset must "
                "have a 'filename' attribute set to the path of the "
                "DICOMDIR file"
            )

        self.clear()
        self._id = cast(str | None, ds.get("FileSetID", None))
        uid = cast(UID | None, ds.file_meta.get("MediaStorageSOPInstanceUID"))
        if not uid:
            uid = generate_uid()
            ds.file_meta.MediaStorageSOPInstanceUID = uid
        self._uid = uid
        self._descriptor = cast(str | None, ds.get("FileSetDescriptorFileID", None))
        self._charset = cast(
            str | None, ds.get("SpecificCharacterSetOfFileSetDescriptorFile", None)
        )
        self._path = path.parent
        self._ds = ds

        # Create the record tree
        self._parse_records(ds, include_orphans, raise_orphans)

        bad_instances = []
        for instance in self:
            # Check that the referenced file exists
            file_id = instance.node._file_id
            if file_id is None:
                bad_instances.append(instance)
                continue

            try:
                # self.path is already set at this point
                (cast(Path, self.path) / file_id).resolve(strict=True)
            except FileNotFoundError:
                bad_instances.append(instance)
                warn_and_log(
                    "The referenced SOP Instance for the directory record at "
                    f"offset {instance.node._offset} does not exist: "
                    f"{cast(Path, self.path) / file_id}"
                )
                continue

            # If the instance's existing directory structure doesn't match
            #   the pydicom semantics then stage for movement
            if instance.for_moving:
                self._stage["~"] = True

        for instance in bad_instances:
            self._instances.remove(instance)

    def _parse_records(
        self, ds: Dataset, include_orphans: bool, raise_orphans: bool = False
    ) -> None:
        """Parse the records in an existing DICOMDIR.

        Parameters
        ----------
        ds : pydicom.dataset.Dataset
            The File-set's DICOMDIR dataset.
        include_orphans : bool
            If ``True`` then include within the File-set orphaned records that
            contain a valid (and unique) *Referenced File ID* element. Orphaned
            records are those that aren't placed within the *Directory Record
            Sequence* hierarchy.
        raise_orphans : bool, optional
            If ``True`` then raise an exception if orphaned directory records
            are found in the File-set (default ``False``).
        """
        # First pass: get the offsets for each record
        records = {}
        for record in cast(Iterable[Dataset], ds.DirectoryRecordSequence):
            offset = cast(int, record.seq_item_tell)
            node = RecordNode(record)
            node._offset = offset
            records[offset] = node

        # Define the top-level nodes
        if records:
            node = records[ds[_FIRST_OFFSET].value]
            node.parent = self._tree
            while getattr(node._record, _NEXT_OFFSET, None):
                node = records[node._record[_NEXT_OFFSET].value]
                node.parent = self._tree

        # Second pass: build the record hierarchy
        #   Records not in the hierarchy will be ignored
        #   Branches without a valid leaf node File ID will be removed
        def recurse_node(node: RecordNode) -> None:
            child_offset = getattr(node._record, _LOWER_OFFSET, None)
            if child_offset:
                child = records[child_offset]
                child.parent = node

                next_offset = getattr(child._record, _NEXT_OFFSET, None)
                while next_offset:
                    child = records[next_offset]
                    child.parent = node
                    next_offset = getattr(child._record, _NEXT_OFFSET, None)
            elif "ReferencedFileID" not in node._record:
                # No children = leaf node, leaf nodes must reference a File ID
                del node.parent[node]

            # The leaf node references the FileInstance
            if "ReferencedFileID" in node._record:
                node.instance = FileInstance(node)
                self._instances.append(node.instance)

            for child in node.children:
                recurse_node(child)

        for node in self._tree.children:
            recurse_node(node)

        if len(records) == len(list(iter(self._tree))):
            return

        if raise_orphans:
            raise ValueError("The DICOMDIR contains orphaned directory records")

        # DICOMDIR contains orphaned records
        # Determine which nodes are both orphaned and reference an instance
        missing_set = set(records.keys()) - {ii._offset for ii in self._tree}
        missing = [records[o] for o in missing_set]
        missing = [r for r in missing if "ReferencedFileID" in r._record]

        if missing and not include_orphans:
            warn_and_log(
                f"The DICOMDIR has {len(missing)} orphaned directory records "
                "that reference an instance that will not be included in the "
                "File-set"
            )
            return

        for node in missing:
            # Get the path to the orphaned instance
            original_value = node._record.ReferencedFileID
            file_id = node._file_id
            if file_id is None:
                continue

            # self.path is set for an existing File Set
            path = cast(Path, self.path) / file_id
            if node.record_type == "PRIVATE":
                instance = self.add_custom(path, node)
            else:
                instance = self.add(path)

            # Because the record is new the Referenced File ID isn't set
            instance.node._record.ReferencedFileID = original_value

    @property
    def path(self) -> str | None:
        """Return the absolute path to the File-set root directory as
        :class:`str` (if set) or ``None`` otherwise.
        """
        if self._path is not None:
            return os.fspath(self._path)

        return self._path

    def _recordify(self, ds: Dataset) -> Iterator[Dataset]:
        """Yield directory records for a SOP Instance.

        Parameters
        ----------
        ds : pydicom.dataset.Dataset
            The SOP Instance to create DICOMDIR directory records for.

        Yields
        ------
        ds : pydicom.dataset.Dataset
            A directory record for the instance, ordered from highest to
            lowest level.

        Raises
        ------
        ValueError
            If unable to create the required directory records because of
            a missing required element or element value.
        """
        # Single-level records: leaf
        record_type = _single_level_record_type(ds)
        if record_type != "PATIENT":
            try:
                record = DIRECTORY_RECORDERS[record_type](ds)
            except ValueError as exc:
                raise ValueError(
                    f"Unable to use the default '{record_type}' "
                    f"record creator: {exc}. See DICOM PS3.3 Section F.5. "
                    "Either update the instance, "
                    "define your own record creation function or use "
                    "'FileSet.add_custom()' instead"
                ) from exc

            record.OffsetOfTheNextDirectoryRecord = 0
            record.RecordInUseFlag = 0xFFFF
            record.OffsetOfReferencedLowerLevelDirectoryEntity = 0
            record.DirectoryRecordType = record_type
            record.ReferencedFileID = None
            record.ReferencedSOPClassUIDInFile = ds.SOPClassUID
            record.ReferencedSOPInstanceUIDInFile = ds.SOPInstanceUID
            record.ReferencedTransferSyntaxUIDInFile = ds.file_meta.TransferSyntaxUID

            yield record
            return

        # Four-level records: PATIENT -> STUDY -> SERIES -> leaf
        records = []
        leaf_type = _four_level_record_type(ds)
        for record_type in ["PATIENT", "STUDY", "SERIES", leaf_type]:
            try:
                record = DIRECTORY_RECORDERS[record_type](ds)
            except ValueError as exc:
                raise ValueError(
                    f"Unable to use the default '{record_type}' "
                    f"record creator: {exc}. See DICOM PS3.3 Section F.5. "
                    "Either update the instance, "
                    "define your own record creation function or use "
                    "'FileSet.add_custom()' instead"
                ) from exc

            record.OffsetOfTheNextDirectoryRecord = 0
            record.RecordInUseFlag = 0xFFFF
            record.OffsetOfReferencedLowerLevelDirectoryEntity = 0
            record.DirectoryRecordType = record_type
            if "SpecificCharacterSet" in ds:
                record.SpecificCharacterSet = ds.SpecificCharacterSet

            records.append(record)

        # Add the instance referencing elements to the leaf
        leaf = records[3]
        leaf.ReferencedFileID = None
        leaf.ReferencedSOPClassUIDInFile = ds.SOPClassUID
        leaf.ReferencedSOPInstanceUIDInFile = ds.SOPInstanceUID
        leaf.ReferencedTransferSyntaxUIDInFile = ds.file_meta.TransferSyntaxUID

        yield from records

    def remove(self, instance: FileInstance | list[FileInstance]) -> None:
        """Stage instance(s) for removal from the File-set.

        If the instance has been staged for addition to the File-set, calling
        :meth:`~pydicom.fileset.FileSet.remove` will cancel the staging and
        the instance will not be added.

        Parameters
        ----------
        instance : pydicom.fileset.FileInstance or a list of FileInstance
            The instance(s) to remove from the File-set.
        """
        if isinstance(instance, list):
            for item in instance:
                self.remove(item)
            return

        if instance not in self._instances:
            raise ValueError("No such instance in the File-set")

        # If staged for addition, no longer add
        if instance.SOPInstanceUID in self._stage["+"]:
            leaf = instance.node
            del leaf.parent[leaf]
            del self._stage["+"][instance.SOPInstanceUID]
            # Delete file from stage
            try:
                Path(instance.path).unlink()
            except FileNotFoundError:
                pass
            instance._apply_stage("-")
            self._instances.remove(instance)

        # Stage for removal if not already done
        elif instance.SOPInstanceUID not in self._stage["-"]:
            instance._apply_stage("-")
            self._stage["-"][instance.SOPInstanceUID] = instance
            self._instances.remove(instance)

    def __str__(self) -> str:
        """Return a string representation of the FileSet."""
        s = [
            "DICOM File-set",
            f"  Root directory: {self.path or '(no value available)'}",
            f"  File-set ID: {self.ID or '(no value available)'}",
            f"  File-set UID: {self.UID}",
            (
                f"  Descriptor file ID: "
                f"{self.descriptor_file_id or '(no value available)'}"
            ),
            (
                f"  Descriptor file character set: "
                f"{self.descriptor_character_set or '(no value available)'}"
            ),
        ]
        if self.is_staged:
            changes = []
            if not self._ds:
                changes.append("DICOMDIR creation")
            else:
                changes.append("DICOMDIR update")

            if self._stage["~"]:
                changes.append("directory structure update")

            if self._stage["+"]:
                suffix = "s" if len(self._stage["+"]) > 1 else ""
                changes.append(f"{len(self._stage['+'])} addition{suffix}")
            if self._stage["-"]:
                suffix = "s" if len(self._stage["-"]) > 1 else ""
                changes.append(f"{len(self._stage['-'])} removal{suffix}")

            s.append(f"  Changes staged for write(): {', '.join(changes)}")

        if not self._tree.children:
            return "\n".join(s)

        s.append("\n  Managed instances:")
        s.extend([f"    {ii}" for ii in self._tree.prettify()])

        return "\n".join(s)

    @property
    def UID(self) -> UID:
        """Return the File-set's UID."""
        return cast(UID, self._uid)

    @UID.setter
    def UID(self, uid: UID) -> None:
        """Set the File-set UID.

        Parameters
        ----------
        uid : pydicom.uid.UID
            The UID to use as the new File-set UID.
        """
        if uid == self._uid:
            return

        uid = UID(uid)
        assert uid.is_valid
        self._uid = uid
        if self._ds:
            self._ds.file_meta.MediaStorageSOPInstanceUID = uid

        self._stage["^"] = True

    def write(
        self,
        path: str | os.PathLike | None = None,
        use_existing: bool = False,
        force_implicit: bool = False,
    ) -> None:
        """Write the File-set, or changes to the File-set, to the file system.

        .. warning::

            If modifying an existing File-set it's **strongly recommended**
            that you follow standard data management practices and ensure that
            you have an up-to-date backup of the original data.

        By default, for both new or existing File-sets, *pydicom* uses the
        following directory structure semantics when writing out changes:

        * For instances defined using the standard four-levels of directory
          records (i.e. PATIENT/STUDY/SERIES + one of the record types
          such as IMAGE or RT DOSE): ``PTxxxxxx/STxxxxxx/SExxxxxx/`` with a
          filename such as ``IMxxxxxx`` (for IMAGE), where the first two
          characters are dependent on the record type and ``xxxxxx`` is a
          numeric or alphanumeric index.
        * For instances defined using the standard one-level directory record
          (i.e. PALETTE, IMPLANT): a filename such as ``PAxxxxxx`` (for
          PALETTE).
        * For instances defined using PRIVATE directory records then the
          structure will be along the lines of ``P0xxxxxx/P1xxxxxx/P2xxxxxx``
          for PRIVATE/PRIVATE/PRIVATE, ``PTxxxxxx/STxxxxxx/P2xxxxxx`` for
          PATIENT/STUDY/PRIVATE.

        When only changes to the DICOMDIR file are required or instances have
        only been removed from an existing File-set you can use the
        `use_existing` keyword parameter to keep the existing directory
        structure and only update the DICOMDIR file.

        Parameters
        ----------
        path : str or PathLike, optional
            For new File-sets, the absolute path to the root directory where
            the File-set will be written. Using `path` with an existing
            File-set will raise :class:`ValueError`.
        use_existing : bool, optional
            If ``True`` and no instances have been added to the File-set
            (removals are OK), then only update the DICOMDIR file, keeping
            the current directory structure rather than converting everything
            to the semantics used by *pydicom* for File-sets (default
            ``False``).
        force_implicit : bool, optional
            If ``True`` force the DICOMDIR file to be encoded using *Implicit
            VR Little Endian* which is non-conformant to the DICOM Standard
            (default ``False``).

        Raises
        ------
        ValueError
            If `use_existing` is ``True`` but instances have been staged
            for addition to the File-set.
        """
        if not path and self.path is None:
            raise ValueError(
                "The path to the root directory is required for a new File-set"
            )

        if path and self.path:
            raise ValueError(
                "The path for an existing File-set cannot be changed, use "
                "'FileSet.copy()' to write the File-set to a new location"
            )

        if path:
            self._path = Path(path)

        # Don't write unless changed or new
        if not self.is_staged:
            return

        # Path to the DICOMDIR file
        p = cast(Path, self._path) / "DICOMDIR"

        # Re-use the existing directory structure if only moves or removals
        #   are required and `use_existing` is True
        major_change = bool(self._stage["+"])
        if use_existing and major_change:
            raise ValueError(
                "'Fileset.write()' called with 'use_existing' but additions "
                "to the File-set's managed instances are staged"
            )

        if not use_existing:
            major_change |= self._stage["~"]

        # Worst case scenario if all instances in one directory
        if len(self) > 10**6:
            self._use_alphanumeric = True

        if len(self) > 35**6:
            raise NotImplementedError(
                "pydicom doesn't support writing File-sets with more than "
                "1838265625 managed instances"
            )

        # Remove the removals - must be first because the File IDs will be
        #   incorrect with the removals still in the tree
        for instance in self._stage["-"].values():
            try:
                Path(instance.path).unlink()
            except FileNotFoundError:
                pass
            self._tree.remove(instance.node)

        if use_existing and not major_change:
            with open(p, "wb") as fp:
                f = DicomFileLike(fp)
                self._write_dicomdir(f, force_implicit=force_implicit)

            self.load(p, raise_orphans=True)

            return

        # We need to be careful not to overwrite the source file
        #   for a different (later) instance
        # Check for collisions between the new and old File IDs
        #   and copy any to the stage
        fout = {Path(ii.FileID) for ii in self}
        fin = {
            ii.node._file_id for ii in self if ii.SOPInstanceUID not in self._stage["+"]
        }
        collisions = fout & fin
        for instance in [ii for ii in self if ii.node._file_id in collisions]:
            self._stage["+"][instance.SOPInstanceUID] = instance
            instance._apply_stage("+")
            shutil.copyfile(self._path / instance.node._file_id, instance.path)

        for instance in self:
            dst = self._path / instance.FileID
            dst.parent.mkdir(parents=True, exist_ok=True)
            fn: Callable
            if instance.SOPInstanceUID in self._stage["+"]:
                src = instance.path
                fn = shutil.copyfile
            else:
                src = self._path / instance.node._file_id
                fn = shutil.move

            fn(os.fspath(src), os.fspath(dst))
            instance.node._record.ReferencedFileID = instance.FileID.split(os.path.sep)

        # Create the DICOMDIR file
        with open(p, "wb") as fp:
            f = DicomFileLike(fp)
            self._write_dicomdir(f, force_implicit=force_implicit)

        # Reload the File-set
        #   We're doing things wrong if we have orphans so raise
        self.load(p, raise_orphans=True)

    def _write_dicomdir(
        self, fp: DicomFileLike, copy_safe: bool = False, force_implicit: bool = False
    ) -> None:
        """Encode and write the File-set's DICOMDIR dataset.

        Parameters
        ----------
        fp : file-like
            The file-like to write the encoded DICOMDIR dataset to. Must
            have ``write()``, ``tell()`` and ``seek()`` methods.
        copy_safe : bool, optional
            If ``True`` then the function doesn't make any changes to the
            public parts of the current :class:`~pydicom.fileset.FileSet`
            instance.
        force_implicit : bool, optional
            Force encoding the DICOMDIR with 'Implicit VR Little Endian' which
            is non-conformant to the DICOM Standard (default ``False``).
        """
        ds = self._ds
        if copy_safe or not ds:
            ds = self._create_dicomdir()

        # By default, always convert to the correct syntax
        ds.file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
        seq_offset = 12
        if force_implicit:
            ds.file_meta.TransferSyntaxUID = ImplicitVRLittleEndian
            seq_offset = 8

        fp.is_implicit_VR = ds.file_meta.TransferSyntaxUID.is_implicit_VR
        fp.is_little_endian = ds.file_meta.TransferSyntaxUID.is_little_endian

        # Reset the offsets
        first_elem = ds[_FIRST_OFFSET]
        first_elem.value = 0
        last_elem = ds[_LAST_OFFSET]
        last_elem.value = 0

        # Write the preamble, DICM marker and File Meta
        fp.write(b"\x00" * 128 + b"DICM")
        write_file_meta_info(fp, ds.file_meta, enforce_standard=True)

        # Write the dataset
        # Write up to the *Offset of the First Directory Record...* element
        write_dataset(fp, ds[:0x00041200])
        tell_offset_first = fp.tell()  # Start of *Offset of the First...*
        # Write up to (but not including) the *Directory Record Sequence*
        write_dataset(fp, ds[0x00041200:0x00041220])

        # Rebuild and encode the *Directory Record Sequence*
        # Step 1: Determine the offsets for all the records
        offset = fp.tell() + seq_offset  # Start of the first seq. item tag
        for node in self._tree:
            # RecordNode._offset is the start of each record's seq. item tag
            node._offset = offset
            offset += 8  # a sequence item's (tag + length)
            # Copy safe - only modifies RecordNode._offset
            offset += node._encode_record(force_implicit)
            # If the sequence item has undefined length then it uses a
            #   sequence item delimiter item
            if node._record.is_undefined_length_sequence_item:
                offset += 8

        # Step 2: Update the records and add to *Directory Record Sequence*
        ds.DirectoryRecordSequence = []
        for node in self._tree:
            record = node._record
            if not copy_safe:
                node._update_record_offsets()
            else:
                record = copy.deepcopy(record)
                next_elem = record[_NEXT_OFFSET]
                next_elem.value = 0
                if node.next:
                    next_elem.value = node.next._offset

                lower_elem = record[_LOWER_OFFSET]
                lower_elem.value = 0
                if node.children:
                    record[_LOWER_OFFSET].value = node.children[0]._offset

            cast(list[Dataset], ds.DirectoryRecordSequence).append(record)

        # Step 3: Encode *Directory Record Sequence* and the rest
        write_dataset(fp, ds[0x00041220:])

        # Update the first and last record offsets
        if self._tree.children:
            first_elem.value = self._tree.children[0]._offset
            last_elem.value = self._tree.children[-1]._offset
            # Re-write the record offset pointer elements
            fp.seek(tell_offset_first)
            write_data_element(fp, first_elem)
            write_data_element(fp, last_elem)
            # Go to the end
            fp.seek(0, 2)


# Functions for creating Directory Records
def _check_dataset(ds: Dataset, keywords: list[str]) -> None:
    """Check the dataset module for the Type 1 `keywords`.

    Parameters
    ----------
    ds : pydicom.dataset.Dataset
        The dataset to check.
    keywords : list of str
        The DICOM keywords for Type 1 elements that are to be checked.

    Raises
    ------
    KeyError
        If an element is not in the dataset.
    ValueError
        If the element is present but has no value.
    """
    for kw in keywords:
        tag = Tag(cast(int, tag_for_keyword(kw)))
        name = dictionary_description(tag)
        if kw not in ds:
            raise ValueError(f"The instance's {tag} '{name}' element is missing")

        if ds[kw].VM != 0:
            continue

        raise ValueError(f"The instance's {tag} '{name}' element cannot be empty")


def _define_patient(ds: Dataset) -> Dataset:
    """Return a PATIENT directory record from `ds`."""
    _check_dataset(ds, ["PatientID"])

    record = Dataset()
    record.PatientName = ds.get("PatientName")
    record.PatientID = ds.PatientID

    return record


def _define_study(ds: Dataset) -> Dataset:
    """Return a STUDY directory record from `ds`."""
    _check_dataset(ds, ["StudyDate", "StudyTime", "StudyID"])

    record = Dataset()
    record.StudyDate = ds.StudyDate
    record.StudyTime = ds.StudyTime
    record.StudyDescription = ds.get("StudyDescription")
    if "StudyInstanceUID" in ds:
        _check_dataset(ds, ["StudyInstanceUID"])
        record.StudyInstanceUID = ds.StudyInstanceUID
    record.StudyID = ds.StudyID
    record.AccessionNumber = ds.get("AccessionNumber")

    return record


def _define_series(ds: Dataset) -> Dataset:
    """Return a SERIES directory record from `ds`."""
    _check_dataset(ds, ["Modality", "SeriesInstanceUID", "SeriesNumber"])

    record = Dataset()
    record.Modality = ds.Modality
    record.SeriesInstanceUID = ds.SeriesInstanceUID
    record.SeriesNumber = ds.SeriesNumber

    return record


def _define_image(ds: Dataset) -> Dataset:
    """Return an IMAGE directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber"])

    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber

    return record


def _define_rt_dose(ds: Dataset) -> Dataset:
    """Return an RT DOSE directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber", "DoseSummationType"])

    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber
    record.DoseSummationType = ds.DoseSummationType

    return record


def _define_rt_structure_set(ds: Dataset) -> Dataset:
    """Return an RT STRUCTURE SET directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber", "StructureSetLabel"])

    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber
    record.StructureSetLabel = ds.StructureSetLabel
    record.StructureSetDate = ds.get("StructureSetDate")
    record.StructureSetTime = ds.get("StructureSetTime")

    return record


def _define_rt_plan(ds: Dataset) -> Dataset:
    """Return an RT PLAN directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber", "RTPlanLabel"])

    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber
    record.RTPlanLabel = ds.RTPlanLabel
    record.RTPlanDate = ds.get("RTPlanDate")
    record.RTPlanTime = ds.get("RTPlanTime")

    return record


def _define_rt_treatment_record(ds: Dataset) -> Dataset:
    """Return an RT TREAT RECORD directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber"])

    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber
    record.TreatmentDate = ds.get("TreatmentDate")
    record.TreatmentTime = ds.get("TreatmentTime")

    return record


def _define_presentation(ds: Dataset) -> Dataset:
    """Return a PRESENTATION directory record from `ds`."""
    _check_dataset(
        ds,
        [
            "PresentationCreationDate",
            "PresentationCreationTime",
            "InstanceNumber",
            "ContentLabel",
        ],
    )

    record = Dataset()
    record.PresentationCreationDate = ds.PresentationCreationDate
    record.PresentationCreationTime = ds.PresentationCreationTime
    # Content Identification Macro
    record.InstanceNumber = ds.InstanceNumber
    record.ContentLabel = ds.ContentLabel
    record.ContentDescription = ds.get("ContentDescription")
    record.ContentCreatorName = ds.get("ContentCreatorName")
    if "ReferencedSeriesSequence" in ds:
        _check_dataset(ds, ["ReferencedSeriesSequence"])
        record.ReferencedSeriesSequence = ds.ReferencedSeriesSequence
    if "BlendingSequence" in ds:
        _check_dataset(ds, ["BlendingSequence"])
        record.BlendingSequence = ds.BlendingSequence

    return record


def _define_sr_document(ds: Dataset) -> Dataset:
    """Return a SR DOCUMENT directory record from `ds`."""
    _check_dataset(
        ds,
        [
            "InstanceNumber",
            "CompletionFlag",
            "VerificationFlag",
            "ContentDate",
            "ContentTime",
            "ConceptNameCodeSequence",
        ],
    )

    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber
    record.CompletionFlag = ds.CompletionFlag
    record.VerificationFlag = ds.VerificationFlag
    record.ContentDate = ds.ContentDate
    record.ContentTime = ds.ContentTime
    if "VerificationDateTime" in ds:
        _check_dataset(ds, ["VerificationDateTime"])
        record.VerificationDateTime = ds.VerificationDateTime
    record.ConceptNameCodeSequence = ds.ConceptNameCodeSequence
    if "ContentSequence" in ds:
        _check_dataset(ds, ["ContentSequence"])
        record.ContentSequence = ds.ContentSequence

    return record


def _define_key_object_doc(ds: Dataset) -> Dataset:
    """Return a KEY OBJECT DOC directory record from `ds`."""
    _check_dataset(
        ds,
        [
            "InstanceNumber",
            "ContentDate",
            "ContentTime",
            "ConceptNameCodeSequence",
        ],
    )

    record = Dataset()
    record.ContentDate = ds.ContentDate
    record.ContentTime = ds.ContentTime
    record.InstanceNumber = ds.InstanceNumber
    record.ConceptNameCodeSequence = ds.ConceptNameCodeSequence
    if "ContentSequence" in ds:
        _check_dataset(ds, ["ContentSequence"])
        record.ContentSequence = ds.ContentSequence

    return record


def _define_spectroscopy(ds: Dataset) -> Dataset:
    """Return an SPECTROSCOPY directory record from `ds`."""
    _check_dataset(
        ds,
        [
            "ImageType",
            "ContentDate",
            "ContentTime",
            "InstanceNumber",
            "NumberOfFrames",
            "Rows",
            "Columns",
            "DataPointRows",
            "DataPointColumns",
        ],
    )

    record = Dataset()
    record.ImageType = ds.ImageType
    record.ContentDate = ds.ContentDate
    record.ContentTime = ds.ContentTime
    record.InstanceNumber = ds.InstanceNumber
    if "ReferencedImageEvidenceSequence" in ds:
        _check_dataset(ds, ["ReferencedImageEvidenceSequence"])

        record.ReferencedImageEvidenceSequence = ds.ReferencedImageEvidenceSequence

    record.NumberOfFrames = ds.NumberOfFrames
    record.Rows = ds.Rows
    record.Columns = ds.Columns
    record.DataPointRows = ds.DataPointRows
    record.DataPointColumns = ds.DataPointColumns

    return record


def _define_hanging_protocol(ds: Dataset) -> Dataset:
    """Return a HANGING PROTOCOL directory record from `ds`."""
    _check_dataset(
        ds,
        [
            "HangingProtocolCreator",
            "HangingProtocolCreationDateTime",
            "HangingProtocolDefinitionSequence",
            "NumberOfPriorsReferenced",
        ],
    )

    record = Dataset()
    record.HangingProtocolCreator = ds.HangingProtocolCreator
    record.HangingProtocolCreationDateTime = ds.HangingProtocolCreationDateTime
    record.HangingProtocolDefinitionSequence = ds.HangingProtocolDefinitionSequence
    record.NumberOfPriorsReferenced = ds.NumberOfPriorsReferenced
    record.HangingProtocolUserIdentificationCodeSequence = ds.get(
        "HangingProtocolUserIdentificationCodeSequence", []
    )

    return record


def _define_encap_doc(ds: Dataset) -> Dataset:
    """Return an ENCAP DOC directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber", "MIMETypeOfEncapsulatedDocument"])

    record = Dataset()
    record.ContentDate = ds.get("ContentDate")
    record.ContentTime = ds.get("ContentTime")
    record.InstanceNumber = ds.InstanceNumber
    record.DocumentTitle = ds.get("DocumentTitle")
    if "HL7InstanceIdentifier" in ds:
        _check_dataset(ds, ["HL7InstanceIdentifier"])
        record.HL7InstanceIdentifier = ds.HL7InstanceIdentifier
    record.ConceptNameCodeSequence = ds.get("ConceptNameCodeSequence")

    record.MIMETypeOfEncapsulatedDocument = ds.MIMETypeOfEncapsulatedDocument

    return record


def _define_palette(ds: Dataset) -> Dataset:
    """Return a PALETTE directory record from `ds`."""
    _check_dataset(ds, ["ContentLabel"])

    record = Dataset()
    record.ContentLabel = ds.ContentLabel
    record.ContentDescription = ds.get("ContentDescription")

    return record


def _define_implant(ds: Dataset) -> Dataset:
    """Return a IMPLANT directory record from `ds`."""
    _check_dataset(ds, ["Manufacturer", "ImplantName", "ImplantPartNumber"])

    record = Dataset()
    record.Manufacturer = ds.Manufacturer
    record.ImplantName = ds.ImplantName
    if "ImplantSize" in ds:
        _check_dataset(ds, ["ImplantSize"])
        record.ImplantSize = ds.ImplantSize
    record.ImplantPartNumber = ds.ImplantPartNumber

    return record


def _define_implant_assy(ds: Dataset) -> Dataset:
    """Return a IMPLANT ASSY directory record from `ds`."""
    _check_dataset(
        ds, ["ImplantAssemblyTemplateName", "Manufacturer", "ProcedureTypeCodeSequence"]
    )

    record = Dataset()
    record.ImplantAssemblyTemplateName = ds.ImplantAssemblyTemplateName
    record.Manufacturer = ds.Manufacturer
    record.ProcedureTypeCodeSequence = ds.ProcedureTypeCodeSequence

    return record


def _define_implant_group(ds: Dataset) -> Dataset:
    """Return a IMPLANT GROUP directory record from `ds`."""
    _check_dataset(ds, ["ImplantTemplateGroupName", "ImplantTemplateGroupIssuer"])

    record = Dataset()
    record.ImplantTemplateGroupName = ds.ImplantTemplateGroupName
    record.ImplantTemplateGroupIssuer = ds.ImplantTemplateGroupIssuer

    return record


def _define_surface_scan(ds: Dataset) -> Dataset:
    """Return a SURFACE SCAN directory record from `ds`."""
    _check_dataset(ds, ["ContentDate", "ContentTime"])

    record = Dataset()
    record.ContentDate = ds.ContentDate
    record.ContentTime = ds.ContentTime

    return record


def _define_assessment(ds: Dataset) -> Dataset:
    """Return a ASSESSMENT directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber", "InstanceCreationDate"])

    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber
    record.InstanceCreationDate = ds.InstanceCreationDate
    record.InstanceCreationTime = ds.get("InstanceCreationTime")

    return record


def _define_radiotherapy(ds: Dataset) -> Dataset:
    """Return a RADIOTHERAPY directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber"])

    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber
    if "UserContentLabel" in ds:
        _check_dataset(ds, ["UserContentLabel"])
        record.UserContentLabel = ds.UserContentLabel
    if "UserContentLongLabel" in ds:
        _check_dataset(ds, ["UserContentLongLabel"])
        record.UserContentLongLabel = ds.UserContentLongLabel

    record.ContentDescription = ds.get("ContentDescription")
    record.ContentCreatorName = ds.get("ContentCreatorName")

    return record


def _define_generic_content(ds: Dataset) -> Dataset:
    """Return a WAVEFORM/RAW DATA directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber", "ContentDate", "ContentTime"])

    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber
    record.ContentDate = ds.ContentDate
    record.ContentTime = ds.ContentTime

    return record


def _define_generic_content_id(ds: Dataset) -> Dataset:
    """Return a generic content identification directory record from `ds`."""
    _check_dataset(ds, ["InstanceNumber", "ContentDate", "ContentTime", "ContentLabel"])

    # Content Identification Macro
    record = Dataset()
    record.InstanceNumber = ds.InstanceNumber
    record.ContentDate = ds.ContentDate
    record.ContentTime = ds.ContentTime
    record.ContentLabel = ds.ContentLabel
    record.ContentDescription = ds.get("ContentDescription")
    record.ContentCreatorName = ds.get("ContentCreatorName")

    return record


def _define_empty(ds: Dataset) -> Dataset:
    """Return an empty directory record from `ds`."""
    return Dataset()


DIRECTORY_RECORDERS = {
    "PATIENT": _define_patient,  # TOP LEVEL
    "STUDY": _define_study,  # INTERMEDIATE or LEAF
    "SERIES": _define_series,  # INTERMEDIATE
    "IMAGE": _define_image,  # LEAF
    "RT DOSE": _define_rt_dose,  # LEAF
    "RT STRUCTURE SET": _define_rt_structure_set,  # LEAF
    "RT PLAN": _define_rt_plan,  # LEAF
    "RT TREAT RECORD": _define_rt_treatment_record,  # LEAF
    "PRESENTATION": _define_presentation,  # LEAF
    "WAVEFORM": _define_generic_content,  # LEAF
    "SR DOCUMENT": _define_sr_document,  # LEAF
    "KEY OBJECT DOC": _define_key_object_doc,  # LEAF
    "SPECTROSCOPY": _define_spectroscopy,  # LEAF
    "RAW DATA": _define_generic_content,  # LEAF
    "REGISTRATION": _define_generic_content_id,  # LEAF
    "FIDUCIAL": _define_generic_content_id,  # LEAF
    "HANGING PROTOCOL": _define_hanging_protocol,  # TOP LEVEL and LEAF
    "ENCAP DOC": _define_encap_doc,  # LEAF
    "VALUE MAP": _define_generic_content_id,  # LEAF
    "STEREOMETRIC": _define_empty,  # LEAF
    "PALETTE": _define_palette,  # TOP LEVEL and LEAF
    "IMPLANT": _define_implant,  # TOP LEVEL and LEAF
    "IMPLANT ASSY": _define_implant_assy,  # TOP LEVEL and LEAF
    "IMPLANT GROUP": _define_implant_group,  # TOP LEVEL and LEAF
    "PLAN": _define_empty,  # LEAF
    "MEASUREMENT": _define_generic_content_id,  # LEAF
    "SURFACE": _define_generic_content_id,  # LEAF
    "SURFACE SCAN": _define_surface_scan,  # LEAF
    "TRACT": _define_generic_content_id,  # LEAF
    "ASSESSMENT": _define_assessment,  # LEAF
    "RADIOTHERAPY": _define_radiotherapy,  # LEAF
}
"""A :class:`dict` containing the directory record creation functions.

The functions are used to create non-PRIVATE records for a given SOP Instance
as ``{"RECORD TYPE": callable}``, where ``"RECORD TYPE"`` should match one of
the allowable values - except PRIVATE - for (0004,1430) *Directory Record
Type*. By overriding the function for a given record type you can customize
the directory records that will be included in the DICOMDIR file.

Example
-------

.. code-block:: python

    from pydicom.fileset import DIRECTORY_RECORDERS, FileSet

    def my_recorder(ds: Dataset) -> Dataset:
        record = Dataset()
        record.OffsetOfTheNextDirectoryRecord = 0
        record.RecordInUseFlag = 0xFFFF
        record.OffsetOfReferencedLowerLevelDirectoryEntity = 0
        record.DirectoryRecordType = "PATIENT"
        if "SpecificCharacterSet" in ds:
            record.SpecificCharacterSet = ds.SpecificCharacterSet

        record.PatientName = ds.get("PatientName")
        record.PatientID = ds.PatientID

        return record

    DIRECTORY_RECORDERS["PATIENT"] = my_recorder

    # Use the updated directory recorder
    fs = FileSet()
    fs.add('my_instance.dcm')

The function should take a single parameter which is the SOP Instance to be
added to the File-set as a :class:`~pydicom.dataset.Dataset` and return a
:class:`~pydicom.dataset.Dataset` with a single directory record matching the
directory record type. See :dcm:`Annex F.3.2.2<chtml/part03/sect_F.3.2.2.html>`
for possible record types.

For PRIVATE records you must use the
:meth:`~pydicom.fileset.FileSet.add_custom` method instead.
"""
_SINGLE_LEVEL_SOP_CLASSES = {
    sop.HangingProtocolStorage: "HANGING PROTOCOL",
    sop.ColorPaletteStorage: "PALETTE",
    sop.GenericImplantTemplateStorage: "IMPLANT",
    sop.ImplantAssemblyTemplateStorage: "IMPLANT ASSY",
    sop.ImplantTemplateGroupStorage: "IMPLANT GROUP",
}
_FOUR_LEVEL_SOP_CLASSES = {
    sop.RTDoseStorage: "RT DOSE",
    sop.RTStructureSetStorage: "RT STRUCTURE SET",
    sop.RTBeamsTreatmentRecordStorage: "RT TREAT RECORD",
    sop.RTBrachyTreatmentRecordStorage: "RT TREAT RECORD",
    sop.RTTreatmentSummaryRecordStorage: "RT TREAT RECORD",
    sop.RTIonBeamsTreatmentRecordStorage: "RT TREAT RECORD",
    sop.GrayscaleSoftcopyPresentationStateStorage: "PRESENTATION",
    sop.ColorSoftcopyPresentationStateStorage: "PRESENTATION",
    sop.PseudoColorSoftcopyPresentationStateStorage: "PRESENTATION",
    sop.BlendingSoftcopyPresentationStateStorage: "PRESENTATION",
    sop.XAXRFGrayscaleSoftcopyPresentationStateStorage: "PRESENTATION",
    sop.BasicStructuredDisplayStorage: "PRESENTATION",
    sop.BasicVoiceAudioWaveformStorage: "WAVEFORM",
    sop.TwelveLeadECGWaveformStorage: "WAVEFORM",
    sop.GeneralECGWaveformStorage: "WAVEFORM",
    sop.AmbulatoryECGWaveformStorage: "WAVEFORM",
    sop.HemodynamicWaveformStorage: "WAVEFORM",
    sop.CardiacElectrophysiologyWaveformStorage: "WAVEFORM",
    sop.ArterialPulseWaveformStorage: "WAVEFORM",
    sop.RespiratoryWaveformStorage: "WAVEFORM",
    sop.GeneralAudioWaveformStorage: "WAVEFORM",
    sop.RoutineScalpElectroencephalogramWaveformStorage: "WAVEFORM",
    sop.ElectromyogramWaveformStorage: "WAVEFORM",
    sop.ElectrooculogramWaveformStorage: "WAVEFORM",
    sop.SleepElectroencephalogramWaveformStorage: "WAVEFORM",
    sop.MultichannelRespiratoryWaveformStorage: "WAVEFORM",
    sop.BodyPositionWaveformStorage: "WAVEFORM",
    sop.BasicTextSRStorage: "SR DOCUMENT",
    sop.EnhancedSRStorage: "SR DOCUMENT",
    sop.ComprehensiveSRStorage: "SR DOCUMENT",
    sop.MammographyCADSRStorage: "SR DOCUMENT",
    sop.ChestCADSRStorage: "SR DOCUMENT",
    sop.ProcedureLogStorage: "SR DOCUMENT",
    sop.XRayRadiationDoseSRStorage: "SR DOCUMENT",
    sop.SpectaclePrescriptionReportStorage: "SR DOCUMENT",
    sop.ColonCADSRStorage: "SR DOCUMENT",
    sop.MacularGridThicknessAndVolumeReportStorage: "SR DOCUMENT",
    sop.ImplantationPlanSRStorage: "SR DOCUMENT",
    sop.Comprehensive3DSRStorage: "SR DOCUMENT",
    sop.RadiopharmaceuticalRadiationDoseSRStorage: "SR DOCUMENT",
    sop.ExtensibleSRStorage: "SR DOCUMENT",
    sop.AcquisitionContextSRStorage: "SR DOCUMENT",
    sop.SimplifiedAdultEchoSRStorage: "SR DOCUMENT",
    sop.PatientRadiationDoseSRStorage: "SR DOCUMENT",
    sop.PlannedImagingAgentAdministrationSRStorage: "SR DOCUMENT",
    sop.PerformedImagingAgentAdministrationSRStorage: "SR DOCUMENT",
    sop.KeyObjectSelectionDocumentStorage: "KEY OBJECT DOC",
    sop.MRSpectroscopyStorage: "SPECTROSCOPY",
    sop.RawDataStorage: "RAW DATA",
    sop.SpatialRegistrationStorage: "REGISTRATION",
    sop.DeformableSpatialRegistrationStorage: "REGISTRATION",
    sop.SpatialFiducialsStorage: "FIDUCIAL",
    sop.RealWorldValueMappingStorage: "VALUE MAP",
    sop.StereometricRelationshipStorage: "STEREOMETRIC",
    sop.LensometryMeasurementsStorage: "MEASUREMENT",
    sop.AutorefractionMeasurementsStorage: "MEASUREMENT",
    sop.KeratometryMeasurementsStorage: "MEASUREMENT",
    sop.SubjectiveRefractionMeasurementsStorage: "MEASUREMENT",
    sop.VisualAcuityMeasurementsStorage: "MEASUREMENT",
    sop.OphthalmicAxialMeasurementsStorage: "MEASUREMENT",
    sop.OphthalmicVisualFieldStaticPerimetryMeasurementsStorage: "MEASUREMENT",
    sop.SurfaceSegmentationStorage: "SURFACE",
    sop.SurfaceScanMeshStorage: "SURFACE SCAN",
    sop.SurfaceScanPointCloudStorage: "SURFACE SCAN",
    sop.TractographyResultsStorage: "TRACT",
    sop.ContentAssessmentResultsStorage: "ASSESSMENT",
}


def _single_level_record_type(ds: Dataset) -> str:
    """Return a single-level *Directory Record Type* for `ds`."""
    sop_class = cast(UID | None, getattr(ds, "SOPClassUID", None))

    try:
        return _SINGLE_LEVEL_SOP_CLASSES[sop_class]  # type: ignore[index]
    except KeyError:
        return "PATIENT"


def _four_level_record_type(ds: Dataset) -> str:
    """Return the fourth-level *Directory Record Type* for `ds`."""
    modality = getattr(ds, "Modality", None)
    if modality in ["RTINTENT", "RTSEGANN", "RTRAD"]:
        return "RADIOTHERAPY"

    if modality == "PLAN":
        return "PLAN"

    if "EncapsulatedDocument" in ds:
        return "ENCAP DOC"

    if "RTPlanLabel" in ds:
        return "RT PLAN"

    sop_class = cast(UID | None, getattr(ds, "SOPClassUID", None))

    try:
        return _FOUR_LEVEL_SOP_CLASSES[sop_class]  # type: ignore[index]
    except KeyError:
        return "IMAGE"
