Skip to content

SpiritSafe API

Overview

gkc.spirit_safe provides SpiritSafe source configuration, lookup hydration, JSON Entity Profile export, value-list hydration, artifact-manifest indexing, and curation packet scaffolding.

Current architecture:

  • Runtime packet assembly loads profiles/<QID>.json directly.
  • cache/manifest.json is a tooling/discovery index.
  • Value lists are materialized in cache/queries/<QID>.json and consumed as cache artifacts.

Quick Start

Configure SpiritSafe Source

from gkc.spirit_safe import set_spirit_safe_source, get_spirit_safe_source

set_spirit_safe_source(mode="github", github_repo="skybristol/SpiritSafe", github_ref="main")
print(get_spirit_safe_source())
from gkc.spirit_safe import set_spirit_safe_source

set_spirit_safe_source(mode="local", local_root="/path/to/SpiritSafe")

Build and Export Artifact Manifest

from gkc.spirit_safe import (
    build_spiritsafe_manifest_document,
    export_spiritsafe_manifest,
)

manifest_doc = build_spiritsafe_manifest_document("/path/to/SpiritSafe")
print(manifest_doc["entities"]["count"])

written = export_spiritsafe_manifest("/path/to/SpiritSafe")
print(len(written["profiles"]))

Load Manifest for Registry Tooling

from gkc.spirit_safe import load_manifest

manifest = load_manifest()
print(manifest.generated_at)
print(manifest.profile_qids)
print(manifest.get_profile_entry("Q4"))

Load JSON Profiles and Build Profile Packages

from gkc.spirit_safe import load_profile, load_profile_package

profile = load_profile("Q4")
print(profile["entity"])

package = load_profile_package("Q4", depth=1)
print(package["primary_profile"])
print(sorted(package["profiles"].keys()))
from gkc.spirit_safe import get_profile_graph, resolve_profile_link

graph = get_profile_graph()
print(graph.get_neighbors("Q4"))

link = resolve_profile_link("Q4", "Q40")
print(link)

Create and Validate Curation Packets

from gkc.spirit_safe import create_curation_packet, validate_packet_structure

packet = create_curation_packet("Q4", operation_mode="bulk", depth=1)
is_valid, errors = validate_packet_structure(packet)
print(packet["packet_id"], is_valid, errors)

Export JSON Profiles from Cache Entities

from gkc.spirit_safe import export_entity_profile_json_documents

result = export_entity_profile_json_documents(
    cache_entities_dir="/path/to/SpiritSafe/cache/entities",
    output_dir="/path/to/SpiritSafe/profiles",
)
print(result.written_ids)

Hydrate Value Lists from Cache Entities

from gkc.spirit_safe import hydrate_value_lists_from_cache

result = hydrate_value_lists_from_cache(
    cache_entities_dir="/path/to/SpiritSafe/cache/entities",
    queries_dir="/path/to/SpiritSafe/queries",
    cache_queries_dir="/path/to/SpiritSafe/cache/queries",
)
print(result.hydrated_ids)

Public API Reference

Configuration

gkc.spirit_safe.SpiritSafeSourceConfig dataclass

Package-level configuration for SpiritSafe source location.

Parameters:

Name Type Description Default
mode SpiritSafeSourceMode

Source mode ("github" or "local")

'github'
github_repo str

GitHub repository slug for SpiritSafe assets

DEFAULT_SPIRIT_SAFE_GITHUB_REPO
github_ref str

Git ref used for GitHub raw file resolution

'main'
local_root Optional[Path]

Local SpiritSafe clone root when mode is "local"

None

Plain meaning: Decide whether SpiritSafe assets come from GitHub or local disk.

Source code in gkc/spirit_safe.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@dataclass(frozen=True)
class SpiritSafeSourceConfig:
    """Package-level configuration for SpiritSafe source location.

    Args:
        mode: Source mode ("github" or "local")
        github_repo: GitHub repository slug for SpiritSafe assets
        github_ref: Git ref used for GitHub raw file resolution
        local_root: Local SpiritSafe clone root when mode is "local"

    Plain meaning: Decide whether SpiritSafe assets come from GitHub or local disk.
    """

    mode: SpiritSafeSourceMode = "github"
    github_repo: str = DEFAULT_SPIRIT_SAFE_GITHUB_REPO
    github_ref: str = "main"
    local_root: Optional[Path] = None

    def resolve_cache_dir(self) -> Path:
        """Resolve default cache directory for the configured source.

        Returns:
            Filesystem path to cache directory.
        """
        if self.mode == "local" and self.local_root is not None:
            return self.local_root / "cache"

        repo_slug = self.github_repo.replace("/", "_")
        return Path.home() / ".cache" / "gkc" / "spiritsafe" / repo_slug / "cache"

    def resolve_relative(self, relative_path: str) -> Union[Path, str]:
        """Resolve a SpiritSafe-relative path to local path or GitHub raw URL.

        Args:
            relative_path: Relative path inside SpiritSafe repository.

        Returns:
            Local filesystem path (local mode) or GitHub raw URL (github mode).
        """
        normalized = relative_path.lstrip("/")
        if self.mode == "local":
            if self.local_root is None:
                raise ValueError("local_root is required when mode='local'")
            return self.local_root / normalized

        return (
            f"https://raw.githubusercontent.com/{self.github_repo}/"
            f"{self.github_ref}/{normalized}"
        )

resolve_cache_dir()

Resolve default cache directory for the configured source.

Returns:

Type Description
Path

Filesystem path to cache directory.

Source code in gkc/spirit_safe.py
61
62
63
64
65
66
67
68
69
70
71
def resolve_cache_dir(self) -> Path:
    """Resolve default cache directory for the configured source.

    Returns:
        Filesystem path to cache directory.
    """
    if self.mode == "local" and self.local_root is not None:
        return self.local_root / "cache"

    repo_slug = self.github_repo.replace("/", "_")
    return Path.home() / ".cache" / "gkc" / "spiritsafe" / repo_slug / "cache"

resolve_relative(relative_path)

Resolve a SpiritSafe-relative path to local path or GitHub raw URL.

Parameters:

Name Type Description Default
relative_path str

Relative path inside SpiritSafe repository.

required

Returns:

Type Description
Union[Path, str]

Local filesystem path (local mode) or GitHub raw URL (github mode).

Source code in gkc/spirit_safe.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def resolve_relative(self, relative_path: str) -> Union[Path, str]:
    """Resolve a SpiritSafe-relative path to local path or GitHub raw URL.

    Args:
        relative_path: Relative path inside SpiritSafe repository.

    Returns:
        Local filesystem path (local mode) or GitHub raw URL (github mode).
    """
    normalized = relative_path.lstrip("/")
    if self.mode == "local":
        if self.local_root is None:
            raise ValueError("local_root is required when mode='local'")
        return self.local_root / normalized

    return (
        f"https://raw.githubusercontent.com/{self.github_repo}/"
        f"{self.github_ref}/{normalized}"
    )

gkc.spirit_safe.set_spirit_safe_source(mode='github', github_repo=DEFAULT_SPIRIT_SAFE_GITHUB_REPO, github_ref='main', local_root=None)

Set package-wide SpiritSafe source location.

Parameters:

Name Type Description Default
mode SpiritSafeSourceMode

Source mode ("github" or "local").

'github'
github_repo str

GitHub repository slug for SpiritSafe assets.

DEFAULT_SPIRIT_SAFE_GITHUB_REPO
github_ref str

Git ref used for GitHub raw file resolution.

'main'
local_root Optional[Union[str, Path]]

Local SpiritSafe clone root when mode is "local".

None

Raises:

Type Description
ValueError

If local mode is requested without local_root.

Plain meaning: Configure where SpiritSafe profiles/queries/caches are resolved.

Source code in gkc/spirit_safe.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def set_spirit_safe_source(
    mode: SpiritSafeSourceMode = "github",
    github_repo: str = DEFAULT_SPIRIT_SAFE_GITHUB_REPO,
    github_ref: str = "main",
    local_root: Optional[Union[str, Path]] = None,
) -> None:
    """Set package-wide SpiritSafe source location.

    Args:
        mode: Source mode ("github" or "local").
        github_repo: GitHub repository slug for SpiritSafe assets.
        github_ref: Git ref used for GitHub raw file resolution.
        local_root: Local SpiritSafe clone root when mode is "local".

    Raises:
        ValueError: If local mode is requested without local_root.

    Plain meaning: Configure where SpiritSafe profiles/queries/caches are resolved.
    """
    global _SPIRIT_SAFE_SOURCE_CONFIG

    normalized_local_root: Optional[Path] = None
    if mode == "local":
        if local_root is None:
            raise ValueError("local_root is required when mode='local'")
        normalized_local_root = Path(local_root).expanduser().resolve()

    _SPIRIT_SAFE_SOURCE_CONFIG = SpiritSafeSourceConfig(
        mode=mode,
        github_repo=github_repo,
        github_ref=github_ref,
        local_root=normalized_local_root,
    )

gkc.spirit_safe.get_spirit_safe_source()

Get current package-wide SpiritSafe source configuration.

Returns:

Type Description
SpiritSafeSourceConfig

Active SpiritSafe source configuration.

Plain meaning: See where SpiritSafe data is configured to come from.

Source code in gkc/spirit_safe.py
132
133
134
135
136
137
138
139
140
def get_spirit_safe_source() -> SpiritSafeSourceConfig:
    """Get current package-wide SpiritSafe source configuration.

    Returns:
        Active SpiritSafe source configuration.

    Plain meaning: See where SpiritSafe data is configured to come from.
    """
    return _SPIRIT_SAFE_SOURCE_CONFIG

Registry Metadata and Lookups

gkc.spirit_safe.ProfileMetadata dataclass

Metadata for a SpiritSafe profile registrant.

This dataclass represents the structured metadata from a profile's metadata.yaml file, supporting discovery, versioning, and governance.

Attributes:

Name Type Description
profile_id str

Profile identifier (directory name)

name str

Human-readable profile name

description str

Profile description

version str

Semantic version string

status str

Profile status (e.g., "stable", "draft", "deprecated")

published_date Optional[str]

Publication date (ISO 8601 string)

authors list[dict[str, str]]

List of author dicts with 'name' and optional 'email'

maintainers list[dict[str, str]]

List of maintainer dicts with 'name' and optional 'email'

source_references list[dict[str, str]]

List of reference dicts with 'name' and 'url'

related_profiles list[str]

List of related profile IDs

community_feedback dict[str, str]

Dict with issue tracker and other feedback URLs

datatypes_used list[str]

List of Wikibase datatypes used in profile

statements_count Optional[int]

Number of statements defined in profile

references_required Optional[bool]

Whether references are required

qualifiers_used list[str]

List of qualifier property IDs used

sparql_sources list[str]

List of SPARQL query filenames

raw_metadata dict[str, Any]

Complete raw metadata dict for access to additional fields

Plain meaning: Structured information about a profile package.

Source code in gkc/spirit_safe.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
@dataclass(frozen=True)
class ProfileMetadata:
    """Metadata for a SpiritSafe profile registrant.

    This dataclass represents the structured metadata from a profile's
    metadata.yaml file, supporting discovery, versioning, and governance.

    Attributes:
        profile_id: Profile identifier (directory name)
        name: Human-readable profile name
        description: Profile description
        version: Semantic version string
        status: Profile status (e.g., "stable", "draft", "deprecated")
        published_date: Publication date (ISO 8601 string)
        authors: List of author dicts with 'name' and optional 'email'
        maintainers: List of maintainer dicts with 'name' and optional 'email'
        source_references: List of reference dicts with 'name' and 'url'
        related_profiles: List of related profile IDs
        community_feedback: Dict with issue tracker and other feedback URLs
        datatypes_used: List of Wikibase datatypes used in profile
        statements_count: Number of statements defined in profile
        references_required: Whether references are required
        qualifiers_used: List of qualifier property IDs used
        sparql_sources: List of SPARQL query filenames
        raw_metadata: Complete raw metadata dict for access to additional fields

    Plain meaning: Structured information about a profile package.
    """

    profile_id: str
    name: str
    description: str
    version: str
    status: str
    published_date: Optional[str] = None
    authors: list[dict[str, str]] = field(default_factory=list)
    maintainers: list[dict[str, str]] = field(default_factory=list)
    source_references: list[dict[str, str]] = field(default_factory=list)
    related_profiles: list[str] = field(default_factory=list)
    community_feedback: dict[str, str] = field(default_factory=dict)
    datatypes_used: list[str] = field(default_factory=list)
    statements_count: Optional[int] = None
    references_required: Optional[bool] = None
    qualifiers_used: list[str] = field(default_factory=list)
    sparql_sources: list[str] = field(default_factory=list)
    raw_metadata: dict[str, Any] = field(default_factory=dict)

gkc.spirit_safe.list_profiles()

List all available profile IDs in the configured SpiritSafe source.

Returns:

Type Description
list[str]

List of profile identifiers (directory names under profiles/)

Example

profiles = list_profiles() print(profiles) ['TribalGovernmentUS', 'OfficeHeldByHeadOfState']

Note

For GitHub mode, this requires an API call to list directory contents. For local mode, this scans the local profiles/ directory.

Design Question: Should we maintain a central registry.yaml file in SpiritSafe to avoid GitHub API calls and provide additional metadata like profile categories, deprecation warnings, or featured profiles?

Plain meaning: See what entity profiles are available.

