Skip to content

graflo.db.tigergraph.token_cache

Process-scoped TigerGraph API token cache.

parse_tg_expiration(raw)

Convert TigerGraph expiration field to a Unix timestamp.

Source code in graflo/db/tigergraph/token_cache.py
def parse_tg_expiration(raw: str | None) -> float | None:
    """Convert TigerGraph expiration field to a Unix timestamp."""
    if raw is None:
        return None
    try:
        return float(raw)
    except (ValueError, TypeError):
        pass
    try:
        dt = datetime.fromisoformat(str(raw).replace("Z", "+00:00"))
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return dt.timestamp()
    except (ValueError, TypeError):
        return None

reset_tigergraph_token_cache()

Clear the process token cache (for tests and long-lived REPL sessions).

Source code in graflo/db/tigergraph/token_cache.py
def reset_tigergraph_token_cache() -> None:
    """Clear the process token cache (for tests and long-lived REPL sessions)."""
    with _TigerGraphTokenCache._instance_lock:
        if _TigerGraphTokenCache._instance is not None:
            _TigerGraphTokenCache._instance.clear()
        _TigerGraphTokenCache._instance = None