Source code for datacube.drivers.common_psql._utils
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2026 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
import contextlib
from collections.abc import Generator
from sqlalchemy import Connection, text
[docs]
def escape_pg_identifier(conn: Connection, name: str) -> str:
"""
Escape identifiers for inclusion in SQL statements.
In ODC only used for user names.
:param conn: A SQLAlchemy connection object.
:param name: The unescaped identifier/username
:return: The escaped identifier/username
"""
# psycopg2 and psycopg3 both support this via the `quote_ident()` function.
# We'll ask the server to escape instead to avoid conditional imports, as these are
# not performance-sensitive.
return str(conn.execute(text(f"select quote_ident('{name}')")).scalar())
[docs]
def get_connection_info(conn: Connection) -> tuple[str, str]:
"""
Return the database name and currently active role of an SQLAlchemy connection.
:param conn: The SQLAlchemy connection object.
:return: a string tuple (database name, role name)
"""
row = conn.execute(
text("select quote_ident(current_database()), quote_ident(current_user)")
).fetchone()
assert row is not None # Reassure mypy that this cannot return None
return tuple(row)
[docs]
def ensure_extension(conn: Connection, extension_name: str) -> None:
"""
Ensure the database has the given extension installed (e.g. the PostGIS extension).
:param conn: An SQLAlchemy connection object.
:param extension_name: The extension name, e.g. "postgis"
"""
sql = text(f"create extension if not exists {extension_name}")
conn.execute(sql)
[docs]
@contextlib.contextmanager
def as_role(conn: Connection, role: str | None) -> Generator[Connection]:
"""
Context manager to temporarily switch database roles.
E.g.::
with engine.connect() as conn:
# This is executed as the session role configured into the engine
conn.execute(query1)
with as_role(conn, "odc_admin") as conn:
# This is executed as the "odc_admin" role
conn.execute(query2)
# This is executed as the session role configured into the engine
conn.execute(query3)
:param conn: The base SQLAlchemy connection object.
:param role: The role to switch to, or None to continue using the currently active
role
:return: The SQLAlchemy connection object, switched to the specified role.
"""
if role is None:
yield conn
else:
_, db_user = get_connection_info(conn)
try:
conn.execute(text(f"set role {role}"))
yield conn
finally:
conn.execute(text(f"set role {db_user}"))