Source code in gkc/spirit_safe.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def list_profiles() -> list[str]:
    """List all available profile IDs in the configured SpiritSafe source.

    Returns:
        List of profile identifiers (directory names under profiles/)

    Example:
        >>> profiles = list_profiles()
        >>> print(profiles)
        ['TribalGovernmentUS', 'OfficeHeldByHeadOfState']

    Note:
        For GitHub mode, this requires an API call to list directory contents.
        For local mode, this scans the local profiles/ directory.

        **Design Question**: Should we maintain a central registry.yaml file
        in SpiritSafe to avoid GitHub API calls and provide additional metadata
        like profile categories, deprecation warnings, or featured profiles?

    Plain meaning: See what entity profiles are available.
    """
    source = get_spirit_safe_source()

    if source.mode == "local":
        if source.local_root is None:
            raise ValueError("local_root required for local mode")
        profiles_dir = source.local_root / "profiles"
        if not profiles_dir.exists():
            return []
        # List directories only
        return sorted(
            [
                item.name
                for item in profiles_dir.iterdir()
                if item.is_dir() and not item.name.startswith(".")
            ]
        )

    # GitHub mode: use GitHub API to list directory contents
    api_url = (
        f"https://api.github.com/repos/{source.github_repo}/"
        f"contents/profiles?ref={source.github_ref}"
    )
    try:
        response = requests.get(api_url, timeout=10)
        response.raise_for_status()
        contents = response.json()
        # Filter for directories only
        return sorted([item["name"] for item in contents if item["type"] == "dir"])
    except requests.RequestException as exc:
        raise RuntimeError(
            f"Failed to list profiles from {source.github_repo}: {exc}"
        ) from exc

gkc.spirit_safe.profile_exists(profile_id)

Check if a profile exists in the configured SpiritSafe source.

Parameters:

Name Type Description Default
profile_id str

Profile identifier to check

required

Returns:

Type Description
bool

True if profile exists, False otherwise

Example

if profile_exists("TribalGovernmentUS"): ... print("Profile found")

Plain meaning: Check if a specific entity profile is available.

Source code in gkc/spirit_safe.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def profile_exists(profile_id: str) -> bool:
    """Check if a profile exists in the configured SpiritSafe source.

    Args:
        profile_id: Profile identifier to check

    Returns:
        True if profile exists, False otherwise

    Example:
        >>> if profile_exists("TribalGovernmentUS"):
        ...     print("Profile found")

    Plain meaning: Check if a specific entity profile is available.
    """
    try:
        # Attempt to resolve the profile path
        profile_path = f"profiles/{profile_id}/profile.yaml"
        source = get_spirit_safe_source()
        resolved = source.resolve_relative(profile_path)
        _read_text_from_resolved_path(resolved)
        return True
    except Exception:
        return False

gkc.spirit_safe.get_profile_metadata(profile_id)

Load metadata for a profile from its metadata.yaml file.

Parameters:

Name Type Description Default
profile_id str

Profile identifier (directory name)

required

Returns:

Type Description
ProfileMetadata

Structured profile metadata

Raises:

Type Description
FileNotFoundError

If profile or metadata.yaml doesn't exist

ValueError

If metadata.yaml is invalid

Example

metadata = get_profile_metadata("TribalGovernmentUS") print(metadata.name) 'Federally Recognized Tribe' print(metadata.version) '1.0.0'

Plain meaning: Get information about a profile without loading its full definition.

Source code in gkc/spirit_safe.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def get_profile_metadata(profile_id: str) -> ProfileMetadata:
    """Load metadata for a profile from its metadata.yaml file.

    Args:
        profile_id: Profile identifier (directory name)

    Returns:
        Structured profile metadata

    Raises:
        FileNotFoundError: If profile or metadata.yaml doesn't exist
        ValueError: If metadata.yaml is invalid

    Example:
        >>> metadata = get_profile_metadata("TribalGovernmentUS")
        >>> print(metadata.name)
        'Federally Recognized Tribe'
        >>> print(metadata.version)
        '1.0.0'

    Plain meaning: Get information about a profile without loading its full definition.
    """
    source = get_spirit_safe_source()
    metadata_path = f"profiles/{profile_id}/metadata.yaml"
    resolved = source.resolve_relative(metadata_path)

    try:
        metadata_text = _read_text_from_resolved_path(resolved)
        raw = yaml.safe_load(metadata_text) or {}
    except Exception as exc:
        raise FileNotFoundError(
            f"Could not load metadata for profile '{profile_id}'"
        ) from exc

    # Validate required fields
    if "name" not in raw:
        raise ValueError(
            f"Profile '{profile_id}' metadata missing required field 'name'"
        )
    if "version" not in raw:
        raise ValueError(
            f"Profile '{profile_id}' metadata missing required field 'version'"
        )
    if "status" not in raw:
        raise ValueError(
            f"Profile '{profile_id}' metadata missing required field 'status'"
        )

    # Normalize published_date to string if it was parsed as date object
    published_date = raw.get("published_date")
    if published_date is not None and not isinstance(published_date, str):
        # YAML may parse ISO dates as date objects
        published_date = str(published_date)

    return ProfileMetadata(
        profile_id=profile_id,
        name=raw["name"],
        description=raw.get("description", ""),
        version=raw["version"],
        status=raw["status"],
        published_date=published_date,
        authors=raw.get("authors", []),
        maintainers=raw.get("maintainers", []),
        source_references=raw.get("source_references", []),
        related_profiles=raw.get("related_profiles", []),
        community_feedback=raw.get("community_feedback", {}),
        datatypes_used=raw.get("datatypes_used", []),
        statements_count=raw.get("statements_count"),
        references_required=raw.get("references_required"),
        qualifiers_used=raw.get("qualifiers_used", []),
        sparql_sources=raw.get("sparql_sources", []),
        raw_metadata=raw,
    )

gkc.spirit_safe.resolve_profile_path(profile_ref)

Resolve a profile reference to a path within SpiritSafe structure.

Handles profile name resolution (with or without .yaml extension) to the registrant package path (profiles/<ProfileName>/profile.yaml) and preserves explicit paths as-is.

Parameters:

Name Type Description Default
profile_ref Union[str, Path]

Profile name (e.g., "TribalGovernmentUS", "TribalGovernmentUS.yaml") or explicit path (e.g., "profiles/TribalGovernmentUS/profile.yaml").

required

Returns:

Type Description
Union[str, Path]

Resolved path suitable for _resolve_profile_text().

Source code in gkc/spirit_safe.py
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
def resolve_profile_path(profile_ref: Union[str, Path]) -> Union[str, Path]:
    """Resolve a profile reference to a path within SpiritSafe structure.

    Handles profile name resolution (with or without .yaml extension) to the
    registrant package path (`profiles/<ProfileName>/profile.yaml`) and preserves
    explicit paths as-is.

    Args:
        profile_ref: Profile name (e.g., "TribalGovernmentUS",
                "TribalGovernmentUS.yaml") or explicit path
                (e.g., "profiles/TribalGovernmentUS/profile.yaml").

    Returns:
        Resolved path suitable for _resolve_profile_text().
    """
    ref_str = str(profile_ref)

    # If it's already a path with directory separators, use as-is
    if "/" in ref_str or "\\" in ref_str:
        return profile_ref

    # If it looks like an absolute path, use as-is
    path_obj = Path(profile_ref)
    if path_obj.is_absolute():
        return profile_ref

    # Simple profile name: resolve to registrant package path
    # Allow both "ProfileName" and "ProfileName.yaml" inputs
    profile_name = ref_str.removesuffix(".yaml")
    return f"profiles/{profile_name}/profile.yaml"

gkc.spirit_safe.resolve_query_ref(query_ref, profile_path)

Resolve a query reference relative to profile location with root fallback.

