Source code for matrixone.orm

# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
MatrixOne ORM - SQLAlchemy-like interface for MatrixOne database

This module provides a SQLAlchemy-compatible ORM interface for MatrixOne.
It supports both custom MatrixOne models and full SQLAlchemy integration.

For aggregate functions (COUNT, SUM, AVG, etc.), we recommend using SQLAlchemy's func module:
    from sqlalchemy import func
    query.select(func.count("id"))
    query.select(func.sum("amount"))

This provides better type safety and integration with SQLAlchemy.
"""

import logging
from typing import Any, Dict, List, Optional, TypeVar

# SQLAlchemy compatibility import
try:
    from sqlalchemy.orm import declarative_base
except ImportError:
    from sqlalchemy.ext.declarative import declarative_base

logger = logging.getLogger(__name__)

T = TypeVar("T")

# Export declarative_base for direct import
__all__ = ["declarative_base", "Query", "MatrixOneQuery"]


# For SQL functions, we recommend using SQLAlchemy's func module for better integration:
#
# from sqlalchemy import func
#
# # For SQLAlchemy models:
# query.select(func.count(User.id))
# query.select(func.sum(Order.amount))
# query.select(func.avg(Product.price))
#
# # For MatrixOne models, you can use string column names:
# query.select(func.count("id"))
# query.select(func.sum("amount"))
# query.select(func.avg("price"))
#
# This provides better type safety, SQL generation, and integration with SQLAlchemy.


# Remove SimpleModel - use SQLAlchemy models directly


[docs] class Query: """ Query builder for ORM operations with SQLAlchemy-style interface. This class provides a fluent interface for building SQL queries in a way that mimics SQLAlchemy's query builder. It supports both SQLAlchemy models and custom MatrixOne models. Key Features: - Fluent method chaining for building complex queries - Support for SELECT, INSERT, UPDATE, DELETE operations - SQLAlchemy expression support in filter(), having(), and other methods - Automatic SQL generation and parameter binding - Snapshot support for point-in-time queries Usage:: # Basic query building query = client.query(User) results = query.filter(User.age > 25).order_by(User.name).limit(10).all() # Complex queries with joins and aggregations query = client.query(User, func.count(Order.id).label('order_count')) results = (query .join(Order, User.id == Order.user_id) .group_by(User.id) .having(func.count(Order.id) > 5) .all()) Note: This is the legacy query builder. For new code, consider using MatrixOneQuery which provides enhanced SQLAlchemy compatibility. """
[docs] def __init__(self, model_class, client, snapshot_name: Optional[str] = None): self.model_class = model_class self.client = client self.snapshot_name = snapshot_name self._select_columns = [] self._where_conditions = [] self._where_params = [] self._joins = [] self._group_by_columns = [] self._having_conditions = [] self._having_params = [] self._order_by_columns = [] self._limit_count = None self._offset_count = None self._query_type = "SELECT" # For INSERT self._insert_values = [] # For UPDATE self._update_set_columns = [] self._update_set_values = []
[docs] def snapshot(self, snapshot_name: str) -> "Query": """Set snapshot for this query - SQLAlchemy style chaining""" self.snapshot_name = snapshot_name return self
[docs] def select(self, *columns) -> "Query": """Select specific columns""" if not columns: # Select all columns from the SQLAlchemy model if hasattr(self.model_class, "__table__"): self._select_columns = [col.name for col in self.model_class.__table__.columns] else: self._select_columns = [] else: self._select_columns = [str(col) for col in columns] return self
[docs] def filter(self, condition: str, *params) -> "Query": """Add WHERE condition""" self._where_conditions.append(condition) self._where_params.extend(params) return self
[docs] def filter_by(self, **kwargs) -> "Query": """Add WHERE conditions from keyword arguments""" for key, value in kwargs.items(): # Check if the column exists in the SQLAlchemy model if hasattr(self.model_class, "__table__") and key in [col.name for col in self.model_class.__table__.columns]: self._where_conditions.append(f"{key} = ?") self._where_params.append(value) return self
[docs] def join(self, table: str, condition: str) -> "Query": """Add JOIN clause""" self._joins.append(f"JOIN {table} ON {condition}") return self
[docs] def group_by(self, *columns) -> "Query": """ Add GROUP BY clause to the query. The GROUP BY clause is used to group rows that have the same values in specified columns, typically used with aggregate functions like COUNT, SUM, AVG, etc. Args:: *columns: Columns to group by as strings. Can include column names, expressions, or functions. Returns:: Query: Self for method chaining. Examples:: # Basic GROUP BY query.group_by("department") query.group_by("department", "status") query.group_by("YEAR(created_at)") # Multiple columns query.group_by("department", "status", "category") # With expressions query.group_by("YEAR(created_at)", "MONTH(created_at)") Notes: - GROUP BY is typically used with aggregate functions (COUNT, SUM, AVG, etc.) - Multiple columns can be grouped together - For SQLAlchemy expression support, use MatrixOneQuery instead Raises:: ValueError: If columns are not strings """ self._group_by_columns.extend([str(col) for col in columns]) return self
[docs] def having(self, condition: str, *params) -> "Query": """ Add HAVING condition to the query. The HAVING clause is used to filter groups after GROUP BY operations, similar to WHERE clause but applied to aggregated results. Args:: condition (str): The HAVING condition as a string. Can include '?' placeholders for parameter substitution. *params: Parameters to replace '?' placeholders in the condition. Returns:: Query: Self for method chaining. Examples:: # Basic HAVING with placeholders query.group_by(User.department) query.having("COUNT(*) > ?", 5) query.having("AVG(age) > ?", 25) # HAVING without placeholders query.group_by(User.department) query.having("COUNT(*) > 5") query.having("AVG(age) > 25") # Multiple HAVING conditions query.group_by(User.department) query.having("COUNT(*) > ?", 5) query.having("AVG(age) > ?", 25) query.having("MAX(age) < ?", 65) Notes: - HAVING clauses are typically used with GROUP BY operations - Use '?' placeholders for safer parameter substitution - Multiple HAVING conditions are combined with AND logic - For SQLAlchemy expression support, use MatrixOneQuery instead Raises:: ValueError: If condition is not a string """ self._having_conditions.append(condition) self._having_params.extend(params) return self
[docs] def order_by(self, *columns) -> "Query": """ Add ORDER BY clause to the query. The ORDER BY clause is used to sort the result set by one or more columns, either in ascending (ASC) or descending (DESC) order. Args:: *columns: Columns to order by as strings. Can include column names with optional ASC/DESC keywords. Returns:: Query: Self for method chaining. Examples:: # Basic ORDER BY query.order_by("name") query.order_by("created_at DESC") query.order_by("department ASC", "name DESC") # Multiple columns query.order_by("department", "name", "age DESC") # With expressions query.order_by("COUNT(*) DESC") query.order_by("AVG(salary) ASC") Notes: - ORDER BY sorts the result set in ascending order by default - Use "DESC" for descending order, "ASC" for explicit ascending order - Multiple columns are ordered from left to right - For SQLAlchemy expression support, use MatrixOneQuery instead Raises:: ValueError: If columns are not strings """ for col in columns: self._order_by_columns.append(str(col)) return self
[docs] def limit(self, count: int) -> "Query": """Add LIMIT clause""" self._limit_count = count return self
[docs] def offset(self, count: int) -> "Query": """Add OFFSET clause""" self._offset_count = count return self
def _build_select_sql(self) -> tuple[str, List[Any]]: """Build SELECT SQL query using unified SQL builder""" from .sql_builder import MatrixOneSQLBuilder builder = MatrixOneSQLBuilder() # Build SELECT clause if self._select_columns: builder.select(*self._select_columns) else: builder.select_all() # Build FROM clause with snapshot builder.from_table(self.model_class._table_name, self.snapshot_name) # Add JOIN clauses builder._joins = self._joins.copy() # Add WHERE conditions and parameters builder._where_conditions = self._where_conditions.copy() builder._where_params = self._where_params.copy() # Add GROUP BY columns if self._group_by_columns: builder.group_by(*self._group_by_columns) # Add HAVING conditions and parameters builder._having_conditions = self._having_conditions.copy() builder._having_params = self._having_params.copy() # Add ORDER BY columns if self._order_by_columns: builder.order_by(*self._order_by_columns) # Add LIMIT and OFFSET if self._limit_count is not None: builder.limit(self._limit_count) if self._offset_count is not None: builder.offset(self._offset_count) return builder.build()
[docs] def all(self) -> List: """Execute query and return all results""" sql, params = self._build_select_sql() result = self._execute(sql, params) models = [] for row in result.rows: # Convert row to dictionary row_dict = {} if hasattr(self.model_class, "__table__"): column_names = [col.name for col in self.model_class.__table__.columns] else: column_names = self._select_columns or [] for i, col_name in enumerate(column_names): if i < len(row): row_dict[col_name] = row[i] # Create SQLAlchemy model instance model = self.model_class(**row_dict) models.append(model) return models
[docs] def first(self) -> Optional: """Execute query and return first result""" self._limit_count = 1 results = self.all() return results[0] if results else None
[docs] def one(self): """ Execute query and return exactly one result. This method executes the query and expects exactly one row to be returned. If no rows are found or multiple rows are found, it raises appropriate exceptions. Returns:: Model instance: The single result row as a model instance. Raises:: NoResultFound: If no results are found. MultipleResultsFound: If more than one result is found. Examples:: # Get a user by unique ID user = client.query(User).filter(User.id == 1).one() # Get a user by unique email user = client.query(User).filter(User.email == "admin@example.com").one() Notes:: - Use this method when you expect exactly one result - For cases where zero or one result is acceptable, use one_or_none() - For cases where multiple results are acceptable, use all() or first() """ results = self.all() if len(results) == 0: from sqlalchemy.exc import NoResultFound raise NoResultFound("No row was found for one()") elif len(results) > 1: from sqlalchemy.exc import MultipleResultsFound raise MultipleResultsFound("Multiple rows were found for one()") return results[0]
[docs] def one_or_none(self): """ Execute query and return exactly one result or None. This method executes the query and returns exactly one row if found, or None if no rows are found. If multiple rows are found, it raises an exception. Returns:: Model instance or None: The single result row as a model instance, or None if no results are found. Raises:: MultipleResultsFound: If more than one result is found. Examples:: # Get a user by ID, return None if not found user = client.query(User).filter(User.id == 999).one_or_none() if user: print(f"Found user: {user.name}") # Get a user by email, return None if not found user = client.query(User).filter(User.email == "nonexistent@example.com").one_or_none() if user is None: print("User not found") Notes:: - Use this method when zero or one result is acceptable - For cases where exactly one result is required, use one() - For cases where multiple results are acceptable, use all() or first() """ results = self.all() if len(results) == 0: return None elif len(results) > 1: from sqlalchemy.exc import MultipleResultsFound raise MultipleResultsFound("Multiple rows were found for one_or_none()") return results[0]
[docs] def scalar(self): """ Execute query and return the first column of the first result. This method executes the query and returns the value of the first column from the first row, or None if no results are found. This is useful for getting single values like counts, sums, or specific column values. Returns:: Any or None: The value of the first column from the first row, or None if no results are found. Examples:: # Get the count of all users count = client.query(User).select(func.count(User.id)).scalar() # Get the name of the first user name = client.query(User).select(User.name).first().scalar() # Get the maximum age max_age = client.query(User).select(func.max(User.age)).scalar() # Get a specific user's name by ID name = client.query(User).select(User.name).filter(User.id == 1).scalar() Notes:: - This method is particularly useful with aggregate functions - For custom select queries, returns the first selected column value - For model queries, returns the first column value from the model - Returns None if no results are found """ result = self.first() if result is None: return None # If result is a model instance, return the first column value if hasattr(result, '__dict__'): # Get the first column value from the model if hasattr(self.model_class, "__table__"): first_column = list(self.model_class.__table__.columns)[0] return getattr(result, first_column.name) else: # For raw queries, return the first attribute attrs = [attr for attr in dir(result) if not attr.startswith('_')] if attrs: return getattr(result, attrs[0]) return None else: # For raw data, return the first element if isinstance(result, (list, tuple)) and len(result) > 0: return result[0] return result
[docs] def count(self) -> int: """Execute query and return count of results""" # Create a new query for counting count_query = Query(self.model_class, self.client, self.snapshot_name) count_query._where_conditions = self._where_conditions.copy() count_query._where_params = self._where_params.copy() count_query._joins = self._joins.copy() count_query._group_by_columns = self._group_by_columns.copy() count_query._having_conditions = self._having_conditions.copy() count_query._having_params = self._having_params.copy() sql, params = count_query._build_select_sql() # Replace SELECT clause with COUNT(*) sql = sql.replace("SELECT *", "SELECT COUNT(*)") if count_query._select_columns: sql = sql.replace(f"SELECT {', '.join(count_query._select_columns)}", "SELECT COUNT(*)") result = self._execute(sql, params) return result.rows[0][0] if result.rows else 0
[docs] def insert(self, **kwargs) -> "Query": """Start INSERT operation""" self._query_type = "INSERT" self._insert_values.append(kwargs) return self
[docs] def bulk_insert(self, values_list: List[Dict[str, Any]]) -> "Query": """Bulk insert multiple records""" self._query_type = "INSERT" self._insert_values.extend(values_list) return self
def _build_insert_sql(self) -> tuple[str, List[Any]]: """Build INSERT SQL query using unified SQL builder""" from .sql_builder import build_insert_query if not self._insert_values: raise ValueError("No values provided for INSERT") return build_insert_query(table_name=self.model_class._table_name, values=self._insert_values)
[docs] def update(self, **kwargs) -> "Query": """Start UPDATE operation""" self._query_type = "UPDATE" for key, value in kwargs.items(): self._update_set_columns.append(f"{key} = ?") self._update_set_values.append(value) return self
def _build_update_sql(self) -> tuple[str, List[Any]]: """Build UPDATE SQL query using unified SQL builder""" from .sql_builder import build_update_query if not self._update_set_columns: raise ValueError("No SET clauses provided for UPDATE") # Convert set columns and values to dict set_values = {} for i, col in enumerate(self._update_set_columns): # Extract column name from "column = ?" format col_name = col.split(" = ")[0] set_values[col_name] = self._update_set_values[i] return build_update_query( table_name=self.model_class._table_name, set_values=set_values, where_conditions=self._where_conditions, where_params=self._where_params, )
[docs] def delete(self) -> Any: """Execute DELETE operation""" self._query_type = "DELETE" sql, params = self._build_delete_sql() return self._execute(sql, params)
def _build_delete_sql(self) -> tuple[str, List[Any]]: """Build DELETE SQL query using unified SQL builder""" from .sql_builder import build_delete_query return build_delete_query( table_name=self.model_class._table_name, where_conditions=self._where_conditions, where_params=self._where_params, )
[docs] def execute(self) -> Any: """Execute the query based on its type""" if self._query_type == "SELECT": return self.all() elif self._query_type == "INSERT": sql, params = self._build_insert_sql() return self._execute(sql, params) elif self._query_type == "UPDATE": sql, params = self._build_update_sql() return self._execute(sql, params) elif self._query_type == "DELETE": sql, params = self._build_delete_sql() return self._execute(sql, params) else: raise ValueError(f"Unknown query type: {self._query_type}")
# CTE (Common Table Expression) support
[docs] class CTE: """CTE (Common Table Expression) class for MatrixOne queries"""
[docs] def __init__(self, name: str, query, recursive: bool = False): self.name = name self.query = query self.recursive = recursive self._sql = None self._params = None
def _compile(self): """Compile the CTE query to SQL""" if self._sql is None: if hasattr(self.query, "_build_sql"): # This is a BaseMatrixOneQuery object self._sql, self._params = self.query._build_sql() elif isinstance(self.query, str): # This is a raw SQL string self._sql = self.query self._params = [] else: raise ValueError("CTE query must be a BaseMatrixOneQuery object or SQL string") return self._sql, self._params
[docs] def as_sql(self) -> tuple[str, list]: """Get the compiled SQL and parameters for this CTE""" return self._compile()
def __str__(self): return f"CTE({self.name})"
# Base Query Builder - SQLAlchemy style
[docs] class BaseMatrixOneQuery: """ Base MatrixOne Query builder that contains common SQL building logic. This base class provides SQLAlchemy-compatible query building with: - Full SQLAlchemy expression support in having(), filter(), and other methods - Automatic SQL generation and parameter binding - Support for both SQLAlchemy expressions and string conditions - Method chaining for fluent query building Key Features: - SQLAlchemy expression support (e.g., func.count(User.id) > 5) - String condition support (e.g., "COUNT(*) > ?", 5) - Automatic column name resolution and SQL generation - Full compatibility with SQLAlchemy 1.4+ and 2.0+ Note: This is a base class. For most use cases, use MatrixOneQuery instead. """
[docs] def __init__(self, model_class, client, transaction_wrapper=None, snapshot=None): self.model_class = model_class self.client = client self.transaction_wrapper = transaction_wrapper self._snapshot_name = snapshot self._table_alias = None # Add table alias support self._select_columns = [] self._joins = [] self._where_conditions = [] self._where_params = [] self._group_by_columns = [] self._having_conditions = [] self._having_params = [] self._order_by_columns = [] self._limit_count = None self._offset_count = None self._ctes = [] # List of CTE definitions self._query_type = "SELECT" # For INSERT self._insert_values = [] # For UPDATE self._update_set_columns = [] self._update_set_values = [] # Handle None model_class (for column-only queries) if model_class is None: self._is_sqlalchemy_model = False self._table_name = None # Will be set later by query() method self._columns = {} else: # Detect if this is a SQLAlchemy model self._is_sqlalchemy_model = self._detect_sqlalchemy_model() # Get table name and columns based on model type if isinstance(model_class, str): # String table name self._table_name = model_class self._columns = {} elif self._is_sqlalchemy_model: self._table_name = model_class.__tablename__ self._columns = {col.name: col for col in model_class.__table__.columns} else: # Fallback to class name self._table_name = getattr(model_class, "_table_name", model_class.__name__.lower()) self._columns = getattr(model_class, "_columns", {})
def _execute(self, sql, params=None): """Execute SQL using either transaction wrapper or client""" if self.transaction_wrapper: return self.transaction_wrapper.execute(sql, params) else: return self.client.execute(sql, params) def _detect_sqlalchemy_model(self) -> bool: """Detect if the model class is a SQLAlchemy model""" return ( hasattr(self.model_class, "__tablename__") and hasattr(self.model_class, "__mapper__") and hasattr(self.model_class, "__table__") )
[docs] def select(self, *columns) -> "BaseMatrixOneQuery": """Add SELECT columns - SQLAlchemy style""" self._select_columns = list(columns) return self
[docs] def cte(self, name: str, recursive: bool = False) -> CTE: """Create a CTE (Common Table Expression) from this query - SQLAlchemy style Args:: name: Name of the CTE recursive: Whether this is a recursive CTE Returns:: CTE object that can be used in other queries Examples:: # Create a CTE from a query user_stats = client.query(User).filter(User.active == True).cte("user_stats") # Use the CTE in another query result = client.query(user_stats).all() # Recursive CTE example hierarchy = client.query(Employee).filter(Employee.manager_id == None).cte("hierarchy", recursive=True) """ return CTE(name, self, recursive)
[docs] def join(self, target, onclause=None, isouter=False, full=False) -> "BaseMatrixOneQuery": """Add JOIN clause - SQLAlchemy style Args:: target: Table or model to join with onclause: ON condition for the join (optional, will be inferred if not provided) isouter: If True, creates LEFT OUTER JOIN (default: False for INNER JOIN) full: If True, creates FULL OUTER JOIN (default: False) Returns:: Self for method chaining Examples:: # Basic inner join with explicit condition query.join(Address, User.id == Address.user_id) # Inner join with string condition query.join('addresses', 'users.id = addresses.user_id') # Left outer join query.join(Address, isouter=True) # Join without explicit condition (will be inferred if possible) query.join(Address) """ # Determine join type if full: join_type = "FULL OUTER JOIN" elif isouter: join_type = "LEFT OUTER JOIN" else: join_type = "INNER JOIN" # Handle different target types if hasattr(target, 'name') and hasattr(target, 'as_sql'): # This is a CTE object table_name = target.name elif hasattr(target, '__tablename__'): # This is a SQLAlchemy model table_name = target.__tablename__ elif hasattr(target, '_table_name'): # This is a custom model with _table_name table_name = target._table_name else: # String table name table_name = str(target) # Handle onclause if onclause is not None: # Process SQLAlchemy expressions if hasattr(onclause, 'compile'): # This is a SQLAlchemy expression, compile it compiled = onclause.compile(compile_kwargs={"literal_binds": True}) on_condition = str(compiled) # Fix SQLAlchemy's quoted column names for MatrixOne compatibility import re on_condition = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", on_condition) # Note: Keep table prefixes for JOIN conditions to avoid ambiguity # Do NOT remove table.column format, it's needed for proper JOIN else: # String condition on_condition = str(onclause) join_clause = f"{join_type} {table_name} ON {on_condition}" else: # No explicit onclause - create join without ON condition # This matches SQLAlchemy behavior where ON condition can be inferred join_clause = f"{join_type} {table_name}" self._joins.append(join_clause) return self
[docs] def innerjoin(self, target, onclause=None) -> "BaseMatrixOneQuery": """Add INNER JOIN clause - SQLAlchemy style (alias for join with isouter=False)""" return self.join(target, onclause, isouter=False)
[docs] def leftjoin(self, target, onclause=None) -> "BaseMatrixOneQuery": """Add LEFT JOIN clause - SQLAlchemy style (alias for join with isouter=True)""" return self.join(target, onclause, isouter=True)
[docs] def rightjoin(self, target, onclause=None) -> "BaseMatrixOneQuery": """Add RIGHT JOIN clause - SQLAlchemy style""" # MatrixOne doesn't support RIGHT JOIN, so we'll use LEFT JOIN with reversed tables # This is a limitation of MatrixOne, but we provide the method for compatibility if onclause is not None: # Process SQLAlchemy expressions if hasattr(onclause, 'compile'): compiled = onclause.compile(compile_kwargs={"literal_binds": True}) on_condition = str(compiled) import re on_condition = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", on_condition) on_condition = re.sub(r"\b([a-zA-Z_]\w*)\.([a-zA-Z_]\w*)\b", r"\2", on_condition) else: on_condition = str(onclause) # For RIGHT JOIN, we need to reverse the condition # This is a simplified approach - in practice, you might need more complex logic join_clause = f"LEFT JOIN {target} ON {on_condition}" else: join_clause = f"LEFT JOIN {target}" self._joins.append(join_clause) return self
[docs] def fullouterjoin(self, target, onclause=None) -> "BaseMatrixOneQuery": """Add FULL OUTER JOIN clause - SQLAlchemy style (alias for join with full=True)""" return self.join(target, onclause, full=True)
[docs] def outerjoin(self, target, onclause=None) -> "BaseMatrixOneQuery": """Add LEFT OUTER JOIN clause - SQLAlchemy style (alias for leftjoin)""" return self.leftjoin(target, onclause)
[docs] def group_by(self, *columns) -> "BaseMatrixOneQuery": """ Add GROUP BY clause to the query - SQLAlchemy style compatibility. The GROUP BY clause is used to group rows that have the same values in specified columns, typically used with aggregate functions like COUNT, SUM, AVG, etc. Args:: *columns: Columns to group by. Can be: - SQLAlchemy column expressions (e.g., User.department, func.year(User.created_at)) - String column names (e.g., "department", "created_at") - SQLAlchemy function expressions (e.g., func.year(User.created_at)) Returns:: BaseMatrixOneQuery: Self for method chaining. Examples:: # SQLAlchemy column expressions (recommended) query.group_by(User.department) query.group_by(User.department, User.status) query.group_by(func.year(User.created_at)) query.group_by(func.date(User.created_at), User.department) # String column names query.group_by("department") query.group_by("department", "status") # Complex expressions query.group_by( User.department, func.year(User.created_at), func.month(User.created_at) ) Notes: - GROUP BY is typically used with aggregate functions (COUNT, SUM, AVG, etc.) - SQLAlchemy expressions provide better type safety and integration - Multiple columns can be grouped together - Column references in SQLAlchemy expressions are automatically converted to MatrixOne-compatible format Raises:: ValueError: If invalid column type is provided SQLAlchemyError: If SQLAlchemy expression compilation fails """ for col in columns: if isinstance(col, str): self._group_by_columns.append(col) elif hasattr(col, "compile"): # SQLAlchemy expression # Compile the expression to SQL string compiled = col.compile(compile_kwargs={"literal_binds": True}) sql_str = str(compiled) # Fix SQLAlchemy's quoted column names for MatrixOne compatibility import re sql_str = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", sql_str) # Handle SQLAlchemy's table prefixes (e.g., "users.name" -> "name") sql_str = re.sub(r"\b([a-zA-Z_]\w*)\.([a-zA-Z_]\w*)\b", r"\2", sql_str) self._group_by_columns.append(sql_str) else: self._group_by_columns.append(str(col)) return self
[docs] def having(self, condition, *params) -> "BaseMatrixOneQuery": """ Add HAVING clause to the query - SQLAlchemy style compatibility. The HAVING clause is used to filter groups after GROUP BY operations, similar to WHERE clause but applied to aggregated results. Args:: condition: The HAVING condition. Can be: - SQLAlchemy expression (e.g., func.count(User.id) > 5) - String condition with placeholders (e.g., "COUNT(*) > ?") - String condition without placeholders (e.g., "COUNT(*) > 5") *params: Additional parameters for string-based conditions. Used to replace '?' placeholders in condition string. Returns:: BaseMatrixOneQuery: Self for method chaining. Examples:: # SQLAlchemy expression syntax (recommended) query.group_by(User.department) query.having(func.count(User.id) > 5) query.having(func.avg(User.age) > 25) query.having(func.count(func.distinct(User.id)) > 3) # String-based syntax with placeholders query.group_by(User.department) query.having("COUNT(*) > ?", 5) query.having("AVG(age) > ?", 25) # String-based syntax without placeholders query.group_by(User.department) query.having("COUNT(*) > 5") query.having("AVG(age) > 25") # Multiple HAVING conditions query.group_by(User.department) query.having(func.count(User.id) > 5) query.having(func.avg(User.age) > 25) query.having(func.max(User.age) < 65) # Mixed string and expression syntax query.group_by(User.department) query.having("COUNT(*) > ?", 5) # String query.having(func.avg(User.age) > 25) # Expression Notes: - HAVING clauses are typically used with GROUP BY operations - SQLAlchemy expressions provide better type safety and integration - String conditions with placeholders are safer against SQL injection - Multiple HAVING conditions are combined with AND logic - Column references in SQLAlchemy expressions are automatically converted to MatrixOne-compatible format Supported SQLAlchemy Functions: - func.count(): Count rows or distinct values - func.avg(): Calculate average - func.sum(): Calculate sum - func.min(): Find minimum value - func.max(): Find maximum value - func.distinct(): Get distinct values Raises:: ValueError: If invalid condition type is provided SQLAlchemyError: If SQLAlchemy expression compilation fails """ # Check if condition contains FulltextFilter objects if hasattr(condition, "compile") and self._contains_fulltext_filter(condition): # Handle SQLAlchemy expressions that contain FulltextFilter objects formatted_condition = self._process_fulltext_expression(condition) self._having_conditions.append(formatted_condition) elif hasattr(condition, "compile"): # This is a SQLAlchemy expression (OR, AND, BinaryExpression, etc.), compile it to SQL compiled = condition.compile(compile_kwargs={"literal_binds": True}) formatted_condition = str(compiled) # Fix SQLAlchemy's quoted column names for MatrixOne compatibility import re formatted_condition = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", formatted_condition) # Handle SQLAlchemy's table prefixes (e.g., "users.name" -> "name") formatted_condition = re.sub(r"\b([a-zA-Z_]\w*)\.([a-zA-Z_]\w*)\b", r"\2", formatted_condition) self._having_conditions.append(formatted_condition) else: # Handle string conditions - replace ? placeholders with actual values formatted_condition = str(condition) # If there are params but no ? placeholders, append them to the condition if params and "?" not in formatted_condition: for param in params: if hasattr(param, "_build_sql"): # This is a MatrixOne expression, compile it sql, _ = param._build_sql() formatted_condition += f" AND {sql}" else: # Regular parameter if isinstance(param, str): formatted_condition += f" AND '{param}'" else: formatted_condition += f" AND {param}" else: # Replace ? placeholders with actual values for param in params: if isinstance(param, str): formatted_condition = formatted_condition.replace("?", f"'{param}'", 1) else: formatted_condition = formatted_condition.replace("?", str(param), 1) self._having_conditions.append(formatted_condition) return self
[docs] def snapshot(self, snapshot_name: str) -> "BaseMatrixOneQuery": """Add snapshot support - SQLAlchemy style chaining""" self._snapshot_name = snapshot_name return self
[docs] def alias(self, alias_name: str) -> "BaseMatrixOneQuery": """Set table alias for this query - SQLAlchemy style chaining""" self._table_alias = alias_name return self
[docs] def subquery(self, alias_name: str = None) -> str: """Convert this query to a subquery with optional alias""" sql, params = self._build_sql() if alias_name: return f"({sql}) AS {alias_name}" else: return f"({sql})"
[docs] def filter(self, condition, *params) -> "BaseMatrixOneQuery": """Add WHERE conditions - SQLAlchemy style unified interface""" # Check if condition is a LogicalIn object if hasattr(condition, "compile") and hasattr(condition, "column") and hasattr(condition, "values"): # This is a LogicalIn object formatted_condition = condition.compile() self._where_conditions.append(formatted_condition) # LogicalIn objects now generate complete SQL with values, no additional parameters needed # Check if condition contains FulltextFilter objects elif hasattr(condition, "compile") and self._contains_fulltext_filter(condition): # Handle SQLAlchemy expressions that contain FulltextFilter objects formatted_condition = self._process_fulltext_expression(condition) self._where_conditions.append(formatted_condition) elif hasattr(condition, "compile"): # This is a SQLAlchemy expression (OR, AND, BinaryExpression, etc.), compile it to SQL compiled = condition.compile(compile_kwargs={"literal_binds": True}) formatted_condition = str(compiled) # Fix SQLAlchemy's quoted column names for MatrixOne compatibility import re formatted_condition = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", formatted_condition) # Handle SQLAlchemy's table prefixes (e.g., "users.name" -> "name") formatted_condition = re.sub(r"\b([a-zA-Z_]\w*)\.([a-zA-Z_]\w*)\b", r"\2", formatted_condition) self._where_conditions.append(formatted_condition) else: # Handle string conditions - replace ? placeholders with actual values formatted_condition = str(condition) # If there are params but no ? placeholders, append them to the condition if params and "?" not in formatted_condition: for param in params: if hasattr(param, "_build_sql"): # This is a subquery object, convert it to SQL subquery_sql, _ = param._build_sql() formatted_condition += f" ({subquery_sql})" else: formatted_condition += f" {param}" else: # Add params to _where_params for processing if params: self._where_params.extend(params) # Process any remaining ? placeholders by replacing them with the next parameter while "?" in formatted_condition and self._where_params: param = self._where_params.pop(0) if isinstance(param, str): formatted_condition = formatted_condition.replace("?", f"'{param}'", 1) elif hasattr(param, "_build_sql"): # This is a subquery object, convert it to SQL subquery_sql, _ = param._build_sql() formatted_condition = formatted_condition.replace("?", f"({subquery_sql})", 1) else: formatted_condition = formatted_condition.replace("?", str(param), 1) self._where_conditions.append(formatted_condition) return self
def _contains_fulltext_filter(self, condition) -> bool: """Check if a SQLAlchemy expression contains FulltextFilter objects.""" from .sqlalchemy_ext.fulltext_search import FulltextFilter if isinstance(condition, FulltextFilter): return True # Check nested clauses if hasattr(condition, 'clauses'): for clause in condition.clauses: if self._contains_fulltext_filter(clause): return True return False def _process_fulltext_expression(self, condition) -> str: """Process SQLAlchemy expressions that contain FulltextFilter objects.""" import re from sqlalchemy.sql import operators from .sqlalchemy_ext.fulltext_search import FulltextFilter if isinstance(condition, FulltextFilter): return condition.compile() # Handle and_() / or_() / not_() expressions if hasattr(condition, 'clauses') and hasattr(condition, 'operator'): parts = [] for clause in condition.clauses: if isinstance(clause, FulltextFilter): parts.append(clause.compile()) elif hasattr(clause, 'compile'): # Regular SQLAlchemy expression compiled = clause.compile(compile_kwargs={"literal_binds": True}) formatted = str(compiled) # Fix quoted column names and table prefixes formatted = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", formatted) formatted = re.sub(r"\w+\.(\w+)", r"\1", formatted) parts.append(formatted) else: parts.append(str(clause)) # Determine operator - check against SQLAlchemy operator objects if condition.operator is operators.and_: return f"({' AND '.join(parts)})" elif condition.operator is operators.or_: return f"({' OR '.join(parts)})" else: # Fallback: try to get operator name op_name = getattr(condition.operator, '__name__', str(condition.operator)) return f"({f' {op_name} '.join(parts)})" # Fallback for other types return str(condition)
[docs] def filter_by(self, **kwargs) -> "BaseMatrixOneQuery": """Add WHERE conditions from keyword arguments - SQLAlchemy style""" for key, value in kwargs.items(): if key in self._columns: if isinstance(value, str): self._where_conditions.append(f"{key} = '{value}'") elif isinstance(value, (int, float)): self._where_conditions.append(f"{key} = {value}") else: # For other types, use parameterized query self._where_conditions.append(f"{key} = ?") self._where_params.append(value) return self
[docs] def where(self, condition: str, *params) -> "BaseMatrixOneQuery": """Add WHERE condition - alias for filter method""" return self.filter(condition, *params)
[docs] def logical_in(self, column, values) -> "BaseMatrixOneQuery": """ Add IN condition with support for various value types. This method provides enhanced IN functionality that can handle: - Lists of values: [1, 2, 3] - SQLAlchemy expressions: func.count(User.id) - FulltextFilter objects: boolean_match("title", "content").must("python") - Subqueries: client.query(User).select(User.id) Args:: column: Column to check (can be string or SQLAlchemy column) values: Values to check against. Can be: - List of values: [1, 2, 3, "a", "b"] - SQLAlchemy expression: func.count(User.id) - FulltextFilter object: boolean_match("title", "content").must("python") - Subquery object: client.query(User).select(User.id) Returns:: BaseMatrixOneQuery: Self for method chaining. Examples:: # List of values query.logical_in("city", ["北京", "上海", "广州"]) query.logical_in(User.id, [1, 2, 3, 4]) # SQLAlchemy expression query.logical_in("id", func.count(User.id)) # FulltextFilter query.logical_in("id", boolean_match("title", "content").must("python")) # Subquery subquery = client.query(User).select(User.id).filter(User.active == True) query.logical_in("author_id", subquery) Notes: - This method automatically handles different value types - For FulltextFilter objects, it creates a subquery using the fulltext search - For SQLAlchemy expressions, it compiles them to SQL - For lists, it creates standard IN clauses with proper parameter binding """ # Handle column name if hasattr(column, "name"): column_name = column.name else: column_name = str(column) # Handle different types of values if hasattr(values, "compile") and hasattr(values, "columns"): # This is a FulltextFilter object if hasattr(values, "compile") and hasattr(values, "query_builder"): # Convert FulltextFilter to subquery fulltext_sql = values.compile() # Create a subquery that selects IDs from the fulltext search # We need to determine the table name from the context table_name = self._table_name or "table" subquery_sql = f"SELECT id FROM {table_name} WHERE {fulltext_sql}" condition = f"{column_name} IN ({subquery_sql})" self._where_conditions.append(condition) else: # Handle other SQLAlchemy expressions compiled = values.compile(compile_kwargs={"literal_binds": True}) sql_str = str(compiled) # Fix SQLAlchemy's quoted column names for MatrixOne compatibility import re sql_str = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", sql_str) sql_str = re.sub(r"\b([a-zA-Z_]\w*)\.([a-zA-Z_]\w*)\b", r"\2", sql_str) condition = f"{column_name} IN ({sql_str})" self._where_conditions.append(condition) elif hasattr(values, "_build_sql"): # This is a subquery object subquery_sql, subquery_params = values._build_sql() condition = f"{column_name} IN ({subquery_sql})" self._where_conditions.append(condition) self._where_params.extend(subquery_params) elif isinstance(values, (list, tuple)): # Handle list of values if not values: # Empty list means no matches condition = "1=0" # Always false self._where_conditions.append(condition) else: # Create placeholders for each value placeholders = ",".join(["?" for _ in values]) condition = f"{column_name} IN ({placeholders})" self._where_conditions.append(condition) self._where_params.extend(values) else: # Single value condition = f"{column_name} IN (?)" self._where_conditions.append(condition) self._where_params.append(values) return self
[docs] def order_by(self, *columns) -> "BaseMatrixOneQuery": """ Add ORDER BY clause to the query - SQLAlchemy style compatibility. The ORDER BY clause is used to sort the result set by one or more columns, either in ascending (ASC) or descending (DESC) order. Args:: *columns: Columns to order by. Can be: - SQLAlchemy column expressions (e.g., User.name, User.created_at.desc()) - String column names (e.g., "name", "created_at DESC") - SQLAlchemy function expressions (e.g., func.count(User.id)) - SQLAlchemy desc/asc expressions (e.g., desc(User.name), asc(User.age)) Returns:: BaseMatrixOneQuery: Self for method chaining. Examples:: # SQLAlchemy column expressions (recommended) query.order_by(User.name) query.order_by(User.created_at.desc()) query.order_by(User.department, User.name.asc()) # String column names query.order_by("name") query.order_by("created_at DESC") query.order_by("department ASC", "name DESC") # SQLAlchemy desc/asc functions from sqlalchemy import desc, asc query.order_by(desc(User.created_at)) query.order_by(asc(User.name), desc(User.age)) # Function expressions query.order_by(func.count(User.id).desc()) query.order_by(func.avg(User.salary).asc()) # Mixed expressions query.order_by(User.department, "name DESC") query.order_by(func.count(User.id).desc(), User.name.asc()) # Complex expressions query.order_by( User.department.asc(), func.count(User.id).desc(), User.name.asc() ) Notes: - ORDER BY sorts the result set in ascending order by default - Use .desc() or desc() for descending order - Use .asc() or asc() for explicit ascending order - Multiple columns are ordered from left to right - SQLAlchemy expressions provide better type safety and integration - Column references in SQLAlchemy expressions are automatically converted to MatrixOne-compatible format Raises:: ValueError: If invalid column type is provided SQLAlchemyError: If SQLAlchemy expression compilation fails """ for col in columns: if isinstance(col, str): self._order_by_columns.append(col) elif hasattr(col, "compile"): # SQLAlchemy expression # Compile the expression to SQL string compiled = col.compile(compile_kwargs={"literal_binds": True}) sql_str = str(compiled) # Fix SQLAlchemy's quoted column names for MatrixOne compatibility import re sql_str = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", sql_str) # Handle SQLAlchemy's table prefixes (e.g., "users.name" -> "name") sql_str = re.sub(r"\b([a-zA-Z_]\w*)\.([a-zA-Z_]\w*)\b", r"\2", sql_str) self._order_by_columns.append(sql_str) else: self._order_by_columns.append(str(col)) return self
[docs] def limit(self, count: int) -> "BaseMatrixOneQuery": """Add LIMIT clause - SQLAlchemy style""" self._limit_count = count return self
[docs] def offset(self, count: int) -> "BaseMatrixOneQuery": """Add OFFSET clause - SQLAlchemy style""" self._offset_count = count return self
def _build_sql(self) -> tuple[str, List[Any]]: """Build SQL query using unified SQL builder""" from .sql_builder import MatrixOneSQLBuilder builder = MatrixOneSQLBuilder() # Build CTE clause if any CTEs are defined if self._ctes: for cte in self._ctes: if isinstance(cte, CTE): cte_sql, cte_params = cte.as_sql() builder._ctes.append({'name': cte.name, 'sql': cte_sql, 'params': cte_params}) else: # Handle legacy CTE format builder._ctes.append(cte) # Build SELECT clause with SQLAlchemy function support if self._select_columns: # Convert SQLAlchemy function objects to strings select_parts = [] for col in self._select_columns: if hasattr(col, "compile"): # SQLAlchemy function object # Check if this is a FulltextLabel (which already has AS in compile()) if hasattr(col, "_compiler_dispatch") and hasattr(col, "name") and "FulltextLabel" in str(type(col)): # For FulltextLabel, use compile() which already includes AS sql_str = col.compile(compile_kwargs={"literal_binds": True}) else: # For regular SQLAlchemy objects compiled = col.compile(compile_kwargs={"literal_binds": True}) sql_str = str(compiled) # Fix SQLAlchemy's quoted column names for MatrixOne compatibility # Convert avg('column') to avg(column) import re sql_str = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", sql_str) # Handle SQLAlchemy label() method - add AS alias if present # But avoid using SQL reserved keywords as aliases if hasattr(col, "name") and col.name and col.name.upper() not in ["DISTINCT"]: sql_str = f"{sql_str} AS {col.name}" select_parts.append(sql_str) else: select_parts.append(str(col)) builder._select_columns = select_parts else: builder.select_all() # Build FROM clause with optional table alias and snapshot if not self._table_name: raise ValueError("Table name is required. Provide a model class or set table name manually.") if self._table_alias: builder._from_table = f"{self._table_name} AS {self._table_alias}" else: builder._from_table = self._table_name builder._from_snapshot = self._snapshot_name # Add JOIN clauses builder._joins = self._joins.copy() # Add WHERE conditions and parameters builder._where_conditions = self._where_conditions.copy() builder._where_params = self._where_params.copy() # Add GROUP BY columns if self._group_by_columns: builder.group_by(*self._group_by_columns) # Add HAVING conditions and parameters builder._having_conditions = self._having_conditions.copy() builder._having_params = self._having_params.copy() # Add ORDER BY columns if self._order_by_columns: builder.order_by(*self._order_by_columns) # Add LIMIT and OFFSET if self._limit_count is not None: builder.limit(self._limit_count) if self._offset_count is not None: builder.offset(self._offset_count) # Build the final SQL and combine parameters sql, params = builder.build() # Parameters are already combined in builder.build() since we added CTEs to builder._ctes return sql, params def _create_row_data(self, row, select_cols): """Create RowData object for aggregate queries""" class RowData: def __init__(self, values, columns): for i, col in enumerate(columns): if i < len(values): # Replace spaces with underscores for valid Python attribute names attr_name = col.replace(" ", "_") setattr(self, attr_name, values[i]) # Also support indexing for backward compatibility self._values = values def __getitem__(self, index): return self._values[index] def __len__(self): return len(self._values) return RowData(row, select_cols) def _extract_select_columns(self): """Extract column names from select columns""" select_cols = [] for col in self._select_columns: # Check if this is a SQLAlchemy function with label if hasattr(col, "compile") and hasattr(col, "name") and col.name: # Special handling for DISTINCT function to avoid reserved keyword issues if hasattr(col, "name") and col.name.upper() == "DISTINCT": # For DISTINCT functions, use the original logic to extract column name col_str = str(col.compile(compile_kwargs={"literal_binds": True})) if "(" in col_str and ")" in col_str: func_name = col_str.split("(")[0].strip() col_name = col_str.split("(")[1].split(")")[0].strip() col_name = col_name.strip("'\"") select_cols.append(f"DISTINCT_{col_name}") else: select_cols.append(col.name) else: # SQLAlchemy function with label - use the label name select_cols.append(col.name) else: # Convert SQLAlchemy function objects to strings first if hasattr(col, "compile"): col_str = str(col.compile(compile_kwargs={"literal_binds": True})) else: col_str = str(col) if " as " in col_str.lower(): # Handle "column as alias" syntax - use case-insensitive split parts = col_str.lower().split(" as ") if len(parts) == 2: # Find the actual alias in the original string as_index = col_str.lower().find(" as ") alias = col_str[as_index + 4 :].strip() # Remove quotes from alias if present alias = alias.strip("'\"") select_cols.append(alias) else: # Fallback to original logic alias = col_str.split(" as ")[-1].strip() alias = alias.strip("'\"") select_cols.append(alias) else: # Handle function calls like "COUNT(id)" or "DISTINCT category" if "(" in col_str and ")" in col_str: # Extract the function name and column func_name = col_str.split("(")[0].strip() col_name = col_str.split("(")[1].split(")")[0].strip() # Remove quotes from column name col_name = col_name.strip("'\"") # Handle special cases if func_name.upper() == "DISTINCT": attr_name = f"DISTINCT_{col_name}" else: attr_name = f"{func_name.upper()}_{col_name}" select_cols.append(attr_name) else: # Handle table aliases in column names (e.g., "u.name" -> "name") if "." in col_str and not col_str.startswith("("): # Extract column name after the dot for attribute access col_name = col_str.split(".")[-1] select_cols.append(col_name) else: # For simple column names, use as-is select_cols.append(col_str) return select_cols
[docs] def update(self, **kwargs) -> "BaseMatrixOneQuery": """ Start UPDATE operation - SQLAlchemy style This method allows you to update records in the database using a fluent interface similar to SQLAlchemy's update() method. It supports both SQLAlchemy expressions and simple key-value pairs for setting column values. Args:: **kwargs: Column names and their new values to set Returns:: Self for method chaining Examples:: # Update with simple key-value pairs query = client.query(User) query.update(name="New Name", email="new@example.com").filter(User.id == 1).execute() # Update with SQLAlchemy expressions from sqlalchemy import func query = client.query(User) query.update( last_login=func.now(), login_count=User.login_count + 1 ).filter(User.id == 1).execute() # Update multiple records with conditions query = client.query(User) query.update(status="inactive").filter(User.last_login < "2023-01-01").execute() # Update with complex conditions query = client.query(User) query.update( status="premium", premium_until=func.date_add(func.now(), func.interval(1, "YEAR")) ).filter( User.subscription_type == "paid", User.payment_status == "active" ).execute() """ self._query_type = "UPDATE" # Handle both SQLAlchemy expressions and simple values for key, value in kwargs.items(): if hasattr(value, "compile"): # SQLAlchemy expression # Compile the expression to SQL compiled = value.compile(compile_kwargs={"literal_binds": True}) sql_str = str(compiled) # Fix SQLAlchemy's quoted column names for MatrixOne compatibility import re sql_str = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", sql_str) self._update_set_columns.append(f"{key} = {sql_str}") else: # Simple value self._update_set_columns.append(f"{key} = ?") self._update_set_values.append(value) return self
def _build_update_sql(self) -> tuple[str, List[Any]]: """ Build UPDATE SQL query directly to handle SQLAlchemy expressions Returns:: Tuple of (SQL string, parameters list) Raises:: ValueError: If no SET clauses are provided for UPDATE """ if not self._update_set_columns: raise ValueError("No SET clauses provided for UPDATE") # Build SET clause set_clauses = [] params = [] value_index = 0 for col in self._update_set_columns: if " = ?" in col: # Simple value assignment col_name = col.split(" = ?")[0] set_clauses.append(f"{col_name} = ?") params.append(self._update_set_values[value_index]) value_index += 1 else: # SQLAlchemy expression (already compiled to SQL) set_clauses.append(col) # Build WHERE clause where_clause = "" if self._where_conditions: where_clause = " WHERE " + " AND ".join(self._where_conditions) params.extend(self._where_params) # Build final SQL sql = f"UPDATE {self._table_name} SET {', '.join(set_clauses)}{where_clause}" return sql, params
[docs] def delete(self) -> Any: """Execute DELETE operation""" self._query_type = "DELETE" sql, params = self._build_delete_sql() return self._execute(sql, params)
def _build_delete_sql(self) -> tuple[str, List[Any]]: """Build DELETE SQL query using unified SQL builder""" from .sql_builder import build_delete_query return build_delete_query( table_name=self._table_name, where_conditions=self._where_conditions, where_params=self._where_params, )
# MatrixOne Snapshot Query Builder - SQLAlchemy style
[docs] class MatrixOneQuery(BaseMatrixOneQuery): """ MatrixOne Query builder that mimics SQLAlchemy Query interface. This class provides full SQLAlchemy compatibility including: - SQLAlchemy expression support in having(), filter(), and other methods - Type-safe column references and function calls - Automatic SQL generation and parameter binding - Full integration with SQLAlchemy models and functions Key Features: - Supports SQLAlchemy expressions (e.g., func.count(User.id) > 5) - Supports traditional string conditions (e.g., "COUNT(*) > ?", 5) - Automatic column name resolution and SQL generation - Method chaining for fluent query building - Full compatibility with SQLAlchemy 1.4+ and 2.0+ Examples # SQLAlchemy expression syntax (recommended) query = client.query(User) query.group_by(User.department) query.having(func.count(User.id) > 5) query.having(func.avg(User.age) > 25) # String-based syntax (also supported) query.having("COUNT(*) > ?", 5) query.having("AVG(age) > ?", 25) # Mixed syntax query.having(func.count(User.id) > 5) # Expression query.having("AVG(age) > ?", 25) # String """
[docs] def __init__(self, model_class, client, transaction_wrapper=None, snapshot=None): super().__init__(model_class, client, transaction_wrapper, snapshot)
[docs] def with_cte(self, *ctes) -> "MatrixOneQuery": """Add CTEs to this query - SQLAlchemy style Args:: *ctes: CTE objects to add to the query Returns:: Self for method chaining Examples:: # Add a single CTE user_stats = client.query(User).filter(User.active == True).cte("user_stats") result = client.query(Article).with_cte(user_stats).join(user_stats, Article.user_id == user_stats.id).all() # Add multiple CTEs result = client.query(Article).with_cte(user_stats, category_stats).all() """ for cte in ctes: if isinstance(cte, CTE): self._ctes.append(cte) else: raise ValueError("All arguments must be CTE objects") return self
[docs] def all(self) -> List: """Execute query and return all results - SQLAlchemy style""" sql, params = self._build_sql() result = self._execute(sql, params) # Handle both ResultSet and SQLAlchemy Result if hasattr(result, 'rows'): # ResultSet from client.execute() rows = result.rows else: # SQLAlchemy Result from session.execute() rows = result.fetchall() models = [] for row in rows: # Check if this is an aggregate query (has custom select columns) if self._select_columns: # For aggregate queries, return raw row data as a simple object select_cols = self._extract_select_columns() row_data = self._create_row_data(row, select_cols) models.append(row_data) else: # Regular model query if self._is_sqlalchemy_model: # For SQLAlchemy models, create instance directly row_dict = {} for i, col_name in enumerate(self._columns.keys()): if i < len(row): row_dict[col_name] = row[i] # Create SQLAlchemy model instance model = self.model_class(**row_dict) models.append(model) else: # For non-SQLAlchemy models, create instance directly row_dict = {} for i, col_name in enumerate(self._columns.keys()): if i < len(row): row_dict[col_name] = row[i] # Create model instance model = self.model_class(**row_dict) models.append(model) return models
[docs] def first(self) -> Optional: """Execute query and return first result - SQLAlchemy style""" self._limit_count = 1 results = self.all() return results[0] if results else None
[docs] def one(self): """ Execute query and return exactly one result - SQLAlchemy style. This method executes the query and expects exactly one row to be returned. If no rows are found or multiple rows are found, it raises appropriate exceptions. This method provides SQLAlchemy-compatible behavior for MatrixOne queries. Returns:: Model instance: The single result row as a model instance. Raises:: NoResultFound: If no results are found. MultipleResultsFound: If more than one result is found. Examples:: # Get a user by unique ID using SQLAlchemy expressions from sqlalchemy import and_ user = client.query(User).filter(and_(User.id == 1, User.active == True)).one() # Get a user by unique email with complex conditions user = client.query(User).filter(User.email == "admin@example.com").one() Notes:: - Use this method when you expect exactly one result - For cases where zero or one result is acceptable, use one_or_none() - For cases where multiple results are acceptable, use all() or first() - This method supports SQLAlchemy expressions and operators """ results = self.all() if len(results) == 0: from sqlalchemy.exc import NoResultFound raise NoResultFound("No row was found for one()") elif len(results) > 1: from sqlalchemy.exc import MultipleResultsFound raise MultipleResultsFound("Multiple rows were found for one()") return results[0]
[docs] def one_or_none(self): """ Execute query and return exactly one result or None - SQLAlchemy style. This method executes the query and returns exactly one row if found, or None if no rows are found. If multiple rows are found, it raises an exception. This method provides SQLAlchemy-compatible behavior for MatrixOne queries. Returns:: Model instance or None: The single result row as a model instance, or None if no results are found. Raises:: MultipleResultsFound: If more than one result is found. Examples:: # Get a user by ID, return None if not found from sqlalchemy import and_ user = client.query(User).filter(and_(User.id == 999, User.active == True)).one_or_none() if user: print(f"Found user: {user.name}") # Get a user by email with complex conditions user = client.query(User).filter(User.email == "nonexistent@example.com").one_or_none() if user is None: print("User not found") Notes:: - Use this method when zero or one result is acceptable - For cases where exactly one result is required, use one() - For cases where multiple results are acceptable, use all() or first() - This method supports SQLAlchemy expressions and operators """ results = self.all() if len(results) == 0: return None elif len(results) > 1: from sqlalchemy.exc import MultipleResultsFound raise MultipleResultsFound("Multiple rows were found for one_or_none()") return results[0]
[docs] def scalar(self): """ Execute query and return the first column of the first result - SQLAlchemy style. This method executes the query and returns the value of the first column from the first row, or None if no results are found. This is useful for getting single values like counts, sums, or specific column values. This method provides SQLAlchemy-compatible behavior for MatrixOne queries. Returns:: Any or None: The value of the first column from the first row, or None if no results are found. Examples:: # Get the count of all users using SQLAlchemy functions from sqlalchemy import func count = client.query(User).select(func.count(User.id)).scalar() # Get the name of the first user name = client.query(User).select(User.name).first().scalar() # Get the maximum age with complex conditions max_age = client.query(User).select(func.max(User.age)).filter(User.active == True).scalar() # Get a specific user's name by ID name = client.query(User).select(User.name).filter(User.id == 1).scalar() Notes:: - This method is particularly useful with aggregate functions - For custom select queries, returns the first selected column value - For model queries, returns the first column value from the model - Returns None if no results are found - This method supports SQLAlchemy expressions and operators """ result = self.first() if result is None: return None # If result is a model instance, return the first column value if hasattr(result, '__dict__'): # For custom select queries, check if we have select columns if self._select_columns: # For custom select, return the first selected column value select_cols = self._extract_select_columns() if select_cols: first_col_name = select_cols[0] return getattr(result, first_col_name) # Get the first column value from the model if hasattr(self.model_class, "__table__"): first_column = list(self.model_class.__table__.columns)[0] return getattr(result, first_column.name) else: # For raw queries, return the first attribute attrs = [attr for attr in dir(result) if not attr.startswith('_')] if attrs: return getattr(result, attrs[0]) return None else: # For raw data, return the first element if isinstance(result, (list, tuple)) and len(result) > 0: return result[0] return result
[docs] def count(self) -> int: """Execute query and return count of results - SQLAlchemy style""" sql, params = self._build_sql() # Replace SELECT * with COUNT(*) sql = sql.replace("SELECT *", "SELECT COUNT(*)") result = self._execute(sql, params) # Handle both ResultSet and SQLAlchemy Result if hasattr(result, 'rows'): # ResultSet from client.execute() return result.rows[0][0] if result.rows else 0 else: # SQLAlchemy Result from session.execute() rows = result.fetchall() return rows[0][0] if rows else 0
[docs] def execute(self) -> Any: """Execute the query based on its type""" if self._query_type == "SELECT": sql, params = self._build_sql() return self._execute(sql, params) elif self._query_type == "INSERT": sql, params = self._build_insert_sql() return self._execute(sql, params) elif self._query_type == "UPDATE": sql, params = self._build_update_sql() return self._execute(sql, params) elif self._query_type == "DELETE": sql, params = self._build_delete_sql() return self._execute(sql, params) else: raise ValueError(f"Unknown query type: {self._query_type}")
[docs] def to_sql(self) -> str: """ Generate the complete SQL statement for this query. Returns the SQL string that would be executed, with parameters properly substituted for better readability. Returns:: str: The complete SQL statement as a string. Examples query = client.query(User).filter(User.age > 25).order_by(User.name) sql = query.to_sql() print(sql) # "SELECT * FROM users WHERE age > 25 ORDER BY name" query = client.query(User).update(name="New Name").filter(User.id == 1) sql = query.to_sql() print(sql) # "UPDATE users SET name = 'New Name' WHERE id = 1" Notes: - This method returns the SQL with parameters substituted - Use this for debugging or logging purposes - The returned SQL is ready to be executed directly """ # Build SQL based on query type if self._query_type == "UPDATE": sql, params = self._build_update_sql() elif self._query_type == "INSERT": sql, params = self._build_insert_sql() elif self._query_type == "DELETE": sql, params = self._build_delete_sql() else: sql, params = self._build_sql() # Substitute parameters for better readability if params: formatted_sql = sql for param in params: if isinstance(param, str): formatted_sql = formatted_sql.replace("?", f"'{param}'", 1) else: formatted_sql = formatted_sql.replace("?", str(param), 1) return formatted_sql else: return sql
[docs] def explain(self, verbose: bool = False) -> Any: """ Execute EXPLAIN statement for this query. Shows the query execution plan without actually executing the query. Useful for understanding how MatrixOne will execute the query and optimizing query performance. Args:: verbose (bool): Whether to include verbose output. Defaults to False. Returns:: Any: The result set containing the execution plan. Examples:: # Basic EXPLAIN plan = client.query(User).filter(User.age > 25).explain() # EXPLAIN with verbose output plan = client.query(User).filter(User.age > 25).explain(verbose=True) # EXPLAIN for complex queries plan = (client.query(User) .filter(User.department == 'Engineering') .order_by(User.salary.desc()) .explain(verbose=True)) Notes: - EXPLAIN shows the execution plan without executing the query - Use verbose=True for more detailed information - Helpful for query optimization and performance tuning """ sql, params = self._build_sql() # Build EXPLAIN statement if verbose: explain_sql = f"EXPLAIN VERBOSE {sql}" else: explain_sql = f"EXPLAIN {sql}" return self._execute(explain_sql, params)
[docs] def explain_analyze(self, verbose: bool = False) -> Any: """ Execute EXPLAIN ANALYZE statement for this query. Shows the query execution plan and actually executes the query, providing both the plan and actual execution statistics. Useful for understanding query performance with real data. Args:: verbose (bool): Whether to include verbose output. Defaults to False. Returns:: Any: The result set containing the execution plan and statistics. Examples:: # Basic EXPLAIN ANALYZE result = client.query(User).filter(User.age > 25).explain_analyze() # EXPLAIN ANALYZE with verbose output result = client.query(User).filter(User.age > 25).explain_analyze(verbose=True) # EXPLAIN ANALYZE for complex queries result = (client.query(User) .filter(User.department == 'Engineering') .order_by(User.salary.desc()) .explain_analyze(verbose=True)) Notes: - EXPLAIN ANALYZE actually executes the query and shows statistics - Use verbose=True for more detailed information - Provides actual execution time and row counts - Use with caution on large datasets as it executes the full query """ sql, params = self._build_sql() # Build EXPLAIN ANALYZE statement if verbose: explain_sql = f"EXPLAIN ANALYZE VERBOSE {sql}" else: explain_sql = f"EXPLAIN ANALYZE {sql}" return self._execute(explain_sql, params)
def _get_export_manager(self): """Get the appropriate export manager for current context (sync/async, transaction/non-transaction)""" from .export import ExportManager return ExportManager(self.client if not self.transaction_wrapper else self.transaction_wrapper) def _parse_export_format(self, format: str): """Parse format string to ExportFormat enum""" from .export import ExportFormat return ExportFormat.CSV if format.lower() == 'csv' else ExportFormat.JSONLINE
[docs] def export_to_file( self, filepath: str, format: str = 'csv', fields_terminated_by: Optional[str] = None, fields_enclosed_by: Optional[str] = None, lines_terminated_by: Optional[str] = None, header: bool = False, max_file_size: Optional[int] = None, force_quote: Optional[list] = None, ) -> Any: """ Export query results to a file using SELECT ... INTO OUTFILE. This is a convenience method that builds the SELECT query and exports it to a file on the MatrixOne server's filesystem. Args: filepath: Absolute path on server filesystem where file will be created format: Export file format ('csv' or 'jsonline') fields_terminated_by: Field delimiter (default: ',' for CSV) fields_enclosed_by: Field enclosure character (default: '"' for CSV) lines_terminated_by: Line terminator (default: '\\n') header: Whether to include column headers (default: False) max_file_size: Maximum size per file in bytes (for splitting large exports) force_quote: List of column names/indices to always quote Returns: ResultSet with export operation results Examples: >>> # Export filtered users to CSV with headers >>> client.query(User).filter(User.age > 25).export_to_file( ... filepath="/tmp/users_over_25.csv", ... format='csv', ... header=True ... ) >>> # Export aggregated data >>> from sqlalchemy import func >>> client.query(Order, func.sum(Order.amount).label('total')) \\ ... .group_by(Order.customer_id) \\ ... .export_to_file("/tmp/customer_totals.csv", format='csv', header=True) Note: - The filepath must be on the MatrixOne server's filesystem - The MatrixOne server process must have write permissions - For more control, use client.export.to_file() with the query's SQL """ # Build the SELECT query SQL sql, params = self._build_sql() # Use the export manager to export export_manager = self._get_export_manager() export_format = self._parse_export_format(format) return export_manager.to_file( query=sql, filepath=filepath, format=export_format, fields_terminated_by=fields_terminated_by, fields_enclosed_by=fields_enclosed_by, lines_terminated_by=lines_terminated_by, header=header, max_file_size=max_file_size, force_quote=force_quote, )
[docs] def export_to_stage( self, stage_name: str, filename: str, format: str = 'csv', fields_terminated_by: Optional[str] = None, fields_enclosed_by: Optional[str] = None, lines_terminated_by: Optional[str] = None, header: bool = False, max_file_size: Optional[int] = None, force_quote: Optional[list] = None, compression: Optional[str] = None, ) -> Any: """ Export query results to a stage using SELECT ... INTO STAGE (Recommended). This is the recommended way to export query results. It builds the SELECT query and exports it to an external stage (S3, local filesystem, etc.). Args: stage_name: Name of the target stage filename: Filename to create in the stage format: Export file format ('csv' or 'jsonline') fields_terminated_by: Field delimiter (default: ',' for CSV) fields_enclosed_by: Field enclosure character (default: '"' for CSV) lines_terminated_by: Line terminator (default: '\\n') header: Whether to include column headers (default: False) max_file_size: Maximum size per file in bytes (for splitting large exports) force_quote: List of column names/indices to always quote compression: Compression format (e.g., 'gzip', 'bzip2') Returns: ResultSet with export operation results Examples: >>> # Export users to S3 stage with compression >>> client.query(User).filter(User.active == True).export_to_stage( ... stage_name='s3_backup', ... filename='active_users.csv', ... format='csv', ... header=True, ... compression='gzip' ... ) >>> # Export aggregated sales data >>> from sqlalchemy import func >>> client.query(Sale) \\ ... .select(Sale.product_id, func.sum(Sale.quantity).label('total_sold')) \\ ... .group_by(Sale.product_id) \\ ... .having(func.sum(Sale.quantity) > 100) \\ ... .export_to_stage('data_warehouse', 'sales_summary.csv', header=True) >>> # Export to JSONLINE format >>> client.query(Event).filter(Event.timestamp > '2025-01-01').export_to_stage( ... stage_name='event_archive', ... filename='events_2025.jsonl', ... format='jsonline' ... ) Note: - The stage must exist before exporting - The stage must have appropriate write permissions - Use compression for large exports to save storage and transfer costs - This is more flexible than export_to_file() as it supports cloud storage """ # Build the SELECT query SQL sql, params = self._build_sql() # Use the export manager to export export_manager = self._get_export_manager() export_format = self._parse_export_format(format) return export_manager.to_stage( query=sql, stage_name=stage_name, filename=filename, format=export_format, fields_terminated_by=fields_terminated_by, fields_enclosed_by=fields_enclosed_by, lines_terminated_by=lines_terminated_by, header=header, max_file_size=max_file_size, force_quote=force_quote, compression=compression, )
[docs] def export_to(self, stage_or_filepath: str, filename: Optional[str] = None, **kwargs) -> Any: """ Smart export method that automatically chooses between file or stage export. This is a convenience method that determines whether to export to a file or stage based on the arguments provided. If filename is provided, it exports to a stage. Otherwise, it exports to a file. Args: stage_or_filepath: Stage name (if filename provided) or file path (if filename not provided) filename: Optional filename for stage export. If provided, exports to stage. **kwargs: Additional export options (format, header, compression, etc.) Returns: ResultSet with export operation results Examples: >>> # Export to stage (recommended) >>> client.query(User).filter(User.age > 25).export_to( ... 'my_stage', ... 'users_over_25.csv', ... format='csv', ... header=True ... ) >>> # Export to file >>> client.query(Order).filter(Order.status == 'completed').export_to( ... '/tmp/completed_orders.csv', ... format='csv', ... header=True ... ) >>> # Export with compression >>> client.query(Event).export_to( ... 'event_archive', ... 'events_2025.csv', ... format='csv', ... header=True, ... compression='gzip' ... ) Note: - If filename is provided, exports to stage (more flexible, supports cloud storage) - If filename is not provided, exports to local file on server - For better clarity, consider using export_to_stage() or export_to_file() directly """ if filename: # Export to stage return self.export_to_stage(stage_or_filepath, filename, **kwargs) else: # Export to file return self.export_to_file(stage_or_filepath, **kwargs)
# Helper functions for ORDER BY def desc(column: str) -> str: """Create descending order clause""" return f"{column} DESC" def asc(column: str) -> str: """Create ascending order clause""" return f"{column} ASC"
[docs] class LogicalIn: """ Helper class for creating IN conditions that can be used in filter() method. This class provides a way to create IN conditions with support for various value types including FulltextFilter objects, lists, and SQLAlchemy expressions. Usage # List of values query.filter(logical_in("city", ["北京", "上海", "广州"])) query.filter(logical_in(User.id, [1, 2, 3, 4])) # FulltextFilter query.filter(logical_in("id", boolean_match("title", "content").must("python"))) # Subquery subquery = client.query(User).select(User.id).filter(User.active == True) query.filter(logical_in("author_id", subquery)) """
[docs] def __init__(self, column, values): self.column = column self.values = values
[docs] def compile(self, compile_kwargs=None): """Compile to SQL expression for use in filter() method""" # Handle column name if hasattr(self.column, "name"): column_name = self.column.name else: column_name = str(self.column) # Handle different types of values if hasattr(self.values, "compile") and hasattr(self.values, "columns"): # This is a FulltextFilter object if hasattr(self.values, "compile") and hasattr(self.values, "query_builder"): # Convert FulltextFilter to subquery fulltext_sql = self.values.compile() # Create a subquery that selects IDs from the fulltext search # We need to determine the table name from the context # For now, use a generic approach that works with most cases table_name = "table" # Default table name - will be handled by the query context subquery_sql = f"SELECT id FROM {table_name} WHERE {fulltext_sql}" return f"{column_name} IN ({subquery_sql})" else: # Handle other SQLAlchemy expressions compiled = self.values.compile(compile_kwargs={"literal_binds": True}) sql_str = str(compiled) # Fix SQLAlchemy's quoted column names for MatrixOne compatibility import re sql_str = re.sub(r"(\w+)\('([^']+)'\)", r"\1(\2)", sql_str) sql_str = re.sub(r"\b([a-zA-Z_]\w*)\.([a-zA-Z_]\w*)\b", r"\2", sql_str) return f"{column_name} IN ({sql_str})" elif hasattr(self.values, "_build_sql"): # This is a subquery object subquery_sql, subquery_params = self.values._build_sql() return f"{column_name} IN ({subquery_sql})" elif isinstance(self.values, (list, tuple)): # Handle list of values if not self.values: # Empty list means no matches return "1=0" # Always false else: # Create SQL with actual values for LogicalIn formatted_values = [] for value in self.values: if isinstance(value, str): formatted_values.append(f"'{value}'") else: formatted_values.append(str(value)) return f"{column_name} IN ({','.join(formatted_values)})" else: # Single value if isinstance(self.values, str): return f"{column_name} IN ('{self.values}')" else: return f"{column_name} IN ({self.values})"
[docs] def logical_in(column, values): """ Create a logical IN condition for use in filter() method. This function provides enhanced IN functionality that can handle various types of values and expressions, making it more flexible than standard SQLAlchemy IN operations. It automatically generates appropriate SQL based on the input type. Key Features: - Support for lists of values with automatic SQL generation - Integration with SQLAlchemy expressions - FulltextFilter support for complex search conditions - Subquery support for dynamic value sets - Automatic parameter binding and SQL injection prevention Args:: column: Column to check against. Can be: - String column name: "user_id" - SQLAlchemy column: User.id - Column expression: func.upper(User.name) values: Values to check against. Can be: - List of values: [1, 2, 3, "a", "b"] - Single value: 42 or "test" - SQLAlchemy expression: func.count(User.id) - FulltextFilter object: boolean_match("title", "content").must("python") - Subquery object: client.query(User).select(User.id) - None: Creates "column IN (NULL)" condition Returns:: LogicalIn: A logical IN condition object that can be used in filter(). The object automatically compiles to appropriate SQL when used. Examples # List of values - generates: WHERE city IN ('北京', '上海', '广州') query.filter(logical_in("city", ["北京", "上海", "广州"])) query.filter(logical_in(User.id, [1, 2, 3, 4])) # Single value - generates: WHERE id IN (42) query.filter(logical_in("id", 42)) # SQLAlchemy expression - generates: WHERE id IN (SELECT COUNT(*) FROM users) query.filter(logical_in("id", func.count(User.id))) # FulltextFilter - generates: WHERE id IN (SELECT id FROM table WHERE MATCH(...)) query.filter(logical_in(User.id, boolean_match("title", "content").must("python"))) # Subquery - generates: WHERE user_id IN (SELECT id FROM active_users) active_user_ids = client.query(User).select(User.id).filter(User.active == True) query.filter(logical_in("user_id", active_user_ids)) # NULL value - generates: WHERE id IN (NULL) query.filter(logical_in("id", None)) Note: This function is designed to work seamlessly with MatrixOne's query builder and provides better integration than standard SQLAlchemy IN operations. """ return LogicalIn(column, values)