Source code for luminadb.database

"""SQLite Database"""

from atexit import register as finalize
from sqlite3 import OperationalError, connect
from typing import Iterable, Literal, Optional

from luminadb._debug import if_debug_print

from .utils import (
    WithCursor,
    check_iter,
    check_one,
    dict_factory,
    sqlite_multithread_check,
)
from .column import BuilderColumn, Column
from .query_builder.table_creation import extract_table_creations
from .table import Table
from .errors import DatabaseExistsError, DatabaseMissingError
from .index import Index

Columns = Iterable[Column] | Iterable[BuilderColumn]

__all__ = ["Database"]

IGNORE_TABLE_CHECKS = ("sqlite_master", "sqlite_temp_schema", "sqlite_temp_master")


[docs] class Database: # pylint: disable=too-many-instance-attributes """Sqlite3 database, this provide basic integration. Custom flags: strict : Certain actions are prevented when active, i.e, initializing nonexistent tables forgive: Certain actions are replaced when active, i.e, replacing .create_table to .table when a table exists""" def __init__(self, path: str, **kwargs) -> None: kwargs["check_same_thread"] = sqlite_multithread_check() != 3 self._path = path self._strict: bool = kwargs.get("strict", True) self._forgive: bool = kwargs.get("forgive", True) self._active = [] if 'forgive' in kwargs: del kwargs['forgive'] if 'strict' in kwargs: del kwargs['strict'] self._config = None self._closed = False self._table_class = Table if not self._closed or self.__dict__.get("_initiated", False) is False: finalize(self._finalizer) self._initiated = True self._kwargs = kwargs self._create_connection() def _create_connection(self): self._database = connect(self._path, **self._kwargs) self._database.row_factory = dict_factory def _finalizer(self): self.close()
[docs] def cursor(self) -> WithCursor: """Create cursor""" return self._database.cursor(WithCursor) # type: ignore
[docs] def create_table(self, table: str, columns: Columns): """Create table Args: table (str): Table name columns (Iterable[Column]): Columns for table Returns: Table: Newly created table """ columns = ( column.to_column() if isinstance(column, BuilderColumn) else column for column in columns ) tbquery = extract_table_creations(columns) query = f"create table {table} ({tbquery})" if_debug_print(query) if self._forgive and self.check_table(table): return self.table(table, columns) try: cursor = self._database.cursor() cursor.execute(query) self._database.commit() except OperationalError as error: if "already exists" in str(error): dberror = DatabaseExistsError(f"table {table} already exists.") dberror.add_note(f"{type(error).__name__}: {error!s}") raise dberror from error error.add_note(f"Query: {query}") raise error table_ = self.table(table, columns) table_._deleted = False # pylint: disable=protected-access return table_
[docs] def delete_table(self, table: str): """Delete an existing table Args: table (str): table name """ check_one(table) table_ = self.table(table) self._database.cursor().execute(f"drop table {table}") # pylint: disable-next=protected-access table_._delete_hook() # pylint: disable=protected-access
[docs] def table(self, table: str, __columns: Optional[Iterable[Column]] = None): # type: ignore """fetch table""" if self._strict and not self.check_table(table): raise DatabaseMissingError(f"table {table} does not exists.") try: this_table = self._table_class(self, table, __columns) except OperationalError as exc: dberror = DatabaseMissingError(f"table {table} does not exists") dberror.add_note(f"{type(exc).__name__}: {exc!s}") raise dberror from None return this_table
[docs] def reset_table(self, table: str, columns: Columns) -> Table: """Reset existing table with new, this rewrote entire table than altering it.""" try: self.delete_table(table) except OperationalError: pass return self.create_table(table, columns)
[docs] def rename_table(self, old_table: str, new_table: str) -> Table: """Rename existing table to a new one.""" check_iter((old_table, new_table)) cursor = self.sql.cursor() cursor.execute(f"alter table {old_table} rename to {new_table}") self.sql.commit() return self.table(new_table)
[docs] def check_table(self, table: str): """Check if table is exists or not.""" # if self._path in PLUGINS_PATH: # plugin = self._path[2:] # raise ValueError(f"Plugin {plugin} must redefine check_table.") check_one(table) if table in IGNORE_TABLE_CHECKS: return True # Let's return true. cursor = self.sql.cursor() cursor.execute( "select name from sqlite_master where type='table' and name=?", (table,) ) return cursor.fetchone() is not None
[docs] def create_index(self, index: Index): """Create an index""" with self.sql as dbcursor: dbcursor.execute(index.build_sql())
[docs] def delete_index(self, name: str | Index, exists_ok: bool = False): """Drop an index""" _name = name.index_name if isinstance(name, Index) else name check_one(_name) with self.sql as dbcursor: if_ok = "if exists" if exists_ok else "" dbcursor.execute(f"drop index {if_ok} {_name}")
def __repr__(self) -> str: return f"<{type(self).__name__} {id(self)}>"
[docs] def close(self): """Close database""" if self._closed: return self._database.close() if self.path == ":memory:": self._closed = True return self._closed = True
[docs] def tables(self) -> tuple[Table, ...]: """Return tuple containing all table except internal tables""" master = self.table("sqlite_master") listed = [] for table in master.select(): if table.type == "table": listed.append(self.table(table.name)) return tuple(listed)
[docs] def commit(self): """Commit changes to database""" self._database.commit()
[docs] def rollback(self): """Rollback changes""" self._database.rollback()
[docs] def foreign_pragma(self, bool_state: Literal["ON", "OFF", ""] = ""): """Enable/disable foreign key pragma""" if bool_state not in ("ON", "OFF", ""): raise ValueError("Either ON/OFF for foreign key pragma.") return self._database.execute( f"PRAGMA foreign_keys{'='+bool_state if bool_state else ''}" ).fetchone() # pylint: disable=line-too-long
[docs] def optimize(self): """Optimize current database""" return self._database.execute("PRAGMA optimize").fetchone()
[docs] def shrink_memory(self): """Shrink memories from database as much as it can.""" return self._database.execute("PRAGMA shrink_memory").fetchone()
[docs] def vacuum(self): """Vacuum this database""" return self._database.execute("VACUUM").fetchone()
@property def closed(self): """Is database closed?""" return self._closed @closed.setter def closed(self, __o: bool): """Is database closed?""" if __o: self.close() return raise ValueError("Expected non-false/non-null value") @property def path(self): """Path to SQL Connection""" return self._path or ":memory:" @property def sql(self): """SQL Connection""" return self._database