Resolution strategy: 1. Try profile-relative first (profiles//queries/file.sparql) 2. Fall back to root-relative (queries/file.sparql)

Parameters:

Name Type Description Default
query_ref str

Query reference path from profile (e.g., "queries/file.sparql")

required
profile_path Union[str, Path]

Path to the profile file that references the query

required

Returns:

Type Description
Union[Path, str]

Resolved path (local Path or GitHub URL depending on source mode)

Raises:

Type Description
FileNotFoundError

If query cannot be found in either location

Example

For profile "profiles/TribalGovernmentUS/profile.yaml"

and query_ref "queries/file.sparql"

resolve_query_ref( ... "queries/file.sparql", ... "profiles/TribalGovernmentUS/profile.yaml", ... )

tries: profiles/TribalGovernmentUS/queries/file.sparql

then: queries/file.sparql

Plain meaning: Find query file near profile first, then in global queries directory.

Source code in gkc/spirit_safe.py
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
def resolve_query_ref(
    query_ref: str, profile_path: Union[str, Path]
) -> Union[Path, str]:
    """Resolve a query reference relative to profile location with root fallback.

    Resolution strategy:
    1. Try profile-relative first (profiles/<Name>/queries/file.sparql)
    2. Fall back to root-relative (queries/file.sparql)

    Args:
        query_ref: Query reference path from profile (e.g., "queries/file.sparql")
        profile_path: Path to the profile file that references the query

    Returns:
        Resolved path (local Path or GitHub URL depending on source mode)

    Raises:
        FileNotFoundError: If query cannot be found in either location

    Example:
        >>> # For profile "profiles/TribalGovernmentUS/profile.yaml"
        >>> # and query_ref "queries/file.sparql"
        >>> resolve_query_ref(
        ...     "queries/file.sparql",
        ...     "profiles/TribalGovernmentUS/profile.yaml",
        ... )
        # tries: profiles/TribalGovernmentUS/queries/file.sparql
        # then:  queries/file.sparql

    Plain meaning: Find query file near profile first, then in global queries directory.
    """
    source = get_spirit_safe_source()
    profile_path_str = str(profile_path)

    # Extract profile directory for registrant-style profiles
    # profiles/Foo/profile.yaml -> profiles/Foo/
    profile_dir: Optional[str] = None
    if "/" in profile_path_str or "\\" in profile_path_str:
        profile_parent = str(Path(profile_path_str).parent)
        # Only treat as profile directory if it looks like a registrant path
        if profile_parent.startswith("profiles/") and profile_parent != "profiles":
            profile_dir = profile_parent

    candidates: list[str] = []

    # Strategy 1: profile-relative (only if we have a profile directory)
    if profile_dir:
        profile_relative = f"{profile_dir}/{query_ref}".replace("//", "/")
        candidates.append(profile_relative)

    # Strategy 2: root-relative fallback
    candidates.append(query_ref)

    last_error: Optional[Exception] = None
    for candidate in candidates:
        try:
            resolved = source.resolve_relative(candidate)
            # Verify the path exists before returning it
            _read_text_from_resolved_path(resolved)
            return resolved
        except Exception as exc:
            last_error = exc

    # Build helpful error message
    tried_paths = ", ".join(candidates)
    if last_error is not None:
        raise FileNotFoundError(
            f"Query not found: {query_ref} (tried: {tried_paths})"
        ) from last_error

    raise FileNotFoundError(f"Query not found: {query_ref} (tried: {tried_paths})")

gkc.spirit_safe.LookupCache

Manage cached SPARQL lookup results.

Parameters:

Name Type Description Default
cache_dir Optional[Path]

Directory for cache storage (default from active SpiritSafe source)

None
Example

cache = LookupCache() cache.get("query_hash")

Plain meaning: Store and retrieve SPARQL query results from disk.

Source code in gkc/spirit_safe.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
class LookupCache:
    """Manage cached SPARQL lookup results.

    Args:
        cache_dir: Directory for cache storage (default from active SpiritSafe source)

    Example:
        >>> cache = LookupCache()
        >>> cache.get("query_hash")

    Plain meaning: Store and retrieve SPARQL query results from disk.
    """

    def __init__(self, cache_dir: Optional[Path] = None):
        """Initialize cache manager.

        Args:
            cache_dir: Cache storage directory (default from active SpiritSafe source)
        """
        if cache_dir is None:
            cache_dir = get_spirit_safe_source().resolve_cache_dir()

        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def _query_hash(self, query: str) -> str:
        """Generate a hash for a query string.

        Args:
            query: SPARQL query string

        Returns:
            SHA256 hash of the query
        """
        return hashlib.sha256(query.encode("utf-8")).hexdigest()[:16]

    def _cache_path(self, query: str) -> Path:
        """Get cache file path for a query.

        Args:
            query: SPARQL query string

        Returns:
            Path to cache file
        """
        query_hash = self._query_hash(query)
        return self.cache_dir / f"{query_hash}.json"

    def get(self, query: str) -> Optional[dict[str, Any]]:
        """Retrieve cached results for a query.

        Args:
            query: SPARQL query string

        Returns:
            Cached data dict or None if not found

        Example:
            >>> cache = LookupCache()
            >>> data = cache.get("SELECT ?item WHERE { ... }")
        """
        cache_path = self._cache_path(query)
        if not cache_path.exists():
            return None

        try:
            with open(cache_path, "r") as f:
                return json.load(f)
        except (json.JSONDecodeError, IOError):
            return None

    def set(
        self,
        query: str,
        results: list[dict[str, Any]],
        metadata: Optional[dict[str, Any]] = None,
    ) -> None:
        """Cache results for a query.

        Args:
            query: SPARQL query string
            results: Query results to cache
            metadata: Optional metadata to store with results

        Example:
            >>> cache = LookupCache()
            >>> cache.set("SELECT ...", [{"item": "Q123"}])
        """
        cache_path = self._cache_path(query)

        cache_data = {
            "query": query,
            "timestamp": datetime.now().isoformat(),
            "results": results,
            "metadata": metadata or {},
        }

        with open(cache_path, "w") as f:
            json.dump(cache_data, f, indent=2)

    def is_fresh(self, query: str, refresh_policy: RefreshPolicy = "manual") -> bool:
        """Check if cached results are still fresh.

        Args:
            query: SPARQL query string
            refresh_policy: Refresh policy to check against

        Returns:
            True if cache is fresh, False otherwise

        Example:
            >>> cache = LookupCache()
            >>> if not cache.is_fresh(query, "daily"):
            ...     # Refresh cache
        """
        if refresh_policy == "manual":
            # Manual refresh: always consider fresh if exists
            return self.get(query) is not None

        cached = self.get(query)
        if cached is None:
            return False

        # Parse timestamp
        try:
            cached_time = datetime.fromisoformat(cached["timestamp"])
        except (KeyError, ValueError):
            return False

        # Check freshness based on policy
        now = datetime.now()
        if refresh_policy == "daily":
            return (now - cached_time) < timedelta(days=1)
        elif refresh_policy == "weekly":
            return (now - cached_time) < timedelta(weeks=1)
        # on_release would need version comparison (not implemented yet)
        return False

    def invalidate(self, query: str) -> bool:
        """Invalidate cache for a specific query.

        Args:
            query: SPARQL query string

        Returns:
            True if cache was invalidated, False if not found

        Example:
            >>> cache = LookupCache()
            >>> cache.invalidate("SELECT ...")
        """
        cache_path = self._cache_path(query)
        if cache_path.exists():
            cache_path.unlink()
            return True
        return False

    def clear_all(self) -> int:
        """Clear all cached queries.

        Returns:
            Number of cache files deleted

        Example:
            >>> cache = LookupCache()
            >>> count = cache.clear_all()
        """
        count = 0
        for cache_file in self.cache_dir.glob("*.json"):
            cache_file.unlink()
            count += 1
        return count

__init__(cache_dir=None)

Initialize cache manager.

Parameters:

Name Type Description Default
cache_dir Optional[Path]

Cache storage directory (default from active SpiritSafe source)

None
Source code in gkc/spirit_safe.py
385
386
387
388
389
390
391
392
393
394
395
def __init__(self, cache_dir: Optional[Path] = None):
    """Initialize cache manager.

    Args:
        cache_dir: Cache storage directory (default from active SpiritSafe source)
    """
    if cache_dir is None:
        cache_dir = get_spirit_safe_source().resolve_cache_dir()

    self.cache_dir = Path(cache_dir)
    self.cache_dir.mkdir(parents=True, exist_ok=True)

clear_all()

Clear all cached queries.

Returns:

Type Description
int

Number of cache files deleted

Example

cache = LookupCache() count = cache.clear_all()

Source code in gkc/spirit_safe.py
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def clear_all(self) -> int:
    """Clear all cached queries.

    Returns:
        Number of cache files deleted

    Example:
        >>> cache = LookupCache()
        >>> count = cache.clear_all()
    """
    count = 0
    for cache_file in self.cache_dir.glob("*.json"):
        cache_file.unlink()
        count += 1
    return count

get(query)

Retrieve cached results for a query.

Parameters:

Name Type Description Default
query str

SPARQL query string

required

Returns:

Type Description
Optional[dict[str, Any]]

Cached data dict or None if not found

Example

cache = LookupCache() data = cache.get("SELECT ?item WHERE { ... }")

Source code in gkc/spirit_safe.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def get(self, query: str) -> Optional[dict[str, Any]]:
    """Retrieve cached results for a query.

    Args:
        query: SPARQL query string

    Returns:
        Cached data dict or None if not found

    Example:
        >>> cache = LookupCache()
        >>> data = cache.get("SELECT ?item WHERE { ... }")
    """
    cache_path = self._cache_path(query)
    if not cache_path.exists():
        return None

    try:
        with open(cache_path, "r") as f:
            return json.load(f)
    except (json.JSONDecodeError, IOError):
        return None

invalidate(query)

Invalidate cache for a specific query.

Parameters:

Name Type Description Default
query str

SPARQL query string

required

Returns:

Type Description
bool

True if cache was invalidated, False if not found

Example

cache = LookupCache() cache.invalidate("SELECT ...")

Source code in gkc/spirit_safe.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
def invalidate(self, query: str) -> bool:
    """Invalidate cache for a specific query.

    Args:
        query: SPARQL query string

    Returns:
        True if cache was invalidated, False if not found

    Example:
        >>> cache = LookupCache()
        >>> cache.invalidate("SELECT ...")
    """
    cache_path = self._cache_path(query)
    if cache_path.exists():
        cache_path.unlink()
        return True
    return False

is_fresh(query, refresh_policy='manual')

Check if cached results are still fresh.

Parameters:

Name Type Description Default
query str

SPARQL query string

required
refresh_policy RefreshPolicy

Refresh policy to check against

'manual'

Returns:

Type Description
bool

True if cache is fresh, False otherwise

Example

cache = LookupCache() if not cache.is_fresh(query, "daily"): ... # Refresh cache

Source code in gkc/spirit_safe.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
def is_fresh(self, query: str, refresh_policy: RefreshPolicy = "manual") -> bool:
    """Check if cached results are still fresh.

    Args:
        query: SPARQL query string
        refresh_policy: Refresh policy to check against

    Returns:
        True if cache is fresh, False otherwise

    Example:
        >>> cache = LookupCache()
        >>> if not cache.is_fresh(query, "daily"):
        ...     # Refresh cache
    """
    if refresh_policy == "manual":
        # Manual refresh: always consider fresh if exists
        return self.get(query) is not None

    cached = self.get(query)
    if cached is None:
        return False

    # Parse timestamp
    try:
        cached_time = datetime.fromisoformat(cached["timestamp"])
    except (KeyError, ValueError):
        return False

    # Check freshness based on policy
    now = datetime.now()
    if refresh_policy == "daily":
        return (now - cached_time) < timedelta(days=1)
    elif refresh_policy == "weekly":
        return (now - cached_time) < timedelta(weeks=1)
    # on_release would need version comparison (not implemented yet)
    return False

set(query, results, metadata=None)

Cache results for a query.

Parameters:

Name Type Description Default
query str

SPARQL query string

required
results list[dict[str, Any]]

Query results to cache

required
metadata Optional[dict[str, Any]]

Optional metadata to store with results

None
Example

cache = LookupCache() cache.set("SELECT ...", [{"item": "Q123"}])

Source code in gkc/spirit_safe.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def set(
    self,
    query: str,
    results: list[dict[str, Any]],
    metadata: Optional[dict[str, Any]] = None,
) -> None:
    """Cache results for a query.

    Args:
        query: SPARQL query string
        results: Query results to cache
        metadata: Optional metadata to store with results

    Example:
        >>> cache = LookupCache()
        >>> cache.set("SELECT ...", [{"item": "Q123"}])
    """
    cache_path = self._cache_path(query)

    cache_data = {
        "query": query,
        "timestamp": datetime.now().isoformat(),
        "results": results,
        "metadata": metadata or {},
    }

    with open(cache_path, "w") as f:
        json.dump(cache_data, f, indent=2)

gkc.spirit_safe.LookupFetcher

Fetch and cache SPARQL-backed choice lists.

Parameters:

Name Type Description Default
cache Optional[LookupCache]

Optional LookupCache instance

None
endpoint str

SPARQL endpoint URL

'https://query.wikidata.org/sparql'
Example

fetcher = LookupFetcher() results = fetcher.fetch(query, refresh_policy="daily")

Plain meaning: Execute SPARQL queries for choice lists with caching.

Source code in gkc/spirit_safe.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
class LookupFetcher:
    """Fetch and cache SPARQL-backed choice lists.

    Args:
        cache: Optional LookupCache instance
        endpoint: SPARQL endpoint URL

    Example:
        >>> fetcher = LookupFetcher()
        >>> results = fetcher.fetch(query, refresh_policy="daily")

    Plain meaning: Execute SPARQL queries for choice lists with caching.
    """

    def __init__(
        self,
        cache: Optional[LookupCache] = None,
        endpoint: str = "https://query.wikidata.org/sparql",
    ):
        """Initialize lookup fetcher.

        Args:
            cache: LookupCache instance (creates default if None)
            endpoint: SPARQL endpoint URL
        """
        self.cache = cache or LookupCache()
        self.endpoint = endpoint
        self.sparql = SPARQLQuery(endpoint=endpoint)

    def _dedupe_results(self, results: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Remove duplicate results based on unique identifier.

        Handles query result redundancy from SPARQL endpoints or pagination
        artifacts by tracking seen items and keeping only first occurrence.
        Uses the "item" field as the unique identifier (standard for Wikidata).

        Args:
            results: Raw results from SPARQL query execution.

        Returns:
            Deduplicated results list preserving order of first occurrence.

        Plain meaning: Remove duplicate rows from query results.
        """
        seen_items: set[str] = set()
        deduplicated: list[dict[str, Any]] = []

        for result in results:
            # Use "item" field as unique identifier (Wikidata convention)
            # If no item field, use entire result as dict key (as string)
            if "item" in result:
                item_key = result["item"]
            else:
                # Fallback: use string representation of the entire row
                # This handles cases with multiple identifier fields
                item_key = tuple(sorted(result.items())).__str__()

            if item_key not in seen_items:
                seen_items.add(item_key)
                deduplicated.append(result)

        return deduplicated

    def fetch(
        self,
        query: str,
        refresh_policy: RefreshPolicy = "manual",
        force_refresh: bool = False,
        page_size: int = 1000,
        max_results: Optional[int] = None,
    ) -> list[dict[str, str]]:
        """Fetch lookup results with caching.

        Args:
            query: SPARQL query string
            refresh_policy: Cache refresh policy
            force_refresh: Force cache refresh even if fresh
            page_size: Results per page for pagination
            max_results: Maximum total results to fetch

        Returns:
            List of result dictionaries

        Raises:
            SPARQLError: If query execution fails

        Example:
            >>> fetcher = LookupFetcher()
            >>> results = fetcher.fetch(
            ...     "SELECT ?item ?itemLabel WHERE { ... }",
            ...     refresh_policy="daily"
            ... )

        Plain meaning: Get lookup data from cache or query endpoint.
        """
        # Check cache first
        if not force_refresh and self.cache.is_fresh(query, refresh_policy):
            cached = self.cache.get(query)
            if cached is not None:
                return cached["results"]

        # Execute query with pagination
        results = paginate_query(
            query,
            page_size=page_size,
            endpoint=self.endpoint,
            max_results=max_results,
        )

        # Deduplicate results to handle redundant query results
        # (can occur with certain SPARQL patterns or pagination artifacts)
        results = self._dedupe_results(results)

        # Cache results
        self.cache.set(
            query,
            results,
            metadata={
                "refresh_policy": refresh_policy,
                "result_count": len(results),
            },
        )

        return results

    def fetch_choice_list(
        self,
        query: str,
        id_var: str = "item",
        label_var: str = "itemLabel",
        extra_vars: Optional[list[str]] = None,
        refresh_policy: RefreshPolicy = "manual",
        force_refresh: bool = False,
    ) -> list[dict[str, str]]:
        """Fetch a choice list with normalized structure.

        Normalizes SPARQL results to a consistent choice list format
        with id, label, and optional extra fields.

        Args:
            query: SPARQL query string
            id_var: Variable name for item ID (default: "item")
            label_var: Variable name for label (default: "itemLabel")
            extra_vars: Optional list of extra variable names to include
            refresh_policy: Cache refresh policy
            force_refresh: Force cache refresh

        Returns:
            List of choice items with normalized structure

        Example:
            >>> fetcher = LookupFetcher()
            >>> choices = fetcher.fetch_choice_list(
            ...     query,
            ...     id_var="item",
            ...     label_var="itemLabel",
            ...     extra_vars=["languageCode"]
            ... )
            >>> # Returns: [{"id": "Q123", "label": "Example", "languageCode": "en"}]

        Plain meaning: Get normalized choice data for forms and validation.
        """
        raw_results = self.fetch(query, refresh_policy, force_refresh)

        # Normalize to choice list format
        choices = []
        for row in raw_results:
            choice: dict[str, str] = {}

            # Extract ID (handle URLs with entity IDs)
            id_value = row.get(id_var, "")
            if "/" in id_value:
                # Extract QID from URL
                # (e.g., http://www.wikidata.org/entity/Q123 -> Q123)
                id_value = id_value.split("/")[-1]
            choice["id"] = id_value

            # Extract label
            choice["label"] = row.get(label_var, "")

            # Extract extra fields if specified
            if extra_vars:
                for var in extra_vars:
                    if var in row:
                        choice[var] = row[var]

            choices.append(choice)

        return choices

__init__(cache=None, endpoint='https://query.wikidata.org/sparql')

Initialize lookup fetcher.

Parameters:

Name Type Description Default
cache Optional[LookupCache]

LookupCache instance (creates default if None)

None
endpoint str

SPARQL endpoint URL

'https://query.wikidata.org/sparql'
Source code in gkc/spirit_safe.py
560
561
562
563
564
565
566
567
568
569
570
571
572
573
def __init__(
    self,
    cache: Optional[LookupCache] = None,
    endpoint: str = "https://query.wikidata.org/sparql",
):
    """Initialize lookup fetcher.

    Args:
        cache: LookupCache instance (creates default if None)
        endpoint: SPARQL endpoint URL
    """
    self.cache = cache or LookupCache()
    self.endpoint = endpoint
    self.sparql = SPARQLQuery(endpoint=endpoint)

fetch(query, refresh_policy='manual', force_refresh=False, page_size=1000, max_results=None)

Fetch lookup results with caching.

Parameters:

Name Type Description Default
query str

SPARQL query string

required
refresh_policy RefreshPolicy

Cache refresh policy

'manual'
force_refresh bool

Force cache refresh even if fresh

False
page_size int

Results per page for pagination

1000
max_results Optional[int]

Maximum total results to fetch

None

Returns:

Type Description
list[dict[str, str]]

List of result dictionaries

Raises:

Type Description
SPARQLError

If query execution fails

Example

fetcher = LookupFetcher() results = fetcher.fetch( ... "SELECT ?item ?itemLabel WHERE { ... }", ... refresh_policy="daily" ... )

Plain meaning: Get lookup data from cache or query endpoint.

Source code in gkc/spirit_safe.py
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
def fetch(
    self,
    query: str,
    refresh_policy: RefreshPolicy = "manual",
    force_refresh: bool = False,
    page_size: int = 1000,
    max_results: Optional[int] = None,
) -> list[dict[str, str]]:
    """Fetch lookup results with caching.

    Args:
        query: SPARQL query string
        refresh_policy: Cache refresh policy
        force_refresh: Force cache refresh even if fresh
        page_size: Results per page for pagination
        max_results: Maximum total results to fetch

    Returns:
        List of result dictionaries

    Raises:
        SPARQLError: If query execution fails

    Example:
        >>> fetcher = LookupFetcher()
        >>> results = fetcher.fetch(
        ...     "SELECT ?item ?itemLabel WHERE { ... }",
        ...     refresh_policy="daily"
        ... )

    Plain meaning: Get lookup data from cache or query endpoint.
    """
    # Check cache first
    if not force_refresh and self.cache.is_fresh(query, refresh_policy):
        cached = self.cache.get(query)
        if cached is not None:
            return cached["results"]

    # Execute query with pagination
    results = paginate_query(
        query,
        page_size=page_size,
        endpoint=self.endpoint,
        max_results=max_results,
    )

    # Deduplicate results to handle redundant query results
    # (can occur with certain SPARQL patterns or pagination artifacts)
    results = self._dedupe_results(results)

    # Cache results
    self.cache.set(
        query,
        results,
        metadata={
            "refresh_policy": refresh_policy,
            "result_count": len(results),
        },
    )

    return results

fetch_choice_list(query, id_var='item', label_var='itemLabel', extra_vars=None, refresh_policy='manual', force_refresh=False)

Fetch a choice list with normalized structure.

Normalizes SPARQL results to a consistent choice list format with id, label, and optional extra fields.

Parameters:

Name Type Description Default
query str

SPARQL query string

required
id_var str

Variable name for item ID (default: "item")

'item'
label_var str

Variable name for label (default: "itemLabel")

'itemLabel'
extra_vars Optional[list[str]]

Optional list of extra variable names to include

None
refresh_policy RefreshPolicy

Cache refresh policy

'manual'
force_refresh bool

Force cache refresh

False

Returns:

Type Description
list[dict[str, str]]

List of choice items with normalized structure

Example

fetcher = LookupFetcher() choices = fetcher.fetch_choice_list( ... query, ... id_var="item", ... label_var="itemLabel", ... extra_vars=["languageCode"] ... )

Returns: [{"id": "Q123", "label": "Example", "languageCode": "en"}]

Plain meaning: Get normalized choice data for forms and validation.

Source code in gkc/spirit_safe.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
def fetch_choice_list(
    self,
    query: str,
    id_var: str = "item",
    label_var: str = "itemLabel",
    extra_vars: Optional[list[str]] = None,
    refresh_policy: RefreshPolicy = "manual",
    force_refresh: bool = False,
) -> list[dict[str, str]]:
    """Fetch a choice list with normalized structure.

    Normalizes SPARQL results to a consistent choice list format
    with id, label, and optional extra fields.

    Args:
        query: SPARQL query string
        id_var: Variable name for item ID (default: "item")
        label_var: Variable name for label (default: "itemLabel")
        extra_vars: Optional list of extra variable names to include
        refresh_policy: Cache refresh policy
        force_refresh: Force cache refresh

    Returns:
        List of choice items with normalized structure

    Example:
        >>> fetcher = LookupFetcher()
        >>> choices = fetcher.fetch_choice_list(
        ...     query,
        ...     id_var="item",
        ...     label_var="itemLabel",
        ...     extra_vars=["languageCode"]
        ... )
        >>> # Returns: [{"id": "Q123", "label": "Example", "languageCode": "en"}]

    Plain meaning: Get normalized choice data for forms and validation.
    """
    raw_results = self.fetch(query, refresh_policy, force_refresh)

    # Normalize to choice list format
    choices = []
    for row in raw_results:
        choice: dict[str, str] = {}

        # Extract ID (handle URLs with entity IDs)
        id_value = row.get(id_var, "")
        if "/" in id_value:
            # Extract QID from URL
            # (e.g., http://www.wikidata.org/entity/Q123 -> Q123)
            id_value = id_value.split("/")[-1]
        choice["id"] = id_value

        # Extract label
        choice["label"] = row.get(label_var, "")

        # Extract extra fields if specified
        if extra_vars:
            for var in extra_vars:
                if var in row:
                    choice[var] = row[var]

        choices.append(choice)

    return choices

gkc.spirit_safe.hydrate_profile_lookups(profile_paths, *, refresh_policy=None, force_refresh=False, page_size=1000, max_results=None, endpoint='https://query.wikidata.org/sparql', dry_run=False, fail_on_query_error=False)

Hydrate SPARQL lookup caches for one or more profile files.

This performs an explicit lookup hydration workflow by scanning profile YAML, extracting SPARQL lookup specs, resolving query references/templates, deduplicating identical rendered queries, and optionally executing them through LookupFetcher.

Parameters:

Name Type Description Default
profile_paths list[Union[str, Path]]

Paths to profile YAML files.

required
refresh_policy Optional[RefreshPolicy]

Optional global refresh policy override.

None
force_refresh bool

Force refresh even if cache is fresh.

False
page_size int

Page size for paginated query execution.

1000
max_results Optional[int]

Optional maximum total results per query.

None
endpoint str

SPARQL endpoint URL.

'https://query.wikidata.org/sparql'
dry_run bool

If True, do not execute queries; return discovery summary only.

False
fail_on_query_error bool

If True, raise on first query execution failure.

False

Returns:

Type Description
dict[str, Any]

Summary dictionary with discovery/execution stats.

Source code in gkc/spirit_safe.py
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
def hydrate_profile_lookups(
    profile_paths: list[Union[str, Path]],
    *,
    refresh_policy: Optional[RefreshPolicy] = None,
    force_refresh: bool = False,
    page_size: int = 1000,
    max_results: Optional[int] = None,
    endpoint: str = "https://query.wikidata.org/sparql",
    dry_run: bool = False,
    fail_on_query_error: bool = False,
) -> dict[str, Any]:
    """Hydrate SPARQL lookup caches for one or more profile files.

    This performs an explicit lookup hydration workflow by scanning profile YAML,
    extracting SPARQL lookup specs, resolving query references/templates, deduplicating
    identical rendered queries, and optionally executing them through `LookupFetcher`.

    Args:
        profile_paths: Paths to profile YAML files.
        refresh_policy: Optional global refresh policy override.
        force_refresh: Force refresh even if cache is fresh.
        page_size: Page size for paginated query execution.
        max_results: Optional maximum total results per query.
        endpoint: SPARQL endpoint URL.
        dry_run: If True, do not execute queries; return discovery summary only.
        fail_on_query_error: If True, raise on first query execution failure.

    Returns:
        Summary dictionary with discovery/execution stats.
    """
    source = get_spirit_safe_source()
    discovered_specs: list[dict[str, Any]] = []

    for profile_path in profile_paths:
        yaml_text = _resolve_profile_text(profile_path)
        profile_data = yaml.safe_load(yaml_text) or {}
        profile_specs = _extract_sparql_specs(profile_data)
        for spec in profile_specs:
            spec["profile"] = str(profile_path)
            discovered_specs.append(spec)

    unique_queries: dict[tuple[str, str], dict[str, Any]] = {}
    failures: list[dict[str, Any]] = []

    for spec in discovered_specs:
        try:
            if spec.get("query"):
                rendered_query = str(spec["query"])
            else:
                query_ref = spec.get("query_ref")
                if not query_ref:
                    raise ValueError("Missing both 'query' and 'query_ref'")
                resolved_query_ref = resolve_query_ref(
                    str(query_ref), spec.get("profile", "")
                )
                query_template = _read_text_from_resolved_path(resolved_query_ref)
                rendered_query = _render_query_template(
                    query_template, spec.get("query_params", {})
                )

            key = (endpoint, rendered_query.strip())
            if key not in unique_queries:
                unique_queries[key] = {
                    "endpoint": endpoint,
                    "query": rendered_query,
                    "refresh": refresh_policy or spec.get("refresh", "manual"),
                    "sources": [],
                }
            unique_queries[key]["sources"].append(
                {
                    "profile": spec.get("profile"),
                    "location": spec.get("location"),
                    "query_ref": spec.get("query_ref"),
                }
            )
        except Exception as exc:
            failure = {
                "profile": spec.get("profile"),
                "location": spec.get("location"),
                "query_ref": spec.get("query_ref"),
                "error": str(exc),
            }
            failures.append(failure)
            if fail_on_query_error:
                profile_loc = f"{failure['profile']}:{failure['location']}"
                raise RuntimeError(
                    f"Failed to prepare query for {profile_loc}"
                ) from exc

    hydrated: list[dict[str, Any]] = []
    if not dry_run:
        fetcher = LookupFetcher(endpoint=endpoint)
        for entry in unique_queries.values():
            try:
                results = fetcher.fetch(
                    entry["query"],
                    refresh_policy=entry["refresh"],
                    force_refresh=force_refresh,
                    page_size=page_size,
                    max_results=max_results,
                )
                hydrated.append(
                    {
                        "endpoint": endpoint,
                        "refresh": entry["refresh"],
                        "source_count": len(entry["sources"]),
                        "result_count": len(results),
                        "sources": entry["sources"],
                    }
                )
            except Exception as exc:
                failure = {
                    "endpoint": endpoint,
                    "sources": entry["sources"],
                    "error": str(exc),
                }
                failures.append(failure)
                if fail_on_query_error:
                    raise RuntimeError(
                        "Failed to execute hydrated lookup query"
                    ) from exc

    cache_dir = source.resolve_cache_dir()
    cache_file_count = len(list(cache_dir.glob("*.json"))) if cache_dir.exists() else 0

    return {
        "source_mode": source.mode,
        "profiles_scanned": len(profile_paths),
        "lookup_specs_found": len(discovered_specs),
        "unique_queries": len(unique_queries),
        "unique_queries_executed": 0 if dry_run else len(hydrated),
        "dry_run": dry_run,
        "cache_dir": str(cache_dir),
        "cache_file_count": cache_file_count,
        "hydrated": hydrated,
        "failures": failures,
    }

Entity Profile and Value-List Artifacts

gkc.spirit_safe.EntityProfileJsonBuilder

Build JSON entity profiles from SpiritSafe per-entity cache files.

Plain meaning: Convert profile-linked cache entities into JSON profile docs.

Source code in gkc/spirit_safe.py
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
class EntityProfileJsonBuilder:
    """Build JSON entity profiles from SpiritSafe per-entity cache files.

    Plain meaning: Convert profile-linked cache entities into JSON profile docs.
    """

    PROFILE_CLASS_ID = "Q3"

    PROFILE_STATEMENT = "P157"
    HAS_QUALIFIER = "P158"
    HAS_VALUE = "P161"
    VALUE_TYPE = "P194"
    IO_MAP = "P5"
    PROMPT = "P171"
    GUIDANCE = "P169"
    CONSEQUENCES = "P170"
    ERROR_MESSAGE = "P168"
    MAX_COUNT = "P182"
    HAS_REFERENCE = "P211"
    APPLIES_TO_PROFILE = "P205"
    DERIVES_DEFAULT_VALUE_FROM = "P213"

    LABEL_PROMPT = "P188"
    LABEL_GUIDANCE = "P185"
    DESCRIPTION_PROMPT = "P189"
    DESCRIPTION_GUIDANCE = "P186"
    ALIAS_PROMPT = "P190"
    ALIAS_GUIDANCE = "P187"

    SAME_AS = "P212"
    GKC_ENTITY_PROFILE_CLASS = "Q3"
    GKC_VALUE_LIST_CLASS = "Q7"
    WIKIDATA_ENTITY_CLASS = "Q52"

    LANGUAGE_KEY_PATTERN = re.compile(
        r"^(mul|[a-z]{2,3}(?:-[a-z0-9]+)*)$", re.IGNORECASE
    )

    MESSAGE_FIELD_BY_PROP = {
        PROMPT: "prompt",
        GUIDANCE: "guidance",
        CONSEQUENCES: "consequences_message",
        ERROR_MESSAGE: "error_message",
    }

    def __init__(
        self,
        cache_entities_dir: Union[str, Path],
        entity_prefix: str = "https://datadistillery.wikibase.cloud/entity/",
        label_language_order: tuple[str, ...] = ("mul", "en"),
        description_language_order: tuple[str, ...] = ("en", "mul"),
    ) -> None:
        self.cache_entities_dir = Path(cache_entities_dir)
        self.entity_prefix = entity_prefix.rstrip("/") + "/"
        self.label_language_order = label_language_order
        self.description_language_order = description_language_order
        self._cache_index = self._load_cache_index()

    def build_all(self) -> list[dict[str, Any]]:
        """Build JSON documents for every cache entity typed as a profile."""
        results: list[dict[str, Any]] = []
        for doc in self._cache_index.values():
            if self._is_profile_item(doc):
                results.append(self.build_one(doc))
        return results

    def build_one(self, wikibase_item: dict[str, Any]) -> dict[str, Any]:
        """Build one JSON profile document from a single cache entity."""
        entity_uri = f"{self.entity_prefix}{wikibase_item.get('entity_id', '')}"

        identification = {
            "labels": self._build_language_section(
                wikibase_item, self.LABEL_PROMPT, self.LABEL_GUIDANCE
            ),
            "descriptions": self._build_language_section(
                wikibase_item, self.DESCRIPTION_PROMPT, self.DESCRIPTION_GUIDANCE
            ),
            "aliases": self._build_language_section(
                wikibase_item, self.ALIAS_PROMPT, self.ALIAS_GUIDANCE
            ),
        }

        statements = self._build_profile_statements(wikibase_item)
        metadata = self._build_profile_metadata(
            wikibase_item,
            identification=identification,
            statements=statements,
            entity_uri=entity_uri,
        )

        return {
            "entity": entity_uri,
            "identification": identification,
            "statements": statements,
            "metadata": metadata,
        }

    def _build_profile_metadata(
        self,
        wikibase_item: dict[str, Any],
        *,
        identification: dict[str, Any],
        statements: list[dict[str, Any]],
        entity_uri: str,
    ) -> dict[str, Any]:
        metadata = {
            "labels": self._localized_text_map(wikibase_item, "labels"),
            "descriptions": self._localized_text_map(wikibase_item, "descriptions"),
            "aliases": self._alias_text_map(wikibase_item),
            "generated_at": datetime.now(timezone.utc)
            .isoformat()
            .replace("+00:00", "Z"),
            "languages": [],
            "statement_count": len(statements),
            "profile_graph": self._build_profile_graph(wikibase_item),
            "value_list_graph": self._build_value_list_graph(statements),
            "exported_from": entity_uri,
        }

        metadata["languages"] = self._collect_languages(
            {
                "identification": identification,
                "statements": statements,
                "metadata": {
                    "labels": metadata["labels"],
                    "descriptions": metadata["descriptions"],
                    "aliases": metadata["aliases"],
                },
            }
        )
        return metadata

    def _build_profile_graph(
        self, wikibase_item: dict[str, Any]
    ) -> list[dict[str, Optional[str]]]:
        graph: list[dict[str, Optional[str]]] = []
        seen: set[tuple[str, str]] = set()
        for statement_id, linked_value_ids in self._iter_statement_value_linkages(
            wikibase_item
        ):
            for target_id in linked_value_ids:
                type_ids = self._entity_type_ids(self._cache_index.get(target_id))
                if self.GKC_ENTITY_PROFILE_CLASS not in type_ids:
                    continue
                key = (statement_id, target_id)
                if key in seen:
                    continue
                seen.add(key)
                graph.append(
                    {
                        "entity": f"{self.entity_prefix}{target_id}",
                        "label": self._entity_label(target_id),
                        "via_statement": f"{self.entity_prefix}{statement_id}",
                        "linkage_type": self.HAS_VALUE,
                    }
                )
        return graph

    def _build_value_list_graph(
        self, statements: list[dict[str, Any]]
    ) -> list[dict[str, Optional[str]]]:
        graph: list[dict[str, Optional[str]]] = []
        seen: set[tuple[str, str]] = set()

        for statement in self._iter_statement_nodes(statements):
            if not isinstance(statement, dict):
                continue

            statement_entity = statement.get("entity")
            if not isinstance(statement_entity, str) or not statement_entity:
                continue

            value_payload = statement.get("value")
            if not isinstance(value_payload, dict):
                continue

            cache_path = value_payload.get("value_list_reference")
            if not isinstance(cache_path, str) or not cache_path:
                continue

            target_id = self._qid_from_cache_path(cache_path)
            if not target_id:
                continue

            type_ids = self._entity_type_ids(self._cache_index.get(target_id))
            if type_ids and self.GKC_VALUE_LIST_CLASS not in type_ids:
                continue

            key = (statement_entity, cache_path)
            if key in seen:
                continue
            seen.add(key)

            graph.append(
                {
                    "entity": f"{self.entity_prefix}{target_id}",
                    "label": self._entity_label(target_id),
                    "via_statement": statement_entity,
                    "cache_path": cache_path,
                }
            )
        return graph

    def _iter_statement_nodes(
        self, statements: list[dict[str, Any]]
    ) -> list[dict[str, Any]]:
        nodes: list[dict[str, Any]] = []

        def _walk(statement_list: list[dict[str, Any]]) -> None:
            for statement in statement_list:
                if not isinstance(statement, dict):
                    continue
                nodes.append(statement)
                for key in ("qualifiers", "references"):
                    nested = statement.get(key)
                    if isinstance(nested, list):
                        _walk(nested)

        _walk(statements)
        return nodes

    def _qid_from_cache_path(self, cache_path: str) -> Optional[str]:
        candidate = Path(cache_path).stem.upper()
        if candidate.startswith("Q") and candidate[1:].isdigit():
            return candidate
        return None

    def _iter_statement_value_linkages(
        self, wikibase_item: dict[str, Any]
    ) -> list[tuple[str, list[str]]]:
        linkages: list[tuple[str, list[str]]] = []
        claims = wikibase_item.get("entity", {}).get("claims", {})
        for claim in claims.get(self.PROFILE_STATEMENT, []):
            statement_id = self._claim_entity_id(claim)
            if not statement_id:
                continue
            statement_doc = self._cache_index.get(statement_id)
            statement_claims = (
                statement_doc.get("entity", {}).get("claims", {})
                if statement_doc
                else {}
            )
            intrinsic = self._claim_entity_values(
                statement_claims.get(self.HAS_VALUE, [])
            )
            overlay = self._qualifier_entity_ids(
                claim.get("qualifiers", {}), self.HAS_VALUE
            )
            linkages.append(
                (statement_id, self._dedupe_preserve_order(intrinsic + overlay))
            )
        return linkages

    def _build_profile_statements(
        self, wikibase_item: dict[str, Any]
    ) -> list[dict[str, Any]]:
        statements: list[dict[str, Any]] = []
        claims = wikibase_item.get("entity", {}).get("claims", {})
        root_id = wikibase_item.get("entity_id")
        for claim in claims.get(self.PROFILE_STATEMENT, []):
            statement_id = self._claim_entity_id(claim)
            if not statement_id:
                continue
            built = self._build_statement_from_cache_id(
                statement_id,
                role="statement",
                overlay_qualifiers=claim.get("qualifiers", {}),
                visited={root_id} if root_id else set(),
                current_profile_id=root_id,
            )
            if built:
                statements.append(built)
        return statements

    def _build_statement_from_cache_id(
        self,
        entity_id: str,
        *,
        role: str,
        overlay_qualifiers: Optional[dict[str, list[dict[str, Any]]]] = None,
        visited: Optional[set[str]] = None,
        current_profile_id: Optional[str] = None,
        parent_statement_id: Optional[str] = None,
    ) -> Optional[dict[str, Any]]:
        if visited is None:
            visited = set()
        if entity_id in visited:
            return None

        statement_item = self._cache_index.get(entity_id)
        if not statement_item:
            return None

        next_visited = set(visited)
        next_visited.add(entity_id)

        statement_json = self._build_statement_from_item(statement_item)

        qualifiers = overlay_qualifiers or {}
        overlay_value_ids = self._qualifier_entity_ids(qualifiers, self.HAS_VALUE)
        combined_value_ids = self._dedupe_preserve_order(
            statement_json["value"]["linked_entity_ids"] + overlay_value_ids
        )
        statement_json["value"] = self._build_value_payload(
            statement_json["value"]["type"], combined_value_ids
        )

        source_statement_id = self._derived_value_source_statement_id(
            statement_item=statement_item,
            role=role,
            parent_statement_id=parent_statement_id,
            current_profile_id=current_profile_id,
        )
        if source_statement_id:
            statement_json["value"]["value_source"] = "statement_value"
            statement_json["value"][
                "value_source_statement"
            ] = f"{self.entity_prefix}{source_statement_id}"

        if qualifiers:
            statement_json["messages"] = self._merge_messages(
                statement_json.get("messages", {}),
                self._build_messages_from_qualifiers(qualifiers),
            )
            statement_json["max_count"] = self._qualifier_first_quantity_int(
                qualifiers, self.MAX_COUNT
            )

            if role == "statement":
                qualifier_ids = self._qualifier_entity_ids(
                    qualifiers, self.HAS_QUALIFIER
                )
                reference_ids = self._qualifier_entity_ids(
                    qualifiers, self.HAS_REFERENCE
                )
                statement_json["qualifiers"] = self._resolve_linked_statements(
                    qualifier_ids,
                    role="qualifier",
                    visited=next_visited,
                    current_profile_id=current_profile_id,
                    parent_statement_id=entity_id,
                )
                statement_json["references"] = self._resolve_linked_statements(
                    reference_ids,
                    role="reference",
                    visited=next_visited,
                    current_profile_id=current_profile_id,
                    parent_statement_id=entity_id,
                )

        if role in {"qualifier", "reference"}:
            statement_json.pop("qualifiers", None)
            statement_json.pop("references", None)

        return statement_json

    def _resolve_linked_statements(
        self,
        entity_ids: list[str],
        *,
        role: str,
        visited: set[str],
        current_profile_id: Optional[str],
        parent_statement_id: str,
    ) -> list[dict[str, Any]]:
        resolved: list[dict[str, Any]] = []
        for entity_id in entity_ids:
            nested = self._build_statement_from_cache_id(
                entity_id,
                role=role,
                visited=visited,
                current_profile_id=current_profile_id,
                parent_statement_id=parent_statement_id,
            )
            if nested:
                resolved.append(nested)
        return resolved

    def _build_statement_from_item(
        self, statement_item: dict[str, Any]
    ) -> dict[str, Any]:
        entity_id = statement_item.get("entity_id")
        label = self._get_localized_text(
            statement_item,
            section="labels",
            language_order=self.label_language_order,
            required=False,
        )

        claims = statement_item.get("entity", {}).get("claims", {})
        io_targets = self._claim_string_values(claims.get(self.IO_MAP, []))

        value_type: Optional[str] = None
        type_refs = self._claim_entity_values(claims.get(self.VALUE_TYPE, []))
        if type_refs:
            value_type = self._entity_label(type_refs[0]) or type_refs[0]

        intrinsic_value_ids = self._claim_entity_values(claims.get(self.HAS_VALUE, []))

        return {
            "entity": f"{self.entity_prefix}{entity_id}",
            "label": label,
            "io_map": [{"to": target} for target in io_targets],
            "value": {
                "type": value_type,
                "linked_entity_ids": intrinsic_value_ids,
            },
            "messages": self._build_messages_from_claims(claims),
            "max_count": None,
            "qualifiers": [],
            "references": [],
        }

    def _build_value_payload(
        self, value_type: Optional[str], target_ids: list[str]
    ) -> dict[str, Any]:
        payload: dict[str, Any] = {"type": value_type}
        value_list: list[dict[str, Optional[str]]] = []

        for target_id in target_ids:
            target_doc = self._cache_index.get(target_id)
            type_ids = self._entity_type_ids(target_doc)
            target_label = self._entity_label(target_id)
            target_entity = f"{self.entity_prefix}{target_id}"

            if self.GKC_ENTITY_PROFILE_CLASS in type_ids and "profile" not in payload:
                payload["profile"] = {"entity": target_entity, "label": target_label}

            if (
                self.GKC_VALUE_LIST_CLASS in type_ids
                and "value_list_reference" not in payload
            ):
                payload["value_list_reference"] = f"cache/queries/{target_id}.json"

            if self.WIKIDATA_ENTITY_CLASS in type_ids:
                for url in self._entity_string_claim_values(target_doc, self.SAME_AS):
                    qid = self._extract_wikidata_qid_from_url(url)
                    if qid:
                        value_list.append({"item": qid, "itemLabel": target_label})

        if value_list:
            payload["value_list"] = value_list

        return payload

    def _build_messages_from_claims(
        self, claims: dict[str, list[dict[str, Any]]]
    ) -> dict[str, dict[str, str]]:
        messages: dict[str, dict[str, str]] = {}
        for prop_id, field_name in self.MESSAGE_FIELD_BY_PROP.items():
            by_lang = self._monolingual_claims_by_language(claims.get(prop_id, []))
            for language, text in by_lang.items():
                messages.setdefault(language, {})[field_name] = text
        return messages

    def _build_messages_from_qualifiers(
        self, qualifiers: dict[str, list[dict[str, Any]]]
    ) -> dict[str, dict[str, str]]:
        messages: dict[str, dict[str, str]] = {}
        for prop_id, field_name in self.MESSAGE_FIELD_BY_PROP.items():
            by_lang = self._monolingual_qualifiers_by_language(
                qualifiers.get(prop_id, [])
            )
            for language, text in by_lang.items():
                messages.setdefault(language, {})[field_name] = text
        return messages

    def _merge_messages(
        self,
        base_messages: dict[str, dict[str, str]],
        overlay_messages: dict[str, dict[str, str]],
    ) -> dict[str, dict[str, str]]:
        merged = {language: fields.copy() for language, fields in base_messages.items()}
        for language, fields in overlay_messages.items():
            merged.setdefault(language, {}).update(fields)
        return merged

    def _build_language_section(
        self,
        wikibase_item: dict[str, Any],
        prompt_claim_id: str,
        guidance_claim_id: str,
    ) -> dict[str, dict[str, str]]:
        prompt_by_language = self._extract_monolingual_by_language(
            wikibase_item, prompt_claim_id
        )
        guidance_by_language = self._extract_monolingual_by_language(
            wikibase_item, guidance_claim_id
        )

        languages = sorted(set(prompt_by_language) | set(guidance_by_language))
        section: dict[str, dict[str, str]] = {}

        for language in languages:
            entry: dict[str, str] = {}
            prompts = prompt_by_language.get(language, [])
            guidances = guidance_by_language.get(language, [])
            if prompts:
                entry["prompt"] = prompts[0]
            if guidances:
                entry["guidance"] = guidances[0]
            if entry:
                section[language] = entry

        return section

    def _extract_monolingual_by_language(
        self,
        wikibase_item: dict[str, Any],
        claim_id: str,
    ) -> dict[str, list[str]]:
        claims = wikibase_item.get("entity", {}).get("claims", {}).get(claim_id, [])
        by_language: dict[str, list[str]] = {}
        for claim in claims:
            value = claim.get("mainsnak", {}).get("datavalue", {}).get("value", {})
            if not isinstance(value, dict):
                continue
            language = value.get("language")
            text = value.get("text")
            if language and text:
                by_language.setdefault(language, []).append(text)
        return by_language

    def _localized_text_map(
        self, wikibase_item: dict[str, Any], section: str
    ) -> dict[str, str]:
        mapped: dict[str, str] = {}
        values = wikibase_item.get("entity", {}).get(section, {})
        for language, payload in values.items():
            if isinstance(payload, dict) and payload.get("value"):
                mapped[language] = payload["value"]
        return mapped

    def _alias_text_map(self, wikibase_item: dict[str, Any]) -> dict[str, list[str]]:
        alias_map: dict[str, list[str]] = {}
        aliases = wikibase_item.get("entity", {}).get("aliases", {})
        for language, values in aliases.items():
            texts = [
                value.get("value")
                for value in values
                if isinstance(value, dict) and value.get("value")
            ]
            if texts:
                alias_map[language] = texts
        return alias_map

    def _collect_languages(self, value: Any) -> list[str]:
        found: set[str] = set()

        identification = (
            value.get("identification", {}) if isinstance(value, dict) else {}
        )
        statements = value.get("statements", []) if isinstance(value, dict) else []
        metadata = value.get("metadata", {}) if isinstance(value, dict) else {}

        self._collect_language_keys_from_identification(identification, found)
        self._collect_language_keys_from_statements(statements, found)
        self._collect_language_keys_from_metadata(metadata, found)

        return sorted(found)

    def _collect_language_keys_from_identification(
        self, identification: dict[str, Any], out: set[str]
    ) -> None:
        for field_name in ("labels", "descriptions", "aliases"):
            language_map = identification.get(field_name, {})
            if isinstance(language_map, dict):
                out.update(self._valid_language_keys(language_map.keys()))

    def _collect_language_keys_from_statements(
        self, statements: list[dict[str, Any]], out: set[str]
    ) -> None:
        for statement in statements:
            if not isinstance(statement, dict):
                continue

            messages = statement.get("messages", {})
            if isinstance(messages, dict):
                out.update(self._valid_language_keys(messages.keys()))

            for nested_field in ("qualifiers", "references"):
                nested = statement.get(nested_field, [])
                if isinstance(nested, list):
                    self._collect_language_keys_from_statements(nested, out)

    def _collect_language_keys_from_metadata(
        self, metadata: dict[str, Any], out: set[str]
    ) -> None:
        for field_name in ("labels", "descriptions", "aliases"):
            language_map = metadata.get(field_name, {})
            if isinstance(language_map, dict):
                out.update(self._valid_language_keys(language_map.keys()))

    def _valid_language_keys(self, keys: Any) -> set[str]:
        valid: set[str] = set()
        for key in keys:
            if isinstance(key, str) and self.LANGUAGE_KEY_PATTERN.fullmatch(key):
                valid.add(key)
        return valid

    def _is_profile_item(self, wikibase_item: dict[str, Any]) -> bool:
        claims_p1 = wikibase_item.get("entity", {}).get("claims", {}).get("P1", [])
        for claim in claims_p1:
            if self._claim_entity_id(claim) == self.PROFILE_CLASS_ID:
                return True
        return False

    def _load_cache_index(self) -> dict[str, dict[str, Any]]:
        index: dict[str, dict[str, Any]] = {}
        for json_file in sorted(self.cache_entities_dir.glob("*.json")):
            with json_file.open("r", encoding="utf-8") as handle:
                doc = json.load(handle)
            entity_id = doc.get("entity_id")
            if entity_id:
                index[entity_id] = doc
        return index

    def _get_localized_text(
        self,
        wikibase_item: dict[str, Any],
        *,
        section: str,
        language_order: tuple[str, ...],
        required: bool,
    ) -> Optional[str]:
        values = wikibase_item.get("entity", {}).get(section, {})
        for language in language_order:
            text = values.get(language, {}).get("value")
            if text:
                return text
        for payload in values.values():
            if isinstance(payload, dict) and payload.get("value"):
                return payload["value"]
        if required:
            raise ValueError(
                f"{section} missing for {wikibase_item.get('entity_id', '<unknown>')}"
            )
        return None

    def _monolingual_claims_by_language(
        self, claims: list[dict[str, Any]]
    ) -> dict[str, str]:
        by_language: dict[str, str] = {}
        for claim in claims:
            value = claim.get("mainsnak", {}).get("datavalue", {}).get("value", {})
            if not isinstance(value, dict):
                continue
            language = value.get("language")
            text = value.get("text")
            if language and text and language not in by_language:
                by_language[language] = text
        return by_language

    def _monolingual_qualifiers_by_language(
        self, qualifiers: list[dict[str, Any]]
    ) -> dict[str, str]:
        by_language: dict[str, str] = {}
        for qualifier in qualifiers:
            value = qualifier.get("datavalue", {}).get("value", {})
            if not isinstance(value, dict):
                continue
            language = value.get("language")
            text = value.get("text")
            if language and text and language not in by_language:
                by_language[language] = text
        return by_language

    def _claim_entity_id(self, claim: dict[str, Any]) -> Optional[str]:
        value = claim.get("mainsnak", {}).get("datavalue", {}).get("value", {})
        return value.get("id") if isinstance(value, dict) else None

    def _claim_entity_values(self, claims: list[dict[str, Any]]) -> list[str]:
        values: list[str] = []
        for claim in claims:
            entity_id = self._claim_entity_id(claim)
            if entity_id:
                values.append(entity_id)
        return values

    def _derived_value_source_statement_id(
        self,
        *,
        statement_item: dict[str, Any],
        role: str,
        parent_statement_id: Optional[str],
        current_profile_id: Optional[str],
    ) -> Optional[str]:
        if role not in {"qualifier", "reference"}:
            return None
        if not parent_statement_id:
            return None

        claims = statement_item.get("entity", {}).get("claims", {})
        derives_claims = claims.get(self.DERIVES_DEFAULT_VALUE_FROM, [])
        for claim in derives_claims:
            source_statement_id = self._claim_entity_id(claim)
            if not source_statement_id or source_statement_id != parent_statement_id:
                continue
            if self._claim_applies_to_profile(claim, current_profile_id):
                return source_statement_id
        return None

    def _claim_applies_to_profile(
        self,
        claim: dict[str, Any],
        current_profile_id: Optional[str],
    ) -> bool:
        if not current_profile_id:
            return True

        qualifiers = claim.get("qualifiers", {})
        if not isinstance(qualifiers, dict):
            return True

        applies_to_profiles = self._qualifier_entity_ids(
            qualifiers, self.APPLIES_TO_PROFILE
        )
        if not applies_to_profiles:
            return True
        return current_profile_id in applies_to_profiles

    def _claim_string_values(self, claims: list[dict[str, Any]]) -> list[str]:
        values: list[str] = []
        for claim in claims:
            value = claim.get("mainsnak", {}).get("datavalue", {}).get("value")
            if isinstance(value, str):
                values.append(value)
        return values

    def _qualifier_entity_ids(
        self, qualifiers: dict[str, list[dict[str, Any]]], prop_id: str
    ) -> list[str]:
        values: list[str] = []
        for qualifier in qualifiers.get(prop_id, []):
            value = qualifier.get("datavalue", {}).get("value", {})
            if isinstance(value, dict) and value.get("id"):
                values.append(value["id"])
        return values

    def _qualifier_first_quantity_int(
        self,
        qualifiers: dict[str, list[dict[str, Any]]],
        prop_id: str,
    ) -> Optional[int]:
        for qualifier in qualifiers.get(prop_id, []):
            value = qualifier.get("datavalue", {}).get("value", {})
            if not isinstance(value, dict):
                continue
            amount = value.get("amount")
            if isinstance(amount, str):
                try:
                    return int(float(amount))
                except ValueError:
                    return None
        return None

    def _entity_type_ids(self, entity_doc: Optional[dict[str, Any]]) -> list[str]:
        if not entity_doc:
            return []
        claims = entity_doc.get("entity", {}).get("claims", {})
        return self._claim_entity_values(claims.get("P1", []))

    def _entity_string_claim_values(
        self, entity_doc: Optional[dict[str, Any]], prop_id: str
    ) -> list[str]:
        if not entity_doc:
            return []
        claims = entity_doc.get("entity", {}).get("claims", {})
        return self._claim_string_values(claims.get(prop_id, []))

    def _extract_wikidata_qid_from_url(self, url: str) -> Optional[str]:
        if "/entity/Q" not in url:
            return None
        candidate = url.rstrip("/").split("/")[-1]
        if candidate.startswith("Q") and candidate[1:].isdigit():
            return candidate
        return None

    def _dedupe_preserve_order(self, values: list[str]) -> list[str]:
        seen: set[str] = set()
        deduped: list[str] = []
        for value in values:
            if value in seen:
                continue
            seen.add(value)
            deduped.append(value)
        return deduped

    def _entity_label(self, entity_id: str) -> Optional[str]:
        doc = self._cache_index.get(entity_id)
        if not doc:
            return None
        return self._get_localized_text(
            doc,
            section="labels",
            language_order=self.label_language_order,
            required=False,
        )

build_all()

Build JSON documents for every cache entity typed as a profile.

Source code in gkc/spirit_safe.py
1403
1404
1405
1406
1407
1408
1409
def build_all(self) -> list[dict[str, Any]]:
    """Build JSON documents for every cache entity typed as a profile."""
    results: list[dict[str, Any]] = []
    for doc in self._cache_index.values():
        if self._is_profile_item(doc):
            results.append(self.build_one(doc))
    return results

build_one(wikibase_item)

Build one JSON profile document from a single cache entity.

Source code in gkc/spirit_safe.py
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
def build_one(self, wikibase_item: dict[str, Any]) -> dict[str, Any]:
    """Build one JSON profile document from a single cache entity."""
    entity_uri = f"{self.entity_prefix}{wikibase_item.get('entity_id', '')}"

    identification = {
        "labels": self._build_language_section(
            wikibase_item, self.LABEL_PROMPT, self.LABEL_GUIDANCE
        ),
        "descriptions": self._build_language_section(
            wikibase_item, self.DESCRIPTION_PROMPT, self.DESCRIPTION_GUIDANCE
        ),
        "aliases": self._build_language_section(
            wikibase_item, self.ALIAS_PROMPT, self.ALIAS_GUIDANCE
        ),
    }

    statements = self._build_profile_statements(wikibase_item)
    metadata = self._build_profile_metadata(
        wikibase_item,
        identification=identification,
        statements=statements,
        entity_uri=entity_uri,
    )

    return {
        "entity": entity_uri,
        "identification": identification,
        "statements": statements,
        "metadata": metadata,
    }

gkc.spirit_safe.EntityProfileJsonExportResult dataclass

Summary of JSON entity profile export writes.

Source code in gkc/spirit_safe.py
2145
2146
2147
2148
2149
2150
@dataclass(frozen=True)
class EntityProfileJsonExportResult:
    """Summary of JSON entity profile export writes."""

    output_dir: str
    written_ids: list[str]

gkc.spirit_safe.build_entity_profile_json_documents(cache_entities_dir, *, entity_prefix='https://datadistillery.wikibase.cloud/entity/')

Build JSON entity profile documents from cache entities.

Parameters:

Name Type Description Default
cache_entities_dir Union[str, Path]

Directory containing SpiritSafe cache entity JSON files.

required
entity_prefix str

URI prefix for entity references.

'https://datadistillery.wikibase.cloud/entity/'

Returns:

Type Description
list[dict[str, Any]]

List of JSON profile documents.

Source code in gkc/spirit_safe.py
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
def build_entity_profile_json_documents(
    cache_entities_dir: Union[str, Path],
    *,
    entity_prefix: str = "https://datadistillery.wikibase.cloud/entity/",
) -> list[dict[str, Any]]:
    """Build JSON entity profile documents from cache entities.

    Args:
        cache_entities_dir: Directory containing SpiritSafe cache entity JSON files.
        entity_prefix: URI prefix for entity references.

    Returns:
        List of JSON profile documents.
    """
    builder = EntityProfileJsonBuilder(
        cache_entities_dir=cache_entities_dir,
        entity_prefix=entity_prefix,
    )
    return builder.build_all()

gkc.spirit_safe.export_entity_profile_json_documents(cache_entities_dir, output_dir, *, entity_prefix='https://datadistillery.wikibase.cloud/entity/', profile_ids=None)

Build and export JSON entity profile documents as one file per profile.

Files are written as <output_dir>/<QID>.json.

Parameters:

Name Type Description Default
cache_entities_dir Union[str, Path]

Directory containing SpiritSafe cache entity JSON files.

required
output_dir Union[str, Path]

Output directory for generated JSON profile files.

required
entity_prefix str

URI prefix for entity references.

'https://datadistillery.wikibase.cloud/entity/'
profile_ids Optional[list[str]]

Optional list of profile QIDs to export.

None

Returns:

Type Description
EntityProfileJsonExportResult

Export result summary.

Source code in gkc/spirit_safe.py
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
def export_entity_profile_json_documents(
    cache_entities_dir: Union[str, Path],
    output_dir: Union[str, Path],
    *,
    entity_prefix: str = "https://datadistillery.wikibase.cloud/entity/",
    profile_ids: Optional[list[str]] = None,
) -> EntityProfileJsonExportResult:
    """Build and export JSON entity profile documents as one file per profile.

    Files are written as `<output_dir>/<QID>.json`.

    Args:
        cache_entities_dir: Directory containing SpiritSafe cache entity JSON files.
        output_dir: Output directory for generated JSON profile files.
        entity_prefix: URI prefix for entity references.
        profile_ids: Optional list of profile QIDs to export.

    Returns:
        Export result summary.
    """
    documents = build_entity_profile_json_documents(
        cache_entities_dir=cache_entities_dir,
        entity_prefix=entity_prefix,
    )

    requested_ids = set(profile_ids or [])
    if requested_ids:
        filtered_documents = []
        for document in documents:
            entity_id = str(document.get("entity", "")).rstrip("/").split("/")[-1]
            if entity_id in requested_ids:
                filtered_documents.append(document)
        documents = filtered_documents

    out_dir = Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    written_ids: list[str] = []
    for document in documents:
        entity_id = str(document.get("entity", "")).rstrip("/").split("/")[-1]
        if not entity_id:
            continue
        destination = out_dir / f"{entity_id}.json"
        destination.write_text(json.dumps(document, indent=2), encoding="utf-8")
        written_ids.append(entity_id)

    return EntityProfileJsonExportResult(
        output_dir=str(out_dir.resolve()),
        written_ids=sorted(written_ids),
    )

gkc.spirit_safe.ValueListHydrationResult dataclass

Summary of value-list query export and hydration operations.

Source code in gkc/spirit_safe.py
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
@dataclass(frozen=True)
class ValueListHydrationResult:
    """Summary of value-list query export and hydration operations."""

    queries_dir: str
    cache_queries_dir: str
    discovered_ids: list[str] = field(default_factory=list)
    hydrated_ids: list[str] = field(default_factory=list)
    query_files_written: list[str] = field(default_factory=list)
    cache_files_written: list[str] = field(default_factory=list)
    failures: list[dict[str, Any]] = field(default_factory=list)

gkc.spirit_safe.discover_value_list_ids(cache_entities_dir, *, value_list_class_id='Q7')

Discover all value-list entity IDs from SpiritSafe cache entities.

Value lists are identified by P1 -> Q7 classification in cached entity claims.

Source code in gkc/spirit_safe.py
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
def discover_value_list_ids(
    cache_entities_dir: Union[str, Path],
    *,
    value_list_class_id: str = "Q7",
) -> list[str]:
    """Discover all value-list entity IDs from SpiritSafe cache entities.

    Value lists are identified by `P1 -> Q7` classification in cached entity claims.
    """
    cache_dir = Path(cache_entities_dir)
    discovered: list[str] = []

    for path in sorted(cache_dir.glob("*.json")):
        try:
            payload = json.loads(path.read_text(encoding="utf-8"))
        except Exception:
            continue

        entity_id = payload.get("entity_id")
        if not isinstance(entity_id, str) or not entity_id:
            continue

        claims = payload.get("entity", {}).get("claims", {})
        p1_claims = claims.get("P1", []) if isinstance(claims, dict) else []
        if _claims_include_entity_id(p1_claims, value_list_class_id):
            discovered.append(entity_id)

    return sorted(discovered)

gkc.spirit_safe.export_value_list_sparql_queries(*, cache_entities_dir, queries_dir, api_url, value_list_ids=None)

Export first <sparql> talk-page blocks into SpiritSafe query files.

Writes one file per value-list ID as <queries_dir>/<QID>.sparql.

Source code in gkc/spirit_safe.py
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
def export_value_list_sparql_queries(
    *,
    cache_entities_dir: Union[str, Path],
    queries_dir: Union[str, Path],
    api_url: str,
    value_list_ids: Optional[list[str]] = None,
) -> dict[str, Any]:
    """Export first `<sparql>` talk-page blocks into SpiritSafe query files.

    Writes one file per value-list ID as `<queries_dir>/<QID>.sparql`.
    """
    selected_ids = sorted(
        set(value_list_ids or discover_value_list_ids(cache_entities_dir))
    )
    out_dir = Path(queries_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    api_client = WikibaseApiClient(api_url=api_url)
    written_files: list[str] = []
    failures: list[dict[str, Any]] = []

    for entity_id in selected_ids:
        title = f"Item_talk:{entity_id}"
        try:
            wikitext = fetch_mediawiki_page_wikitext(api_client=api_client, title=title)
            query_text = extract_first_sparql_block(wikitext)
            output_path = out_dir / f"{entity_id}.sparql"
            output_path.write_text(query_text.strip() + "\n", encoding="utf-8")
            written_files.append(str(output_path.resolve()))
        except Exception as exc:
            failures.append(
                {
                    "value_list_id": entity_id,
                    "source_title": title,
                    "error": str(exc),
                }
            )

    return {
        "value_list_ids": selected_ids,
        "queries_dir": str(out_dir.resolve()),
        "query_files_written": sorted(written_files),
        "failures": failures,
    }

gkc.spirit_safe.hydrate_value_list_query_caches(*, value_list_ids, queries_dir, cache_queries_dir, endpoint, page_size=1000, max_results=None, wikibase_api_url='https://datadistillery.wikibase.cloud/w/api.php')

Hydrate value-list JSON cache artifacts from local .sparql files.

Existing cache files are preserved if hydration fails for an item.

Source code in gkc/spirit_safe.py
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
def hydrate_value_list_query_caches(
    *,
    value_list_ids: list[str],
    queries_dir: Union[str, Path],
    cache_queries_dir: Union[str, Path],
    endpoint: str,
    page_size: int = 1000,
    max_results: Optional[int] = None,
    wikibase_api_url: str = "https://datadistillery.wikibase.cloud/w/api.php",
) -> dict[str, Any]:
    """Hydrate value-list JSON cache artifacts from local `.sparql` files.

    Existing cache files are preserved if hydration fails for an item.
    """
    query_root = Path(queries_dir)
    cache_root = Path(cache_queries_dir)
    cache_root.mkdir(parents=True, exist_ok=True)

    base_uri = _wikibase_base_uri_from_api_url(wikibase_api_url)

    hydrated_ids: list[str] = []
    written_files: list[str] = []
    failures: list[dict[str, Any]] = []

    for entity_id in value_list_ids:
        query_file = query_root / f"{entity_id}.sparql"
        output_file = cache_root / f"{entity_id}.json"
        try:
            query_text = read_sparql_query_file(query_file)
            rows = paginate_query(
                query=query_text,
                page_size=page_size,
                endpoint=endpoint,
                max_results=max_results,
            )
            items = _normalize_value_list_items(rows)
            payload = {
                "metadata": {
                    "entity": f"{base_uri}/entity/{entity_id}",
                    "source": f"{base_uri}/wiki/Item_talk:{entity_id}",
                    "query": f"queries/{entity_id}.sparql",
                    "updated": datetime.now(timezone.utc)
                    .isoformat()
                    .replace("+00:00", "Z"),
                    "count": len(items),
                },
                "items": items,
            }
            output_file.write_text(json.dumps(payload, indent=2), encoding="utf-8")
            hydrated_ids.append(entity_id)
            written_files.append(str(output_file.resolve()))
        except Exception as exc:
            failures.append(
                {
                    "value_list_id": entity_id,
                    "query_file": str(query_file),
                    "cache_file": str(output_file),
                    "error": str(exc),
                }
            )

    return {
        "value_list_ids": value_list_ids,
        "cache_queries_dir": str(cache_root.resolve()),
        "hydrated_ids": sorted(hydrated_ids),
        "cache_files_written": sorted(written_files),
        "failures": failures,
    }

gkc.spirit_safe.hydrate_value_lists_from_cache(*, cache_entities_dir, queries_dir, cache_queries_dir, api_url, endpoint, value_list_ids=None, page_size=1000, max_results=None, fail_on_hydration_error=True)

Export value-list SPARQL files and hydrate value-list cache artifacts.

Source code in gkc/spirit_safe.py
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
def hydrate_value_lists_from_cache(
    *,
    cache_entities_dir: Union[str, Path],
    queries_dir: Union[str, Path],
    cache_queries_dir: Union[str, Path],
    api_url: str,
    endpoint: str,
    value_list_ids: Optional[list[str]] = None,
    page_size: int = 1000,
    max_results: Optional[int] = None,
    fail_on_hydration_error: bool = True,
) -> ValueListHydrationResult:
    """Export value-list SPARQL files and hydrate value-list cache artifacts."""
    export_summary = export_value_list_sparql_queries(
        cache_entities_dir=cache_entities_dir,
        queries_dir=queries_dir,
        api_url=api_url,
        value_list_ids=value_list_ids,
    )

    export_failures = list(export_summary.get("failures", []))
    if export_failures and fail_on_hydration_error:
        first = export_failures[0]
        raise RuntimeError(
            "Failed to export value-list SPARQL query "
            f"for {first.get('value_list_id')}: {first.get('error')}"
        )

    eligible_ids = [
        value_list_id
        for value_list_id in export_summary["value_list_ids"]
        if value_list_id not in {f.get("value_list_id") for f in export_failures}
    ]

    hydrate_summary = hydrate_value_list_query_caches(
        value_list_ids=eligible_ids,
        queries_dir=queries_dir,
        cache_queries_dir=cache_queries_dir,
        endpoint=endpoint,
        page_size=page_size,
        max_results=max_results,
        wikibase_api_url=api_url,
    )

    failures = export_failures + list(hydrate_summary.get("failures", []))
    if failures and fail_on_hydration_error:
        first = failures[0]
        raise RuntimeError(
            "Value-list hydration failed "
            f"for {first.get('value_list_id')}: {first.get('error')}"
        )

    return ValueListHydrationResult(
        queries_dir=export_summary["queries_dir"],
        cache_queries_dir=hydrate_summary["cache_queries_dir"],
        discovered_ids=sorted(export_summary["value_list_ids"]),
        hydrated_ids=sorted(hydrate_summary["hydrated_ids"]),
        query_files_written=sorted(export_summary["query_files_written"]),
        cache_files_written=sorted(hydrate_summary["cache_files_written"]),
        failures=failures,
    )

Manifest and Packet Workflows

gkc.spirit_safe.Manifest dataclass

Container for a loaded URI-keyed SpiritSafe artifact manifest.

Source code in gkc/spirit_safe.py
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
@dataclass(frozen=True)
class Manifest:
    """Container for a loaded URI-keyed SpiritSafe artifact manifest."""

    generated_at: str
    source: str
    profiles: list[dict[str, Any]]
    entities: dict[str, Any]
    queries: list[dict[str, Any]]
    value_lists: list[dict[str, Any]]
    raw_manifest: dict[str, Any]

    @property
    def profile_qids(self) -> list[str]:
        """List the QIDs indexed in the manifest profile section."""

        qids: list[str] = []
        for profile in self.profiles:
            qid = _entity_id_from_reference(profile.get("qid") or profile.get("entity"))
            if qid:
                qids.append(qid)
        return qids

    def get_profile_entry(self, qid_or_uri: str) -> Optional[dict[str, Any]]:
        """Retrieve a manifest profile entry by QID or full entity URI."""

        requested_qid = _entity_id_from_reference(qid_or_uri)
        requested_uri = _entity_uri_from_reference(qid_or_uri)
        for profile in self.profiles:
            profile_qid = _entity_id_from_reference(
                profile.get("qid") or profile.get("entity")
            )
            profile_uri = _entity_uri_from_reference(profile.get("entity"))
            if requested_qid and profile_qid == requested_qid:
                return profile
            if requested_uri and profile_uri == requested_uri:
                return profile
        return None

profile_qids property

List the QIDs indexed in the manifest profile section.

get_profile_entry(qid_or_uri)

Retrieve a manifest profile entry by QID or full entity URI.

Source code in gkc/spirit_safe.py
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
def get_profile_entry(self, qid_or_uri: str) -> Optional[dict[str, Any]]:
    """Retrieve a manifest profile entry by QID or full entity URI."""

    requested_qid = _entity_id_from_reference(qid_or_uri)
    requested_uri = _entity_uri_from_reference(qid_or_uri)
    for profile in self.profiles:
        profile_qid = _entity_id_from_reference(
            profile.get("qid") or profile.get("entity")
        )
        profile_uri = _entity_uri_from_reference(profile.get("entity"))
        if requested_qid and profile_qid == requested_qid:
            return profile
        if requested_uri and profile_uri == requested_uri:
            return profile
    return None

gkc.spirit_safe.build_spiritsafe_manifest_document(spiritsafe_root)

Build a manifest document from already-generated SpiritSafe artifacts.

The manifest indexes artifacts present under a local SpiritSafe checkout. It does not re-query Wikibase or regenerate profile/value-list artifacts.

Source code in gkc/spirit_safe.py
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
def build_spiritsafe_manifest_document(
    spiritsafe_root: Union[str, Path],
) -> dict[str, Any]:
    """Build a manifest document from already-generated SpiritSafe artifacts.

    The manifest indexes artifacts present under a local SpiritSafe checkout. It
    does not re-query Wikibase or regenerate profile/value-list artifacts.
    """

    root = Path(spiritsafe_root).expanduser().resolve()
    profiles_dir = root / "profiles"
    cache_entities_dir = root / "cache" / "entities"
    queries_dir = root / "queries"
    cache_queries_dir = root / "cache" / "queries"

    entity_label_index: dict[str, str] = {}
    entity_qids: list[str] = []
    for entity_path in sorted(cache_entities_dir.glob("*.json")):
        entity_doc = json.loads(entity_path.read_text(encoding="utf-8"))
        entity_id = str(entity_doc.get("entity_id") or entity_path.stem)
        entity_qids.append(entity_id)
        entity_label_index[entity_id] = _label_from_cache_entity(entity_doc)

    profiles: list[dict[str, Any]] = []
    for profile_path in sorted(profiles_dir.glob("*.json")):
        profile_doc = json.loads(profile_path.read_text(encoding="utf-8"))
        metadata = profile_doc.get("metadata", {})
        entity_uri = _entity_uri_from_reference(profile_doc.get("entity"))
        qid = _entity_id_from_reference(entity_uri) or profile_path.stem
        profiles.append(
            {
                "entity": entity_uri,
                "qid": qid,
                "labels": metadata.get("labels", {}),
                "descriptions": metadata.get("descriptions", {}),
                "statement_count": metadata.get(
                    "statement_count", len(profile_doc.get("statements", []))
                ),
                "profile_graph": metadata.get("profile_graph", []),
                "value_list_graph": metadata.get("value_list_graph", []),
            }
        )

    queries = [
        {"qid": query_path.stem, "path": f"queries/{query_path.name}"}
        for query_path in sorted(queries_dir.glob("*.sparql"))
    ]

    value_lists: list[dict[str, Any]] = []
    for cache_path in sorted(cache_queries_dir.glob("*.json")):
        cache_doc = json.loads(cache_path.read_text(encoding="utf-8"))
        metadata = cache_doc.get("metadata", {})
        qid = cache_path.stem
        entity_uri = _entity_uri_from_reference(metadata.get("entity")) or (
            f"{SPIRITSAFE_ENTITY_URI_PREFIX}{qid}"
        )
        value_lists.append(
            {
                "entity": entity_uri,
                "qid": qid,
                "label": (
                    metadata.get("label")
                    or entity_label_index.get(qid)
                    or _profile_label_from_map(metadata.get("labels", {}))
                ),
                "path": f"cache/queries/{cache_path.name}",
                "item_count": metadata.get("count", len(cache_doc.get("items", []))),
            }
        )

    return {
        "generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        "source": _manifest_source_url(),
        "profiles": profiles,
        "entities": {
            "count": len(entity_qids),
            "qids": sorted(entity_qids),
        },
        "queries": queries,
        "value_lists": value_lists,
    }

gkc.spirit_safe.export_spiritsafe_manifest(spiritsafe_root, output_path=None)

Build and write the SpiritSafe artifact manifest.

Returns the manifest document that was written to disk.

Source code in gkc/spirit_safe.py
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
def export_spiritsafe_manifest(
    spiritsafe_root: Union[str, Path],
    output_path: Optional[Union[str, Path]] = None,
) -> dict[str, Any]:
    """Build and write the SpiritSafe artifact manifest.

    Returns the manifest document that was written to disk.
    """

    root = Path(spiritsafe_root).expanduser().resolve()
    destination = (
        Path(output_path).expanduser().resolve()
        if output_path is not None
        else (root / "cache" / "manifest.json")
    )
    manifest_document = build_spiritsafe_manifest_document(root)
    destination.parent.mkdir(parents=True, exist_ok=True)
    destination.write_text(json.dumps(manifest_document, indent=2), encoding="utf-8")
    return manifest_document

gkc.spirit_safe.load_manifest(source_mode=None, github_repo=None, github_ref=None, local_root=None, use_cache=True)

Load the SpiritSafe artifact manifest with optional caching.

Source code in gkc/spirit_safe.py
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
def load_manifest(
    source_mode: Optional[SpiritSafeSourceMode] = None,
    github_repo: Optional[str] = None,
    github_ref: Optional[str] = None,
    local_root: Optional[Union[str, Path]] = None,
    use_cache: bool = True,
) -> Manifest:
    """Load the SpiritSafe artifact manifest with optional caching."""

    global _MANIFEST_CACHE

    if source_mode is not None or github_repo is not None or local_root is not None:
        source = SpiritSafeSourceConfig(
            mode=source_mode or get_spirit_safe_source().mode,
            github_repo=github_repo or get_spirit_safe_source().github_repo,
            github_ref=github_ref or get_spirit_safe_source().github_ref,
            local_root=(
                Path(local_root).expanduser().resolve()
                if local_root
                else get_spirit_safe_source().local_root
            ),
        )
    else:
        source = get_spirit_safe_source()

    cache_key = (
        f"{source.mode}:{source.github_repo}:{source.github_ref}:{source.local_root}"
    )
    if use_cache and _MANIFEST_CACHE is not None:
        cached_key, cached_manifest = _MANIFEST_CACHE
        if cached_key == cache_key:
            return cached_manifest

    manifest_path = source.resolve_relative("cache/manifest.json")

    try:
        manifest_data = _load_json_from_resolved_path(manifest_path)
    except FileNotFoundError as exc:
        raise FileNotFoundError(
            f"Manifest not found at {manifest_path}. Ensure SpiritSafe artifacts are built."
        ) from exc
    except Exception as exc:
        raise RuntimeError(f"Failed to load manifest: {exc}") from exc

    manifest = Manifest(
        generated_at=str(manifest_data.get("generated_at", "")),
        source=str(manifest_data.get("source", "")),
        profiles=manifest_data.get("profiles", []),
        entities=manifest_data.get("entities", {}),
        queries=manifest_data.get("queries", []),
        value_lists=manifest_data.get("value_lists", []),
        raw_manifest=manifest_data,
    )

    if use_cache:
        _MANIFEST_CACHE = (cache_key, manifest)

    return manifest

gkc.spirit_safe.load_profile(profile_id, manifest=None)

Load a single JSON entity profile by QID or entity URI.

Source code in gkc/spirit_safe.py
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
def load_profile(
    profile_id: str, manifest: Optional[Manifest] = None
) -> dict[str, Any]:
    """Load a single JSON entity profile by QID or entity URI."""

    del manifest

    entity_id = _entity_id_from_reference(profile_id)
    if not entity_id:
        raise FileNotFoundError(f"Invalid profile reference: {profile_id}")

    source = get_spirit_safe_source()
    resolved_path = source.resolve_relative(f"profiles/{entity_id}.json")

    try:
        return _load_json_from_resolved_path(resolved_path)
    except FileNotFoundError as exc:
        raise FileNotFoundError(
            f"Profile JSON not found: profiles/{entity_id}.json"
        ) from exc
    except Exception as exc:
        raise RuntimeError(f"Failed to load profile '{entity_id}': {exc}") from exc

gkc.spirit_safe.load_profile_package(profile_id, depth=1, manifest=None)

Load a JSON profile plus related JSON profiles from embedded graph metadata.

Source code in gkc/spirit_safe.py
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
def load_profile_package(
    profile_id: str, depth: int = 1, manifest: Optional[Manifest] = None
) -> dict[str, Any]:
    """Load a JSON profile plus related JSON profiles from embedded graph metadata."""

    del manifest

    normalized_profile_id = _entity_id_from_reference(profile_id)
    if not normalized_profile_id:
        raise FileNotFoundError(f"Invalid profile reference: {profile_id}")

    profiles_to_load: dict[str, dict[str, Any]] = {}
    _load_profile_documents_for_depth(
        normalized_profile_id,
        depth,
        loaded_profiles=profiles_to_load,
        visited=set(),
    )

    from gkc.profiles.graph import ProfileGraph

    graph = ProfileGraph.from_profile_documents(profiles_to_load)

    primary_profile = profiles_to_load.get(normalized_profile_id)
    if primary_profile is None:
        raise FileNotFoundError(
            f"Profile '{normalized_profile_id}' could not be loaded"
        )

    return {
        "primary_profile": normalized_profile_id,
        "primary_profile_entity": primary_profile.get("entity"),
        "profiles": profiles_to_load,
        "graph": graph,
        "depth": depth,
    }

gkc.spirit_safe.get_profile_graph(manifest=None)

Get the complete ProfileGraph from the loaded manifest.

Source code in gkc/spirit_safe.py
2633
2634
2635
2636
2637
2638
2639
2640
2641
def get_profile_graph(manifest: Optional[Manifest] = None) -> Any:
    """Get the complete ProfileGraph from the loaded manifest."""

    if manifest is None:
        manifest = load_manifest()

    from gkc.profiles.graph import ProfileGraph

    return ProfileGraph.from_manifest_data(manifest.profiles)

Resolve a profile-graph edge by source profile and linking statement URI/QID.

Source code in gkc/spirit_safe.py
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
def resolve_profile_link(
    source_profile_id: str,
    statement_id: str,
    manifest: Optional[Manifest] = None,
) -> Optional[dict[str, Any]]:
    """Resolve a profile-graph edge by source profile and linking statement URI/QID."""

    del manifest

    profile_document = load_profile(source_profile_id)
    requested_statement_id = _entity_id_from_reference(statement_id)
    requested_statement_uri = _entity_uri_from_reference(statement_id)

    for edge in profile_document.get("metadata", {}).get("profile_graph", []):
        edge_statement_id = _entity_id_from_reference(edge.get("via_statement"))
        edge_statement_uri = _entity_uri_from_reference(edge.get("via_statement"))
        if requested_statement_id and edge_statement_id == requested_statement_id:
            return {
                "target_profile": _entity_id_from_reference(edge.get("entity")),
                "target_entity": _entity_uri_from_reference(edge.get("entity")),
                "via_statement": edge_statement_uri,
                "relationship_type": edge.get("linkage_type"),
                "label": edge.get("label"),
            }
        if requested_statement_uri and edge_statement_uri == requested_statement_uri:
            return {
                "target_profile": _entity_id_from_reference(edge.get("entity")),
                "target_entity": _entity_uri_from_reference(edge.get("entity")),
                "via_statement": edge_statement_uri,
                "relationship_type": edge.get("linkage_type"),
                "label": edge.get("label"),
            }

    return None

gkc.spirit_safe.create_curation_packet(profile_id, operation_mode='single', load_wikidata_qids=False, depth=1, manifest=None)

Create a curation packet from JSON Entity Profiles.

Packet assembly reads profiles/<QID>.json directly. The SpiritSafe manifest remains a tooling/index artifact and is not required here.

Source code in gkc/spirit_safe.py
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
def create_curation_packet(
    profile_id: str,
    operation_mode: str = "single",
    load_wikidata_qids: bool = False,
    depth: int = 1,
    manifest: Optional[Manifest] = None,
) -> dict[str, Any]:
    """Create a curation packet from JSON Entity Profiles.

    Packet assembly reads `profiles/<QID>.json` directly. The SpiritSafe
    manifest remains a tooling/index artifact and is not required here.
    """

    del load_wikidata_qids
    del manifest

    actual_depth = depth if operation_mode == "bulk" else 0
    package = load_profile_package(profile_id, depth=actual_depth)

    entities: list[dict[str, Any]] = []
    entity_id_map: dict[str, str] = {}

    for idx, (profile_qid, profile_data) in enumerate(package["profiles"].items()):
        entity_id = f"ent-{idx + 1:03d}"
        entity_id_map[profile_qid] = entity_id
        normalized_statements = [
            _normalized_packet_statement(statement)
            for statement in profile_data.get("statements", [])
            if isinstance(statement, dict)
        ]
        entities.append(
            {
                "id": entity_id,
                "profile": profile_qid,
                "profile_entity": profile_data.get("entity"),
                "data": {},
                "profile_structure": {
                    "identification": profile_data.get("identification", {}),
                    "statements": normalized_statements,
                },
            }
        )

    cross_references: list[dict[str, Any]] = []
    for source_profile_id, profile_data in package["profiles"].items():
        source_entity_id = entity_id_map.get(source_profile_id)
        if not source_entity_id:
            continue
        for edge in profile_data.get("metadata", {}).get("profile_graph", []):
            target_profile = _entity_id_from_reference(edge.get("entity"))
            if not target_profile or target_profile not in entity_id_map:
                continue
            cross_references.append(
                {
                    "from": source_entity_id,
                    "from_profile": source_profile_id,
                    "from_entity": profile_data.get("entity"),
                    "to": entity_id_map[target_profile],
                    "to_profile": target_profile,
                    "to_entity": _entity_uri_from_reference(edge.get("entity")),
                    "via_statement": _entity_uri_from_reference(
                        edge.get("via_statement")
                    ),
                    "relationship_type": edge.get("linkage_type"),
                    "cardinality": {},
                    "workflow_policy": {},
                }
            )

    cardinality_constraints = [
        {
            "from": cross_reference["from"],
            "to": cross_reference["to"],
            "min": 0,
            "max": -1,
        }
        for cross_reference in cross_references
    ]

    primary_profile_id = package["primary_profile"]
    return {
        "packet_id": f"pkt-{uuid.uuid4().hex[:12]}",
        "operation_mode": operation_mode,
        "created_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        "primary_profile": primary_profile_id,
        "primary_profile_entity": package.get("primary_profile_entity"),
        "entities": entities,
        "cross_references": cross_references,
        "cardinality_constraints": cardinality_constraints,
        "profile_package": package,
    }

gkc.spirit_safe.validate_packet_structure(packet)

Validate packet structure and basic linkage consistency.

Source code in gkc/spirit_safe.py
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
def validate_packet_structure(packet: dict[str, Any]) -> tuple[bool, list[str]]:
    """Validate packet structure and basic linkage consistency."""

    errors = []
    required_fields = ["packet_id", "operation_mode", "entities", "cross_references"]
    for required_field in required_fields:
        if required_field not in packet:
            errors.append(f"Missing required field: {required_field}")

    entity_ids = {
        str(entity.get("id"))
        for entity in packet.get("entities", [])
        if isinstance(entity, dict) and entity.get("id")
    }

    for cross_ref in packet.get("cross_references", []):
        if cross_ref.get("from") not in entity_ids:
            errors.append(
                f"Cross-reference from {cross_ref.get('from')} points to unknown entity"
            )
        if cross_ref.get("to") not in entity_ids:
            errors.append(
                f"Cross-reference to {cross_ref.get('to')} points to unknown entity"
            )

    for constraint in packet.get("cardinality_constraints", []):
        minimum = constraint.get("min")
        maximum = constraint.get("max")
        if isinstance(minimum, int) and minimum < 0:
            errors.append(f"Cardinality min must be >= 0: {constraint}")
        if (
            isinstance(minimum, int)
            and isinstance(maximum, int)
            and maximum != -1
            and maximum < minimum
        ):
            errors.append(f"Cardinality max must be >= min or -1: {constraint}")

    return (len(errors) == 0, errors)

Theoretical Design Notes

  • Packet-level conformance notices shared across charge/barrel/validation are architecturally planned but not yet standardized in a single public type.
  • Wizard integration should consume packet structures and value-list routes directly from packet artifacts, without local manifest inference.