Source code for matrixone.sqlalchemy_ext.fulltext_index

# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Fulltext index support for SQLAlchemy integration with MatrixOne.
"""

from typing import Any, List, Union

from sqlalchemy import Index, text
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.schema import CreateIndex


def _exec_sql_safe(connection, sql: str):
    """
    Execute SQL safely, bypassing SQLAlchemy's bind parameter parsing.

    This prevents JSON strings like {"a":1} from being incorrectly parsed as :1 bind params.
    Uses exec_driver_sql() when available, falls back to text() for testing/compatibility.
    """
    if hasattr(connection, 'exec_driver_sql'):
        # Escape % to %% for pymysql's format string handling
        escaped_sql = sql.replace('%', '%%')
        return connection.exec_driver_sql(escaped_sql)
    else:
        # Fallback for testing or older SQLAlchemy versions
        return connection.execute(text(sql))


[docs] class FulltextAlgorithmType: """ Enum-like class for fulltext algorithm types. MatrixOne supports two main fulltext relevancy algorithms: Attributes: TF_IDF (str): Term Frequency-Inverse Document Frequency * Traditional information retrieval algorithm * Good for specific use cases with proven reliability * Formula: TF(term) × IDF(term) * Use case: Academic search, technical documentation BM25 (str): Best Matching 25 (Okapi BM25) * Modern probabilistic ranking algorithm * Generally superior to TF-IDF for diverse content * Handles document length normalization better * Use case: General-purpose search, modern applications * Recommended as default for new applications Note: The algorithm is set at runtime via SQL command, not in the index DDL. Examples:: # Set algorithm to BM25 client.execute('SET ft_relevancy_algorithm = "BM25"') # Create index with BM25 reference index = FulltextIndex("ftidx_content", ["title", "content"], algorithm=FulltextAlgorithmType.BM25) # Perform searches with BM25 scoring result = client.query(Article).filter( boolean_match(Article.content).must("search term") ).execute() """ TF_IDF = "TF-IDF" BM25 = "BM25"
[docs] class FulltextParserType: """ Enum-like class for fulltext parser types. MatrixOne supports specialized parsers for different content types. Attributes: JSON (str): Parser for JSON documents * Indexes JSON values (not keys) * Suitable for text/varchar/json columns containing JSON data * Use case: Product details, user profiles, metadata * Example SQL: CREATE FULLTEXT INDEX idx ON table (col) WITH PARSER json NGRAM (str): Parser for Chinese and Asian languages * N-gram based tokenization for languages without word delimiters * Better word segmentation for Chinese, Japanese, Korean, etc. * Use case: Chinese articles, mixed language content * Example SQL: CREATE FULLTEXT INDEX idx ON table (col) WITH PARSER ngram Examples:: # Using JSON parser in ORM class Product(Base): __tablename__ = "products" details = Column(Text) __table_args__ = ( FulltextIndex("ftidx_json", "details", parser=FulltextParserType.JSON), ) # Using NGRAM parser for Chinese content class ChineseArticle(Base): __tablename__ = "chinese_articles" title = Column(String(200)) body = Column(Text) __table_args__ = ( FulltextIndex("ftidx_chinese", ["title", "body"], parser=FulltextParserType.NGRAM), ) # Using parser in create_index method FulltextIndex.create_index( engine, 'products', 'ftidx_json', 'details', parser=FulltextParserType.JSON ) """ JSON = "json" NGRAM = "ngram"
[docs] class FulltextModeType: """Enum-like class for fulltext search modes.""" NATURAL_LANGUAGE = "natural language mode" BOOLEAN = "boolean mode" QUERY_EXPANSION = "query expansion mode"
[docs] class FulltextIndex(Index): """ SQLAlchemy Index for fulltext columns with MatrixOne-specific syntax. Specialized class for fulltext indexes with type safety and clear API. Usage Examples 1. Class Methods (Recommended for one-time operations):: # Create index using class method success = FulltextIndex.create_index( engine=engine, table_name='my_table', name='ftidx_content', columns=['title', 'content'], algorithm=FulltextAlgorithmType.BM25 ) # Drop index using class method success = FulltextIndex.drop_index( engine=engine, table_name='my_table', name='ftidx_content' ) 2. Instance Methods (Useful for reusable index configurations):: # Create index object index = FulltextIndex('ftidx_content', ['title', 'content'], algorithm=FulltextAlgorithmType.BM25) # Create index using instance method success = index.create(engine, 'my_table') # Drop index using instance method success = index.drop(engine, 'my_table') 3. SQLAlchemy ORM Integration:: # In table definition class Document(Base): __tablename__ = 'documents' id = Column(Integer, primary_key=True) title = Column(String) content = Column(Text) # Define fulltext index in table __table_args__ = (FulltextIndex('ftidx_doc', ['title', 'content']),) # Or create index separately FulltextIndex.create_index(engine, 'documents', 'ftidx_doc', ['title', 'content']) 4. Using client.fulltext_index manager (recommended):: # Auto-commit mode client.fulltext_index.create( 'my_table', 'ftidx_content', ['title', 'content'], algorithm=FulltextAlgorithmType.BM25 ) # Transaction context mode with client.session() as session: session.fulltext_index.create( 'my_table', 'ftidx_content', ['title', 'content'] ) session.commit() """
[docs] def __init__( self, name: str, columns: Union[str, List[str]], algorithm: str = FulltextAlgorithmType.TF_IDF, parser: str = None, ): """ Initialize FulltextIndex. Args: name (str): Index name (e.g., 'ftidx_content', 'idx_search') columns (str or list): Column(s) to index * Single column: "content" or ["content"] * Multiple columns: ["title", "content"] algorithm (str): Fulltext algorithm type (stored but not part of DDL) * FulltextAlgorithmType.TF_IDF (default): Traditional TF-IDF * FulltextAlgorithmType.BM25: Modern BM25 ranking * Note: Set via SET ft_relevancy_algorithm at runtime parser (str, optional): Parser type for specialized content * None (default): Standard text parser * FulltextParserType.JSON: Parse JSON documents * FulltextParserType.NGRAM: N-gram for Chinese/Asian languages Examples:: # Basic fulltext index (no parser) index = FulltextIndex("ftidx_content", "content") # Multiple columns with BM25 index = FulltextIndex("ftidx_search", ["title", "content"], algorithm=FulltextAlgorithmType.BM25) # JSON parser for JSON content index = FulltextIndex("ftidx_json", "json_data", parser=FulltextParserType.JSON) # NGRAM parser for Chinese content index = FulltextIndex("ftidx_chinese", ["title", "body"], parser=FulltextParserType.NGRAM) # Combined: Multiple columns with JSON parser index = FulltextIndex("ftidx_multi_json", ["json1", "json2"], parser=FulltextParserType.JSON) """ if isinstance(columns, str): columns = [columns] # Validate columns parameter if not columns: raise ValueError("FulltextIndex requires at least one column") if not all(isinstance(col, str) for col in columns): raise TypeError("All columns must be strings") self.algorithm = algorithm self.parser = parser self._column_names = columns.copy() # Store column names for easy access super().__init__(name, *columns)
[docs] def get_columns(self): """Get column names as a list""" return self._column_names.copy()
def _create_index_sql(self, table_name: str) -> str: """Generate the CREATE INDEX SQL for fulltext index.""" columns_str = ", ".join(self._column_names) sql = f"CREATE FULLTEXT INDEX {self.name} ON {table_name} ({columns_str})" if self.parser: sql += f" WITH PARSER {self.parser}" return sql
[docs] @classmethod def create_index( cls, bind, table_name: str, name: str, columns: Union[str, List[str]], algorithm: str = FulltextAlgorithmType.TF_IDF, parser: str = None, ) -> bool: """ Create a fulltext index using class method. This method creates a fulltext index on specified columns with optional parser support for specialized content types (JSON, Chinese text, etc.). Args: bind: SQLAlchemy connection (from session.connection() or engine.begin()) table_name (str): Target table name (e.g., 'articles', 'documents') name (str): Index name (e.g., 'ftidx_content', 'idx_search') columns (str or list): Column(s) to index * Single: "content" or ["content"] * Multiple: ["title", "content", "summary"] algorithm (str): Algorithm type (stored for reference, not in DDL) * FulltextAlgorithmType.TF_IDF (default) * FulltextAlgorithmType.BM25 * Set via: SET ft_relevancy_algorithm = "BM25" parser (str, optional): Parser type for specialized content * None (default): Standard parser * FulltextParserType.JSON: For JSON documents * FulltextParserType.NGRAM: For Chinese/Asian languages Returns: bool: True if succeeded, False otherwise Examples:: # With session connection (recommended) with client.session() as session: conn = session.connection() FulltextIndex.create_index( conn, 'articles', 'ftidx_content', 'content' ) session.commit() # With async session connection async with async_client.session() as session: conn = await session.connection() await conn.run_sync( lambda sync_conn: FulltextIndex.create_index( sync_conn, 'articles', 'ftidx_content', 'content' ) ) await session.commit() """ try: if isinstance(columns, str): columns = [columns] columns_str = ", ".join(columns) sql = f"CREATE FULLTEXT INDEX {name} ON {table_name} ({columns_str})" if parser: sql += f" WITH PARSER {parser}" # Directly use the provided connection _exec_sql_safe(bind, sql) return True except Exception as e: print(f"Failed to create fulltext index: {e}") return False
[docs] @classmethod def drop_index(cls, bind, table_name: str, name: str) -> bool: """ Drop a fulltext index using ORM-style method. Args: bind: SQLAlchemy connection (from session.connection() or engine.begin()) table_name: Target table name name: Index name Returns: bool: True if successful, False otherwise """ try: sql = f"DROP INDEX {name} ON {table_name}" # Directly use the provided connection _exec_sql_safe(bind, sql) return True except Exception as e: print(f"Failed to drop fulltext index: {e}") return False
[docs] def create(self, engine, table_name: str) -> bool: """ Create this fulltext index using ORM-style method. Args: engine: SQLAlchemy engine table_name: Target table name Returns: bool: True if successful, False otherwise """ try: sql = self._create_index_sql(table_name) with engine.begin() as conn: _exec_sql_safe(conn, sql) return True except Exception as e: print(f"Failed to create fulltext index: {e}") return False
[docs] def drop(self, engine, table_name: str) -> bool: """ Drop this fulltext index using ORM-style method. Args: engine: SQLAlchemy engine table_name: Target table name Returns: bool: True if successful, False otherwise """ try: sql = f"DROP INDEX {self.name} ON {table_name}" with engine.begin() as conn: _exec_sql_safe(conn, sql) return True except Exception as e: print(f"Failed to drop fulltext index: {e}") return False
[docs] class FulltextSearchBuilder: """ Builder class for fulltext search queries. Provides a fluent interface for building MATCH...AGAINST queries. """
[docs] def __init__(self, table_name: str, columns: Union[str, List[str]]): """ Initialize FulltextSearchBuilder. Args: table_name: Table to search in columns: Column(s) to search in """ self.table_name = table_name if isinstance(columns, str): columns = [columns] self.columns = columns self.search_term = None self.search_mode = FulltextModeType.NATURAL_LANGUAGE self.include_score = False self.where_conditions = [] self.order_clause = None self.limit_value = None self.offset_value = None
@property def with_score(self): """Get the with_score setting for backward compatibility""" return self.include_score @property def mode(self): """Get the search mode for backward compatibility""" return self.search_mode @property def order_by(self): """Get the order by clause for backward compatibility""" return self.order_clause
[docs] def search(self, term: str) -> "FulltextSearchBuilder": """ Set the search term. Args: term: Search term Returns: FulltextSearchBuilder: Self for chaining """ self.search_term = term return self
[docs] def set_mode(self, mode: str) -> "FulltextSearchBuilder": """ Set the search mode. Args: mode: Search mode (natural language, boolean, query expansion) Returns: FulltextSearchBuilder: Self for chaining """ self.search_mode = mode return self
[docs] def set_with_score(self, include_score: bool = True) -> "FulltextSearchBuilder": """ Include relevance score in results. Args: include_score: Whether to include score Returns: FulltextSearchBuilder: Self for chaining """ self.include_score = include_score return self
[docs] def where(self, condition: str) -> "FulltextSearchBuilder": """ Add WHERE condition. Args: condition: WHERE condition Returns: FulltextSearchBuilder: Self for chaining """ self.where_conditions.append(condition) return self
[docs] def set_order_by(self, column: str, direction: str = "DESC") -> "FulltextSearchBuilder": """ Set ORDER BY clause. Args: column: Column to order by direction: Order direction (ASC/DESC) Returns: FulltextSearchBuilder: Self for chaining """ self.order_clause = f"{column} {direction}" return self
[docs] def limit(self, count: int) -> "FulltextSearchBuilder": """ Set LIMIT clause. Args: count: Number of rows to limit Returns: FulltextSearchBuilder: Self for chaining """ self.limit_value = count return self
[docs] def offset(self, count: int) -> "FulltextSearchBuilder": """ Set OFFSET clause. Args: count: Number of rows to offset Returns: FulltextSearchBuilder: Self for chaining """ self.offset_value = count return self
[docs] def build_sql(self) -> str: """ Build the SQL query using unified SQL builder. Returns: str: SQL query string """ if not self.search_term: raise ValueError("Search term is required") from ..sql_builder import MatrixOneSQLBuilder builder = MatrixOneSQLBuilder() # Build SELECT clause columns_str = ", ".join(self.columns) # MatrixOne doesn't support "IN NATURAL_LANGUAGE" syntax, use simple AGAINST if self.search_mode == FulltextModeType.NATURAL_LANGUAGE or self.search_mode == "natural language mode": match_clause = f"MATCH({columns_str}) AGAINST('{self.search_term}')" elif self.search_mode == FulltextModeType.BOOLEAN or self.search_mode == "boolean mode": match_clause = f"MATCH({columns_str}) AGAINST('{self.search_term}' IN BOOLEAN MODE)" elif self.search_mode == FulltextModeType.QUERY_EXPANSION or self.search_mode == "query expansion mode": match_clause = f"MATCH({columns_str}) AGAINST('{self.search_term}' WITH QUERY EXPANSION)" else: # Default to simple AGAINST for unknown modes match_clause = f"MATCH({columns_str}) AGAINST('{self.search_term}')" if self.include_score: builder.select("*", f"{match_clause} AS score") else: builder.select_all() # Build FROM clause builder.from_table(self.table_name) # Build WHERE clause with MATCH AGAINST builder.where(match_clause) # Add additional WHERE conditions for condition in self.where_conditions: builder.where(condition) # Add ORDER BY clause if self.order_clause: builder.order_by(self.order_clause) elif self.include_score: builder.order_by("score DESC") # Add LIMIT/OFFSET clause if self.limit_value: builder.limit(self.limit_value) if self.offset_value: builder.offset(self.offset_value) return builder.build_with_parameter_substitution()
[docs] def execute(self, connection) -> Any: """ Execute the search query. Args: connection: Database connection Returns: Query result """ sql = self.build_sql() return _exec_sql_safe(connection, sql)
# Convenience functions def create_fulltext_index( engine, table_name: str, name: str, columns: Union[str, List[str]], algorithm: str = FulltextAlgorithmType.TF_IDF, parser: str = None, ) -> bool: """ Convenience function to create a fulltext index. Args: engine: SQLAlchemy engine table_name: Target table name name: Index name columns: Column(s) to index algorithm: Fulltext algorithm type parser: Parser type for fulltext index (json, ngram, or None) Returns: bool: True if successful, False otherwise """ return FulltextIndex.create_index(engine, table_name, name, columns, algorithm, parser) def fulltext_search_builder(table_name: str, columns: Union[str, List[str]]) -> FulltextSearchBuilder: """ Convenience function to create a fulltext search builder. Args: table_name: Table to search in columns: Column(s) to search in Returns: FulltextSearchBuilder: Search builder instance """ return FulltextSearchBuilder(table_name, columns) # Register SQLAlchemy compiler for FulltextIndex to generate FULLTEXT DDL @compiles(CreateIndex) def compile_create_index(element, compiler, **kw): """ Custom compiler for CREATE INDEX that handles FulltextIndex specially. This function intercepts SQLAlchemy's CREATE INDEX statement generation and adds the FULLTEXT keyword and parser clause for FulltextIndex instances. """ index = element.element # Check if this is a FulltextIndex if isinstance(index, FulltextIndex): # Validate table is bound if index.table is None: raise ValueError( f"FulltextIndex '{index.name}' is not bound to a table. " "Ensure the index is defined in __table_args__ or bound to a table." ) # Generate FULLTEXT index DDL columns_str = ", ".join(col.name for col in index.columns) sql = f"CREATE FULLTEXT INDEX {index.name} ON {index.table.name} ({columns_str})" if hasattr(index, 'parser') and index.parser: sql += f" WITH PARSER {index.parser}" return sql # Default behavior for regular indexes return compiler.visit_create_index(element, **kw) # Monkey-patch MySQLDDLCompiler to handle FulltextIndex at dialect level # This ensures create_all() generates correct FULLTEXT INDEX DDL try: from sqlalchemy.dialects.mysql.base import MySQLDDLCompiler _original_visit_create_index = MySQLDDLCompiler.visit_create_index def _patched_visit_create_index(self, create, **kw): """Patched visit_create_index to handle FulltextIndex.""" index = create.element if isinstance(index, FulltextIndex): columns_str = ", ".join(col.name for col in index.columns) sql = f"CREATE FULLTEXT INDEX {index.name} ON {index.table.name} ({columns_str})" if hasattr(index, 'parser') and index.parser: sql += f" WITH PARSER {index.parser}" return sql return _original_visit_create_index(self, create, **kw) MySQLDDLCompiler.visit_create_index = _patched_visit_create_index except ImportError: # SQLAlchemy not installed or MySQL dialect not available pass