Client
MatrixOne Client - Basic implementation
- class matrixone.client.ClientExecutor(client)[source]
Bases:
BaseMatrixOneExecutorClient executor that uses Client’s execute method
- __init__(client)[source]
- class matrixone.client.Client(host: str | None = None, port: int | None = None, user: str | None = None, password: str | None = None, database: str | None = None, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, pool_size: int = 10, max_overflow: int = 20, pool_timeout: int = 30, pool_recycle: int = 3600, connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]
Bases:
BaseMatrixOneClientMatrixOne Client - High-level interface for MatrixOne database operations.
This class provides a comprehensive interface for connecting to and interacting with MatrixOne databases. It supports modern API patterns including table creation, data insertion, querying, vector operations, and transaction management.
Key Features:
High-level table operations (create_table, drop_table, insert, batch_insert)
Query builder interface for complex queries
Vector operations (similarity search, range search, indexing)
Transaction management with context managers
Snapshot and restore operations
Account and user management
Fulltext search capabilities
Connection pooling and SSL support
Examples:
from matrixone import Client # Basic usage client = Client( host='localhost', port=6001, user='root', password='111', database='test' ) # Create table client.create_table("users", { "id": "int primary key", "name": "varchar(100)", "email": "varchar(255)" }) # Insert and query data client.insert("users", {"id": 1, "name": "John", "email": "john@example.com"}) result = client.query("users").where("id = ?", 1).all() # Vector operations client.create_table("documents", { "id": "int primary key", "content": "text", "embedding": "vecf32(384)" }) results = client.vector_ops.similarity_search( "documents", vector_column="embedding", query_vector=[0.1, 0.2, 0.3, ...], limit=10 ) # Transaction with client.transaction() as tx: tx.execute("INSERT INTO users (name) VALUES ('John')")
Attributes:
engine (Engine): SQLAlchemy engine instance connected (bool): Connection status backend_version (str): Detected backend version vector_ops (VectorManager): Vector operations manager snapshots (SnapshotManager): Snapshot operations manager query (QueryBuilder): Query builder for complex queries
- __init__(host: str | None = None, port: int | None = None, user: str | None = None, password: str | None = None, database: str | None = None, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, pool_size: int = 10, max_overflow: int = 20, pool_timeout: int = 30, pool_recycle: int = 3600, connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]
Initialize MatrixOne client
Args:
host: Database host (optional, can be set later via connect) port: Database port (optional, can be set later via connect) user: Username (optional, can be set later via connect) password: Password (optional, can be set later via connect) database: Database name (optional, can be set later via connect) ssl_mode: SSL mode (disabled, preferred, required) ssl_ca: SSL CA certificate path ssl_cert: SSL client certificate path ssl_key: SSL client key path account: Optional account name role: Optional role name pool_size: Connection pool size max_overflow: Maximum overflow connections pool_timeout: Pool timeout in seconds pool_recycle: Connection recycle time in seconds connection_timeout: Connection timeout in seconds query_timeout: Query timeout in seconds auto_commit: Enable auto-commit mode charset: Character set for connection logger: Custom logger instance. If None, creates a default logger sql_log_mode: SQL logging mode ('off', 'auto', 'simple', 'full') - 'off': No SQL logging - 'auto': Smart logging - short SQL shown fully, long SQL summarized (default) - 'simple': Show operation summary only - 'full': Show complete SQL regardless of length slow_query_threshold: Threshold in seconds for slow query warnings (default: 1.0) max_sql_display_length: Maximum SQL length in auto mode before summarizing (default: 500)
- connect(*, host: str = 'localhost', port: int = 6001, user: str = 'root', password: str = '111', database: str, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, charset: str = 'utf8mb4', connection_timeout: int = 30, auto_commit: bool = True, on_connect: ConnectionHook | List[ConnectionAction | str] | Callable | None = None) None[source]
Connect to MatrixOne database using SQLAlchemy engine
Args:
host: Database host port: Database port user: Username or login info in format "user", "account#user", "account#user#role", "account:user", or "account:user:role" (both '#' and ':' separators are supported) password: Password database: Database name ssl_mode: SSL mode (disabled, preferred, required) ssl_ca: SSL CA certificate path ssl_cert: SSL client certificate path ssl_key: SSL client key path account: Optional account name (will be combined with user if user doesn't contain '#' or ':') role: Optional role name (will be combined with user if user doesn't contain '#' or ':') charset: Character set for the connection (default: utf8mb4) connection_timeout: Connection timeout in seconds (default: 30) auto_commit: Enable autocommit (default: True) on_connect: Connection hook to execute after successful connection. Can be: - ConnectionHook instance - List of ConnectionAction or string action names - Custom callback function
Examples:
# Enable all features after connection client.connect(host, port, user, password, database, on_connect=[ConnectionAction.ENABLE_ALL]) # Enable only vector operations with custom charset client.connect(host, port, user, password, database, charset="utf8mb4", on_connect=[ConnectionAction.ENABLE_VECTOR]) # Custom callback def my_callback(client): print(f"Connected to {client._connection_params['host']}") client.connect(host, port, user, password, database, on_connect=my_callback)
- classmethod from_engine(engine: Engine, **kwargs) Client[source]
Create Client instance from existing SQLAlchemy Engine
Args:
engine: SQLAlchemy Engine instance (must use MySQL driver) **kwargs: Additional client configuration options
Returns:
Client: Configured client instance
Raises:
ConnectionError: If engine doesn't use MySQL driver
Examples
Basic usage:
from sqlalchemy import create_engine from matrixone import Client engine = create_engine("mysql+pymysql://user:pass@host:port/db") client = Client.from_engine(engine) With custom configuration:: engine = create_engine("mysql+pymysql://user:pass@host:port/db") client = Client.from_engine( engine, sql_log_mode='auto', slow_query_threshold=0.5 )
- disconnect() None[source]
Disconnect from MatrixOne database and dispose engine.
This method properly closes all database connections and disposes of the SQLAlchemy engine. It should be called when the client is no longer needed to free up resources.
After calling this method, the client will need to be reconnected using the connect() method before any database operations can be performed.
Raises:
Exception: If disconnection fails (logged but re-raised)
Example
>>> client = Client('localhost', 6001, 'root', '111', 'test') >>> client.connect() >>> # ... perform database operations ... >>> client.disconnect() # Clean up resources
- get_login_info() dict | None[source]
Get parsed login information used for database connection.
Returns the login information dictionary that was used to establish the database connection. This includes user, account, role, and other authentication details.
Returns:
Optional[dict]: Dictionary containing login information with keys: - user: Username - account: Account name (if specified) - role: Role name (if specified) - host: Database host - port: Database port - database: Database name Returns None if not connected or no login info available.
Example
>>> client = Client('localhost', 6001, 'root', '111', 'test') >>> client.connect() >>> login_info = client.get_login_info() >>> print(f"Connected as {login_info['user']} to {login_info['database']}")
- execute(sql_or_stmt, params: Tuple | None = None, log_mode: str | None = None) ResultSet[source]
Execute SQL query or SQLAlchemy statement without transaction isolation.
This method executes queries directly using the connection pool, without wrapping them in a transaction. Each statement executes independently with auto-commit enabled. For atomic multi-statement operations, use client.session() instead.
The method supports both SQLAlchemy ORM-style statements (recommended) and string SQL with parameter binding. It’s ideal for single-statement operations like SELECT queries, simple INSERT/UPDATE/DELETE, or DDL statements.
Key Features:
ORM-style statements: Full support for SQLAlchemy select(), insert(), update(), delete()
Parameter binding: Automatic escaping of parameters to prevent SQL injection
Query logging: Integrated logging with performance tracking
Auto-commit: Each statement commits immediately (no transaction isolation)
Connection pooling: Efficient connection reuse from pool
Type safety: ResultSet with structured access to query results
- Parameters:
sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - SQLAlchemy select() statement (recommended) - SQLAlchemy insert() statement (recommended) - SQLAlchemy update() statement (recommended) - SQLAlchemy delete() statement (recommended) - String SQL with ‘?’ placeholders for parameters - SQLAlchemy text() statement
params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Automatically escaped to prevent SQL injection. Ignored for SQLAlchemy statements (use .values() or .where() with bound parameters instead).
log_mode (Optional[str]) – Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If None, uses client’s global sql_log_mode setting. Useful for debugging specific queries or disabling logs for frequently-executed statements.
- Returns:
- Query result object with:
columns: List[str] - Column names
rows: List[Tuple] - Row data as tuples
affected_rows: int - Number of rows affected by DML operations
fetchall() -> List[Row] - Get all rows as list
fetchone() -> Optional[Row] - Get next row or None
fetchmany(size) -> List[Row] - Get next N rows
- Return type:
- Raises:
ConnectionError – If not connected to database
QueryError – If query execution fails or SQL syntax is invalid
Usage Examples:
from matrixone import Client from sqlalchemy import select, insert, update, delete, and_, or_, func from sqlalchemy.orm import declarative_base Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) status = Column(String(20)) class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) user_id = Column(Integer) amount = Column(Float) client = Client(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # SQLAlchemy SELECT Statements (Recommended) # ======================================== # Basic SELECT with WHERE clause stmt = select(User).where(User.age > 25) result = client.execute(stmt) for user in result.fetchall(): print(f"User: {user.name}, Age: {user.age}") # SELECT specific columns stmt = select(User.name, User.email).where(User.status == 'active') result = client.execute(stmt) for name, email in result.fetchall(): print(f"{name}: {email}") # Complex WHERE with AND/OR stmt = select(User).where( and_( User.age > 18, or_( User.status == 'active', User.status == 'pending' ) ) ) result = client.execute(stmt) # SELECT with JOIN stmt = select(User, Order).join(Order, User.id == Order.user_id) result = client.execute(stmt) for user, order in result.fetchall(): print(f"{user.name} ordered ${order.amount}") # SELECT with aggregation stmt = select(func.count(User.id), func.avg(User.age)).where(User.status == 'active') result = client.execute(stmt) count, avg_age = result.fetchone() print(f"Active users: {count}, Average age: {avg_age}") # SELECT with ORDER BY and LIMIT stmt = select(User).where(User.age > 25).order_by(User.age.desc()).limit(10) result = client.execute(stmt) # ======================================== # SQLAlchemy INSERT Statements (Recommended) # ======================================== # Single INSERT stmt = insert(User).values(name='John', email='john@example.com', age=30) result = client.execute(stmt) print(f"Inserted {result.affected_rows} rows") # Bulk INSERT stmt = insert(User).values([ {'name': 'Alice', 'email': 'alice@example.com', 'age': 28}, {'name': 'Bob', 'email': 'bob@example.com', 'age': 35}, {'name': 'Carol', 'email': 'carol@example.com', 'age': 42} ]) result = client.execute(stmt) print(f"Inserted {result.affected_rows} rows") # ======================================== # SQLAlchemy UPDATE Statements (Recommended) # ======================================== # Simple UPDATE stmt = update(User).where(User.id == 1).values(email='newemail@example.com') result = client.execute(stmt) print(f"Updated {result.affected_rows} rows") # Conditional UPDATE stmt = update(User).where(User.age < 18).values(status='minor') result = client.execute(stmt) # UPDATE with expressions stmt = update(Order).values(total=Order.quantity * Order.price) result = client.execute(stmt) # UPDATE multiple columns stmt = update(User).where(User.id == 1).values( name='Updated Name', email='updated@example.com', status='active' ) result = client.execute(stmt) # ======================================== # SQLAlchemy DELETE Statements (Recommended) # ======================================== # Simple DELETE stmt = delete(User).where(User.id == 1) result = client.execute(stmt) print(f"Deleted {result.affected_rows} rows") # Conditional DELETE stmt = delete(User).where(User.status == 'deleted') result = client.execute(stmt) # DELETE with complex condition stmt = delete(User).where( and_( User.age < 18, User.status == 'inactive' ) ) result = client.execute(stmt) # ======================================== # String SQL with Parameters (Alternative) # ======================================== # SELECT with parameters result = client.execute( "SELECT * FROM users WHERE age > ? AND status = ?", (25, 'active') ) # INSERT with parameters result = client.execute( "INSERT INTO users (name, email, age) VALUES (?, ?, ?)", ('David', 'david@example.com', 28) ) # UPDATE with parameters result = client.execute( "UPDATE users SET status = ? WHERE age < ?", ('minor', 18) ) # DELETE with parameters result = client.execute( "DELETE FROM users WHERE status = ?", ('inactive',) ) # ======================================== # Query Logging Control # ======================================== # Disable logging for frequently executed query result = client.execute( select(User).where(User.id == 1), log_mode='off' ) # Force full SQL logging for debugging result = client.execute( select(User).where(User.name.like('%test%')), log_mode='full' ) # Simple logging (operation type only) result = client.execute( update(User).values(status='processed'), log_mode='simple' )
Important Notes:
No transaction isolation: Each execute() call commits immediately
Use session() for transactions: For atomic multi-statement operations
ORM-style preferred: Use SQLAlchemy statements for better type safety
Auto-commit behavior: Changes are permanent immediately after execute()
Thread-safe: Uses connection pooling for concurrent access
Best Practices:
Prefer ORM-style statements: Use select(), insert(), update(), delete()
Use parameters: Always use parameter binding to prevent SQL injection
Session for transactions: Use client.session() for atomic operations
Disable logging in production: Use log_mode=’off’ for hot paths
Handle exceptions: Wrap execute() in try-except for error handling
See also
Client.session(): For transaction-aware operations
Session.execute(): Execute within transaction context
AsyncClient.execute(): Async version for async/await workflows
- insert(table_name_or_model, data: dict) ResultSet[source]
Insert a single row of data into a table.
This method provides a convenient way to insert data using a dictionary where keys are column names and values are the data to insert. The method automatically handles SQL generation and parameter binding.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class data (dict): Dictionary mapping column names to values. Example: {'name': 'John', 'age': 30, 'email': 'john@example.com'}
Returns:
ResultSet: Object containing insertion results with: - affected_rows: Number of rows inserted (should be 1) - columns: Empty list (no columns returned for INSERT) - rows: Empty list (no rows returned for INSERT)
Raises:
ConnectionError: If not connected to database QueryError: If insertion fails
Examples
- # Insert a single user using table name
>>> result = client.insert('users', { ... 'name': 'John Doe', ... 'age': 30, ... 'email': 'john@example.com' ... }) >>> print(f"Inserted {result.affected_rows} row")
# Insert using model class >>> from sqlalchemy import Column, Integer, String >>> from matrixone.orm import declarative_base >>> Base = declarative_base() >>> class User(Base): … __tablename__ = ‘users’ … id = Column(Integer, primary_key=True) … name = Column(String(50)) … age = Column(Integer) >>> result = client.insert(User, { … ‘name’: ‘Jane Doe’, … ‘age’: 25 … })
# Insert with NULL values >>> result = client.insert(‘products’, { … ‘name’: ‘Product A’, … ‘price’: 99.99, … ‘description’: None # NULL value … })
- batch_insert(table_name_or_model, data_list: list) ResultSet[source]
Batch insert multiple rows of data into a table.
This method efficiently inserts multiple rows in a single operation, which is much faster than calling insert() multiple times. All rows must have the same column structure.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class data_list (list): List of dictionaries, where each dictionary represents a row to insert. All dictionaries must have the same keys. Example: [ {'name': 'John', 'age': 30}, {'name': 'Jane', 'age': 25}, {'name': 'Bob', 'age': 35} ]
Returns:
ResultSet: Object containing insertion results with: - affected_rows: Number of rows inserted - columns: Empty list (no columns returned for INSERT) - rows: Empty list (no rows returned for INSERT)
Raises:
ConnectionError: If not connected to database QueryError: If batch insertion fails ValueError: If data_list is empty or has inconsistent column structure
Examples
- # Insert multiple users
>>> users = [ ... {'name': 'John Doe', 'age': 30, 'email': 'john@example.com'}, ... {'name': 'Jane Smith', 'age': 25, 'email': 'jane@example.com'}, ... {'name': 'Bob Johnson', 'age': 35, 'email': 'bob@example.com'} ... ] >>> result = client.batch_insert('users', users) >>> print(f"Inserted {result.affected_rows} rows")
# Insert with some NULL values >>> products = [ … {‘name’: ‘Product A’, ‘price’: 99.99, ‘description’: ‘Great product’}, … {‘name’: ‘Product B’, ‘price’: 149.99, ‘description’: None} … ] >>> result = client.batch_insert(‘products’, products)
- query(*columns, snapshot: str | None = None)[source]
Get MatrixOne query builder - SQLAlchemy style
Args:
*columns: Can be: - Single model class: query(Article) - returns all columns from model - Multiple columns: query(Article.id, Article.title) - returns specific columns - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns snapshot: Optional snapshot name for snapshot queries
Examples
- # Traditional model query (all columns)
client.query(Article).filter(…).all()
# Column-specific query client.query(Article.id, Article.title).filter(…).all()
# With fulltext score client.query(Article.id, boolean_match(“title”, “content”).must(“python”).label(“score”))
# Snapshot query client.query(Article, snapshot=”my_snapshot”).filter(…).all()
Returns:
MatrixOneQuery instance configured for the specified columns
- snapshot(snapshot_name: str) Generator[SnapshotClient, None, None][source]
Snapshot context manager
Usage
with client.snapshot(“daily_backup”) as snapshot_client: result = snapshot_client.execute(“SELECT * FROM users”)
- session() Generator[Session, None, None][source]
Create a transaction-aware session for atomic database operations.
This method returns a MatrixOne Session that extends SQLAlchemy Session with MatrixOne-specific features. All operations within the session are executed atomically - they either all succeed or all fail together.
The session is a context manager that automatically handles transaction lifecycle: - Commits the transaction when the context exits normally - Rolls back the transaction if any exception occurs - Cleans up database resources automatically
Key Features:
Full SQLAlchemy ORM: All standard SQLAlchemy Session methods (add, delete, query, etc.)
Atomic transactions: Multiple operations commit or rollback together
MatrixOne managers: Access to snapshots, clones, vector ops, etc.
ORM-style operations: Use SQLAlchemy select(), insert(), update(), delete()
Automatic cleanup: Transaction and connection resources managed automatically
Query logging: Integrated query logging with performance tracking
Available Managers (transaction-aware):
session.snapshots: SnapshotManager for creating/managing snapshots
session.clone: CloneManager for cloning databases and tables
session.restore: RestoreManager for restoring from snapshots
session.pitr: PitrManager for point-in-time recovery
session.pubsub: PubSubManager for publish-subscribe operations
session.account: AccountManager for account/user management
session.vector_ops: VectorManager for vector operations and indexing
session.fulltext_index: FulltextIndexManager for fulltext search
session.metadata: MetadataManager for table metadata analysis
session.load_data: LoadDataManager for bulk data loading
session.stage: StageManager for external stage management
- Returns:
Context manager that yields a MatrixOne Session
- Return type:
Generator[Session, None, None]
- Raises:
ConnectionError – If client is not connected to database
Usage Examples:
from matrixone import Client from sqlalchemy import select, insert, update, delete from sqlalchemy.orm import declarative_base Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) client = Client(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # Example 1: Basic Transaction with ORM-style SQL # ======================================== with client.session() as session: # Insert using SQLAlchemy insert() session.execute(insert(User).values(name='John', email='john@example.com', age=30)) # Update using SQLAlchemy update() session.execute(update(User).where(User.age < 18).values(status='minor')) # Select using SQLAlchemy select() stmt = select(User).where(User.age > 25) result = session.execute(stmt) for user in result.scalars(): print(f"User: {user.name}") # Delete using SQLAlchemy delete() session.execute(delete(User).where(User.status == 'inactive')) # All operations commit atomically # ======================================== # Example 2: SQLAlchemy ORM Operations # ======================================== with client.session() as session: # Create new objects user1 = User(name='Alice', email='alice@example.com', age=28) user2 = User(name='Bob', email='bob@example.com', age=35) # Add to session session.add(user1) session.add(user2) # Query using ORM stmt = select(User).where(User.name == 'Alice') result = session.execute(stmt) alice = result.scalar_one() # Update object alice.email = 'newemail@example.com' # Commit (or let context manager do it) session.commit() # ======================================== # Example 3: MatrixOne Snapshot Operations # ======================================== with client.session() as session: # Create snapshot within transaction from matrixone import SnapshotLevel snapshot = session.snapshots.create( name='daily_backup', level=SnapshotLevel.DATABASE, database='production' ) # Clone database using snapshot session.clone.clone_database( target_db='prod_copy', source_db='production', snapshot_name='daily_backup' ) # Both operations commit atomically # ======================================== # Example 4: Bulk Data Loading with Stages # ======================================== with client.session() as session: # Create stage using simple interface session.stage.create_local('import_stage', '/data/imports/') # Load data from stage using ORM model session.load_data.from_stage_csv('import_stage', 'users.csv', User) # Update statistics after loading session.execute("ANALYZE TABLE users") # All operations are atomic # ======================================== # Example 5: Error Handling and Rollback # ======================================== try: with client.session() as session: session.execute(insert(User).values(name='Charlie', age=40)) session.execute(insert(InvalidTable).values(data='test')) # This fails # Transaction automatically rolls back - Charlie is NOT inserted except Exception as e: print(f"Transaction failed and rolled back: {e}") # ======================================== # Example 6: Manual Transaction Control # ======================================== with client.session() as session: try: session.execute(insert(User).values(name='David', age=25)) session.execute(update(Account).values(balance=Account.balance - 100)) # Verify before committing stmt = select(Account.balance) balance = session.execute(stmt).scalar() if balance >= 0: session.commit() # Explicit commit else: session.rollback() # Explicit rollback print("Insufficient balance") except Exception as e: session.rollback() raise # ======================================== # Example 7: Complex Multi-Manager Transaction # ======================================== with client.session() as session: # Create publication session.pubsub.create_database_publication( name='analytics_pub', database='analytics', account='subscriber_account' ) # Create S3 stage for exports session.stage.create_s3( name='export_stage', bucket='my-bucket', path='exports/', aws_key_id='key', aws_secret_key='secret' ) # Load fresh data using ORM model session.load_data.from_csv('/data/latest.csv', Analytics) # Create snapshot after load session.snapshots.create( name='post_load_snapshot', level=SnapshotLevel.DATABASE, database='analytics' ) # All operations commit together
Best Practices:
Always use context manager: Use with client.session() for automatic cleanup
Keep transactions short: Long transactions can block other operations
Handle exceptions: Wrap session code in try-except for error handling
Use ORM-style SQL: Prefer SQLAlchemy insert(), update(), select(), delete()
Avoid nested sessions: SQLAlchemy doesn’t support true nested transactions
Test rollback behavior: Ensure your application handles rollbacks correctly
See also
Session: The session class returned by this method
AsyncClient.session(): Async version for async/await workflows
Client.execute(): Non-transactional query execution
- property snapshots: SnapshotManager | None
Get snapshot manager
- property clone: CloneManager | None
Get clone manager
- property branch: BranchManager | None
Get branch manager for Git-style version control operations.
Provides table and database branching, diffing, and merging capabilities. Requires MatrixOne 3.0.5 or higher.
- Returns:
BranchManager instance for branch operations
Example:
client = Client() client.connect(database='test') # Create table branch client.branch.create_table_branch('users_branch', 'users') # Create database branch client.branch.create_database_branch('dev_db', 'production') # Compare branches diffs = client.branch.diff_table('users_branch', 'users') # Merge branches client.branch.merge_table('users_branch', 'users') # Delete branches client.branch.delete_table_branch('users_branch') client.branch.delete_database_branch('dev_db')
See also
branch_guide - Complete branch management guide
- property moctl: MoCtlManager | None
Get mo_ctl manager
- property restore: RestoreManager | None
Get restore manager
- property pitr: PitrManager | None
Get PITR manager
- property pubsub: PubSubManager | None
Get publish-subscribe manager
- property account: AccountManager | None
Get account manager
- property load_data: LoadDataManager | None
Get load data manager
- property stage: StageManager | None
Get stage manager for external stage operations
- property cdc: CDCManager | None
Get CDC manager for change data capture operations.
- property export
Get export manager for data export operations (INTO OUTFILE, INTO STAGE)
- property vector_ops: VectorManager | None
Get unified vector operations manager for vector operations (index and data)
- get_pinecone_index(table_name_or_model, vector_column: str)[source]
Get a PineconeCompatibleIndex object for vector search operations.
This method creates a Pinecone-compatible vector search interface that automatically parses the table schema and vector index configuration. The primary key column is automatically detected, and all other columns except the vector column will be included as metadata.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class vector_column: Name of the vector column
Returns:
PineconeCompatibleIndex object with Pinecone-compatible API
Example
>>> index = client.get_pinecone_index("documents", "embedding") >>> index = client.get_pinecone_index(DocumentModel, "embedding") >>> results = index.query([0.1, 0.2, 0.3], top_k=5) >>> for match in results.matches: ... print(f"ID: {match.id}, Score: {match.score}") ... print(f"Metadata: {match.metadata}")
- property fulltext_index: FulltextIndexManager | None
Get fulltext index manager for fulltext index operations
- property metadata: MetadataManager | None
Get metadata manager for table metadata operations
- version() str[source]
Get MatrixOne server version
Returns:
str: MatrixOne server version string
Raises:
ConnectionError: If not connected to MatrixOne QueryError: If version query fails
Example
>>> client = Client('localhost', 6001, 'root', '111', 'test') >>> version = client.version() >>> print(f"MatrixOne version: {version}")
- git_version() str[source]
Get MatrixOne git version information
Returns:
str: MatrixOne git version string
Raises:
ConnectionError: If not connected to MatrixOne QueryError: If git version query fails
Example
>>> client = Client('localhost', 6001, 'root', '111', 'test') >>> git_version = client.git_version() >>> print(f"MatrixOne git version: {git_version}")
- set_backend_version(version: str) None[source]
Manually set the backend version
Args:
version: Version string in format "major.minor.patch" (e.g., "3.0.1")
- get_backend_version() str | None[source]
Get current backend version
Returns:
Version string or None if not set
- is_feature_available(feature_name: str) bool[source]
Check if a feature is available in current backend version
Args:
feature_name: Name of the feature to check
Returns:
True if feature is available, False otherwise
- get_feature_info(feature_name: str) Dict[str, Any] | None[source]
Get feature requirement information
Args:
feature_name: Name of the feature
Returns:
Feature information dictionary or None if not found
- check_version_compatibility(required_version: str, operator: str = '>=') bool[source]
Check if current backend version is compatible with required version
Args:
required_version: Required version string (e.g., "3.0.1") operator: Comparison operator (">=", ">", "<=", "<", "==", "!=")
Returns:
True if compatible, False otherwise
- get_version_hint(feature_name: str, error_context: str = '') str[source]
Get helpful hint message for version-related errors
Args:
feature_name: Name of the feature error_context: Additional context for the error
Returns:
Helpful hint message
- is_development_version() bool[source]
Check if current backend is a development version
Returns:
True if backend is development version (999.x.x), False otherwise
- create_table(table_name_or_model, columns: dict | None = None, **kwargs) Client[source]
Create a table with a simplified interface.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class columns: Dictionary mapping column names to their types (required if table_name_or_model is str) Supported formats: - 'id': 'bigint' (with primary_key=True if needed) - 'name': 'varchar(100)' - 'embedding': 'vecf32(128)' or 'vecf64(128)' - 'score': 'float' - 'created_at': 'datetime' - 'is_active': 'boolean' **kwargs: Additional table parameters
Returns:
Client: Self for chaining
Example
client.create_table(“users”, { ‘id’: ‘bigint’, ‘name’: ‘varchar(100)’, ‘email’: ‘varchar(255)’, ‘embedding’: ‘vecf32(128)’, ‘score’: ‘float’, ‘created_at’: ‘datetime’, ‘is_active’: ‘boolean’ }, primary_key=’id’)
- drop_table(table_name_or_model) Client[source]
Drop a table.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class
Returns:
Client: Self for chaining
Example
- # Drop table by name
client.drop_table(“users”)
# Drop table by model class client.drop_table(UserModel)
- create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) Client[source]
Create a table with vector indexes using a simplified interface.
Args:
table_name: Name of the table columns: Dictionary mapping column names to their types (same format as create_table) indexes: List of index definitions, each containing: - 'name': Index name - 'column': Column name to index - 'type': Index type ('ivfflat' or 'hnsw') - 'params': Dictionary of index-specific parameters **kwargs: Additional table parameters
Returns:
Client: Self for chaining
Example
client.create_table_with_index(“vector_docs”, { ‘id’: ‘bigint’, ‘title’: ‘varchar(200)’, ‘embedding’: ‘vector(128,f32)’ }, indexes=[ { ‘name’: ‘idx_hnsw’, ‘column’: ‘embedding’, ‘type’: ‘hnsw’, ‘params’: {‘m’: 48, ‘ef_construction’: 64, ‘ef_search’: 64} } ], primary_key=’id’)
- create_table_orm(table_name: str, *columns, **kwargs) Client[source]
Create a table using SQLAlchemy ORM-style column definitions. Similar to SQLAlchemy Table() constructor but without metadata.
Args:
table_name: Name of the table *columns: SQLAlchemy Column objects and Index objects (including VectorIndex) **kwargs: Additional parameters (like enable_hnsw, enable_ivf)
Returns:
Client: Self for chaining
Example:
from sqlalchemy import Column, BigInteger, Integer from matrixone.sqlalchemy_ext import Vectorf32, VectorIndex, VectorIndexType, VectorOpType client.create_table_orm( 'vector_docs_hnsw_demo', Column('a', BigInteger, primary_key=True), Column('b', Vectorf32(128)), Column('c', Integer), VectorIndex('idx_hnsw', 'b', index_type=VectorIndexType.HNSW, m=48, ef_construction=64, ef_search=64, op_type=VectorOpType.VECTOR_L2_OPS) )
- create_all(base_class=None)[source]
Create all tables defined in the given base class or default Base.
Args:
base_class: SQLAlchemy declarative base class. If None, uses the default Base.
- drop_all(base_class=None)[source]
Drop all tables defined in the given base class or default Base.
Args:
base_class: SQLAlchemy declarative base class. If None, uses the default Base.
- get_secondary_index_tables(table_name: str, database_name: str | None = None) List[str][source]
Get all secondary index table names for a given table.
This includes both regular secondary indexes (MULTIPLE type) and UNIQUE indexes.
- Parameters:
table_name – Name of the table to get secondary indexes for
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
List of secondary index table names (includes both __mo_index_secondary_… and __mo_index_unique_… tables)
Examples:
>>> client = Client() >>> client.connect(host='localhost', port=6001, user='root', password='111', database='test') >>> # Use current database >>> index_tables = client.get_secondary_index_tables('cms_all_content_chunk_info') >>> # Or specify database explicitly >>> index_tables = client.get_secondary_index_tables('cms_all_content_chunk_info', 'test') >>> print(index_tables) ['__mo_index_secondary_..._cms_id', '__mo_index_unique_..._email']
- get_secondary_index_table_by_name(table_name: str, index_name: str, database_name: str | None = None) str | None[source]
Get the physical table name of a secondary index by its index name.
- Parameters:
table_name – Name of the table
index_name – Name of the secondary index
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
Physical table name of the secondary index, or None if not found
Examples:
>>> client = Client() >>> client.connect(host='localhost', port=6001, user='root', password='111', database='test') >>> # Use current database >>> index_table = client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id') >>> # Or specify database explicitly >>> index_table = client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id', 'test') >>> print(index_table) '__mo_index_secondary_018cfbda-bde1-7c3e-805c-3f8e71769f75_cms_id'
- get_table_indexes_detail(table_name: str, database_name: str | None = None) List[dict][source]
Get detailed information about all indexes for a table, including IVF, HNSW, Fulltext, and regular indexes.
This method returns comprehensive information about each index physical table, including: - Index name - Index type (MULTIPLE, PRIMARY, UNIQUE, etc.) - Algorithm type (ivfflat, hnsw, fulltext, etc.) - Algorithm table type (metadata, centroids, entries, etc.) - Physical table name - Column names - Algorithm parameters
- Parameters:
table_name – Name of the table to get indexes for
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
index_name: Name of the index
index_type: Type of index (MULTIPLE, PRIMARY, UNIQUE, etc.)
algo: Algorithm type (ivfflat, hnsw, fulltext, or None for regular indexes)
algo_table_type: Algorithm table type (metadata, centroids, entries, etc., or None)
physical_table_name: Physical table name
columns: List of column names
algo_params: Algorithm parameters (or None)
- Return type:
List of dictionaries, each containing
Examples:
>>> client = Client() >>> client.connect(host='localhost', port=6001, user='root', password='111', database='test') >>> # Get all index details for a table >>> indexes = client.get_table_indexes_detail('ivf_health_demo_docs') >>> for idx in indexes: ... print(f"{idx['index_name']} ({idx['algo']}) - {idx['algo_table_type']}: {idx['physical_table_name']}") idx_embedding_ivf (ivfflat) - metadata: __mo_index_secondary_... idx_embedding_ivf (ivfflat) - centroids: __mo_index_secondary_... idx_embedding_ivf (ivfflat) - entries: __mo_index_secondary_...
- verify_table_index_counts(table_name: str) int[source]
Verify that the main table and all its secondary index tables have the same row count.
This method compares the COUNT(*) of the main table with all its secondary index tables in a single SQL query for consistency. If counts don’t match, raises an exception.
- Parameters:
table_name – Name of the table to verify
- Returns:
Row count (int) if verification succeeds
- Raises:
ValueError – If any secondary index table has a different count than the main table, with details about all counts in the error message
Examples:
>>> client = Client() >>> client.connect(host='localhost', port=6001, user='root', password='111', database='test') >>> count = client.verify_table_index_counts('cms_all_content_chunk_info') >>> print(f"✓ Verification passed, row count: {count}") >>> # If verification fails: >>> try: ... count = client.verify_table_index_counts('some_table') ... except ValueError as e: ... print(f"Verification failed: {e}")
- class matrixone.client.ResultSet(columns: List[str], rows: List[Tuple[Any, ...]], affected_rows: int = 0)[source]
Bases:
objectResult set wrapper for query results from MatrixOne database operations.
This class provides a convenient interface for accessing query results with methods similar to database cursor objects. It supports both SELECT queries (returning data) and DML operations (returning affected row counts).
Key Features:
Iterator interface for row-by-row access
Bulk data access methods (fetchall, fetchmany)
Column name access and metadata
Affected row count for DML operations
Cursor-like positioning for result navigation
Attributes:
columns (List[str]): List of column names in the result set rows (List[Tuple[Any, ...]]): List of tuples containing row data affected_rows (int): Number of rows affected by DML operations
Usage Examples:
# SELECT query results >>> result = client.execute(“SELECT id, name, age FROM users WHERE age > ?”, (25,)) >>> print(f”Found {len(result.rows)} users”) >>> for row in result.fetchall(): … print(f”ID: {row[0]}, Name: {row[1]}, Age: {row[2]}”)
# Access by column name >>> for row in result.rows: … user_id = row[result.columns.index(‘id’)] … user_name = row[result.columns.index(‘name’)]
# DML operation results >>> result = client.execute(“INSERT INTO users (name, age) VALUES (?, ?)”, (“John”, 30)) >>> print(f”Inserted {result.affected_rows} rows”)
# Iterator interface >>> for row in result: … print(row)
Note: This class is automatically created by the Client’s execute() method and provides a consistent interface for all query results.
- keys()[source]
Get column names
- class matrixone.client.SnapshotClient(client, snapshot_name: str)[source]
Bases:
objectSnapshot client wrapper for executing queries with snapshot
- class matrixone.client.Session(bind=None, client=None, wrap_session=None, **kwargs)[source]
Bases:
SessionMatrixOne Session - extends SQLAlchemy Session with MatrixOne features.
This class inherits from SQLAlchemy Session and provides a transaction context for executing multiple database operations atomically, while adding MatrixOne-specific capabilities like snapshots, clones, vector operations, and fulltext search.
Key Features:
Full SQLAlchemy Session API - All inherited methods work as expected
Atomic transaction execution with automatic rollback on errors
Access to all MatrixOne managers within session context
Support for both SQLAlchemy statements and string SQL
Automatic commit/rollback handling
Available Managers: - snapshots: SnapshotManager for snapshot operations - clone: CloneManager for clone operations - restore: RestoreManager for restore operations - pitr: PitrManager for point-in-time recovery - pubsub: PubSubManager for pub/sub operations - account: AccountManager for account operations - vector_ops: TransactionVectorIndexManager for vector operations - fulltext_index: TransactionFulltextIndexManager for fulltext operations
Usage Examples
# Standard SQLAlchemy usage from sqlalchemy import select with client.session() as session: # Execute SQLAlchemy statements stmt = select(User).where(User.age > 25) result = session.execute(stmt) users = result.scalars().all() # Returns ORM objects # ORM operations user = User(name="John", age=30) session.add(user) session.commit() # MatrixOne features with client.session() as session: # Snapshot operations session.snapshots.create("backup", SnapshotLevel.DATABASE, database="mydb") # Clone operations session.clone.clone_database("new_db", "source_db")
Note: This class is automatically created by the Client’s session() context manager and should not be instantiated directly.
- __init__(bind=None, client=None, wrap_session=None, **kwargs)[source]
Initialize MatrixOne Session.
- Parameters:
bind – SQLAlchemy Engine or Connection to bind to
client – MatrixOne Client instance
wrap_session – Existing SQLAlchemy Session to wrap with MatrixOne features
**kwargs – Additional arguments passed to SQLAlchemy Session
- execute(sql_or_stmt, params: Tuple | None = None, **kwargs)[source]
Execute SQL or SQLAlchemy statement within session.
Overrides SQLAlchemy Session.execute() to add: - Support for string SQL with MatrixOne parameter substitution - Query logging with optional per-operation log mode override
- Parameters:
sql_or_stmt – SQL string or SQLAlchemy statement (select, update, delete, insert, text)
params – Query parameters (only used for string SQL with ‘?’ placeholders)
**kwargs –
Additional arguments passed to parent execute(). Supports special parameter:
log_mode (str): Override SQL logging mode for this operation only. Options: ‘off’, ‘simple’, ‘full’. Useful for debugging specific operations without changing global settings.
- Returns:
SQLAlchemy Result object
- Return type:
Examples:
# SQLAlchemy statements from sqlalchemy import select, update with client.session() as session: stmt = select(User).where(User.age > 25) result = session.execute(stmt) users = result.scalars().all() # String SQL (MatrixOne extension) with client.session() as session: result = session.execute("INSERT INTO users (name) VALUES (?)", ("John",)) print(f"Inserted {result.rowcount} rows") # Debugging with temporary logging override with client.session() as session: # Enable full logging for this query only result = session.execute("SELECT * FROM large_table", log_mode='full')
- get_connection()[source]
Get the underlying SQLAlchemy connection for direct use
Returns:
SQLAlchemy Connection instance bound to this session
- insert(table_name: str, data: dict[str, Any]) ResultSet[source]
Insert data into a table within transaction.
Args:
table_name: Name of the table data: Data to insert (dict with column names as keys)
Returns:
ResultSet object
- batch_insert(table_name: str, data_list: list[dict[str, Any]]) ResultSet[source]
Batch insert data into a table within transaction.
Args:
table_name: Name of the table data_list: List of data dictionaries to insert
Returns:
ResultSet object
- query(*columns, snapshot: str | None = None)[source]
Get MatrixOne query builder within transaction - SQLAlchemy style
Args:
*columns: Can be: - Single model class: query(Article) - returns all columns from model - Multiple columns: query(Article.id, Article.title) - returns specific columns - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns snapshot: Optional snapshot name for snapshot queries
Returns:
MatrixOneQuery instance configured for the specified columns within transaction
- create_table(table_name: str, columns: dict, **kwargs) Session[source]
Create a table within MatrixOne session.
Args:
table_name: Name of the table columns: Dictionary mapping column names to their types (same format as client.create_table) **kwargs: Additional table parameters
Returns:
Session: Self for chaining
- drop_table(table_name: str) Session[source]
Drop a table within MatrixOne session.
Args:
table_name: Name of the table to drop
Returns:
Session: Self for chaining
- create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) Session[source]
Create a table with vector indexes within MatrixOne session.
Args:
table_name: Name of the table columns: Dictionary mapping column names to their types (same format as client.create_table) indexes: List of index definitions (same format as client.create_table_with_index) **kwargs: Additional table parameters
Returns:
Session: Self for chaining
- create_table_orm(table_name: str, *columns, **kwargs) Session[source]
Create a table using SQLAlchemy ORM-style definitions within MatrixOne session.
Args:
table_name: Name of the table *columns: SQLAlchemy Column objects and Index objects (including VectorIndex) **kwargs: Additional parameters (like enable_hnsw, enable_ivf)
Returns:
Session: Self for chaining
Client Class
- class matrixone.client.Client(host: str | None = None, port: int | None = None, user: str | None = None, password: str | None = None, database: str | None = None, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, pool_size: int = 10, max_overflow: int = 20, pool_timeout: int = 30, pool_recycle: int = 3600, connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]
Bases:
BaseMatrixOneClientMatrixOne Client - High-level interface for MatrixOne database operations.
This class provides a comprehensive interface for connecting to and interacting with MatrixOne databases. It supports modern API patterns including table creation, data insertion, querying, vector operations, and transaction management.
Key Features:
High-level table operations (create_table, drop_table, insert, batch_insert)
Query builder interface for complex queries
Vector operations (similarity search, range search, indexing)
Transaction management with context managers
Snapshot and restore operations
Account and user management
Fulltext search capabilities
Connection pooling and SSL support
Examples:
from matrixone import Client # Basic usage client = Client( host='localhost', port=6001, user='root', password='111', database='test' ) # Create table client.create_table("users", { "id": "int primary key", "name": "varchar(100)", "email": "varchar(255)" }) # Insert and query data client.insert("users", {"id": 1, "name": "John", "email": "john@example.com"}) result = client.query("users").where("id = ?", 1).all() # Vector operations client.create_table("documents", { "id": "int primary key", "content": "text", "embedding": "vecf32(384)" }) results = client.vector_ops.similarity_search( "documents", vector_column="embedding", query_vector=[0.1, 0.2, 0.3, ...], limit=10 ) # Transaction with client.transaction() as tx: tx.execute("INSERT INTO users (name) VALUES ('John')")
Attributes:
engine (Engine): SQLAlchemy engine instance connected (bool): Connection status backend_version (str): Detected backend version vector_ops (VectorManager): Vector operations manager snapshots (SnapshotManager): Snapshot operations manager query (QueryBuilder): Query builder for complex queries
- __init__(host: str | None = None, port: int | None = None, user: str | None = None, password: str | None = None, database: str | None = None, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, pool_size: int = 10, max_overflow: int = 20, pool_timeout: int = 30, pool_recycle: int = 3600, connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]
Initialize MatrixOne client
Args:
host: Database host (optional, can be set later via connect) port: Database port (optional, can be set later via connect) user: Username (optional, can be set later via connect) password: Password (optional, can be set later via connect) database: Database name (optional, can be set later via connect) ssl_mode: SSL mode (disabled, preferred, required) ssl_ca: SSL CA certificate path ssl_cert: SSL client certificate path ssl_key: SSL client key path account: Optional account name role: Optional role name pool_size: Connection pool size max_overflow: Maximum overflow connections pool_timeout: Pool timeout in seconds pool_recycle: Connection recycle time in seconds connection_timeout: Connection timeout in seconds query_timeout: Query timeout in seconds auto_commit: Enable auto-commit mode charset: Character set for connection logger: Custom logger instance. If None, creates a default logger sql_log_mode: SQL logging mode ('off', 'auto', 'simple', 'full') - 'off': No SQL logging - 'auto': Smart logging - short SQL shown fully, long SQL summarized (default) - 'simple': Show operation summary only - 'full': Show complete SQL regardless of length slow_query_threshold: Threshold in seconds for slow query warnings (default: 1.0) max_sql_display_length: Maximum SQL length in auto mode before summarizing (default: 500)
- connect(*, host: str = 'localhost', port: int = 6001, user: str = 'root', password: str = '111', database: str, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, charset: str = 'utf8mb4', connection_timeout: int = 30, auto_commit: bool = True, on_connect: ConnectionHook | List[ConnectionAction | str] | Callable | None = None) None[source]
Connect to MatrixOne database using SQLAlchemy engine
Args:
host: Database host port: Database port user: Username or login info in format "user", "account#user", "account#user#role", "account:user", or "account:user:role" (both '#' and ':' separators are supported) password: Password database: Database name ssl_mode: SSL mode (disabled, preferred, required) ssl_ca: SSL CA certificate path ssl_cert: SSL client certificate path ssl_key: SSL client key path account: Optional account name (will be combined with user if user doesn't contain '#' or ':') role: Optional role name (will be combined with user if user doesn't contain '#' or ':') charset: Character set for the connection (default: utf8mb4) connection_timeout: Connection timeout in seconds (default: 30) auto_commit: Enable autocommit (default: True) on_connect: Connection hook to execute after successful connection. Can be: - ConnectionHook instance - List of ConnectionAction or string action names - Custom callback function
Examples:
# Enable all features after connection client.connect(host, port, user, password, database, on_connect=[ConnectionAction.ENABLE_ALL]) # Enable only vector operations with custom charset client.connect(host, port, user, password, database, charset="utf8mb4", on_connect=[ConnectionAction.ENABLE_VECTOR]) # Custom callback def my_callback(client): print(f"Connected to {client._connection_params['host']}") client.connect(host, port, user, password, database, on_connect=my_callback)
- classmethod from_engine(engine: Engine, **kwargs) Client[source]
Create Client instance from existing SQLAlchemy Engine
Args:
engine: SQLAlchemy Engine instance (must use MySQL driver) **kwargs: Additional client configuration options
Returns:
Client: Configured client instance
Raises:
ConnectionError: If engine doesn't use MySQL driver
Examples
Basic usage:
from sqlalchemy import create_engine from matrixone import Client engine = create_engine("mysql+pymysql://user:pass@host:port/db") client = Client.from_engine(engine) With custom configuration:: engine = create_engine("mysql+pymysql://user:pass@host:port/db") client = Client.from_engine( engine, sql_log_mode='auto', slow_query_threshold=0.5 )
- disconnect() None[source]
Disconnect from MatrixOne database and dispose engine.
This method properly closes all database connections and disposes of the SQLAlchemy engine. It should be called when the client is no longer needed to free up resources.
After calling this method, the client will need to be reconnected using the connect() method before any database operations can be performed.
Raises:
Exception: If disconnection fails (logged but re-raised)
Example
>>> client = Client('localhost', 6001, 'root', '111', 'test') >>> client.connect() >>> # ... perform database operations ... >>> client.disconnect() # Clean up resources
- get_login_info() dict | None[source]
Get parsed login information used for database connection.
Returns the login information dictionary that was used to establish the database connection. This includes user, account, role, and other authentication details.
Returns:
Optional[dict]: Dictionary containing login information with keys: - user: Username - account: Account name (if specified) - role: Role name (if specified) - host: Database host - port: Database port - database: Database name Returns None if not connected or no login info available.
Example
>>> client = Client('localhost', 6001, 'root', '111', 'test') >>> client.connect() >>> login_info = client.get_login_info() >>> print(f"Connected as {login_info['user']} to {login_info['database']}")
- execute(sql_or_stmt, params: Tuple | None = None, log_mode: str | None = None) ResultSet[source]
Execute SQL query or SQLAlchemy statement without transaction isolation.
This method executes queries directly using the connection pool, without wrapping them in a transaction. Each statement executes independently with auto-commit enabled. For atomic multi-statement operations, use client.session() instead.
The method supports both SQLAlchemy ORM-style statements (recommended) and string SQL with parameter binding. It’s ideal for single-statement operations like SELECT queries, simple INSERT/UPDATE/DELETE, or DDL statements.
Key Features:
ORM-style statements: Full support for SQLAlchemy select(), insert(), update(), delete()
Parameter binding: Automatic escaping of parameters to prevent SQL injection
Query logging: Integrated logging with performance tracking
Auto-commit: Each statement commits immediately (no transaction isolation)
Connection pooling: Efficient connection reuse from pool
Type safety: ResultSet with structured access to query results
- Parameters:
sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - SQLAlchemy select() statement (recommended) - SQLAlchemy insert() statement (recommended) - SQLAlchemy update() statement (recommended) - SQLAlchemy delete() statement (recommended) - String SQL with ‘?’ placeholders for parameters - SQLAlchemy text() statement
params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Automatically escaped to prevent SQL injection. Ignored for SQLAlchemy statements (use .values() or .where() with bound parameters instead).
log_mode (Optional[str]) – Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If None, uses client’s global sql_log_mode setting. Useful for debugging specific queries or disabling logs for frequently-executed statements.
- Returns:
- Query result object with:
columns: List[str] - Column names
rows: List[Tuple] - Row data as tuples
affected_rows: int - Number of rows affected by DML operations
fetchall() -> List[Row] - Get all rows as list
fetchone() -> Optional[Row] - Get next row or None
fetchmany(size) -> List[Row] - Get next N rows
- Return type:
- Raises:
ConnectionError – If not connected to database
QueryError – If query execution fails or SQL syntax is invalid
Usage Examples:
from matrixone import Client from sqlalchemy import select, insert, update, delete, and_, or_, func from sqlalchemy.orm import declarative_base Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) status = Column(String(20)) class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) user_id = Column(Integer) amount = Column(Float) client = Client(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # SQLAlchemy SELECT Statements (Recommended) # ======================================== # Basic SELECT with WHERE clause stmt = select(User).where(User.age > 25) result = client.execute(stmt) for user in result.fetchall(): print(f"User: {user.name}, Age: {user.age}") # SELECT specific columns stmt = select(User.name, User.email).where(User.status == 'active') result = client.execute(stmt) for name, email in result.fetchall(): print(f"{name}: {email}") # Complex WHERE with AND/OR stmt = select(User).where( and_( User.age > 18, or_( User.status == 'active', User.status == 'pending' ) ) ) result = client.execute(stmt) # SELECT with JOIN stmt = select(User, Order).join(Order, User.id == Order.user_id) result = client.execute(stmt) for user, order in result.fetchall(): print(f"{user.name} ordered ${order.amount}") # SELECT with aggregation stmt = select(func.count(User.id), func.avg(User.age)).where(User.status == 'active') result = client.execute(stmt) count, avg_age = result.fetchone() print(f"Active users: {count}, Average age: {avg_age}") # SELECT with ORDER BY and LIMIT stmt = select(User).where(User.age > 25).order_by(User.age.desc()).limit(10) result = client.execute(stmt) # ======================================== # SQLAlchemy INSERT Statements (Recommended) # ======================================== # Single INSERT stmt = insert(User).values(name='John', email='john@example.com', age=30) result = client.execute(stmt) print(f"Inserted {result.affected_rows} rows") # Bulk INSERT stmt = insert(User).values([ {'name': 'Alice', 'email': 'alice@example.com', 'age': 28}, {'name': 'Bob', 'email': 'bob@example.com', 'age': 35}, {'name': 'Carol', 'email': 'carol@example.com', 'age': 42} ]) result = client.execute(stmt) print(f"Inserted {result.affected_rows} rows") # ======================================== # SQLAlchemy UPDATE Statements (Recommended) # ======================================== # Simple UPDATE stmt = update(User).where(User.id == 1).values(email='newemail@example.com') result = client.execute(stmt) print(f"Updated {result.affected_rows} rows") # Conditional UPDATE stmt = update(User).where(User.age < 18).values(status='minor') result = client.execute(stmt) # UPDATE with expressions stmt = update(Order).values(total=Order.quantity * Order.price) result = client.execute(stmt) # UPDATE multiple columns stmt = update(User).where(User.id == 1).values( name='Updated Name', email='updated@example.com', status='active' ) result = client.execute(stmt) # ======================================== # SQLAlchemy DELETE Statements (Recommended) # ======================================== # Simple DELETE stmt = delete(User).where(User.id == 1) result = client.execute(stmt) print(f"Deleted {result.affected_rows} rows") # Conditional DELETE stmt = delete(User).where(User.status == 'deleted') result = client.execute(stmt) # DELETE with complex condition stmt = delete(User).where( and_( User.age < 18, User.status == 'inactive' ) ) result = client.execute(stmt) # ======================================== # String SQL with Parameters (Alternative) # ======================================== # SELECT with parameters result = client.execute( "SELECT * FROM users WHERE age > ? AND status = ?", (25, 'active') ) # INSERT with parameters result = client.execute( "INSERT INTO users (name, email, age) VALUES (?, ?, ?)", ('David', 'david@example.com', 28) ) # UPDATE with parameters result = client.execute( "UPDATE users SET status = ? WHERE age < ?", ('minor', 18) ) # DELETE with parameters result = client.execute( "DELETE FROM users WHERE status = ?", ('inactive',) ) # ======================================== # Query Logging Control # ======================================== # Disable logging for frequently executed query result = client.execute( select(User).where(User.id == 1), log_mode='off' ) # Force full SQL logging for debugging result = client.execute( select(User).where(User.name.like('%test%')), log_mode='full' ) # Simple logging (operation type only) result = client.execute( update(User).values(status='processed'), log_mode='simple' )
Important Notes:
No transaction isolation: Each execute() call commits immediately
Use session() for transactions: For atomic multi-statement operations
ORM-style preferred: Use SQLAlchemy statements for better type safety
Auto-commit behavior: Changes are permanent immediately after execute()
Thread-safe: Uses connection pooling for concurrent access
Best Practices:
Prefer ORM-style statements: Use select(), insert(), update(), delete()
Use parameters: Always use parameter binding to prevent SQL injection
Session for transactions: Use client.session() for atomic operations
Disable logging in production: Use log_mode=’off’ for hot paths
Handle exceptions: Wrap execute() in try-except for error handling
See also
Client.session(): For transaction-aware operations
Session.execute(): Execute within transaction context
AsyncClient.execute(): Async version for async/await workflows
- insert(table_name_or_model, data: dict) ResultSet[source]
Insert a single row of data into a table.
This method provides a convenient way to insert data using a dictionary where keys are column names and values are the data to insert. The method automatically handles SQL generation and parameter binding.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class data (dict): Dictionary mapping column names to values. Example: {'name': 'John', 'age': 30, 'email': 'john@example.com'}
Returns:
ResultSet: Object containing insertion results with: - affected_rows: Number of rows inserted (should be 1) - columns: Empty list (no columns returned for INSERT) - rows: Empty list (no rows returned for INSERT)
Raises:
ConnectionError: If not connected to database QueryError: If insertion fails
Examples
- # Insert a single user using table name
>>> result = client.insert('users', { ... 'name': 'John Doe', ... 'age': 30, ... 'email': 'john@example.com' ... }) >>> print(f"Inserted {result.affected_rows} row")
# Insert using model class >>> from sqlalchemy import Column, Integer, String >>> from matrixone.orm import declarative_base >>> Base = declarative_base() >>> class User(Base): … __tablename__ = ‘users’ … id = Column(Integer, primary_key=True) … name = Column(String(50)) … age = Column(Integer) >>> result = client.insert(User, { … ‘name’: ‘Jane Doe’, … ‘age’: 25 … })
# Insert with NULL values >>> result = client.insert(‘products’, { … ‘name’: ‘Product A’, … ‘price’: 99.99, … ‘description’: None # NULL value … })
- batch_insert(table_name_or_model, data_list: list) ResultSet[source]
Batch insert multiple rows of data into a table.
This method efficiently inserts multiple rows in a single operation, which is much faster than calling insert() multiple times. All rows must have the same column structure.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class data_list (list): List of dictionaries, where each dictionary represents a row to insert. All dictionaries must have the same keys. Example: [ {'name': 'John', 'age': 30}, {'name': 'Jane', 'age': 25}, {'name': 'Bob', 'age': 35} ]
Returns:
ResultSet: Object containing insertion results with: - affected_rows: Number of rows inserted - columns: Empty list (no columns returned for INSERT) - rows: Empty list (no rows returned for INSERT)
Raises:
ConnectionError: If not connected to database QueryError: If batch insertion fails ValueError: If data_list is empty or has inconsistent column structure
Examples
- # Insert multiple users
>>> users = [ ... {'name': 'John Doe', 'age': 30, 'email': 'john@example.com'}, ... {'name': 'Jane Smith', 'age': 25, 'email': 'jane@example.com'}, ... {'name': 'Bob Johnson', 'age': 35, 'email': 'bob@example.com'} ... ] >>> result = client.batch_insert('users', users) >>> print(f"Inserted {result.affected_rows} rows")
# Insert with some NULL values >>> products = [ … {‘name’: ‘Product A’, ‘price’: 99.99, ‘description’: ‘Great product’}, … {‘name’: ‘Product B’, ‘price’: 149.99, ‘description’: None} … ] >>> result = client.batch_insert(‘products’, products)
- query(*columns, snapshot: str | None = None)[source]
Get MatrixOne query builder - SQLAlchemy style
Args:
*columns: Can be: - Single model class: query(Article) - returns all columns from model - Multiple columns: query(Article.id, Article.title) - returns specific columns - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns snapshot: Optional snapshot name for snapshot queries
Examples
- # Traditional model query (all columns)
client.query(Article).filter(…).all()
# Column-specific query client.query(Article.id, Article.title).filter(…).all()
# With fulltext score client.query(Article.id, boolean_match(“title”, “content”).must(“python”).label(“score”))
# Snapshot query client.query(Article, snapshot=”my_snapshot”).filter(…).all()
Returns:
MatrixOneQuery instance configured for the specified columns
- snapshot(snapshot_name: str) Generator[SnapshotClient, None, None][source]
Snapshot context manager
Usage
with client.snapshot(“daily_backup”) as snapshot_client: result = snapshot_client.execute(“SELECT * FROM users”)
- session() Generator[Session, None, None][source]
Create a transaction-aware session for atomic database operations.
This method returns a MatrixOne Session that extends SQLAlchemy Session with MatrixOne-specific features. All operations within the session are executed atomically - they either all succeed or all fail together.
The session is a context manager that automatically handles transaction lifecycle: - Commits the transaction when the context exits normally - Rolls back the transaction if any exception occurs - Cleans up database resources automatically
Key Features:
Full SQLAlchemy ORM: All standard SQLAlchemy Session methods (add, delete, query, etc.)
Atomic transactions: Multiple operations commit or rollback together
MatrixOne managers: Access to snapshots, clones, vector ops, etc.
ORM-style operations: Use SQLAlchemy select(), insert(), update(), delete()
Automatic cleanup: Transaction and connection resources managed automatically
Query logging: Integrated query logging with performance tracking
Available Managers (transaction-aware):
session.snapshots: SnapshotManager for creating/managing snapshots
session.clone: CloneManager for cloning databases and tables
session.restore: RestoreManager for restoring from snapshots
session.pitr: PitrManager for point-in-time recovery
session.pubsub: PubSubManager for publish-subscribe operations
session.account: AccountManager for account/user management
session.vector_ops: VectorManager for vector operations and indexing
session.fulltext_index: FulltextIndexManager for fulltext search
session.metadata: MetadataManager for table metadata analysis
session.load_data: LoadDataManager for bulk data loading
session.stage: StageManager for external stage management
- Returns:
Context manager that yields a MatrixOne Session
- Return type:
Generator[Session, None, None]
- Raises:
ConnectionError – If client is not connected to database
Usage Examples:
from matrixone import Client from sqlalchemy import select, insert, update, delete from sqlalchemy.orm import declarative_base Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) client = Client(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # Example 1: Basic Transaction with ORM-style SQL # ======================================== with client.session() as session: # Insert using SQLAlchemy insert() session.execute(insert(User).values(name='John', email='john@example.com', age=30)) # Update using SQLAlchemy update() session.execute(update(User).where(User.age < 18).values(status='minor')) # Select using SQLAlchemy select() stmt = select(User).where(User.age > 25) result = session.execute(stmt) for user in result.scalars(): print(f"User: {user.name}") # Delete using SQLAlchemy delete() session.execute(delete(User).where(User.status == 'inactive')) # All operations commit atomically # ======================================== # Example 2: SQLAlchemy ORM Operations # ======================================== with client.session() as session: # Create new objects user1 = User(name='Alice', email='alice@example.com', age=28) user2 = User(name='Bob', email='bob@example.com', age=35) # Add to session session.add(user1) session.add(user2) # Query using ORM stmt = select(User).where(User.name == 'Alice') result = session.execute(stmt) alice = result.scalar_one() # Update object alice.email = 'newemail@example.com' # Commit (or let context manager do it) session.commit() # ======================================== # Example 3: MatrixOne Snapshot Operations # ======================================== with client.session() as session: # Create snapshot within transaction from matrixone import SnapshotLevel snapshot = session.snapshots.create( name='daily_backup', level=SnapshotLevel.DATABASE, database='production' ) # Clone database using snapshot session.clone.clone_database( target_db='prod_copy', source_db='production', snapshot_name='daily_backup' ) # Both operations commit atomically # ======================================== # Example 4: Bulk Data Loading with Stages # ======================================== with client.session() as session: # Create stage using simple interface session.stage.create_local('import_stage', '/data/imports/') # Load data from stage using ORM model session.load_data.from_stage_csv('import_stage', 'users.csv', User) # Update statistics after loading session.execute("ANALYZE TABLE users") # All operations are atomic # ======================================== # Example 5: Error Handling and Rollback # ======================================== try: with client.session() as session: session.execute(insert(User).values(name='Charlie', age=40)) session.execute(insert(InvalidTable).values(data='test')) # This fails # Transaction automatically rolls back - Charlie is NOT inserted except Exception as e: print(f"Transaction failed and rolled back: {e}") # ======================================== # Example 6: Manual Transaction Control # ======================================== with client.session() as session: try: session.execute(insert(User).values(name='David', age=25)) session.execute(update(Account).values(balance=Account.balance - 100)) # Verify before committing stmt = select(Account.balance) balance = session.execute(stmt).scalar() if balance >= 0: session.commit() # Explicit commit else: session.rollback() # Explicit rollback print("Insufficient balance") except Exception as e: session.rollback() raise # ======================================== # Example 7: Complex Multi-Manager Transaction # ======================================== with client.session() as session: # Create publication session.pubsub.create_database_publication( name='analytics_pub', database='analytics', account='subscriber_account' ) # Create S3 stage for exports session.stage.create_s3( name='export_stage', bucket='my-bucket', path='exports/', aws_key_id='key', aws_secret_key='secret' ) # Load fresh data using ORM model session.load_data.from_csv('/data/latest.csv', Analytics) # Create snapshot after load session.snapshots.create( name='post_load_snapshot', level=SnapshotLevel.DATABASE, database='analytics' ) # All operations commit together
Best Practices:
Always use context manager: Use with client.session() for automatic cleanup
Keep transactions short: Long transactions can block other operations
Handle exceptions: Wrap session code in try-except for error handling
Use ORM-style SQL: Prefer SQLAlchemy insert(), update(), select(), delete()
Avoid nested sessions: SQLAlchemy doesn’t support true nested transactions
Test rollback behavior: Ensure your application handles rollbacks correctly
See also
Session: The session class returned by this method
AsyncClient.session(): Async version for async/await workflows
Client.execute(): Non-transactional query execution
- property snapshots: SnapshotManager | None
Get snapshot manager
- property clone: CloneManager | None
Get clone manager
- property branch: BranchManager | None
Get branch manager for Git-style version control operations.
Provides table and database branching, diffing, and merging capabilities. Requires MatrixOne 3.0.5 or higher.
- Returns:
BranchManager instance for branch operations
Example:
client = Client() client.connect(database='test') # Create table branch client.branch.create_table_branch('users_branch', 'users') # Create database branch client.branch.create_database_branch('dev_db', 'production') # Compare branches diffs = client.branch.diff_table('users_branch', 'users') # Merge branches client.branch.merge_table('users_branch', 'users') # Delete branches client.branch.delete_table_branch('users_branch') client.branch.delete_database_branch('dev_db')
See also
branch_guide - Complete branch management guide
- property moctl: MoCtlManager | None
Get mo_ctl manager
- property restore: RestoreManager | None
Get restore manager
- property pitr: PitrManager | None
Get PITR manager
- property pubsub: PubSubManager | None
Get publish-subscribe manager
- property account: AccountManager | None
Get account manager
- property load_data: LoadDataManager | None
Get load data manager
- property stage: StageManager | None
Get stage manager for external stage operations
- property export
Get export manager for data export operations (INTO OUTFILE, INTO STAGE)
- property vector_ops: VectorManager | None
Get unified vector operations manager for vector operations (index and data)
- get_pinecone_index(table_name_or_model, vector_column: str)[source]
Get a PineconeCompatibleIndex object for vector search operations.
This method creates a Pinecone-compatible vector search interface that automatically parses the table schema and vector index configuration. The primary key column is automatically detected, and all other columns except the vector column will be included as metadata.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class vector_column: Name of the vector column
Returns:
PineconeCompatibleIndex object with Pinecone-compatible API
Example
>>> index = client.get_pinecone_index("documents", "embedding") >>> index = client.get_pinecone_index(DocumentModel, "embedding") >>> results = index.query([0.1, 0.2, 0.3], top_k=5) >>> for match in results.matches: ... print(f"ID: {match.id}, Score: {match.score}") ... print(f"Metadata: {match.metadata}")
- property fulltext_index: FulltextIndexManager | None
Get fulltext index manager for fulltext index operations
- property metadata: MetadataManager | None
Get metadata manager for table metadata operations
- version() str[source]
Get MatrixOne server version
Returns:
str: MatrixOne server version string
Raises:
ConnectionError: If not connected to MatrixOne QueryError: If version query fails
Example
>>> client = Client('localhost', 6001, 'root', '111', 'test') >>> version = client.version() >>> print(f"MatrixOne version: {version}")
- git_version() str[source]
Get MatrixOne git version information
Returns:
str: MatrixOne git version string
Raises:
ConnectionError: If not connected to MatrixOne QueryError: If git version query fails
Example
>>> client = Client('localhost', 6001, 'root', '111', 'test') >>> git_version = client.git_version() >>> print(f"MatrixOne git version: {git_version}")
- set_backend_version(version: str) None[source]
Manually set the backend version
Args:
version: Version string in format "major.minor.patch" (e.g., "3.0.1")
- get_backend_version() str | None[source]
Get current backend version
Returns:
Version string or None if not set
- is_feature_available(feature_name: str) bool[source]
Check if a feature is available in current backend version
Args:
feature_name: Name of the feature to check
Returns:
True if feature is available, False otherwise
- get_feature_info(feature_name: str) Dict[str, Any] | None[source]
Get feature requirement information
Args:
feature_name: Name of the feature
Returns:
Feature information dictionary or None if not found
- check_version_compatibility(required_version: str, operator: str = '>=') bool[source]
Check if current backend version is compatible with required version
Args:
required_version: Required version string (e.g., "3.0.1") operator: Comparison operator (">=", ">", "<=", "<", "==", "!=")
Returns:
True if compatible, False otherwise
- get_version_hint(feature_name: str, error_context: str = '') str[source]
Get helpful hint message for version-related errors
Args:
feature_name: Name of the feature error_context: Additional context for the error
Returns:
Helpful hint message
- is_development_version() bool[source]
Check if current backend is a development version
Returns:
True if backend is development version (999.x.x), False otherwise
- create_table(table_name_or_model, columns: dict | None = None, **kwargs) Client[source]
Create a table with a simplified interface.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class columns: Dictionary mapping column names to their types (required if table_name_or_model is str) Supported formats: - 'id': 'bigint' (with primary_key=True if needed) - 'name': 'varchar(100)' - 'embedding': 'vecf32(128)' or 'vecf64(128)' - 'score': 'float' - 'created_at': 'datetime' - 'is_active': 'boolean' **kwargs: Additional table parameters
Returns:
Client: Self for chaining
Example
client.create_table(“users”, { ‘id’: ‘bigint’, ‘name’: ‘varchar(100)’, ‘email’: ‘varchar(255)’, ‘embedding’: ‘vecf32(128)’, ‘score’: ‘float’, ‘created_at’: ‘datetime’, ‘is_active’: ‘boolean’ }, primary_key=’id’)
- drop_table(table_name_or_model) Client[source]
Drop a table.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class
Returns:
Client: Self for chaining
Example
- # Drop table by name
client.drop_table(“users”)
# Drop table by model class client.drop_table(UserModel)
- create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) Client[source]
Create a table with vector indexes using a simplified interface.
Args:
table_name: Name of the table columns: Dictionary mapping column names to their types (same format as create_table) indexes: List of index definitions, each containing: - 'name': Index name - 'column': Column name to index - 'type': Index type ('ivfflat' or 'hnsw') - 'params': Dictionary of index-specific parameters **kwargs: Additional table parameters
Returns:
Client: Self for chaining
Example
client.create_table_with_index(“vector_docs”, { ‘id’: ‘bigint’, ‘title’: ‘varchar(200)’, ‘embedding’: ‘vector(128,f32)’ }, indexes=[ { ‘name’: ‘idx_hnsw’, ‘column’: ‘embedding’, ‘type’: ‘hnsw’, ‘params’: {‘m’: 48, ‘ef_construction’: 64, ‘ef_search’: 64} } ], primary_key=’id’)
- create_table_orm(table_name: str, *columns, **kwargs) Client[source]
Create a table using SQLAlchemy ORM-style column definitions. Similar to SQLAlchemy Table() constructor but without metadata.
Args:
table_name: Name of the table *columns: SQLAlchemy Column objects and Index objects (including VectorIndex) **kwargs: Additional parameters (like enable_hnsw, enable_ivf)
Returns:
Client: Self for chaining
Example:
from sqlalchemy import Column, BigInteger, Integer from matrixone.sqlalchemy_ext import Vectorf32, VectorIndex, VectorIndexType, VectorOpType client.create_table_orm( 'vector_docs_hnsw_demo', Column('a', BigInteger, primary_key=True), Column('b', Vectorf32(128)), Column('c', Integer), VectorIndex('idx_hnsw', 'b', index_type=VectorIndexType.HNSW, m=48, ef_construction=64, ef_search=64, op_type=VectorOpType.VECTOR_L2_OPS) )
- create_all(base_class=None)[source]
Create all tables defined in the given base class or default Base.
Args:
base_class: SQLAlchemy declarative base class. If None, uses the default Base.
- drop_all(base_class=None)[source]
Drop all tables defined in the given base class or default Base.
Args:
base_class: SQLAlchemy declarative base class. If None, uses the default Base.
- get_secondary_index_tables(table_name: str, database_name: str | None = None) List[str][source]
Get all secondary index table names for a given table.
This includes both regular secondary indexes (MULTIPLE type) and UNIQUE indexes.
- Parameters:
table_name – Name of the table to get secondary indexes for
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
List of secondary index table names (includes both __mo_index_secondary_… and __mo_index_unique_… tables)
Examples:
>>> client = Client() >>> client.connect(host='localhost', port=6001, user='root', password='111', database='test') >>> # Use current database >>> index_tables = client.get_secondary_index_tables('cms_all_content_chunk_info') >>> # Or specify database explicitly >>> index_tables = client.get_secondary_index_tables('cms_all_content_chunk_info', 'test') >>> print(index_tables) ['__mo_index_secondary_..._cms_id', '__mo_index_unique_..._email']
- get_secondary_index_table_by_name(table_name: str, index_name: str, database_name: str | None = None) str | None[source]
Get the physical table name of a secondary index by its index name.
- Parameters:
table_name – Name of the table
index_name – Name of the secondary index
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
Physical table name of the secondary index, or None if not found
Examples:
>>> client = Client() >>> client.connect(host='localhost', port=6001, user='root', password='111', database='test') >>> # Use current database >>> index_table = client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id') >>> # Or specify database explicitly >>> index_table = client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id', 'test') >>> print(index_table) '__mo_index_secondary_018cfbda-bde1-7c3e-805c-3f8e71769f75_cms_id'
- get_table_indexes_detail(table_name: str, database_name: str | None = None) List[dict][source]
Get detailed information about all indexes for a table, including IVF, HNSW, Fulltext, and regular indexes.
This method returns comprehensive information about each index physical table, including: - Index name - Index type (MULTIPLE, PRIMARY, UNIQUE, etc.) - Algorithm type (ivfflat, hnsw, fulltext, etc.) - Algorithm table type (metadata, centroids, entries, etc.) - Physical table name - Column names - Algorithm parameters
- Parameters:
table_name – Name of the table to get indexes for
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
index_name: Name of the index
index_type: Type of index (MULTIPLE, PRIMARY, UNIQUE, etc.)
algo: Algorithm type (ivfflat, hnsw, fulltext, or None for regular indexes)
algo_table_type: Algorithm table type (metadata, centroids, entries, etc., or None)
physical_table_name: Physical table name
columns: List of column names
algo_params: Algorithm parameters (or None)
- Return type:
List of dictionaries, each containing
Examples:
>>> client = Client() >>> client.connect(host='localhost', port=6001, user='root', password='111', database='test') >>> # Get all index details for a table >>> indexes = client.get_table_indexes_detail('ivf_health_demo_docs') >>> for idx in indexes: ... print(f"{idx['index_name']} ({idx['algo']}) - {idx['algo_table_type']}: {idx['physical_table_name']}") idx_embedding_ivf (ivfflat) - metadata: __mo_index_secondary_... idx_embedding_ivf (ivfflat) - centroids: __mo_index_secondary_... idx_embedding_ivf (ivfflat) - entries: __mo_index_secondary_...
- verify_table_index_counts(table_name: str) int[source]
Verify that the main table and all its secondary index tables have the same row count.
This method compares the COUNT(*) of the main table with all its secondary index tables in a single SQL query for consistency. If counts don’t match, raises an exception.
- Parameters:
table_name – Name of the table to verify
- Returns:
Row count (int) if verification succeeds
- Raises:
ValueError – If any secondary index table has a different count than the main table, with details about all counts in the error message
Examples:
>>> client = Client() >>> client.connect(host='localhost', port=6001, user='root', password='111', database='test') >>> count = client.verify_table_index_counts('cms_all_content_chunk_info') >>> print(f"✓ Verification passed, row count: {count}") >>> # If verification fails: >>> try: ... count = client.verify_table_index_counts('some_table') ... except ValueError as e: ... print(f"Verification failed: {e}")
ResultSet Class
- class matrixone.client.ResultSet(columns: List[str], rows: List[Tuple[Any, ...]], affected_rows: int = 0)[source]
Bases:
objectResult set wrapper for query results from MatrixOne database operations.
This class provides a convenient interface for accessing query results with methods similar to database cursor objects. It supports both SELECT queries (returning data) and DML operations (returning affected row counts).
Key Features:
Iterator interface for row-by-row access
Bulk data access methods (fetchall, fetchmany)
Column name access and metadata
Affected row count for DML operations
Cursor-like positioning for result navigation
Attributes:
columns (List[str]): List of column names in the result set rows (List[Tuple[Any, ...]]): List of tuples containing row data affected_rows (int): Number of rows affected by DML operations
Usage Examples:
# SELECT query results >>> result = client.execute(“SELECT id, name, age FROM users WHERE age > ?”, (25,)) >>> print(f”Found {len(result.rows)} users”) >>> for row in result.fetchall(): … print(f”ID: {row[0]}, Name: {row[1]}, Age: {row[2]}”)
# Access by column name >>> for row in result.rows: … user_id = row[result.columns.index(‘id’)] … user_name = row[result.columns.index(‘name’)]
# DML operation results >>> result = client.execute(“INSERT INTO users (name, age) VALUES (?, ?)”, (“John”, 30)) >>> print(f”Inserted {result.affected_rows} rows”)
# Iterator interface >>> for row in result: … print(row)
Note: This class is automatically created by the Client’s execute() method and provides a consistent interface for all query results.
Session Class
- class matrixone.session.Session(bind=None, client=None, wrap_session=None, **kwargs)[source]
Bases:
SessionMatrixOne Session - Transaction-aware session extending SQLAlchemy Session.
This class provides a comprehensive transaction context for executing multiple database operations atomically. It inherits all SQLAlchemy Session capabilities while adding MatrixOne-specific features like snapshots, clones, vector operations, and fulltext search.
Key Features:
Full SQLAlchemy API: All standard SQLAlchemy Session methods (add, delete, query, etc.)
Atomic transactions: All operations succeed or fail together
Automatic rollback: Errors trigger automatic transaction rollback
MatrixOne managers: Access to all MatrixOne-specific operations within transactions
Hybrid SQL support: Execute both SQLAlchemy statements and string SQL
Query logging: Integrated query logging with performance tracking
Context manager: Automatic transaction lifecycle management
Transaction Behavior:
Auto-commit on success: Transaction commits when context manager exits normally
Auto-rollback on error: Transaction rolls back if any exception occurs
Explicit control: Manual commit() and rollback() also supported
Isolation: All operations within session are isolated from other transactions
ACID compliance: Full ACID guarantees for all operations
Available Managers (all transaction-aware):
snapshots: SnapshotManager for creating/managing database snapshots
clone: CloneManager for cloning databases and tables
restore: RestoreManager for restoring from snapshots
pitr: PitrManager for point-in-time recovery operations
pubsub: PubSubManager for publish-subscribe operations
account: AccountManager for account and user management
vector_ops: VectorManager for vector operations and indexing
fulltext_index: FulltextIndexManager for fulltext search operations
metadata: MetadataManager for table metadata analysis
load_data: LoadDataManager for bulk data loading
stage: StageManager for external stage management
cdc: CDCManager for change data capture task operations
Creating Session:
There are two ways to create a MatrixOne Session:
New Session (Recommended): Create directly from client:
with client.session() as session: # Your operations here pass
Wrap Existing Session (For Legacy Projects): Wrap existing SQLAlchemy session:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from matrixone.session import Session as MatrixOneSession # Your existing SQLAlchemy code engine = create_engine('mysql+pymysql://...') SessionFactory = sessionmaker(bind=engine) sqlalchemy_session = SessionFactory() # Wrap with MatrixOne features mo_session = MatrixOneSession( client=mo_client, wrap_session=sqlalchemy_session ) # Now you can use both SQLAlchemy and MatrixOne features
Usage Examples:
from matrixone import Client from sqlalchemy import select, insert, update, delete client = Client(host='localhost', port=6001, user='root', password='111', database='test') # ============================================================ # Example 0: Wrapping Existing SQLAlchemy Session (Legacy Projects) # ============================================================ from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from matrixone.session import Session as MatrixOneSession # Your existing SQLAlchemy setup engine = create_engine('mysql+pymysql://root:111@127.0.0.1:6001/test') SessionFactory = sessionmaker(bind=engine) existing_session = SessionFactory() # Wrap it with MatrixOne features mo_session = MatrixOneSession( client=client, wrap_session=existing_session ) try: # Standard SQLAlchemy operations work result = mo_session.execute("SELECT * FROM users") # MatrixOne features now available mo_session.stage.create_s3('backup', bucket='my-backup') mo_session.snapshots.create('snapshot1', level='database') mo_session.load_data.from_csv('/data/file.csv', 'users') mo_session.commit() finally: mo_session.close() # ============================================================ # Example 1: Basic Transaction with ORM-style Operations # ============================================================ with client.session() as session: # All operations are atomic - succeed or fail together from sqlalchemy import insert, update # Insert using SQLAlchemy insert() session.execute(insert(User).values(name='John', email='john@example.com')) session.execute(insert(Order).values(user_id=1, amount=100.0)) # Update using SQLAlchemy update() session.execute( update(Account).where(Account.user_id == 1).values(balance=Account.balance - 100) ) # Transaction commits automatically on successful completion # ============================================================ # Example 2: SQLAlchemy ORM Operations # ============================================================ with client.session() as session: # Create new objects user = User(name="Alice", email="alice@example.com") order = Order(user_id=1, amount=50.0) # Add to session session.add(user) session.add(order) # Query using ORM stmt = select(User).where(User.name == "Alice") result = session.execute(stmt) users = result.scalars().all() # Update using ORM user = session.get(User, 1) user.email = "newemail@example.com" # Commit explicitly (or let context manager do it) session.commit() # ============================================================ # Example 3: MatrixOne Snapshot Operations in Transaction # ============================================================ with client.session() as session: # Create snapshot within transaction snapshot = session.snapshots.create( name='daily_backup', level=SnapshotLevel.DATABASE, database='production' ) # Clone database within same transaction session.clone.clone_database( target_db='production_copy', source_db='production', snapshot_name='daily_backup' ) # Both operations commit atomically # ============================================================ # Example 4: Bulk Data Loading in Transaction # ============================================================ with client.session() as session: # Load data files atomically session.load_data.from_csv('/data/users.csv', 'users') session.load_data.from_csv('/data/orders.csv', 'orders') # Update statistics after loading session.execute("ANALYZE TABLE users") session.execute("ANALYZE TABLE orders") # All loads and updates commit together # ============================================================ # Example 5: Error Handling with Automatic Rollback # ============================================================ try: with client.session() as session: session.execute("INSERT INTO users (name) VALUES ('Bob')") session.execute("INSERT INTO invalid_table (data) VALUES ('test')") # This fails # Transaction automatically rolls back - Bob is NOT inserted except Exception as e: print(f"Transaction failed and rolled back: {e}") # ============================================================ # Example 6: Manual Transaction Control # ============================================================ with client.session() as session: try: session.execute("INSERT INTO users (name) VALUES ('Charlie')") session.execute("UPDATE accounts SET balance = balance - 50") # Verify conditions before committing result = session.execute("SELECT balance FROM accounts WHERE id = 1") balance = result.scalar() if balance >= 0: session.commit() # Explicit commit else: session.rollback() # Explicit rollback print("Transaction rolled back due to insufficient balance") except Exception as e: session.rollback() raise # ============================================================ # Example 7: Complex Multi-Manager Transaction # ============================================================ with client.session() as session: # Create publication pub = session.pubsub.create_database_publication( name='analytics_data', database='analytics', account='subscriber_account' ) # Create stage using simple interface session.stage.create_local('export_stage', '/exports/') # Load fresh data session.load_data.from_csv('/data/latest.csv', Analytics) # Use ORM model # Create snapshot after load session.snapshots.create( name='post_load_snapshot', level=SnapshotLevel.DATABASE, database='analytics' ) # All operations are atomic # ============================================================ # Example 8: Vector Operations in Transaction # ============================================================ with client.session() as session: # Create vector table session.create_table('documents', { 'id': 'int primary key', 'content': 'text', 'embedding': 'vecf32(384)' }) # Create vector index session.vector_ops.create_ivf( 'documents', name='doc_idx', column='embedding', lists=100 ) # Insert vector data session.insert('documents', { 'id': 1, 'content': 'sample document', 'embedding': [0.1] * 384 })
Best Practices:
Always use context manager: Use with client.session() for automatic cleanup
Keep transactions short: Long transactions can block other operations
Handle exceptions: Wrap session code in try-except for proper error handling
Avoid nested transactions: SQLAlchemy doesn’t support true nested transactions
Use explicit commits: When you need fine-grained control over transaction boundaries
Test rollback behavior: Ensure your application handles rollbacks correctly
Important Notes:
Session is created by Client.session() context manager
Don’t instantiate Session directly
All manager operations within session are transaction-aware
Session automatically manages transaction lifecycle
Errors trigger automatic rollback
Normal exit triggers automatic commit
See also
Client.session(): Creates and manages Session instances
AsyncSession: Async version for async/await workflows
SQLAlchemy Session: Parent class documentation
- __init__(bind=None, client=None, wrap_session=None, **kwargs)[source]
Initialize MatrixOne Session.
- Parameters:
bind – SQLAlchemy Engine or Connection to bind to
client – MatrixOne Client instance
wrap_session – Existing SQLAlchemy Session to wrap with MatrixOne features
**kwargs – Additional arguments passed to SQLAlchemy Session
Examples
# Create new session from engine session = Session(bind=engine, client=client)
# Wrap existing SQLAlchemy session (for legacy projects) existing_session = sessionmaker(bind=engine)() mo_session = Session(client=client, wrap_session=existing_session)
- execute(sql_or_stmt, params: Tuple | None = None, **kwargs)[source]
Execute SQL or SQLAlchemy statement within the current session transaction.
This method extends SQLAlchemy’s Session.execute() with MatrixOne-specific features:
Parameter substitution for string SQL using ‘?’ placeholders
Integrated query logging with performance tracking
Support for both SQLAlchemy statements and raw SQL strings
All queries executed within the session participate in the current transaction. Changes are committed when the session context exits normally, or rolled back on error.
- Parameters:
sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - String SQL with ‘?’ placeholders for parameters - SQLAlchemy select() statement - SQLAlchemy insert() statement - SQLAlchemy update() statement - SQLAlchemy delete() statement - SQLAlchemy text() statement
params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Ignored for SQLAlchemy statements.
**kwargs –
Additional keyword arguments:
log_mode (str): Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If not specified, uses client’s global sql_log_mode setting.
Other kwargs are passed to SQLAlchemy’s execute()
- Returns:
- SQLAlchemy Result object with methods:
fetchall(): Get all rows as list of tuples
fetchone(): Get next row as tuple or None
fetchmany(size): Get next N rows
scalars(): Get first column of each row
mappings(): Get rows as dictionaries
rowcount: Number of rows affected (for INSERT/UPDATE/DELETE)
- Return type:
- Raises:
QueryError – If query execution fails or SQL syntax is invalid
ConnectionError – If session is not connected to database
Examples:
from matrixone import Client from sqlalchemy import select, insert, update, delete, and_, or_ client = Client(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # String SQL with Parameters # ======================================== with client.session() as session: # Simple INSERT with parameters result = session.execute( "INSERT INTO users (name, email, age) VALUES (?, ?, ?)", ("John Doe", "john@example.com", 30) ) print(f"Inserted {result.rowcount} rows") # SELECT with parameters result = session.execute( "SELECT * FROM users WHERE age > ? AND email LIKE ?", (25, "%@example.com") ) for row in result: print(f"User: {row.name}, Age: {row.age}") # UPDATE with parameters result = session.execute( "UPDATE users SET status = ? WHERE age < ?", ("active", 18) ) print(f"Updated {result.rowcount} rows") # DELETE with parameters result = session.execute( "DELETE FROM users WHERE status = ?", ("inactive",) ) print(f"Deleted {result.rowcount} rows") # ======================================== # SQLAlchemy SELECT Statements # ======================================== with client.session() as session: # Basic SELECT stmt = select(User).where(User.age > 25) result = session.execute(stmt) users = result.scalars().all() # Returns list of User objects # SELECT specific columns stmt = select(User.name, User.email).where(User.status == 'active') result = session.execute(stmt) for name, email in result: print(f"{name}: {email}") # Complex WHERE with AND/OR stmt = select(User).where( and_( User.age > 18, or_( User.status == 'active', User.status == 'pending' ) ) ) result = session.execute(stmt) # SELECT with JOINs stmt = select(User, Order).join(Order, User.id == Order.user_id) result = session.execute(stmt) for user, order in result: print(f"{user.name} ordered {order.amount}") # SELECT with aggregation from sqlalchemy import func stmt = select(func.count(User.id), func.avg(User.age)) result = session.execute(stmt) count, avg_age = result.one() # ======================================== # SQLAlchemy INSERT Statements # ======================================== with client.session() as session: # Single INSERT stmt = insert(User).values(name='Alice', email='alice@example.com', age=28) result = session.execute(stmt) print(f"Inserted row with ID: {result.lastrowid}") # Bulk INSERT stmt = insert(User).values([ {'name': 'Bob', 'email': 'bob@example.com', 'age': 35}, {'name': 'Carol', 'email': 'carol@example.com', 'age': 42} ]) result = session.execute(stmt) print(f"Inserted {result.rowcount} rows") # ======================================== # SQLAlchemy UPDATE Statements # ======================================== with client.session() as session: # Simple UPDATE stmt = update(User).where(User.id == 1).values(email='newemail@example.com') result = session.execute(stmt) print(f"Updated {result.rowcount} rows") # Conditional UPDATE stmt = update(User).where(User.age < 18).values(status='minor') result = session.execute(stmt) # UPDATE with expressions stmt = update(Order).values(total=Order.quantity * Order.price) result = session.execute(stmt) # ======================================== # SQLAlchemy DELETE Statements # ======================================== with client.session() as session: # Simple DELETE stmt = delete(User).where(User.id == 1) result = session.execute(stmt) # Conditional DELETE stmt = delete(User).where(User.status == 'deleted') result = session.execute(stmt) print(f"Deleted {result.rowcount} rows") # ======================================== # Result Processing # ======================================== with client.session() as session: # fetchall() - get all rows result = session.execute("SELECT * FROM users") rows = result.fetchall() for row in rows: print(f"User: {row.name}") # fetchone() - get one row at a time result = session.execute("SELECT * FROM users") while row := result.fetchone(): print(f"User: {row.name}") # fetchmany() - get N rows result = session.execute("SELECT * FROM users") rows = result.fetchmany(10) # Get 10 rows # scalars() - get first column stmt = select(User.name) result = session.execute(stmt) names = result.scalars().all() # List of names # mappings() - get rows as dicts result = session.execute("SELECT name, email FROM users") for row in result.mappings(): print(f"{row['name']}: {row['email']}") # scalar() - get single value stmt = select(func.count(User.id)) result = session.execute(stmt) count = result.scalar() # Single integer value # ======================================== # Query Logging Control # ======================================== with client.session() as session: # Disable logging for this query only result = session.execute( "SELECT * FROM large_table", log_mode='off' ) # Force full SQL logging for debugging result = session.execute( "SELECT * FROM users WHERE complex_condition", log_mode='full' ) # Simple logging (show operation type only) result = session.execute( "UPDATE massive_table SET field = 'value'", log_mode='simple' )
- Best Practices:
Use parameters (?-placeholders) to prevent SQL injection
Use SQLAlchemy statements for complex queries
Use string SQL for simple, dynamic queries
Always consume or close result sets
Use log_mode=’off’ for frequently executed queries in production
See also
AsyncSession.execute(): Async version
Client.execute(): Client-level execute without transaction
SQLAlchemy Result: Result object documentation
- insert(table_name: str, data: dict[str, Any]) ResultSet[source]
Insert data into a table within session.
Args:
table_name: Name of the table data: Data to insert (dict with column names as keys)
Returns:
ResultSet object
- batch_insert(table_name: str, data_list: list[dict[str, Any]]) ResultSet[source]
Batch insert data into a table within session.
Args:
table_name: Name of the table data_list: List of data dictionaries to insert
Returns:
ResultSet object
- query(*columns, snapshot: str | None = None)[source]
Get MatrixOne query builder within session - SQLAlchemy style
Args:
*columns: Can be: - Single model class: query(Article) - returns all columns from model - Multiple columns: query(Article.id, Article.title) - returns specific columns - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns snapshot: Optional snapshot name for snapshot queries
Returns:
MatrixOneQuery instance configured for the specified columns within session
- create_table(table_name: str, columns: dict, **kwargs) Session[source]
Create a table within MatrixOne session.
Args:
table_name: Name of the table columns: Dictionary mapping column names to their types (same format as client.create_table) **kwargs: Additional table parameters
Returns:
Session: Self for chaining
- drop_table(table_name: str) Session[source]
Drop a table within MatrixOne session.
Args:
table_name: Name of the table to drop
Returns:
Session: Self for chaining
- create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) Session[source]
Create a table with vector indexes within MatrixOne session.
Args:
table_name: Name of the table columns: Dictionary mapping column names to their types (same format as client.create_table) indexes: List of index definitions (same format as client.create_table_with_index) **kwargs: Additional table parameters
Returns:
Session: Self for chaining
- create_table_orm(table_name: str, *columns, **kwargs) Session[source]
Create a table using SQLAlchemy ORM-style definitions within MatrixOne session.
Args:
table_name: Name of the table *columns: SQLAlchemy Column objects and Index objects (including VectorIndex) **kwargs: Additional parameters (like enable_hnsw, enable_ivf)
Returns:
Session: Self for chaining