Source code for pycancensus.recalls

"""Detection and removal of locally cached data recalled by Statistics Canada.

Statistics Canada occasionally recalls published census data. CensusMapper
tracks these recalls and exposes them at /api/v1/recall.csv. Cached
get_census() results record the server's data-version header, which is
matched against the recall database to flag stale local data.
"""

import io
import warnings
from typing import Optional

import pandas as pd

from .cache import list_cache, remove_from_cache, session_cache_get, session_cache_set
from .settings import CENSUSMAPPER_API_URL
from .resilience import get_session

_RECALL_CACHE_KEY = "_recall_database"
_warned_this_session = False


[docs] def get_recalled_database(refresh: bool = False) -> Optional[pd.DataFrame]: """ Fetch the database of recalled data from CensusMapper. Parameters ---------- refresh : bool, default False Force a re-download instead of using the session-cached copy. Returns ------- pd.DataFrame or None Rows describing recalled data with columns api_version, dataset, level, vector — or None if the database cannot be downloaded. """ if not refresh: cached = session_cache_get(_RECALL_CACHE_KEY) if cached is not None: return cached try: response = get_session().get(f"{CENSUSMAPPER_API_URL}/recall.csv") data = pd.read_csv(io.StringIO(response.text), dtype=str) except Exception: warnings.warn("Unable to download recall database at this point.") return None if "api_version" not in data.columns: warnings.warn("Unable to download recall database at this point.") return None session_cache_set(_RECALL_CACHE_KEY, data) return data
def _version_number(version: Optional[str]) -> Optional[int]: """Parse the numeric part of a data version like "d.12" or "g.3".""" if not isinstance(version, str) or "." not in version: return None try: return int(version.split(".", 1)[1]) except ValueError: return None def _level_matches(recall_level, entry_level) -> bool: """A recall applies to its level, all levels (NaN), or 'Regions' entries.""" if pd.isna(recall_level) or recall_level == "": return True return entry_level == recall_level or entry_level == "Regions"
[docs] def list_recalled_cached_data( cached_data: Optional[pd.DataFrame] = None, ) -> Optional[pd.DataFrame]: """ List locally cached data that has been recalled by Statistics Canada. Only cache entries written by pycancensus versions that record request metadata (dataset, vectors, data version) can be checked. Parameters ---------- cached_data : pd.DataFrame, optional Cache listing to check, as returned by list_cache(). Defaults to the full local cache. Returns ------- pd.DataFrame or None Rows of list_cache() describing recalled entries, or None if the recall database could not be downloaded. Examples -------- >>> import pycancensus as pc >>> pc.list_recalled_cached_data() """ recall_db = get_recalled_database() if recall_db is None: return None if cached_data is None: cached_data = list_cache() if cached_data.empty or "dataset" not in cached_data.columns: return cached_data.iloc[0:0] recalled_keys = [] for _, entry in cached_data.iterrows(): if pd.isna(entry.get("dataset")): continue # no metadata recorded for this entry entry_vectors = set(entry.get("vectors") or []) data_version = _version_number(entry.get("version")) geo_version = _version_number(entry.get("geo_version")) for _, recall in recall_db.iterrows(): if recall["dataset"] != entry["dataset"]: continue if not _level_matches(recall.get("level"), entry.get("level")): continue recall_num = _version_number(recall["api_version"]) if recall_num is None: continue if recall["api_version"].startswith("d."): # Data recall: exact vector ID match (a recall of v_CA21_1 # must not flag v_CA21_10) for data at or below the # recalled version if ( data_version is not None and data_version <= recall_num and recall.get("vector") in entry_vectors ): recalled_keys.append(entry["cache_key"]) break elif recall["api_version"].startswith("g."): # Geometry recall: applies to any cached geometry at or # below the recalled version if geo_version is not None and geo_version <= recall_num: recalled_keys.append(entry["cache_key"]) break return cached_data[cached_data["cache_key"].isin(recalled_keys)]
[docs] def remove_recalled_cached_data() -> None: """ Remove locally cached data that has been recalled by Statistics Canada. Examples -------- >>> import pycancensus as pc >>> pc.remove_recalled_cached_data() """ recalled = list_recalled_cached_data() if recalled is None: return if recalled.empty: print("No recalled data in cached data.") return size = recalled["size_mb"].sum() remove_from_cache(cache_keys=recalled["cache_key"].tolist()) print(f"Removed {len(recalled)} recalled datasets totalling {size:.2f} MB.")
def check_recalled_data_and_warn(cache_key: str) -> None: """Warn (once per session) if a cached entry being read was recalled.""" global _warned_this_session if _warned_this_session: return cache_listing = list_cache() entry = cache_listing[cache_listing["cache_key"] == cache_key] if entry.empty: return recalled = list_recalled_cached_data(entry) if recalled is not None and not recalled.empty: _warned_this_session = True warnings.warn( "Currently loaded data has been recalled by Statistics Canada. " "Use list_recalled_cached_data() to inspect recalled locally " "cached data and remove_recalled_cached_data() to remove it." )