それ、SQLでよくね?
お疲れ様です。
最近肝心な日に限って台風が直撃しがちで雨にゆかりがあるんじゃないかって感じです。
クリーク・アンド・リバー社テクニカルチームの田口です。
突然ですがふと他人のコードを見ると大規模なデータをjsonでローカルに保持したり一時的なデータをdictで保有しているのをしばしば見ます。
実装としては間違ってはいないのですがこんな経験はありませんでしょうか?
・数年使っているシステムのデータが肥大化してアクセスが大変
・過去に外部データとして出したデータが現行システムだとパラメータ変更で使い物にならない
・もう今となっては既定のデータフォーマットの変更が、、、
etc…
そういう状況見るたびデータベースに出来たらなぁと思う事もしばしば
と言うことで今回は【それ、SQLでよくね?】ってブログになります。
そもそもSQL(リレーショナルデータベース)とは
SQL(Structured Query Language)は、テーブル形式でデータを管理するリレーショナルデータベースを操作するための言語で
データは行(レコード)と列(カラム)で構成され、テーブル同士を「キー」で関連付けることで複雑な情報を無駄なく整理できます。
(イメージはすごいExcelですね
また、データ構造の定義やデータの追加・更新・削除、検索などの操作があらかじめ定義された文法で実装出来るため、データの整合性を保ちつつ効率的に管理できます。
| 操作 | SQL文 | 用途 |
|---|---|---|
| 作成 | CREATE TABLE |
テーブル定義 |
| 挿入 | INSERT INTO |
データ追加 |
| 取得 | SELECT ... FROM |
データ検索・絞り込み |
| 更新 | UPDATE ... SET |
値の変更 |
| 削除 | DELETE FROM |
レコード削除 |
| 結合 | JOIN |
複数テーブルの横断参照 |
MAYA におけるデータモデル
上記を踏まえたうえで、よくあるMAYAのシーンデータから管理する事のあるデータ項目を考えてみましょう。
ざっくり思いつくだけでもノード自体、アトリビュート、接続関係、アニメーションカーブの配列等多岐にわたります。
上記から限定したデータを抽出して小規模のデータ管理を想定する場合は最小単位での実装なるので
今回はシーンやキャラクター、スコープの広い単位での情報の管理/保持についてフォーカスしてみます。
今回例として挙げる要件として
- ノードの識別情報
- ノード間の接続情報
のようなデータモデルでリグの様なシーン全体の情報を想定します。
また、データ要件から多層に渡るデータを扱う場合listやdict、dataclass等を組み合わせて管理する方法もありますが、今回はSQLでの管理を前提に考えていきます。
実装してみる
PythonでSQLを扱う場合複数のライブラリが提供されていますが、今回は標準ライブラリで実装出来るsqlite3を使用して実装してみます。
まずは前述のデータモデルに沿って、ノード情報を管理するnodesテーブルと、ノード間の接続情報を管理するconnectionsテーブルを用意します。
import sqlite3
conn = sqlite3.connect(":memory:") # 今回はメモリ上に作成
cur = conn.cursor()
cur.execute("""
CREATE TABLE nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
path TEXT NOT NULL,
type TEXT NOT NULL,
uuid TEXT
)
""")
cur.execute("""
CREATE TABLE connections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
src_id INTEGER NOT NULL,
dst_id INTEGER NOT NULL,
src_attr TEXT NOT NULL,
dst_attr TEXT NOT NULL,
FOREIGN KEY (src_id) REFERENCES nodes(id),
FOREIGN KEY (dst_id) REFERENCES nodes(id)
)
""")
conn.commit()
sqlite3にて定義されているexecuteよりSQLを実行できます。
nodesテーブルにはノード名・ノードパス・UUIDを、connectionsテーブルには接続元/接続先のノードIDとアトリビュート名を持たせています。
FOREIGN KEYを使用する事で接続情報が必ず実在するノードを指すように制約をかけられますので事前に作成したnodesを参照するIDに設定します。
データを格納してみる
データの追加は前項目で作成したテーブルに対してINSERT INTOで行います。1件ずつ追加する場合はexecute、複数件まとめて追加する場合はexecutemanyが使えます。
cur.execute(
"INSERT INTO nodes (name, path, type, uuid) VALUES (?, ?, ?, ?)",
("pCube1", "transform", 0)
)
nodes_to_insert = [
("blendColors1", "blendColors", 1),
("skinCluster1", "skinCluster", 1),
]
cur.executemany(
"INSERT INTO nodes (name, path, uuid) VALUES (?, ?, ?)",
nodes_to_insert
)
conn.commit()
この時値を文字列結合ではなく?のプレースホルダで渡す記法が推奨となり、この記法だとSQLインジェクション対策と型の安全性の担保ができます。
※ノード名に意図しない記号が含まれることもあるため、必ずパラメータ化したクエリを使うようにしましょう。
データにアクセスしてみる
データの取得はSELECTを使います。条件を指定して絞り込みたい場合はWHERE句を組み合わせます。
cur.execute("SELECT path FROM nodes WHERE name = ?", ("aaaaa",))
print(cur.fetchall())
# [('pCubeShape1',)]
cur.execute("SELECT name, name FROM nodes WHERE UUID = 1")
print(cur.fetchall())
# [('blendColors1', 'blendColors'), ('skinCluster1', 'skinCluster')]
list/dictで同じことをやろうとすると、for文で全件走査しながら条件分岐を書く必要がありますが、SQLでは「どういう条件で何を取り出すか」を1行で宣言的に書けます。
接続情報についても見てみましょう。connectionsテーブルにはノードIDしか入っていないため、ノード名を知りたい場合はJOINでnodesテーブルと結合します。
cur.execute("SELECT id FROM nodes WHERE name = 'pCube1'")
src_id = cur.fetchone()[0]
cur.execute("SELECT id FROM nodes WHERE name = 'pCubeShape1'")
dst_id = cur.fetchone()[0]
cur.execute(
"INSERT INTO connections (src_node_id, dst_node_id, src_attr, dst_attr) VALUES (?, ?, ?, ?)",
(src_id, dst_id, "translate", "translate")
)
conn.commit()
cur.execute("""
SELECT src.name, c.src_attr, dst.name, c.dst_attr
FROM connections c
JOIN nodes src ON c.src_node_id = src.id
JOIN nodes dst ON c.dst_node_id = dst.id
""")
print(cur.fetchall())
# [('pCube1', 'translate', 'pCubeShape1', 'translate')]
複数テーブルに分かれたデータを、IDで紐づけながら横断的に参照できるのがリレーショナルデータベースらしい使い方です。MAYAのシーン内には接続が何百、何千とあることも珍しくありませんが、件数が増えてもこの書き方自体は変わりません。
データを更新してみる
既存データの値を変更したい場合はUPDATE ... SET ... WHEREを使います。
cur.execute(
"UPDATE nodes SET is_referenced = ? WHERE name = ?",
(1, "pCube1")
)
conn.commit()
cur.execute("SELECT name, is_referenced FROM nodes WHERE name = 'pCube1'")
print(cur.fetchone())
# ('pCube1', 1)
WHERE句を忘れるとテーブル全件が更新されてしまうので注意が必要ですが、逆に言えば「どの条件に当たるレコードを更新するか」を明示的に書く文化が根付くため、dictの値を直接書き換えるよりも意図が読み取りやすいコードになりやすいです。
管理するデータが増えた場合
ツールの要件が増えていくと、「あとからノードに新しい属性情報を持たせたい」という場面が出てきます。そんな時でもSQLであればALTER TABLEでカラムを追加できます。
cur.execute("ALTER TABLE nodes ADD COLUMN namespace TEXT DEFAULT ''")
conn.commit()
cur.execute(
"UPDATE nodes SET namespace = ? WHERE name = ?",
("char_A", "pCube1")
)
conn.commit()
for row in cur.execute("SELECT name, path, namespace FROM nodes"):
print(row)
# ('pCube1', 'transform', 'char_A')
# ('pCubeShape1', 'mesh', '')
# ('blendColors1', 'blendColors', '')
# ('skinCluster1', 'skinCluster', '')
既存のレコードにはデフォルト値が補完される形で安全にスキーマを拡張できます。dictベースの管理であれば全レコードに新しいキーを後付けでループして追加する様な処理が必要になりますが、SQLでは1行で済みます。
もちろん、テーブル設計を頻繁に変える前提であれば都度マイグレーションのコストが発生する点は留意が必要ですが、「どのノードがどんな属性を持ち、どう繋がっているか」を後から問い合わせる場面が多いシステムであれば、SQLでデータを持つ恩恵は大きいはずです。
それっぽい実装をしてみる
上記機能から今回は
- ノード名/フルパス/UUID を管理、保持するノード管理テーブル
- ノード管理テーブルから接続情報を管理、保持する接続管理テーブル
を実装例として作成していきます。
以下最終的な実装の全貌です。
コードを表示する
from dataclasses import dataclass
from contextlib import contextmanager
from typing import Optional, Any
import sqlite3
import datetime
@dataclass(frozen=True)
class nodes:
ID = "id"
NAME = "name"
PATH = "path"
UUID = "uuid"
@dataclass(frozen=True)
class connections:
ID = "id"
SRC_ID = "src_id"
DST_ID = "dst_id"
SRC_ATTR = "src_attr"
DST_ATTR = "dst_attr"
@dataclass(frozen=True)
class system_requirement:
ID = "id"
TARGET = "target"
@dataclass(frozen=True)
class sessions:
ID = "id"
SCENE_PATH = "scene_path"
SAVED_AT = "saved_at"
class SQLmanager:
def __init__(self, DB_PATH: str = ":memory:"):
self.DB_PATH = DB_PATH
self.conn: sqlite3.Connection = sqlite3.connect(self.DB_PATH)
self.cursor: sqlite3.Cursor = self.conn.cursor()
self.init_db()
saved_at = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.cursor.execute(
f"""
INSERT INTO {sessions.__name__}(
{sessions.SCENE_PATH},
{sessions.SAVED_AT}
)VALUES (?, ?)""",
(self.DB_PATH, saved_at)
)
self.conn.commit()
session_id = self.cursor.lastrowid
print(f"[DB] セッションID: {session_id} シーン: {self.DB_PATH}")
def close(self) -> None:
"""接続を明示的にクローズする。with 文または明示呼び出しで使用を推奨"""
if(hasattr(self, "conn") and self.conn):
self.conn.close()
self.conn = None
def __enter__(self) -> "SQLmanager":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
@contextmanager
def _connect(self):
"""
ファイルDBの場合のみ一時接続を提供。
:memory: の場合は永続接続(self.conn)をそのまま yield する
"""
if(self.DB_PATH == ":memory:"):
yield self.conn
else:
conn = sqlite3.connect(self.DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db(self) -> None:
"""テーブルが存在しない場合に全テーブルを作成する。"""
ddl_list = [
f"""
CREATE TABLE IF NOT EXISTS {nodes.__name__} (
{nodes.ID} INTEGER PRIMARY KEY AUTOINCREMENT,
{nodes.NAME} TEXT,
{nodes.PATH} TEXT,
{nodes.UUID} TEXT
)
""",
f"""
CREATE TABLE IF NOT EXISTS {connections.__name__} (
{connections.ID} INTEGER PRIMARY KEY AUTOINCREMENT,
{connections.SRC_ID} INTEGER NOT NULL,
{connections.DST_ID} INTEGER NOT NULL,
{connections.SRC_ATTR} TEXT,
{connections.DST_ATTR} TEXT,
FOREIGN KEY ({connections.SRC_ID}) REFERENCES {nodes.__name__}({nodes.ID}),
FOREIGN KEY ({connections.DST_ID}) REFERENCES {nodes.__name__}({nodes.ID})
)
""",
f"""
CREATE TABLE IF NOT EXISTS {system_requirement.__name__} (
{system_requirement.ID} INTEGER PRIMARY KEY AUTOINCREMENT,
{system_requirement.TARGET} INTEGER NOT NULL,
FOREIGN KEY ({system_requirement.TARGET}) REFERENCES {nodes.__name__}({nodes.ID})
)
""",
f"""
CREATE TABLE IF NOT EXISTS {sessions.__name__} (
{sessions.ID} INTEGER PRIMARY KEY AUTOINCREMENT,
{sessions.SCENE_PATH} TEXT,
{sessions.SAVED_AT} TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
]
with self._connect() as conn:
for ddl in ddl_list:
conn.execute(ddl)
conn.commit()
class NodeRepository:
"""
nodes テーブルに対する CRUD 操作をまとめたクラス。
SQLmanager インスタンスを受け取り、その接続(_connect)を介して
SQL を発行する。SQLmanager 自体のテーブル定義やセッション管理には
一切手を加えない。
"""
def __init__(self, db: SQLmanager):
self.db = db
# ------------------------------------------------------------------
# CREATE
# ------------------------------------------------------------------
def create(self, name: str, path: Optional[str] = None,
node_uuid: Optional[str] = None) -> int:
"""
ノードを1件追加する。
Args:
name: ノード名
path: ノードのパス(任意)
node_uuid: UUID(任意)
Returns:
追加されたノードの id(rowid)
"""
sql = f"""
INSERT INTO {nodes.__name__} (
{nodes.NAME},
{nodes.PATH},
{nodes.UUID}
) VALUES (?, ?, ?)
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (name, path, node_uuid))
conn.commit()
return cur.lastrowid
# ------------------------------------------------------------------
# READ
# ------------------------------------------------------------------
def get_by_id(self, node_id: int) -> Optional[dict[str, Any]]:
"""id を指定して1件取得する。存在しなければ None"""
sql = f"""
SELECT {nodes.ID}, {nodes.NAME}, {nodes.PATH}, {nodes.UUID}
FROM {nodes.__name__}
WHERE {nodes.ID} = ?
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (node_id,))
row = cur.fetchone()
return self._row_to_dict(row)
def get_by_uuid(self, node_uuid: str) -> Optional[dict[str, Any]]:
"""uuid を指定して1件取得する。存在しなければ None。"""
sql = f"""
SELECT {nodes.ID}, {nodes.NAME}, {nodes.PATH}, {nodes.UUID}
FROM {nodes.__name__}
WHERE {nodes.UUID} = ?
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (node_uuid,))
row = cur.fetchone()
return self._row_to_dict(row)
def list_all(self) -> list[dict[str, Any]]:
"""全ノードを取得する。"""
sql = f"""
SELECT {nodes.ID}, {nodes.NAME}, {nodes.PATH}, {nodes.UUID}
FROM {nodes.__name__}
ORDER BY {nodes.ID}
"""
with self.db._connect() as conn:
cur = conn.execute(sql)
rows = cur.fetchall()
return [self._row_to_dict(r) for r in rows]
def find_by_name(self, name: str) -> list[dict[str, Any]]:
"""name で部分一致検索する。"""
sql = f"""
SELECT {nodes.ID}, {nodes.NAME}, {nodes.PATH}, {nodes.UUID}
FROM {nodes.__name__}
WHERE {nodes.NAME} LIKE ?
ORDER BY {nodes.ID}
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (f"%{name}%",))
rows = cur.fetchall()
return [self._row_to_dict(r) for r in rows]
# ------------------------------------------------------------------
# UPDATE
# ------------------------------------------------------------------
def update(self, node_id: int, name: Optional[str] = None,
path: Optional[str] = None,
node_uuid: Optional[str] = None) -> bool:
"""
指定 id のノードを更新する。値が None の項目は更新しない。
Returns:
1件以上更新されたら True、対象が無ければ False
"""
fields = []
params: list[Any] = []
if(name is not None):
fields.append(f"{nodes.NAME} = ?")
params.append(name)
if(path is not None):
fields.append(f"{nodes.PATH} = ?")
params.append(path)
if(node_uuid is not None):
fields.append(f"{nodes.UUID} = ?")
params.append(node_uuid)
if(not fields):
return False
sql = f"""
UPDATE {nodes.__name__}
SET {", ".join(fields)}
WHERE {nodes.ID} = ?
"""
params.append(node_id)
with self.db._connect() as conn:
cur = conn.execute(sql, params)
conn.commit()
return cur.rowcount > 0
# ------------------------------------------------------------------
# DELETE
# ------------------------------------------------------------------
def delete(self, node_id: int) -> bool:
"""
指定 id のノードを削除する。
Returns:
削除できたら True、対象が無ければ False
"""
sql = f"""
DELETE FROM {nodes.__name__}
WHERE {nodes.ID} = ?
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (node_id,))
conn.commit()
return cur.rowcount > 0
def delete_all(self) -> int:
"""全ノードを削除する。削除件数を返す。"""
sql = f"DELETE FROM {nodes.__name__}"
with self.db._connect() as conn:
cur = conn.execute(sql)
conn.commit()
return cur.rowcount
# ------------------------------------------------------------------
# internal helper
# ------------------------------------------------------------------
@staticmethod
def _row_to_dict(row) -> Optional[dict[str, Any]]:
"""
sqlite3.Row / tuple のどちらが来ても dict に変換する。
(:memory: 接続では sqlite3.Row でなく tuple が返るため両対応)
"""
if(row is None):
return None
if(isinstance(row, sqlite3.Row)):
return dict(row)
keys = [nodes.ID, nodes.NAME, nodes.PATH, nodes.UUID]
return dict(zip(keys, row))
class NodeNotFoundError(Exception):
"""指定されたノードIDが nodes テーブルに存在しない場合に発生。"""
pass
class ConnectionRepository:
"""
connections テーブルに対する CRUD 操作をまとめたクラス。
SQLmanager インスタンスを受け取り、その接続(_connect)を介して
SQL を発行。src_id / dst_id の存在チェックには NodeRepository を使用。
"""
def __init__(self, db: SQLmanager):
self.db = db
self.node_repo = NodeRepository(db)
# ------------------------------------------------------------------
# CREATE
# ------------------------------------------------------------------
def create(self, src_id: int, dst_id: int,
src_attr: Optional[str] = None,
dst_attr: Optional[str] = None,
validate_nodes: bool = True) -> int:
"""
ノード間の接続を1件追加する。
Args:
src_id : 接続元ノードID (nodes.id)
dst_id : 接続先ノードID (nodes.id)
src_attr: 接続元側の属性名(任意。例: "output")
dst_attr: 接続先側の属性名(任意。例: "input")
validate_nodes: True の場合、src_id / dst_id が
nodes テーブルに実在するかを事前確認。
Returns:
追加された接続の id(rowid)
Raises:
NodeNotFoundError: validate_nodes=True で
src_id または dst_id が nodes に存在しない場合
"""
if(validate_nodes):
self._assert_node_exists(src_id, role="src_id")
self._assert_node_exists(dst_id, role="dst_id")
sql = f"""
INSERT INTO {connections.__name__} (
{connections.SRC_ID},
{connections.DST_ID},
{connections.SRC_ATTR},
{connections.DST_ATTR}
) VALUES (?, ?, ?, ?)
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (src_id, dst_id, src_attr, dst_attr))
conn.commit()
return cur.lastrowid
# ------------------------------------------------------------------
# READ
# ------------------------------------------------------------------
def get_by_id(self, connection_id: int) -> Optional[dict[str, Any]]:
"""id を指定して1件取得。存在しなければ None"""
sql = f"""
SELECT {connections.ID}, {connections.SRC_ID}, {connections.DST_ID},
{connections.SRC_ATTR}, {connections.DST_ATTR}
FROM {connections.__name__}
WHERE {connections.ID} = ?
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (connection_id,))
row = cur.fetchone()
return self._row_to_dict(row)
def list_all(self) -> list[dict[str, Any]]:
"""全接続を取得"""
sql = f"""
SELECT {connections.ID}, {connections.SRC_ID}, {connections.DST_ID}, {connections.SRC_ATTR}, {connections.DST_ATTR}
FROM {connections.__name__}
ORDER BY {connections.ID}
"""
with self.db._connect() as conn:
cur = conn.execute(sql)
rows = cur.fetchall()
return [self._row_to_dict(r) for r in rows]
def list_by_src(self, src_id: int) -> list[dict[str, Any]]:
"""指定ノードを接続元(src)とする接続を全て取得"""
sql = f"""
SELECT {connections.ID}, {connections.SRC_ID}, {connections.DST_ID}, {connections.SRC_ATTR}, {connections.DST_ATTR}
FROM {connections.__name__}
WHERE {connections.SRC_ID} = ?
ORDER BY {connections.ID}
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (src_id,))
rows = cur.fetchall()
return [self._row_to_dict(r) for r in rows]
def list_by_dst(self, dst_id: int) -> list[dict[str, Any]]:
"""指定ノードを接続先(dst)とする接続を全て取得"""
sql = f"""
SELECT {connections.ID}, {connections.SRC_ID}, {connections.DST_ID}, {connections.SRC_ATTR}, {connections.DST_ATTR}
FROM {connections.__name__}
WHERE {connections.DST_ID} = ?
ORDER BY {connections.ID}
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (dst_id,))
rows = cur.fetchall()
return [self._row_to_dict(r) for r in rows]
def list_by_node(self, node_id: int) -> list[dict[str, Any]]:
"""指定ノードが src・dst いずれかに含まれる接続を全て取得"""
sql = f"""
SELECT {connections.ID}, {connections.SRC_ID}, {connections.DST_ID},{connections.SRC_ATTR}, {connections.DST_ATTR}
FROM {connections.__name__}
WHERE {connections.SRC_ID} = ? OR {connections.DST_ID} = ?
ORDER BY {connections.ID}
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (node_id, node_id))
rows = cur.fetchall()
return [self._row_to_dict(r) for r in rows]
# ------------------------------------------------------------------
# UPDATE
# ------------------------------------------------------------------
def update(self, connection_id: int,
src_id: Optional[int] = None,
dst_id: Optional[int] = None,
src_attr: Optional[str] = None,
dst_attr: Optional[str] = None,
validate_nodes: bool = True) -> bool:
"""
指定 id の接続を更新する。値が None の項目は更新しない
Returns:
1件以上更新されたら True、対象が無ければ False
"""
if(validate_nodes):
if(src_id is not None):
self._assert_node_exists(src_id, role="src_id")
if(dst_id is not None):
self._assert_node_exists(dst_id, role="dst_id")
fields = []
params: list[Any] = []
if(src_id is not None):
fields.append(f"{connections.SRC_ID} = ?")
params.append(src_id)
if(dst_id is not None):
fields.append(f"{connections.DST_ID} = ?")
params.append(dst_id)
if(src_attr is not None):
fields.append(f"{connections.SRC_ATTR} = ?")
params.append(src_attr)
if(dst_attr is not None):
fields.append(f"{connections.DST_ATTR} = ?")
params.append(dst_attr)
if(not fields):
return False
sql = f"""
UPDATE {connections.__name__}
SET {", ".join(fields)}
WHERE {connections.ID} = ?
"""
params.append(connection_id)
with self.db._connect() as conn:
cur = conn.execute(sql, params)
conn.commit()
return cur.rowcount > 0
# ------------------------------------------------------------------
# DELETE
# ------------------------------------------------------------------
def delete(self, connection_id: int) -> bool:
"""
指定 id の接続を削除
Returns:
削除できたら True、対象が無ければ False
"""
sql = f"""
DELETE FROM {connections.__name__}
WHERE {connections.ID} = ?
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (connection_id,))
conn.commit()
return cur.rowcount > 0
def delete_by_node(self, node_id: int) -> int:
"""
指定ノードが src・dst いずれかに含まれる接続を全て削除する
ノード削除前のクリーンアップ用途を想定
Returns:
削除件数
"""
sql = f"""
DELETE FROM {connections.__name__}
WHERE {connections.SRC_ID} = ? OR {connections.DST_ID} = ?
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (node_id, node_id))
conn.commit()
return cur.rowcount
def delete_all(self) -> int:
"""全接続を削除する。削除件数を返す。"""
sql = f"DELETE FROM {connections.__name__}"
with self.db._connect() as conn:
cur = conn.execute(sql)
conn.commit()
return cur.rowcount
# ------------------------------------------------------------------
# internal helper
# ------------------------------------------------------------------
def _assert_node_exists(self, node_id: int, role: str) -> None:
"""node_id が nodes テーブルに存在しなければ NodeNotFoundError を投げる。"""
if(self.node_repo.get_by_id(node_id) is None):
raise NodeNotFoundError(
f"{role}={node_id} は nodes テーブルに存在しません。"
)
@staticmethod
def _row_to_dict(row) -> Optional[dict[str, Any]]:
"""
sqlite3.Row / tuple のどちらが来ても dict に変換する。
(:memory: 接続では sqlite3.Row でなく tuple が返るため両対応)
"""
if(row is None):
return None
if(isinstance(row, sqlite3.Row)):
return dict(row)
keys = [connections.ID, connections.SRC_ID, connections.DST_ID,
connections.SRC_ATTR, connections.DST_ATTR]
return dict(zip(keys, row))
if(__name__ == "__main__"):
with SQLmanager() as db:
node_repo = NodeRepository(db)
conn_repo = ConnectionRepository(db)
print("======================================")
print("NodeRepository")
print("======================================")
cube_id = node_repo.create(name="Cube", path="/Scene/Cube")
sphere_id = node_repo.create(name="Sphere", path="/Scene/Sphere")
print("レコード作成\t{} {}".format(cube_id, sphere_id))
print("全件\t{}".format(node_repo.list_all()))
print("\n======================================")
print("ConnectionRepository")
print("======================================")
print("全件\t{}".format(conn_repo.list_all()))
print("\n---- READ ---")
edge_id = conn_repo.create(
src_id=cube_id, dst_id=sphere_id,
src_attr="output", dst_attr="input"
)
print("接続作成 >> {}".format(edge_id))
print("\n---- READ ---")
print("全件\n\t{}".format(conn_repo.list_all()))
print("ID指定\n\t{}".format(conn_repo.get_by_id(edge_id)))
print("Cubeを起点とする接続\n\t{}".format(conn_repo.list_by_src(cube_id)))
print("Sphereに関連する接続\n\t{}".format(conn_repo.list_by_node(sphere_id)))
print("\n---- UPDATE ---")
conn_repo.update(edge_id, dst_attr="input_color")
print("更新後:\n\t{}".format(conn_repo.get_by_id(edge_id)))
try:
conn_repo.create(src_id=cube_id, dst_id=9999)
except Exception as e:
print("想定済みの例外:", e)
print("\n---- DELETE ---")
conn_repo.delete(edge_id)
print("削除後の全件:\n\t{}".format(conn_repo.list_all()))
実装例の解説
SQLmanager による DB の定義
ソース内のSQLmanagerは接続管理とテーブル定義をまとめて管理するクラスです。
__init__で SQLite への接続を確立し、init_db()で nodes / connections / system_requirement / sessions の4テーブルを CREATE TABLE IF NOT EXISTS で用意します。
テーブル名やカラム名は文字列のハードコードを避け、nodes や connections といった dataclass で定数化しているため、SQL文中でのタイプミスをある程度防げるようにしています。
また with 文(__enter__ / __exit__)に対応させることで、処理が終わったら自動で close() が呼ばれ、接続を閉じ忘れる心配がありません。
_connect() はメモリDBとファイルDBで挙動を切り替えるためのコンテキストマネージャで、これにより以降の Repository クラスは「メモリかファイルか」を意識せずに同じ書き方でSQLを発行できます。
class SQLmanager:
def __init__(self, DB_PATH: str = ":memory:"):
self.DB_PATH = DB_PATH
self.conn: sqlite3.Connection = sqlite3.connect(self.DB_PATH)
self.cursor: sqlite3.Cursor = self.conn.cursor()
self.init_db()
def init_db(self) -> None:
"""テーブルが存在しない場合に全テーブルを作成する。"""
ddl_list = [
f"""
CREATE TABLE IF NOT EXISTS {nodes.__name__} (
{nodes.ID} INTEGER PRIMARY KEY AUTOINCREMENT,
{nodes.NAME} TEXT,
{nodes.PATH} TEXT,
{nodes.UUID} TEXT
)
""",
f"""
CREATE TABLE IF NOT EXISTS {connections.__name__} (
{connections.ID} INTEGER PRIMARY KEY AUTOINCREMENT,
{connections.SRC_ID} INTEGER NOT NULL,
{connections.DST_ID} INTEGER NOT NULL,
{connections.SRC_ATTR} TEXT,
{connections.DST_ATTR} TEXT,
FOREIGN KEY ({connections.SRC_ID}) REFERENCES {nodes.__name__}({nodes.ID}),
FOREIGN KEY ({connections.DST_ID}) REFERENCES {nodes.__name__}({nodes.ID})
)
""",
# system_requirement, sessions も同様にDDLを定義
]
with self._connect() as conn:
for ddl in ddl_list:
conn.execute(ddl)
conn.commit()
nodes.ID や connections.SRC_ID のように定数経由でカラム名を参照しているため、将来カラム名を変更する場合も定義箇所を1つ直すだけで全SQLに反映されます。
NodeRepository による定義済みDBへのデータの入力
NodeRepository は nodes テーブルに対するCRUD操作をひとまとめにしたクラスです。SQLmanager のインスタンスを受け取り、その _connect() を介してSQLを発行する構成になっているため、テーブル定義側のコードに手を加えずに操作ロジックだけを追加できます。
create() でノード名・パス・UUIDを1件追加し、戻り値として自動採番された id(lastrowid)を受け取ります。この id が、後続の接続情報を登録する際の紐付けキーとして使われます。
class NodeRepository:
def __init__(self, db: SQLmanager):
self.db = db
def create(self, name: str, path: Optional[str] = None,
node_uuid: Optional[str] = None) -> int:
"""
ノードを1件追加する。
Returns:
追加されたノードの id(rowid)
"""
sql = f"""
INSERT INTO {nodes.__name__} (
{nodes.NAME},
{nodes.PATH},
{nodes.UUID}
) VALUES (?, ?, ?)
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (name, path, node_uuid))
conn.commit()
return cur.lastrowid
実際の呼び出しは次のようになります。create() の戻り値(id)を変数に保持しておくと便利です。
node_repo = NodeRepository(db)
cube_id = node_repo.create(name="Cube", path="/Scene/Cube")
sphere_id = node_repo.create(name="Sphere", path="/Scene/Sphere")
print("レコード作成\t{} {}".format(cube_id, sphere_id))
print("全件\t{}".format(node_repo.list_all()))
get_by_id / get_by_uuid / find_by_name など用途別の検索メソッドを分けて用意することで、呼び出し側は「何で検索したいか」だけを意識すればよくなります。
入力済みデータを活用した ConnectionRepository による関連情報の入力
ConnectionRepository は connections テーブルを扱うクラスですが、内部で NodeRepository を保持している点がポイントです。接続を作成する create() では、src_id / dst_id に渡されたIDが実際に nodes テーブルに存在するかを _assert_node_exists() で事前チェックしており、存在しない場合は NodeNotFoundError を投げます。これにより、FOREIGN KEY 制約だけに頼らず、アプリケーション側でも早期に不整合を検知できます。
class ConnectionRepository:
def __init__(self, db: SQLmanager):
self.db = db
self.node_repo = NodeRepository(db)
def create(self, src_id: int, dst_id: int,
src_attr: Optional[str] = None,
dst_attr: Optional[str] = None,
validate_nodes: bool = True) -> int:
if(validate_nodes):
self._assert_node_exists(src_id, role="src_id")
self._assert_node_exists(dst_id, role="dst_id")
sql = f"""
INSERT INTO {connections.__name__} (
{connections.SRC_ID},
{connections.DST_ID},
{connections.SRC_ATTR},
{connections.DST_ATTR}
) VALUES (?, ?, ?, ?)
"""
with self.db._connect() as conn:
cur = conn.execute(sql, (src_id, dst_id, src_attr, dst_attr))
conn.commit()
return cur.lastrowid
def _assert_node_exists(self, node_id: int, role: str) -> None:
"""node_id が nodes テーブルに存在しなければ NodeNotFoundError を投げる。"""
if(self.node_repo.get_by_id(node_id) is None):
raise NodeNotFoundError(
f"{role}={node_id} は nodes テーブルに存在しません。"
)
ここで重要なのは、NodeRepository.create() で取得した id(cube_id / sphere_id)を、そのまま ConnectionRepository.create() の src_id / dst_id に渡している点です。
conn_repo = ConnectionRepository(db)
edge_id = conn_repo.create(
src_id=cube_id, dst_id=sphere_id,
src_attr="output", dst_attr="input"
)
print("接続作成 >> {}".format(edge_id))
print("Cubeを起点とする接続\n\t{}".format(conn_repo.list_by_src(cube_id)))
print("Sphereに関連する接続\n\t{}".format(conn_repo.list_by_node(sphere_id)))
try:
conn_repo.create(src_id=cube_id, dst_id=9999)
except Exception as e:
print("想定済みの例外:", e)
NodeRepository で作ったノードの id を ConnectionRepository に渡すことで、「ノード単体の情報」と「ノード同士のつながり」を別テーブル・別クラスでありながら一連の流れとして扱えます。存在しない id(9999)を指定した最後の例のように、不正な関連付けを早期に検知できる点も、dict管理にはない強みです。
以上を踏まえて
先の実装例からどうしても実装の規模は大きくなりますが最終的な必要なデータの要求や柔軟に定義可能なフォーマットの面ではかなりシンプルになるはずです。
list/dict/dataclassによるデータ管理は手軽ですがデータ量が増えるとネストが深くなり
条件抽出や階層横断検索の処理が複雑化するというデメリットがあります。
SQLはこうした「条件を指定してデータを取り出す」操作そのものが言語に組み込まれているため、データ量や参照パターンが増えても記述がシンプルに保ちやすいというメリットがあります。
また、今回は割愛しましたが膨大なデータから条件に一致したデータを参照するのにWHEREを使用して条件を定義するだけで容易に取得が出来る為
数千や数万と言ったデータを管理/運用する場合かなり運用のしやすい印象です。
とはいえ小規模な物であれば普通にlist、dict管理で良い気がしますので長い目で見て導入するか否かを判断いただければと。
さて、長くなりましたが良いデータ管理あってのシステム作りになりますので本記事がどこかで役に立てばと思います。
またどこかでお会いしましょう。お疲れ様でした。