# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Optimized Vector Manager for MatrixOne Vector Operations.
This module provides VectorManager and AsyncVectorManager classes with
shared SQL generation logic to eliminate code duplication.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, List, Union
if TYPE_CHECKING:
from .sqlalchemy_ext import VectorOpType
from .ivf_rank import IVFRankMode
from ._utils import get_table_name as _extract_table_name
from .exceptions import QueryError
class _VectorManagerBase:
"""
Base class with shared SQL generation logic.
All SQL building logic is centralized here, eliminating duplication
between sync and async implementations.
"""
@staticmethod
def _build_create_ivf_sql(table_name: str, name: str, column: str, lists: int, op_type: "VectorOpType") -> tuple:
"""Build SQL for creating IVF index."""
from .sqlalchemy_ext import IVFVectorIndex, VectorOpType
if op_type is None:
op_type = VectorOpType.VECTOR_L2_OPS
index = IVFVectorIndex(name, column, lists, op_type)
sql = index.create_sql(table_name)
setup_sqls = ["SET experimental_ivf_index = 1", "SET probe_limit = 1"]
return setup_sqls, sql
@staticmethod
def _build_create_hnsw_sql(
table_name: str, name: str, column: str, m: int, ef_construction: int, ef_search: int, op_type: "VectorOpType"
) -> str:
"""Build SQL for creating HNSW index."""
from .sqlalchemy_ext import HnswVectorIndex, VectorOpType
if op_type is None:
op_type = VectorOpType.VECTOR_L2_OPS
index = HnswVectorIndex(name, column, m=m, ef_construction=ef_construction, ef_search=ef_search, op_type=op_type)
return index.create_sql(table_name)
@staticmethod
def _build_drop_sql(table_name: str, name: str) -> str:
"""Build SQL for dropping index."""
return f"DROP INDEX {name} ON {table_name}"
@staticmethod
def _build_enable_ivf_sql(probe_limit: int) -> list:
"""Build SQL for enabling IVF."""
return ["SET experimental_ivf_index = 1", f"SET probe_limit = {probe_limit}"]
@staticmethod
def _build_disable_ivf_sql() -> str:
"""Build SQL for disabling IVF."""
return "SET experimental_ivf_index = 0"
@staticmethod
def _build_enable_hnsw_sql() -> str:
"""Build SQL for enabling HNSW."""
return "SET experimental_hnsw_index = 1"
@staticmethod
def _build_disable_hnsw_sql() -> str:
"""Build SQL for disabling HNSW."""
return "SET experimental_hnsw_index = 0"
@staticmethod
def _build_insert_sql(table_name: str, data: Dict[str, Any]) -> str:
"""Build SQL for inserting vector data."""
columns = ", ".join(data.keys())
values = []
for v in data.values():
if isinstance(v, list):
vector_str = "[" + ",".join(str(x) for x in v) + "]"
values.append(f"'{vector_str}'")
elif isinstance(v, str):
values.append(f"'{v}'")
else:
values.append(str(v))
values_str = ", ".join(values)
return f"INSERT INTO {table_name} ({columns}) VALUES ({values_str})"
@staticmethod
def _build_similarity_search_sql(
table_name: str,
vector_column: str,
query_vector: List[float],
limit: int,
select_columns: List[str],
where_clause: str,
distance_type: str,
) -> str:
"""Build SQL for similarity search."""
columns = ", ".join(select_columns) if select_columns else "*"
vector_str = "[" + ",".join(str(v) for v in query_vector) + "]"
# Select distance function
if distance_type == "l2":
distance_func = "l2_distance"
elif distance_type == "cosine":
distance_func = "cosine_distance"
elif distance_type == "inner_product":
distance_func = "inner_product"
else:
distance_func = "l2_distance"
sql = f"SELECT {columns}, {distance_func}({vector_column}, '{vector_str}') as distance FROM {table_name}"
if where_clause:
sql += f" WHERE {where_clause}"
sql += f" ORDER BY distance LIMIT {limit}"
return sql
@staticmethod
def _build_range_search_sql(
table_name: str,
vector_column: str,
query_vector: List[float],
max_distance: float,
select_columns: List[str],
where_clause: str,
) -> str:
"""Build SQL for range search."""
columns = ", ".join(select_columns) if select_columns else "*"
vector_str = "[" + ",".join(str(v) for v in query_vector) + "]"
sql = f"SELECT {columns}, l2_distance({vector_column}, '{vector_str}') as distance FROM {table_name}"
distance_condition = f"l2_distance({vector_column}, '{vector_str}') <= {max_distance}"
if where_clause:
sql += f" WHERE {where_clause} AND {distance_condition}"
else:
sql += f" WHERE {distance_condition}"
sql += " ORDER BY distance"
return sql
@staticmethod
def _build_column_inference_sql(table_name: str, database: str) -> str:
"""Build SQL for inferring vector column name."""
return (
f"SELECT column_name, data_type "
f"FROM information_schema.columns "
f"WHERE table_schema = '{database}' "
f"AND table_name = '{table_name}' "
f"AND (data_type LIKE '%VEC%' OR data_type LIKE '%vec%')"
)
@staticmethod
def _build_index_tables_sql(table_name: str, column_name: str, database: str) -> str:
"""Build SQL for getting IVF index table names."""
return (
f"SELECT i.algo_table_type, i.index_table_name "
f"FROM `mo_catalog`.`mo_indexes` AS i "
f"JOIN `mo_catalog`.`mo_tables` AS t ON i.table_id = t.rel_id "
f"AND i.column_name = '{column_name}' "
f"AND t.relname = '{table_name}' "
f"AND t.reldatabase = '{database}' "
f"AND i.algo='ivfflat'"
)
@staticmethod
def _build_distribution_sql(database: str, entries_table: str) -> str:
"""Build SQL for getting bucket distribution."""
return (
f"SELECT "
f" COUNT(*) AS centroid_count, "
f" __mo_index_centroid_fk_id AS centroid_id, "
f" __mo_index_centroid_fk_version AS centroid_version "
f"FROM `{database}`.`{entries_table}` "
f"GROUP BY `__mo_index_centroid_fk_id`, `__mo_index_centroid_fk_version`"
)
[docs]
class VectorManager(_VectorManagerBase):
"""
Synchronous vector manager for MatrixOne vector operations.
Supports two usage modes:
1. Client mode: Each operation is an independent auto-commit transaction
2. Session mode: Operations execute within the session's transaction context
"""
[docs]
def __init__(self, client, executor=None):
"""Initialize VectorManager."""
self.client = client
self.executor = executor if executor is not None else client
[docs]
def create_ivf(
self,
table_name: Union[str, type],
name: str,
column: str,
lists: int = 100,
op_type: "VectorOpType" = None,
) -> "VectorManager":
"""Create an IVFFLAT vector index."""
try:
table_name = _extract_table_name(table_name)
setup_sqls, create_sql = self._build_create_ivf_sql(table_name, name, column, lists, op_type)
for sql in setup_sqls:
self.executor.execute(sql)
self.executor.execute(create_sql)
return self
except Exception as e:
raise QueryError(f"Failed to create IVFFLAT vector index {name} on table {table_name}: {e}")
[docs]
def create_hnsw(
self,
table_name: Union[str, type],
name: str,
column: str,
m: int = 16,
ef_construction: int = 200,
ef_search: int = 50,
op_type: "VectorOpType" = None,
) -> "VectorManager":
"""Create an HNSW vector index."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_create_hnsw_sql(table_name, name, column, m, ef_construction, ef_search, op_type)
self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to create HNSW vector index {name} on table {table_name}: {e}")
[docs]
def drop(self, table_name: Union[str, type], name: str) -> "VectorManager":
"""Drop a vector index."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_drop_sql(table_name, name)
self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to drop vector index {name} on table {table_name}: {e}")
[docs]
def enable_ivf(self, probe_limit: int = 1) -> "VectorManager":
"""Enable IVF indexing."""
try:
sqls = self._build_enable_ivf_sql(probe_limit)
for sql in sqls:
self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"IVF indexing is not supported: {e}")
[docs]
def disable_ivf(self) -> "VectorManager":
"""Disable IVF indexing."""
try:
sql = self._build_disable_ivf_sql()
self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to disable IVF indexing: {e}")
[docs]
def enable_hnsw(self) -> "VectorManager":
"""Enable HNSW indexing."""
try:
sql = self._build_enable_hnsw_sql()
self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to enable HNSW indexing: {e}")
[docs]
def disable_hnsw(self) -> "VectorManager":
"""Disable HNSW indexing."""
try:
sql = self._build_disable_hnsw_sql()
self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to disable HNSW indexing: {e}")
[docs]
def insert(self, table_name: Union[str, type], data: Dict[str, Any]) -> "VectorManager":
"""Insert vector data into table."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_insert_sql(table_name, data)
self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to insert vector data into table {table_name}: {e}")
[docs]
def batch_insert(self, table_name: Union[str, type], data_list: List[Dict[str, Any]]) -> "VectorManager":
"""Batch insert vector data."""
try:
for record in data_list:
self.insert(table_name, record)
return self
except Exception as e:
raise QueryError(f"Failed to batch insert vector data into table {table_name}: {e}")
[docs]
def similarity_search(
self,
table_name: Union[str, type],
vector_column: str,
query_vector: List[float],
limit: int = 10,
select_columns: List[str] = None,
where_clause: str = None,
distance_type: str = "l2",
log_mode: str = None,
) -> List[Dict[str, Any]]:
"""Perform vector similarity search."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_similarity_search_sql(
table_name, vector_column, query_vector, limit, select_columns, where_clause, distance_type
)
result = self.executor.execute(sql, log_mode=log_mode)
return [dict(row._mapping) for row in result]
except Exception as e:
raise QueryError(f"Failed to perform similarity search on table {table_name}: {e}")
[docs]
def range_search(
self,
table_name: Union[str, type],
vector_column: str,
query_vector: List[float],
max_distance: float,
select_columns: List[str] = None,
where_clause: str = None,
log_mode: str = None,
) -> List[Dict[str, Any]]:
"""Perform vector range search."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_range_search_sql(
table_name, vector_column, query_vector, max_distance, select_columns, where_clause
)
result = self.executor.execute(sql, log_mode=log_mode)
return [dict(row._mapping) for row in result]
except Exception as e:
raise QueryError(f"Failed to perform range search on table {table_name}: {e}")
[docs]
def search_with_rank(
self,
table_name: Union[str, type],
vector_column: str,
query_vector: List[float],
limit: int = 10,
select_columns: List[str] = None,
where_clause: str = None,
distance_type: str = "l2",
rank_mode: Union[str, "IVFRankMode"] = None,
log_mode: str = None,
) -> List[Dict[str, Any]]:
"""
Perform vector similarity search with IVF LIMIT BY RANK support.
This method executes vector search using MatrixOne's IVF index with
fine-grained control over ranking strategy via the LIMIT BY RANK
WITH OPTION clause.
Args:
table_name: Name of the table (string) or SQLAlchemy Model class.
vector_column: Name of the vector column to search.
query_vector: Query vector as list of floats.
limit: Maximum number of results to return. Defaults to 10.
select_columns: List of columns to select. If None, selects all.
where_clause: Optional WHERE clause for filtering before search.
distance_type: Distance metric - "l2", "cosine", or "inner_product".
Defaults to "l2".
rank_mode: IVF ranking mode - "pre", "post", or "force".
- "pre": Fast approximate search (default)
- "post": Slower but more accurate search
- "force": Force index usage with strict ranking
If None, defaults to "post".
log_mode: Internal logging mode parameter.
Returns:
List of dictionaries containing search results with distance column.
Raises:
Exception: If search fails or invalid parameters provided.
Example:
>>> # Complete runnable example
>>> from matrixone import Client, IVFRankMode
>>> client = Client()
>>> client.connect(database="test")
>>>
>>> # Create table
>>> client.execute('''
... CREATE TABLE IF NOT EXISTS documents (
... id INT PRIMARY KEY,
... title VARCHAR(200),
... embedding VECF32(4)
... )
... ''')
>>>
>>> # Insert data
>>> client.execute("INSERT INTO documents VALUES (1, 'Doc 1', '[0.1,0.2,0.3,0.4]')")
>>> client.execute("INSERT INTO documents VALUES (2, 'Doc 2', '[0.2,0.3,0.4,0.5]')")
>>>
>>> # Create IVF index
>>> client.vector_ops.create_ivf('documents', 'idx_emb', 'embedding', lists=2)
>>>
>>> # Search with rank
>>> results = client.vector_ops.search_with_rank(
... table_name="documents",
... vector_column="embedding",
... query_vector=[0.15, 0.25, 0.35, 0.45],
... limit=2
... )
>>> len(results)
2
>>>
>>> # Cleanup
>>> client.execute("DROP TABLE documents")
>>> client.disconnect()
"""
from .ivf_rank import IVFRankMode
try:
table_name = _extract_table_name(table_name)
# Parse rank mode
if rank_mode is None:
rank_mode = IVFRankMode.POST
elif isinstance(rank_mode, str):
rank_mode = IVFRankMode(rank_mode.lower())
elif not isinstance(rank_mode, IVFRankMode):
raise ValueError(f"rank_mode must be IVFRankMode or string, got {type(rank_mode).__name__}")
# Build base similarity search SQL
columns = ", ".join(select_columns) if select_columns else "*"
vector_str = "[" + ",".join(str(v) for v in query_vector) + "]"
# Select distance function
if distance_type == "l2":
distance_func = "l2_distance"
elif distance_type == "cosine":
distance_func = "cosine_distance"
elif distance_type == "inner_product":
distance_func = "inner_product"
else:
distance_func = "l2_distance"
sql = f"SELECT {columns}, {distance_func}({vector_column}, '{vector_str}') as distance FROM {table_name}"
if where_clause:
sql += f" WHERE {where_clause}"
# Add LIMIT BY RANK WITH OPTION clause
sql += f" ORDER BY distance LIMIT {limit} BY RANK WITH OPTION 'mode={rank_mode.value}'"
result = self.executor.execute(sql, log_mode=log_mode)
return [dict(row._mapping) for row in result]
except Exception as e:
raise QueryError(f"Failed to perform vector search with rank on table {table_name}: {e}") from e
[docs]
def get_ivf_stats(
self,
table_name: Union[str, type],
column_name: str = None,
database: str = None,
) -> Dict[str, Any]:
"""
Get IVF index statistics.
Args:
table_name: Name of the table (string) or SQLAlchemy Model class
column_name: Optional vector column name (auto-inferred if not provided)
database: Optional database name (uses current if not provided)
Returns:
Dictionary containing index statistics and distribution
"""
table_name = _extract_table_name(table_name)
if database is None:
# Try to get database from connection params first
if hasattr(self.client, '_connection_params') and self.client._connection_params:
database = self.client._connection_params.get('database')
if not database:
database = getattr(self.client, 'database', 'test')
# Auto-infer column name if not provided
if not column_name:
schema_sql = self._build_column_inference_sql(table_name, database)
result = self.executor.execute(schema_sql)
vector_columns = result.fetchall()
if not vector_columns:
raise QueryError(f"No vector columns found in table {table_name}")
elif len(vector_columns) == 1:
column_name = vector_columns[0][0]
else:
column_names = [col[0] for col in vector_columns]
raise QueryError(
f"Multiple vector columns found in table {table_name}: {column_names}. "
f"Please specify the column_name parameter."
)
# Get IVF index table names
index_sql = self._build_index_tables_sql(table_name, column_name, database)
result = self.executor.execute(index_sql)
index_tables = {row[0]: row[1] for row in result}
if not index_tables:
raise QueryError(f"No IVF index found for table {table_name}, column {column_name}")
# Get the entries table name for distribution analysis
entries_table = index_tables.get('entries')
if not entries_table:
raise QueryError("No entries table found in IVF index")
# Get bucket distribution
dist_sql = self._build_distribution_sql(database, entries_table)
result = self.executor.execute(dist_sql)
rows = result.fetchall()
distribution = {
"centroid_count": [row[0] for row in rows],
"centroid_id": [row[1] for row in rows],
"centroid_version": [row[2] for row in rows],
}
return {
'index_tables': index_tables,
'distribution': distribution,
'database': database,
'table_name': table_name,
'column_name': column_name,
}
class AsyncVectorManager(_VectorManagerBase):
"""
Asynchronous vector manager for MatrixOne vector operations.
Supports two usage modes:
1. AsyncClient mode: Each operation is an independent auto-commit transaction
2. AsyncSession mode: Operations execute within the session's transaction context
"""
def __init__(self, client, executor=None):
"""Initialize AsyncVectorManager."""
self.client = client
self.executor = executor if executor is not None else client
async def create_ivf(
self,
table_name: Union[str, type],
name: str,
column: str,
lists: int = 100,
op_type: "VectorOpType" = None,
) -> "AsyncVectorManager":
"""Create an IVFFLAT vector index."""
try:
table_name = _extract_table_name(table_name)
setup_sqls, create_sql = self._build_create_ivf_sql(table_name, name, column, lists, op_type)
for sql in setup_sqls:
await self.executor.execute(sql)
await self.executor.execute(create_sql)
return self
except Exception as e:
raise QueryError(f"Failed to create IVFFLAT vector index {name} on table {table_name}: {e}")
async def create_hnsw(
self,
table_name: Union[str, type],
name: str,
column: str,
m: int = 16,
ef_construction: int = 200,
ef_search: int = 50,
op_type: "VectorOpType" = None,
) -> "AsyncVectorManager":
"""Create an HNSW vector index."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_create_hnsw_sql(table_name, name, column, m, ef_construction, ef_search, op_type)
await self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to create HNSW vector index {name} on table {table_name}: {e}")
async def drop(self, table_name: Union[str, type], name: str) -> "AsyncVectorManager":
"""Drop a vector index."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_drop_sql(table_name, name)
await self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to drop vector index {name} on table {table_name}: {e}")
async def enable_ivf(self, probe_limit: int = 1) -> "AsyncVectorManager":
"""Enable IVF indexing."""
try:
sqls = self._build_enable_ivf_sql(probe_limit)
for sql in sqls:
await self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"IVF indexing is not supported: {e}")
async def disable_ivf(self) -> "AsyncVectorManager":
"""Disable IVF indexing."""
try:
sql = self._build_disable_ivf_sql()
await self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to disable IVF indexing: {e}")
async def enable_hnsw(self) -> "AsyncVectorManager":
"""Enable HNSW indexing."""
try:
sql = self._build_enable_hnsw_sql()
await self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to enable HNSW indexing: {e}")
async def disable_hnsw(self) -> "AsyncVectorManager":
"""Disable HNSW indexing."""
try:
sql = self._build_disable_hnsw_sql()
await self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to disable HNSW indexing: {e}")
async def insert(self, table_name: Union[str, type], data: Dict[str, Any]) -> "AsyncVectorManager":
"""Insert vector data into table."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_insert_sql(table_name, data)
await self.executor.execute(sql)
return self
except Exception as e:
raise QueryError(f"Failed to insert vector data into table {table_name}: {e}")
async def batch_insert(self, table_name: Union[str, type], data_list: List[Dict[str, Any]]) -> "AsyncVectorManager":
"""Batch insert vector data."""
try:
for record in data_list:
await self.insert(table_name, record)
return self
except Exception as e:
raise QueryError(f"Failed to batch insert vector data into table {table_name}: {e}")
async def similarity_search(
self,
table_name: Union[str, type],
vector_column: str,
query_vector: List[float],
limit: int = 10,
select_columns: List[str] = None,
where_clause: str = None,
distance_type: str = "l2",
log_mode: str = None,
) -> List[Dict[str, Any]]:
"""Perform vector similarity search."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_similarity_search_sql(
table_name, vector_column, query_vector, limit, select_columns, where_clause, distance_type
)
result = await self.executor.execute(sql, log_mode=log_mode)
return [dict(row._mapping) for row in result]
except Exception as e:
raise QueryError(f"Failed to perform similarity search on table {table_name}: {e}")
async def range_search(
self,
table_name: Union[str, type],
vector_column: str,
query_vector: List[float],
max_distance: float,
select_columns: List[str] = None,
where_clause: str = None,
log_mode: str = None,
) -> List[Dict[str, Any]]:
"""Perform vector range search."""
try:
table_name = _extract_table_name(table_name)
sql = self._build_range_search_sql(
table_name, vector_column, query_vector, max_distance, select_columns, where_clause
)
result = await self.executor.execute(sql, log_mode=log_mode)
return [dict(row._mapping) for row in result]
except Exception as e:
raise QueryError(f"Failed to perform range search on table {table_name}: {e}")
async def search_with_rank(
self,
table_name: Union[str, type],
vector_column: str,
query_vector: List[float],
limit: int = 10,
select_columns: List[str] = None,
where_clause: str = None,
distance_type: str = "l2",
rank_mode: Union[str, "IVFRankMode"] = None,
log_mode: str = None,
) -> List[Dict[str, Any]]:
"""
Perform vector similarity search with IVF LIMIT BY RANK support (async).
This method executes vector search using MatrixOne's IVF index with
fine-grained control over ranking strategy via the LIMIT BY RANK
WITH OPTION clause.
Args:
table_name: Name of the table (string) or SQLAlchemy Model class.
vector_column: Name of the vector column to search.
query_vector: Query vector as list of floats.
limit: Maximum number of results to return. Defaults to 10.
select_columns: List of columns to select. If None, selects all.
where_clause: Optional WHERE clause for filtering before search.
distance_type: Distance metric - "l2", "cosine", or "inner_product".
Defaults to "l2".
rank_mode: IVF ranking mode - "pre", "post", or "force".
- "pre": Fast approximate search (default)
- "post": Slower but more accurate search
- "force": Force index usage with strict ranking
If None, defaults to "post".
log_mode: Internal logging mode parameter.
Returns:
List of dictionaries containing search results with distance column.
Raises:
Exception: If search fails or invalid parameters provided.
Example:
>>> # Complete runnable example
>>> import asyncio
>>> from matrixone import AsyncClient, IVFRankMode
>>>
>>> async def example():
... client = AsyncClient()
... await client.connect(database="test")
...
... # Create table
... await client.execute('''
... CREATE TABLE IF NOT EXISTS documents (
... id INT PRIMARY KEY,
... title VARCHAR(200),
... embedding VECF32(4)
... )
... ''')
...
... # Insert data
... await client.execute("INSERT INTO documents VALUES (1, 'Doc 1', '[0.1,0.2,0.3,0.4]')")
... await client.execute("INSERT INTO documents VALUES (2, 'Doc 2', '[0.2,0.3,0.4,0.5]')")
...
... # Create IVF index
... await client.vector_ops.create_ivf('documents', 'idx_emb', 'embedding', lists=2)
...
... # Search with rank
... results = await client.vector_ops.search_with_rank(
... table_name="documents",
... vector_column="embedding",
... query_vector=[0.15, 0.25, 0.35, 0.45],
... limit=2
... )
... print(f"Found {len(results)} results")
...
... # Cleanup
... await client.execute("DROP TABLE documents")
... await client.disconnect()
>>>
>>> asyncio.run(example())
"""
from .ivf_rank import IVFRankMode
try:
table_name = _extract_table_name(table_name)
# Parse rank mode
if rank_mode is None:
rank_mode = IVFRankMode.POST
elif isinstance(rank_mode, str):
rank_mode = IVFRankMode(rank_mode.lower())
elif not isinstance(rank_mode, IVFRankMode):
raise ValueError(f"rank_mode must be IVFRankMode or string, got {type(rank_mode).__name__}")
# Build base similarity search SQL
columns = ", ".join(select_columns) if select_columns else "*"
vector_str = "[" + ",".join(str(v) for v in query_vector) + "]"
# Select distance function
if distance_type == "l2":
distance_func = "l2_distance"
elif distance_type == "cosine":
distance_func = "cosine_distance"
elif distance_type == "inner_product":
distance_func = "inner_product"
else:
distance_func = "l2_distance"
sql = f"SELECT {columns}, {distance_func}({vector_column}, '{vector_str}') as distance FROM {table_name}"
if where_clause:
sql += f" WHERE {where_clause}"
# Add LIMIT BY RANK WITH OPTION clause
sql += f" ORDER BY distance LIMIT {limit} BY RANK WITH OPTION 'mode={rank_mode.value}'"
result = await self.executor.execute(sql, log_mode=log_mode)
return [dict(row._mapping) for row in result]
except Exception as e:
raise QueryError(f"Failed to perform vector search with rank on table {table_name}: {e}") from e
async def get_ivf_stats(
self,
table_name: Union[str, type],
column_name: str = None,
database: str = None,
) -> Dict[str, Any]:
"""
Get IVF index statistics.
Args:
table_name: Name of the table (string) or SQLAlchemy Model class
column_name: Optional vector column name (auto-inferred if not provided)
database: Optional database name (uses current if not provided)
Returns:
Dictionary containing index statistics and distribution
"""
table_name = _extract_table_name(table_name)
if database is None:
# Try to get database from connection params first
if hasattr(self.client, '_connection_params') and self.client._connection_params:
database = self.client._connection_params.get('database')
if not database:
database = getattr(self.client, 'database', 'test')
# Auto-infer column name if not provided
if not column_name:
schema_sql = self._build_column_inference_sql(table_name, database)
result = await self.executor.execute(schema_sql)
vector_columns = result.fetchall()
if not vector_columns:
raise QueryError(f"No vector columns found in table {table_name}")
elif len(vector_columns) == 1:
column_name = vector_columns[0][0]
else:
column_names = [col[0] for col in vector_columns]
raise QueryError(
f"Multiple vector columns found in table {table_name}: {column_names}. "
f"Please specify the column_name parameter."
)
# Get IVF index table names
index_sql = self._build_index_tables_sql(table_name, column_name, database)
result = await self.executor.execute(index_sql)
index_tables = {row[0]: row[1] for row in result}
if not index_tables:
raise QueryError(f"No IVF index found for table {table_name}, column {column_name}")
# Get the entries table name for distribution analysis
entries_table = index_tables.get('entries')
if not entries_table:
raise QueryError("No entries table found in IVF index")
# Get bucket distribution
dist_sql = self._build_distribution_sql(database, entries_table)
result = await self.executor.execute(dist_sql)
rows = result.fetchall()
distribution = {
"centroid_count": [row[0] for row in rows],
"centroid_id": [row[1] for row in rows],
"centroid_version": [row[2] for row in rows],
}
return {
'index_tables': index_tables,
'distribution': distribution,
'database': database,
'table_name': table_name,
'column_name': column_name,
}