Source code for docarray.array.storage.sqlite.helper

import sqlite3


[docs]def initialize_table( table_name: str, container_type_name: str, schema_version: str, cur: sqlite3.Cursor ) -> None: if not _is_metadata_table_initialized(cur): _do_initialize_metadata_table(cur) if not _is_table_initialized(table_name, container_type_name, schema_version, cur): _do_create_table(table_name, cur) _do_tidy_table_metadata(table_name, container_type_name, schema_version, cur)
def _is_metadata_table_initialized(cur: sqlite3.Cursor) -> bool: try: cur.execute('SELECT 1 FROM metadata LIMIT 1') _ = list(cur) return True except sqlite3.OperationalError as _: pass return False def _do_initialize_metadata_table(cur: sqlite3.Cursor) -> None: cur.execute( ''' CREATE TABLE metadata ( table_name TEXT PRIMARY KEY, schema_version TEXT NOT NULL, container_type TEXT NOT NULL, UNIQUE (table_name, container_type) ) ''' ) def _do_create_table( table_name: str, cur: 'sqlite3.Cursor', ) -> None: cur.execute( f''' CREATE TABLE {table_name} ( doc_id TEXT NOT NULL UNIQUE, serialized_value Document NOT NULL, item_order INTEGER PRIMARY KEY) ''' ) def _is_table_initialized( table_name: str, container_type_name: str, schema_version: str, cur: sqlite3.Cursor ) -> bool: try: cur.execute( 'SELECT schema_version FROM metadata WHERE table_name=? AND container_type=?', (table_name, container_type_name), ) buf = cur.fetchone() if buf is None: return False version = buf[0] if version != schema_version: return False cur.execute(f'SELECT 1 FROM {table_name} LIMIT 1') _ = list(cur) return True except sqlite3.OperationalError as _: pass return False def _do_tidy_table_metadata( table_name: str, container_type_name: str, schema_version: str, cur: sqlite3.Cursor ) -> None: cur.execute( 'INSERT INTO metadata (table_name, schema_version, container_type) VALUES (?, ?, ?)', (table_name, schema_version, container_type_name), )