PG Vector 클라이언트 직접 만들기
Engineering VectorDB Python AI
PG Vector 클라이언트 직접 만들기
LangChain에서 PGVector 클라이언트를 제공하긴 하지만, DBA들이 관리하기 좋은 형태로 데이터를 저장하지 않는다.
vector_store = PGVector(
embeddings=embeddings,
collection_name=collection_name,
connection=connection,
use_jsonb=True,
)
docs = [
Document(
page_content="there are cats in the pond",
metadata={"id": 1, "location": "pond", "topic": "animals"},
)
]
vector_store.add_documents(docs, ids=[doc.metadata["id"] for doc in docs])
저장된 데이터
langchain_pg_collection
- uuid: uuid-A
- name: collection_name
langchain_pg_embedding
- id: 1
- collection_id: uuid-A
- embedding: ...
- document: "there are cats in the pond"
- cmetadata: {"id": 1, "location": "pond", "topic": "animals"}
- NoSQL처럼 데이터를 저장하고 사용하기에, DBA가 관리하기 어렵다.
따라서 직접 구현하기로 결정하였다.
구현 목표
- DBA들이 관리할 수 있는 형태로, RDB처럼 데이터를 저장할 것
- 메타데이터들이 컬럼으로 저장될 것
- 임베딩 된 벡터값 역시 또 하나의 컬럼으로 저장될 것
- 어떤 엔티티든 사용할 수 있도록, 확장 가능한 구조로 만들기
vector entity
VectorEntityBase
PG Vector에 저장하고 싶은 엔티티는, VectorEntityBase를 상속받아야 한다.
자식 엔티티는 벡터 DB 데이터로서 사용되기 위해 아래 3가지 요소를 정의해야 한다.
content- agent가 참고할 context에 해당
chunk_for_embedding- 임베딩 할 텍스트
content의 부분집합본문 텍스트가 너무 긴 경우, 벡터 DB내에서 유사도 검색을 할 때 정확도가 떨어진다. 이러한 이유로, 본문 텍스트보다는 좀 더 작은 단위로 임베딩 할 청크를 나누고 싶을 수 있다.
chunk_for_embedding은 이런 상황에 사용한다.
chunk_for_embedding으로 지정된 텍스트만 임베딩의 대상이 된다.
content만 agent에게 건네주는 context가 된다.
to_metadata- 메타 데이터 반환 메서드
from abc import abstractmethod
from typing import Any
from model.entity.base import TableBase
from pgvector.sqlalchemy import Vector
from sqlalchemy.orm import Mapped, mapped_column
class VectorEntityBase(TableBase):
__abstract__ = True
embedding_value: Mapped[list[float]] = mapped_column(Vector(1024))
@property
@abstractmethod
def content(self) -> str:
raise NotImplementedError("Subclass must implement content property")
@property
@abstractmethod
def chunk_for_embedding(self) -> str:
raise NotImplementedError(
"Subclass must implement chunk_for_embedding property"
)
@content.setter
@abstractmethod
def content(self, value: str) -> None:
raise NotImplementedError("Subclass must implement content setter")
@chunk_for_embedding.setter
@abstractmethod
def chunk_for_embedding(self, value: str) -> None:
raise NotImplementedError("Subclass must implement chunk_for_embedding setter")
@abstractmethod
def to_metadata(self, score: float) -> dict[str, Any]:
raise NotImplementedError("Subclass must implement metadata property")
vector entity 구현 예시
예를 들어, TempBoard 데이터를 벡터 DB에 넣고 싶은 개발자는 아래와 같이 엔티티를 구현할 수 있다.
from datetime import datetime
from typing import Any
from model.vector_entity.base import VectorEntityBase
from overrides import override
from sqlalchemy import (
Column,
DateTime,
Integer,
PrimaryKeyConstraint,
String,
text,
)
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.types import Text
class TempBoardEntity(VectorEntityBase):
__tablename__ = "temp_board"
__table_args__ = (
PrimaryKeyConstraint("tenant_sid", "user_sid"),
{"schema": "ai"},
)
tenant_sid = Column(String(100), nullable=False)
user_sid = Column(String(100), nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False)
content_chunk: Mapped[str] = mapped_column(Text, nullable=False)
write_dtm = Column(DateTime, default=datetime.now)
update_dtm = Column(DateTime, default=datetime.now, onupdate=datetime.now)
@property
@override
def content(self) -> str:
return self.content
@property
@override
def chunk_for_embedding(self) -> str:
return self.content_chunk
@content.setter
@override
def content(self, value: str) -> None:
self.content = value
@chunk_for_embedding.setter
@override
def chunk_for_embedding(self, value: str) -> None:
self.content_chunk = value
@override
def to_metadata(self, score: float) -> dict[str, Any]:
return {
"tenant_sid": self.tenant_sid,
"user_sid": self.user_sid,
"content": self.content,
"write_dtm": self.write_dtm.isoformat(),
"update_dtm": self.update_dtm.isoformat(),
"score": score,
}
vector store
PGVectorStore
LangChain의 VectorStore와 대응되는 PGVector 클라이언트 클래스로, 2가지 기능을 구현하였다.
- 문서 임베딩 후 저장
- query를 바탕으로 문서 유사도 검색
from typing import Self, Sequence, Type, TypeVar
from core.persist.transaction import Tx
from core.types.type import Document
from model.vector_entity.base import VectorEntityBase
from langchain_core.embeddings import Embeddings
from sqlalchemy import ColumnElement, desc, select
T = TypeVar("T", bound=VectorEntityBase)
class PGVectorStore:
def __init__(
self,
embedding_model: Embeddings,
):
self._embedding_model = embedding_model
async def add_documents(self, tx: Tx, documents: Sequence[T]) -> None:
embeddings = self._embedding_model.embed_documents(
[doc.chunk_for_embedding for doc in documents]
)
for doc, embedding in zip(documents, embeddings):
doc.embedding_value = embedding
async for _tx in tx.require():
session = _tx.get_session()
session.add_all(documents)
async def similarity_search_with_score(
self,
tx: Tx,
query: str,
k: int,
threshold: float,
entity_class: Type[T],
additional_where_clause: list[ColumnElement[bool]] = [],
) -> list[Document]:
query_embedding = self._embedding_model.embed_query(query)
similarity_column = (
1 - entity_class.embedding_value.cosine_distance(query_embedding)
).label("similarity")
result = []
async for _tx in tx.require():
stmt = (
select(
entity_class,
similarity_column,
)
.where(similarity_column > threshold)
.where(*additional_where_clause)
.order_by(desc(similarity_column))
.limit(k)
)
query_result = (await _tx.get_session().execute(stmt)).all()
result = [
Document(
content=row._mapping[entity_class.__name__].content,
metadata=row._mapping[entity_class.__name__].to_metadata(
row.similarity
),
)
for row in query_result
]
return result
문서 저장
VectorEntityBase를 구현한 엔티티의 컬렉션만 저장할 수 있도록 하였다.
엔티티만 저장할 수 있다는 제약을 통해, RDB 방식으로 PG Vector를 사용해야 한다는 인상을 주고 싶었다.
문서 유사도 검색
where절을 optional 파라미터로 받음으로써 유사도 검색이후 데이터를 추가적으로 필터링 할 수 있도록 구현하였다. 유사도 검색의 반환값은 Document 타입으로 통일되어 있다.
class Document(TypedDict):
content: str
metadata: Any
Document의 content엔 agent에게 넘길 context가 들어있고, metadata엔 엔티티에서 구현한 to_metadata 반환값이 들어간다.
어떤 엔티티를 바탕으로 조회하던, 같은 타입으로 문서를 반환하도록 하여 클라이언트 코드가 조회한 문서를 일관적으로 사용할 수 있도록 하였다.
문서를 검색하는 클라이언트 코드는 아래와 같이 작성할 수 있다.
async def similarity_search_by(
vector_store: PGVectorStore,
tx: Tx,
query: str,
k: int,
threshold: float,
tenant_sid: str,
user_sid: str
) -> list[Document]:
additional_where_clause = [
TempBoardEntity.tenant_sid == tenant_sid,
TempBoardEntity.user_sid == user_sid,
]
return await vector_store.similarity_search_with_score(
tx,
query,
k,
threshold,
TempBoardEntity,
additional_where_cl링use,
)
유사도 검색 후 tenant code와 user ID로 데이터 필터링
문서 삭제
PGVector는 기본적으로 RDB이기 때문에 삭제 로직은 쿼리를 바탕으로 다양한 방식으로 이뤄질 수 있다. 따라서 베이스 클래스인 PGVectorStore에선 삭제를 구현하지 않았다.
쿼리를 바탕으로 한 다양한 단건 삭제, 다건 삭제 로직은 PGVectorStore를 상속한 자식 클래스에서 구현해야 한다.