Source code for matrixone.sqlalchemy_ext.vector_index

# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Vector index support for SQLAlchemy integration with MatrixOne.
"""

from typing import List, Optional, Union

from sqlalchemy import Column, Index, text
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.schema import DDLElement
from sqlalchemy.sql.ddl import CreateIndex as SQLAlchemyCreateIndex


def _exec_sql_safe(connection, sql: str):
    """
    Execute SQL safely, bypassing SQLAlchemy's bind parameter parsing.

    This prevents JSON strings like {"a":1} from being incorrectly parsed as :1 bind params.
    Uses exec_driver_sql() when available, falls back to text() for testing/compatibility.
    """
    if hasattr(connection, 'exec_driver_sql'):
        # Escape % to %% for pymysql's format string handling
        escaped_sql = sql.replace('%', '%%')
        return connection.exec_driver_sql(escaped_sql)
    else:
        # Fallback for testing or older SQLAlchemy versions
        return connection.execute(text(sql))


class VectorIndexType:
    """Enum-like class for vector index types."""

    IVFFLAT = "ivfflat"
    HNSW = "hnsw"  # Future support


class VectorOpType:
    """Enum-like class for vector operation types."""

    VECTOR_L2_OPS = "vector_l2_ops"
    VECTOR_IP_OPS = "vector_ip_ops"
    VECTOR_COSINE_OPS = "vector_cosine_ops"


class IVFVectorIndex(Index):
    """
    SQLAlchemy Index for IVFFLAT vector columns with MatrixOne-specific syntax.

    Specialized class for IVFFLAT vector indexes with type safety and clear API.

    Usage Examples

    1. Class Methods (Recommended for one-time operations):

        # Create index using class method
        success = IVFVectorIndex.create_index(
            engine=engine,
            table_name='my_table',
            name='idx_embedding',
            column='embedding',
            lists=100,
            op_type=VectorOpType.VECTOR_L2_OPS
        )

        # Drop index using class method
        success = IVFVectorIndex.drop_index(
            engine=engine,
            table_name='my_table',
            name='idx_embedding'
        )


    2. Instance Methods (Useful for reusable index configurations):

        # Create index object
        index = IVFVectorIndex('idx_embedding', 'embedding', lists=100)

        # Create index using instance method
        success = index.create(engine, 'my_table')

        # Drop index using instance method
        success = index.drop(engine, 'my_table')

    3. SQLAlchemy ORM Integration:

        # In table definition
        class Document(Base):
            __tablename__ = 'documents'
            id = Column(Integer, primary_key=True)
            embedding = create_vector_column(128, "f32")

            # Note: For ORM integration, create table first, then create index separately
            # __table_args__ = (IVFVectorIndex('idx_embedding', 'embedding', lists=100),)

        # Create table first
        Base.metadata.create_all(engine)

        # Then create index separately
        IVFVectorIndex.create_index(engine, 'documents', 'idx_embedding', 'embedding', lists=100)

    4. Client Chain Operations:

        # Using client.vector_index.create_ivf() method
        client.vector_index.create_ivf('my_table', 'idx_embedding', 'embedding', lists=100)

    Parameters:
        name (str): Index name
        column (Union[str, Column]): Vector column to index
        lists (int): Number of lists for IVFFLAT (default: 100)
        op_type (str): Vector operation type (default: vector_l2_ops)
        **kwargs: Additional index parameters

    Note:

        - MatrixOne supports only ONE index per vector column
        - Enable IVF indexing before creating IVFFLAT indexes: SET experimental_ivf_index = 1
        - Set probe limit for search: SET probe_limit = 1
    """

    def __init__(
        self,
        name: str,
        column: Union[str, Column],
        lists: int = 100,
        op_type: str = VectorOpType.VECTOR_L2_OPS,
        **kwargs,
    ):
        """
        Initialize IVFVectorIndex.

        Args::

            name: Index name
            column: Vector column to index
            lists: Number of lists for IVFFLAT (default: 100)
            op_type: Vector operation type (default: vector_l2_ops)
            **kwargs: Additional index parameters
        """
        self.index_type = VectorIndexType.IVFFLAT
        self.lists = lists
        self.op_type = op_type

        # Store column name for later use
        self._column_name = str(column) if not isinstance(column, str) else column

        # Call parent constructor first
        super().__init__(name, column, **kwargs)

        # Set dialect options after initialization to bind to matrixone dialect
        self.dialect_options["matrixone"] = {"length": None, "using": None}
        # Also provide mysql fallback for compatibility
        self.dialect_options["mysql"] = {"length": None, "using": None, "with_parser": None}

    def _create_index_sql(self, table_name: str) -> str:
        """Generate the CREATE INDEX SQL for IVFFLAT vector index."""
        column_name = self._column_name
        sql_parts = [f"CREATE INDEX {self.name} USING {self.index_type} ON {table_name}({column_name})"]
        sql_parts.append(f"LISTS {self.lists}")
        sql_parts.append(f"op_type '{self.op_type}'")
        return " ".join(sql_parts)

    def create_sql(self, table_name: str) -> str:
        """Generate CREATE INDEX SQL for the given table name."""
        return self._create_index_sql(table_name)

    def drop_sql(self, table_name: str) -> str:
        """Generate DROP INDEX SQL for the given table name."""
        return f"DROP INDEX {self.name} ON {table_name}"

    @classmethod
    def create_index(
        cls,
        engine,
        table_name: str,
        name: str,
        column: Union[str, Column],
        lists: int = 100,
        op_type: str = VectorOpType.VECTOR_L2_OPS,
        **kwargs,
    ) -> bool:
        """
        Create an IVFFLAT vector index using ORM-style method.

        Args::

            engine: SQLAlchemy engine
            table_name: Name of the table
            name: Name of the index
            column: Vector column to index
            lists: Number of lists for IVFFLAT (default: 100)
            op_type: Vector operation type (default: vector_l2_ops)
            **kwargs: Additional index parameters

        Returns::

            bool: True if successful, False otherwise
        """
        try:
            index = cls(name, column, lists, op_type, **kwargs)
            sql = index.create_sql(table_name)

            with engine.begin() as conn:
                # Enable IVF indexing
                _exec_sql_safe(conn, "SET experimental_ivf_index = 1")
                _exec_sql_safe(conn, "SET probe_limit = 1")

                # Execute CREATE INDEX
                _exec_sql_safe(conn, sql)

                # Try to log using the connection's info attribute if available
                try:
                    from ..client import logger

                    logger.info(
                        f"✓ | CREATE INDEX {name} USING ivfflat ON {table_name}({column}) "
                        f"LISTS {lists} op_type '{op_type}'"
                    )
                except Exception:
                    pass  # Silently skip logging if not available (for tests)

            return True
        except Exception as e:
            print(f"Failed to create IVFFLAT vector index: {e}")
            return False

    @classmethod
    def drop_index(cls, engine, table_name: str, name: str) -> bool:
        """
        Drop an IVFFLAT vector index using ORM-style method.

        Args::

            engine: SQLAlchemy engine
            table_name: Name of the table
            name: Name of the index to drop

        Returns::

            bool: True if successful, False otherwise
        """
        try:
            sql = f"DROP INDEX {name} ON {table_name}"
            with engine.begin() as conn:
                _exec_sql_safe(conn, sql)
            return True
        except Exception as e:
            print(f"Failed to drop IVFFLAT vector index: {e}")
            return False

    def create(self, engine, table_name: str) -> bool:
        """
        Create this IVFFLAT vector index using ORM-style method.

        Args::

            engine: SQLAlchemy engine
            table_name: Name of the table

        Returns::

            bool: True if successful, False otherwise
        """
        try:
            sql = self.create_sql(table_name)

            with engine.begin() as conn:
                # Enable IVF indexing
                _exec_sql_safe(conn, "SET experimental_ivf_index = 1")
                _exec_sql_safe(conn, "SET probe_limit = 1")
                _exec_sql_safe(conn, sql)
            return True
        except Exception as e:
            print(f"Failed to create IVFFLAT vector index: {e}")
            return False

    def drop(self, engine, table_name: str) -> bool:
        """
        Drop this IVFFLAT vector index using ORM-style method.

        Args::

            engine: SQLAlchemy engine
            table_name: Name of the table

        Returns::

            bool: True if successful, False otherwise
        """
        try:
            sql = self.drop_sql(table_name)
            with engine.begin() as conn:
                _exec_sql_safe(conn, sql)
            return True
        except Exception as e:
            print(f"Failed to drop IVFFLAT vector index: {e}")
            return False


class HnswVectorIndex(Index):
    """
    SQLAlchemy Index for HNSW vector columns with MatrixOne-specific syntax.

    Specialized class for HNSW vector indexes with type safety and clear API.

    Usage Examples

    1. Class Methods (Recommended for one-time operations):

        # Create index using class method
        success = HnswVectorIndex.create_index(
            engine=engine,
            table_name='my_table',
            name='idx_embedding',
            column='embedding',
            m=16,
            ef_construction=200,
            ef_search=50,
            op_type=VectorOpType.VECTOR_L2_OPS
        )

        # Drop index using class method
        success = HnswVectorIndex.drop_index(
            engine=engine,
            table_name='my_table',
            name='idx_embedding'
        )

    2. Instance Methods (Useful for reusable index configurations):

        # Create index object
        index = HnswVectorIndex('idx_embedding', 'embedding', m=16, ef_construction=200, ef_search=50)

        # Create index using instance method
        success = index.create(engine, 'my_table')

        # Drop index using instance method
        success = index.drop(engine, 'my_table')

    3. SQLAlchemy ORM Integration:

        # In table definition (requires BigInteger primary key for HNSW)
        class Document(Base):
            __tablename__ = 'documents'
            id = Column(BigInteger, primary_key=True)  # BigInteger required for HNSW
            embedding = create_vector_column(128, "f32")

            # Note: For ORM integration, create table first, then create index separately
            # __table_args__ = (HnswVectorIndex('idx_embedding', 'embedding', m=16),)

        # Create table first
        Base.metadata.create_all(engine)

        # Then create index separately
        HnswVectorIndex.create_index(engine, 'documents', 'idx_embedding', 'embedding', m=16)

    4. Client Chain Operations:

        # Using client.vector_index.create_hnsw() method
        client.vector_index.create_hnsw('my_table', 'idx_embedding', 'embedding', m=16, ef_construction=200)

    Parameters:
        name (str): Index name
        column (Union[str, Column]): Vector column to index
        m (int): Number of bi-directional links for HNSW (default: 16)
        ef_construction (int): Size of dynamic candidate list for HNSW construction (default: 200)
        ef_search (int): Size of dynamic candidate list for HNSW search (default: 50)
        op_type (str): Vector operation type (default: vector_l2_ops)
        **kwargs: Additional index parameters

    Note:

        - MatrixOne supports only ONE index per vector column
        - Enable HNSW indexing before creating HNSW indexes: SET experimental_hnsw_index = 1
        - HNSW indexes require BigInteger primary key in the table
        - Higher M values provide better recall but slower construction
        - Higher ef_construction provides better index quality but slower construction
        - Higher ef_search provides better recall but slower search
    """

    def __init__(
        self,
        name: str,
        column: Union[str, Column],
        m: int = 16,
        ef_construction: int = 200,
        ef_search: int = 50,
        op_type: str = VectorOpType.VECTOR_L2_OPS,
        **kwargs,
    ):
        """
        Initialize HnswVectorIndex.

        Args::

            name: Index name
            column: Vector column to index
            m: Number of bi-directional links for HNSW (default: 16)
            ef_construction: Size of dynamic candidate list for HNSW construction (default: 200)
            ef_search: Size of dynamic candidate list for HNSW search (default: 50)
            op_type: Vector operation type (default: vector_l2_ops)
            **kwargs: Additional index parameters
        """
        self.index_type = VectorIndexType.HNSW
        self.m = m
        self.ef_construction = ef_construction
        self.ef_search = ef_search
        self.op_type = op_type

        # Store column name for later use
        self._column_name = str(column) if not isinstance(column, str) else column

        # Call parent constructor first
        super().__init__(name, column, **kwargs)

        # Set dialect options after initialization to bind to matrixone dialect
        self.dialect_options["matrixone"] = {"length": None, "using": None}
        # Also provide mysql fallback for compatibility
        self.dialect_options["mysql"] = {"length": None, "using": None, "with_parser": None}

    def _create_index_sql(self, table_name: str) -> str:
        """Generate the CREATE INDEX SQL for HNSW vector index."""
        column_name = self._column_name
        sql_parts = [f"CREATE INDEX {self.name} USING {self.index_type} ON {table_name}({column_name})"]
        sql_parts.append(f"M {self.m}")
        sql_parts.append(f"EF_CONSTRUCTION {self.ef_construction}")
        sql_parts.append(f"EF_SEARCH {self.ef_search}")
        sql_parts.append(f"op_type '{self.op_type}'")
        return " ".join(sql_parts)

    def create_sql(self, table_name: str) -> str:
        """Generate CREATE INDEX SQL for the given table name."""
        return self._create_index_sql(table_name)

    def drop_sql(self, table_name: str) -> str:
        """Generate DROP INDEX SQL for the given table name."""
        return f"DROP INDEX {self.name} ON {table_name}"

    @classmethod
    def create_index(
        cls,
        engine,
        table_name: str,
        name: str,
        column: Union[str, Column],
        m: int = 16,
        ef_construction: int = 200,
        ef_search: int = 50,
        op_type: str = VectorOpType.VECTOR_L2_OPS,
        **kwargs,
    ) -> bool:
        """
        Create an HNSW vector index using ORM-style method.

        Args::

            engine: SQLAlchemy engine
            table_name: Name of the table
            name: Name of the index
            column: Vector column to index
            m: Number of bi-directional links for HNSW (default: 16)
            ef_construction: Size of dynamic candidate list for HNSW construction (default: 200)
            ef_search: Size of dynamic candidate list for HNSW search (default: 50)
            op_type: Vector operation type (default: vector_l2_ops)
            **kwargs: Additional index parameters

        Returns::

            bool: True if successful, False otherwise
        """
        try:
            index = cls(name, column, m, ef_construction, ef_search, op_type, **kwargs)
            sql = index.create_sql(table_name)

            with engine.begin() as conn:
                # Enable HNSW indexing
                _exec_sql_safe(conn, "SET experimental_hnsw_index = 1")

                # Execute CREATE INDEX
                _exec_sql_safe(conn, sql)

                # Try to log using the connection's info attribute if available
                try:
                    from ..client import logger

                    logger.info(
                        f"✓ | CREATE INDEX {name} USING hnsw ON {table_name}({column}) "
                        f"M {m} EF_CONSTRUCTION {ef_construction} EF_SEARCH {ef_search} op_type '{op_type}'"
                    )
                except Exception:
                    pass  # Silently skip logging if not available (for tests)

            return True
        except Exception as e:
            print(f"Failed to create HNSW vector index: {e}")
            return False

    @classmethod
    def drop_index(cls, engine, table_name: str, name: str) -> bool:
        """
        Drop an HNSW vector index using ORM-style method.

        Args::

            engine: SQLAlchemy engine
            table_name: Name of the table
            name: Name of the index to drop

        Returns::

            bool: True if successful, False otherwise
        """
        try:
            sql = f"DROP INDEX {name} ON {table_name}"
            with engine.begin() as conn:
                _exec_sql_safe(conn, sql)
            return True
        except Exception as e:
            print(f"Failed to drop HNSW vector index: {e}")
            return False

    def create(self, engine, table_name: str) -> bool:
        """
        Create this HNSW vector index using ORM-style method.

        Args::

            engine: SQLAlchemy engine
            table_name: Name of the table

        Returns::

            bool: True if successful, False otherwise
        """
        try:
            sql = self.create_sql(table_name)

            with engine.begin() as conn:
                # Enable HNSW indexing
                _exec_sql_safe(conn, "SET experimental_hnsw_index = 1")
                _exec_sql_safe(conn, sql)
            return True
        except Exception as e:
            print(f"Failed to create HNSW vector index: {e}")
            return False

    def drop(self, engine, table_name: str) -> bool:
        """
        Drop this HNSW vector index using ORM-style method.

        Args::

            engine: SQLAlchemy engine
            table_name: Name of the table

        Returns::

            bool: True if successful, False otherwise
        """
        try:
            sql = self.drop_sql(table_name)
            with engine.begin() as conn:
                _exec_sql_safe(conn, sql)
            return True
        except Exception as e:
            print(f"Failed to drop HNSW vector index: {e}")
            return False


[docs] class VectorIndex(Index): """ SQLAlchemy Index for vector columns with MatrixOne-specific syntax. This class provides a generic interface for creating vector indexes with various algorithms and operation types. It supports both IVF (Inverted File) and HNSW (Hierarchical Navigable Small World) indexing algorithms. Key Features: - Support for multiple vector indexing algorithms (IVF, HNSW) - Configurable operation types (L2 distance, cosine similarity, inner product) - Automatic SQL generation for index creation and management - Integration with MatrixOne's vector search capabilities - Support for both class methods and instance methods Supported Index Types: - IVF (Inverted File): Good for large datasets, requires training - HNSW: Good for high-dimensional vectors, no training required Supported Operation Types: - VECTOR_L2_OPS: L2 (Euclidean) distance - VECTOR_COSINE_OPS: Cosine similarity - VECTOR_INNER_PRODUCT_OPS: Inner product similarity Usage Examples:: # Create IVF index index = VectorIndex( name='vec_idx_ivf', column='embedding', index_type=VectorIndexType.IVFFLAT, lists=100, op_type=VectorOpType.VECTOR_L2_OPS ) # Create HNSW index index = VectorIndex( name='vec_idx_hnsw', column='embedding', index_type=VectorIndexType.HNSW, m=16, ef_construction=200, op_type=VectorOpType.VECTOR_COSINE_OPS ) Note: This is the legacy generic class. For better type safety and specific algorithm features, consider using IVFVectorIndex or HnswVectorIndex instead. """
[docs] def __init__( self, name: str, column: Union[str, Column], index_type: str = VectorIndexType.IVFFLAT, lists: Optional[int] = None, op_type: str = VectorOpType.VECTOR_L2_OPS, # HNSW parameters m: Optional[int] = None, ef_construction: Optional[int] = None, ef_search: Optional[int] = None, **kwargs, ): """ Initialize VectorIndex. Args:: name: Index name column: Vector column to index index_type: Type of vector index (ivfflat, hnsw, etc.) lists: Number of lists for IVFFLAT (optional) op_type: Vector operation type m: Number of bi-directional links for HNSW (optional) ef_construction: Size of dynamic candidate list for HNSW construction (optional) ef_search: Size of dynamic candidate list for HNSW search (optional) **kwargs: Additional index parameters """ self.index_type = index_type self.lists = lists self.op_type = op_type # HNSW parameters self.m = m self.ef_construction = ef_construction self.ef_search = ef_search # Store column name for later use self._column_name = str(column) if not isinstance(column, str) else column # Call parent constructor first super().__init__(name, column, **kwargs) # Set dialect options after initialization to bind to matrixone dialect self.dialect_options["matrixone"] = {"length": None, "using": None} # Also provide mysql fallback for compatibility self.dialect_options["mysql"] = {"length": None, "using": None, "with_parser": None}
def _create_index_sql(self, table_name: str) -> str: """Generate the CREATE INDEX SQL for vector index.""" # For simplicity, we'll use the column name passed during initialization # This should be stored as a string in most cases column_name = self._column_name sql_parts = [f"CREATE INDEX {self.name} USING {self.index_type} ON {table_name}({column_name})"] # Add parameters based on index type if self.index_type == VectorIndexType.IVFFLAT and self.lists is not None: sql_parts.append(f"lists = {self.lists}") elif self.index_type == VectorIndexType.HNSW: # Add HNSW parameters if self.m is not None: sql_parts.append(f"M {self.m}") if self.ef_construction is not None: sql_parts.append(f"EF_CONSTRUCTION {self.ef_construction}") if self.ef_search is not None: sql_parts.append(f"EF_SEARCH {self.ef_search}") # Add operation type sql_parts.append(f"op_type '{self.op_type}'") return " ".join(sql_parts)
[docs] def create_sql(self, table_name: str) -> str: """Generate CREATE INDEX SQL for the given table name.""" return self._create_index_sql(table_name)
[docs] def drop_sql(self, table_name: str) -> str: """Generate DROP INDEX SQL for the given table name.""" return f"DROP INDEX {self.name} ON {table_name}"
[docs] @classmethod def create_index( cls, engine, table_name: str, name: str, column: Union[str, Column], index_type: str = VectorIndexType.IVFFLAT, lists: Optional[int] = None, op_type: str = VectorOpType.VECTOR_L2_OPS, # HNSW parameters m: Optional[int] = None, ef_construction: Optional[int] = None, ef_search: Optional[int] = None, **kwargs, ) -> bool: """ Create a vector index using ORM-style method. Args:: engine: SQLAlchemy engine table_name: Name of the table name: Name of the index column: Vector column to index index_type: Type of vector index (ivfflat, hnsw, etc.) lists: Number of lists for IVFFLAT (optional) op_type: Vector operation type m: Number of bi-directional links for HNSW (optional) ef_construction: Size of dynamic candidate list for HNSW construction (optional) ef_search: Size of dynamic candidate list for HNSW search (optional) **kwargs: Additional index parameters Returns:: bool: True if successful, False otherwise """ try: index = cls(name, column, index_type, lists, op_type, m, ef_construction, ef_search, **kwargs) sql = index.create_sql(table_name) with engine.begin() as conn: # Enable appropriate indexing in the same connection if index_type == VectorIndexType.IVFFLAT: _exec_sql_safe(conn, "SET experimental_ivf_index = 1") _exec_sql_safe(conn, "SET probe_limit = 1") elif index_type == VectorIndexType.HNSW: _exec_sql_safe(conn, "SET experimental_hnsw_index = 1") _exec_sql_safe(conn, sql) return True except Exception as e: print(f"Failed to create vector index: {e}") return False
[docs] @classmethod def drop_index(cls, engine, table_name: str, name: str) -> bool: """ Drop a vector index using ORM-style method. Args:: engine: SQLAlchemy engine table_name: Name of the table name: Name of the index to drop Returns:: bool: True if successful, False otherwise """ try: sql = f"DROP INDEX {name} ON {table_name}" with engine.begin() as conn: _exec_sql_safe(conn, sql) return True except Exception as e: print(f"Failed to drop vector index: {e}") return False
[docs] def create(self, engine, table_name: str) -> bool: """ Create this vector index using ORM-style method. Args:: engine: SQLAlchemy engine table_name: Name of the table Returns:: bool: True if successful, False otherwise """ return self.__class__.create_index( engine, table_name, self.name, self._column_name, self.index_type, self.lists, self.op_type, self.m, self.ef_construction, self.ef_search, )
[docs] def drop(self, engine, table_name: str) -> bool: """ Drop this vector index using ORM-style method. Args:: engine: SQLAlchemy engine table_name: Name of the table Returns:: bool: True if successful, False otherwise """ return self.__class__.drop_index(engine, table_name, self.name)
class CreateVectorIndex(DDLElement): """DDL element for creating vector indexes.""" def __init__(self, index: VectorIndex, if_not_exists: bool = False): self.index = index self.if_not_exists = if_not_exists @compiles(CreateVectorIndex) def compile_create_vector_index(element: CreateVectorIndex, compiler, **kw): """Compile CREATE VECTOR INDEX statement.""" index = element.index # Use the stored column name column_name = index._column_name sql_parts = ["CREATE INDEX"] if element.if_not_exists: sql_parts.append("IF NOT EXISTS") sql_parts.append(f"{index.name} USING {index.index_type} ON {index.table.name}({column_name})") # Add parameters based on index type if index.index_type == VectorIndexType.IVFFLAT and index.lists is not None: sql_parts.append(f"lists = {index.lists}") elif index.index_type == VectorIndexType.HNSW: # Add HNSW parameters if index.m is not None: sql_parts.append(f"M {index.m}") if index.ef_construction is not None: sql_parts.append(f"EF_CONSTRUCTION {index.ef_construction}") if index.ef_search is not None: sql_parts.append(f"EF_SEARCH {index.ef_search}") # Add operation type sql_parts.append(f"op_type '{index.op_type}'") return " ".join(sql_parts) @compiles(SQLAlchemyCreateIndex, "matrixone") def compile_create_vector_index_matrixone(element: SQLAlchemyCreateIndex, compiler, **kw): """Compile CREATE INDEX for VectorIndex on MatrixOne dialect.""" index = element.element # Check if this is a VectorIndex if isinstance(index, VectorIndex): # Use the stored column name column_name = index._column_name sql_parts = ["CREATE INDEX"] if element.if_not_exists: sql_parts.append("IF NOT EXISTS") sql_parts.append(f"{index.name} USING {index.index_type} ON {index.table.name}({column_name})") # Add parameters based on index type if index.index_type == VectorIndexType.IVFFLAT and index.lists is not None: sql_parts.append(f"lists = {index.lists}") elif index.index_type == VectorIndexType.HNSW: # Add HNSW parameters if index.m is not None: sql_parts.append(f"M {index.m}") if index.ef_construction is not None: sql_parts.append(f"EF_CONSTRUCTION {index.ef_construction}") if index.ef_search is not None: sql_parts.append(f"EF_SEARCH {index.ef_search}") # Add operation type sql_parts.append(f"op_type '{index.op_type}'") return " ".join(sql_parts) else: # Fall back to default compilation return compiler.visit_create_index(element, **kw) @compiles(SQLAlchemyCreateIndex, "mysql") def compile_create_vector_index_mysql(element: SQLAlchemyCreateIndex, compiler, **kw): """Compile CREATE INDEX for VectorIndex on MySQL dialect.""" index = element.element # Check if this is a VectorIndex if isinstance(index, VectorIndex): # Use the stored column name column_name = index._column_name sql_parts = ["CREATE INDEX"] if element.if_not_exists: sql_parts.append("IF NOT EXISTS") sql_parts.append(f"{index.name} USING {index.index_type} ON {index.table.name}({column_name})") # Add parameters based on index type if index.index_type == VectorIndexType.IVFFLAT and index.lists is not None: sql_parts.append(f"lists = {index.lists}") elif index.index_type == VectorIndexType.HNSW: # Add HNSW parameters if index.m is not None: sql_parts.append(f"M {index.m}") if index.ef_construction is not None: sql_parts.append(f"EF_CONSTRUCTION {index.ef_construction}") if index.ef_search is not None: sql_parts.append(f"EF_SEARCH {index.ef_search}") # Add operation type sql_parts.append(f"op_type '{index.op_type}'") return " ".join(sql_parts) else: # Fall back to default MySQL index compilation return compiler.visit_create_index(element, **kw) def create_vector_index( name: str, column: Union[str, Column], index_type: str = VectorIndexType.IVFFLAT, lists: Optional[int] = None, op_type: str = VectorOpType.VECTOR_L2_OPS, # HNSW parameters m: Optional[int] = None, ef_construction: Optional[int] = None, ef_search: Optional[int] = None, **kwargs, ) -> VectorIndex: """ Create a vector index. Args:: name: Index name column: Vector column to index index_type: Type of vector index (ivfflat, hnsw, etc.) lists: Number of lists for IVFFLAT (optional) op_type: Vector operation type m: Number of bi-directional links for HNSW (optional) ef_construction: Size of dynamic candidate list for HNSW construction (optional) ef_search: Size of dynamic candidate list for HNSW search (optional) **kwargs: Additional index parameters Returns:: VectorIndex instance Example # Create IVFFLAT index with 256 lists idx = create_vector_index( "idx_vector_l2", "embedding", index_type="ivfflat", lists=256, op_type="vector_l2_ops" ) # Create HNSW index with custom parameters idx = create_vector_index( "idx_vector_hnsw", "embedding", index_type="hnsw", m=48, ef_construction=64, ef_search=64, op_type="vector_l2_ops" ) """ return VectorIndex( name=name, column=column, index_type=index_type, lists=lists, op_type=op_type, m=m, ef_construction=ef_construction, ef_search=ef_search, **kwargs, ) def create_ivfflat_index( name: str, column: Union[str, Column], lists: int = 256, op_type: str = VectorOpType.VECTOR_L2_OPS, **kwargs, ) -> VectorIndex: """ Create an IVFFLAT vector index. Args:: name: Index name column: Vector column to index lists: Number of lists (default: 256) op_type: Vector operation type (default: vector_l2_ops) **kwargs: Additional index parameters Returns:: VectorIndex instance Example # Create IVFFLAT index with 256 lists for L2 distance idx = create_ivfflat_index("idx_embedding_l2", "embedding", lists=256) # Create IVFFLAT index with 128 lists for cosine similarity idx = create_ivfflat_index( "idx_embedding_cosine", "embedding", lists=128, op_type="vector_cosine_ops" ) """ return create_vector_index( name=name, column=column, index_type=VectorIndexType.IVFFLAT, lists=lists, op_type=op_type, **kwargs, ) def create_hnsw_index( name: str, column: Union[str, Column], m: int = 48, ef_construction: int = 64, ef_search: int = 64, op_type: str = VectorOpType.VECTOR_L2_OPS, **kwargs, ) -> VectorIndex: """ Create an HNSW vector index. Args:: name: Index name column: Vector column to index m: Number of bi-directional links (default: 48) ef_construction: Size of dynamic candidate list for construction (default: 64) ef_search: Size of dynamic candidate list for search (default: 64) op_type: Vector operation type (default: vector_l2_ops) **kwargs: Additional index parameters Returns:: VectorIndex instance Example # Create HNSW index with default parameters idx = create_hnsw_index("idx_embedding_hnsw", "embedding") # Create HNSW index with custom parameters idx = create_hnsw_index( "idx_embedding_hnsw_custom", "embedding", m=32, ef_construction=128, ef_search=128, op_type="vector_cosine_ops" ) """ return create_vector_index( name=name, column=column, index_type=VectorIndexType.HNSW, m=m, ef_construction=ef_construction, ef_search=ef_search, op_type=op_type, **kwargs, ) class VectorIndexBuilder: """ Builder class for creating vector indexes with different configurations. """ def __init__(self, column: Union[str, Column]): """ Initialize VectorIndexBuilder. Args:: column: Vector column to index """ self.column = column self._indexes = [] def ivfflat( self, name: str, lists: int = 256, op_type: str = VectorOpType.VECTOR_L2_OPS, **kwargs ) -> "VectorIndexBuilder": """ Add an IVFFLAT index. Args:: name: Index name lists: Number of lists op_type: Vector operation type **kwargs: Additional parameters Returns:: Self for method chaining """ index = create_ivfflat_index(name, self.column, lists, op_type, **kwargs) self._indexes.append(index) return self def l2_index(self, name: str, lists: int = 256, **kwargs) -> "VectorIndexBuilder": """ Add an L2 distance index. Args:: name: Index name lists: Number of lists for IVFFLAT **kwargs: Additional parameters Returns:: Self for method chaining """ return self.ivfflat(name, lists, VectorOpType.VECTOR_L2_OPS, **kwargs) def cosine_index(self, name: str, lists: int = 256, **kwargs) -> "VectorIndexBuilder": """ Add a cosine similarity index. Args:: name: Index name lists: Number of lists for IVFFLAT **kwargs: Additional parameters Returns:: Self for method chaining """ return self.ivfflat(name, lists, VectorOpType.VECTOR_COSINE_OPS, **kwargs) def ip_index(self, name: str, lists: int = 256, **kwargs) -> "VectorIndexBuilder": """ Add an inner product index. Args:: name: Index name lists: Number of lists for IVFFLAT **kwargs: Additional parameters Returns:: Self for method chaining """ return self.ivfflat(name, lists, VectorOpType.VECTOR_IP_OPS, **kwargs) def hnsw( self, name: str, m: int = 48, ef_construction: int = 64, ef_search: int = 64, op_type: str = VectorOpType.VECTOR_L2_OPS, **kwargs, ) -> "VectorIndexBuilder": """ Add an HNSW index. Args:: name: Index name m: Number of bi-directional links ef_construction: Size of dynamic candidate list for construction ef_search: Size of dynamic candidate list for search op_type: Vector operation type **kwargs: Additional parameters Returns:: Self for method chaining """ index = create_hnsw_index(name, self.column, m, ef_construction, ef_search, op_type, **kwargs) self._indexes.append(index) return self def hnsw_l2_index( self, name: str, m: int = 48, ef_construction: int = 64, ef_search: int = 64, **kwargs, ) -> "VectorIndexBuilder": """ Add an HNSW L2 distance index. Args:: name: Index name m: Number of bi-directional links ef_construction: Size of dynamic candidate list for construction ef_search: Size of dynamic candidate list for search **kwargs: Additional parameters Returns:: Self for method chaining """ return self.hnsw(name, m, ef_construction, ef_search, VectorOpType.VECTOR_L2_OPS, **kwargs) def hnsw_cosine_index( self, name: str, m: int = 48, ef_construction: int = 64, ef_search: int = 64, **kwargs, ) -> "VectorIndexBuilder": """ Add an HNSW cosine similarity index. Args:: name: Index name m: Number of bi-directional links ef_construction: Size of dynamic candidate list for construction ef_search: Size of dynamic candidate list for search **kwargs: Additional parameters Returns:: Self for method chaining """ return self.hnsw(name, m, ef_construction, ef_search, VectorOpType.VECTOR_COSINE_OPS, **kwargs) def hnsw_ip_index( self, name: str, m: int = 48, ef_construction: int = 64, ef_search: int = 64, **kwargs, ) -> "VectorIndexBuilder": """ Add an HNSW inner product index. Args:: name: Index name m: Number of bi-directional links ef_construction: Size of dynamic candidate list for construction ef_search: Size of dynamic candidate list for search **kwargs: Additional parameters Returns:: Self for method chaining """ return self.hnsw(name, m, ef_construction, ef_search, VectorOpType.VECTOR_IP_OPS, **kwargs) def build(self) -> List[VectorIndex]: """ Build and return the list of vector indexes. Returns:: List of VectorIndex instances """ return self._indexes.copy() def add_to_table(self, table) -> "VectorIndexBuilder": """ Add indexes to a table. Args:: table: SQLAlchemy Table instance Returns:: Self for method chaining """ for index in self._indexes: index.table = table table.indexes.add(index) return self def vector_index_builder(column: Union[str, Column]) -> VectorIndexBuilder: """ Create a VectorIndexBuilder for a column. Args:: column: Vector column to index Returns:: VectorIndexBuilder instance Example # Create multiple indexes for a vector column indexes = vector_index_builder("embedding") \ .l2_index("idx_l2", lists=256) \ .cosine_index("idx_cosine", lists=128) \ .build() """ return VectorIndexBuilder(column)