Client

MatrixOne Client - Basic implementation

class matrixone.client.ClientExecutor(client)[source]

Bases: BaseMatrixOneExecutor

Client executor that uses Client’s execute method

__init__(client)[source]
class matrixone.client.Client(host: str | None = None, port: int | None = None, user: str | None = None, password: str | None = None, database: str | None = None, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, pool_size: int = 10, max_overflow: int = 20, pool_timeout: int = 30, pool_recycle: int = 3600, connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]

Bases: BaseMatrixOneClient

MatrixOne Client - High-level interface for MatrixOne database operations.

This class provides a comprehensive interface for connecting to and interacting with MatrixOne databases. It supports modern API patterns including table creation, data insertion, querying, vector operations, and transaction management.

Key Features:

  • High-level table operations (create_table, drop_table, insert, batch_insert)

  • Query builder interface for complex queries

  • Vector operations (similarity search, range search, indexing)

  • Transaction management with context managers

  • Snapshot and restore operations

  • Account and user management

  • Fulltext search capabilities

  • Connection pooling and SSL support

Examples:

from matrixone import Client

# Basic usage
client = Client(
    host='localhost',
    port=6001,
    user='root',
    password='111',
    database='test'
)

# Create table
client.create_table("users", {
    "id": "int primary key",
    "name": "varchar(100)",
    "email": "varchar(255)"
})

# Insert and query data
client.insert("users", {"id": 1, "name": "John", "email": "john@example.com"})
result = client.query("users").where("id = ?", 1).all()

# Vector operations
client.create_table("documents", {
    "id": "int primary key",
    "content": "text",
    "embedding": "vecf32(384)"
})

results = client.vector_ops.similarity_search(
    "documents",
    vector_column="embedding",
    query_vector=[0.1, 0.2, 0.3, ...],
    limit=10
)

# Transaction
with client.transaction() as tx:
    tx.execute("INSERT INTO users (name) VALUES ('John')")

Attributes:

engine (Engine): SQLAlchemy engine instance
connected (bool): Connection status
backend_version (str): Detected backend version
vector_ops (VectorManager): Vector operations manager
snapshots (SnapshotManager): Snapshot operations manager
query (QueryBuilder): Query builder for complex queries
__init__(host: str | None = None, port: int | None = None, user: str | None = None, password: str | None = None, database: str | None = None, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, pool_size: int = 10, max_overflow: int = 20, pool_timeout: int = 30, pool_recycle: int = 3600, connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]

Initialize MatrixOne client

Args:

host: Database host (optional, can be set later via connect)
port: Database port (optional, can be set later via connect)
user: Username (optional, can be set later via connect)
password: Password (optional, can be set later via connect)
database: Database name (optional, can be set later via connect)
ssl_mode: SSL mode (disabled, preferred, required)
ssl_ca: SSL CA certificate path
ssl_cert: SSL client certificate path
ssl_key: SSL client key path
account: Optional account name
role: Optional role name
pool_size: Connection pool size
max_overflow: Maximum overflow connections
pool_timeout: Pool timeout in seconds
pool_recycle: Connection recycle time in seconds
connection_timeout: Connection timeout in seconds
query_timeout: Query timeout in seconds
auto_commit: Enable auto-commit mode
charset: Character set for connection
logger: Custom logger instance. If None, creates a default logger
sql_log_mode: SQL logging mode ('off', 'auto', 'simple', 'full')
    - 'off': No SQL logging
    - 'auto': Smart logging - short SQL shown fully, long SQL summarized (default)
    - 'simple': Show operation summary only
    - 'full': Show complete SQL regardless of length
slow_query_threshold: Threshold in seconds for slow query warnings (default: 1.0)
max_sql_display_length: Maximum SQL length in auto mode before summarizing (default: 500)
connect(*, host: str = 'localhost', port: int = 6001, user: str = 'root', password: str = '111', database: str, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, charset: str = 'utf8mb4', connection_timeout: int = 30, auto_commit: bool = True, on_connect: ConnectionHook | List[ConnectionAction | str] | Callable | None = None) None[source]

Connect to MatrixOne database using SQLAlchemy engine

Args:

host: Database host
port: Database port
user: Username or login info in format "user", "account#user", "account#user#role",
      "account:user", or "account:user:role" (both '#' and ':' separators are supported)
password: Password
database: Database name
ssl_mode: SSL mode (disabled, preferred, required)
ssl_ca: SSL CA certificate path
ssl_cert: SSL client certificate path
ssl_key: SSL client key path
account: Optional account name (will be combined with user if user doesn't contain '#' or ':')
role: Optional role name (will be combined with user if user doesn't contain '#' or ':')
charset: Character set for the connection (default: utf8mb4)
connection_timeout: Connection timeout in seconds (default: 30)
auto_commit: Enable autocommit (default: True)
on_connect: Connection hook to execute after successful connection.
           Can be:
           - ConnectionHook instance
           - List of ConnectionAction or string action names
           - Custom callback function

Examples:

# Enable all features after connection
client.connect(host, port, user, password, database,
              on_connect=[ConnectionAction.ENABLE_ALL])

# Enable only vector operations with custom charset
client.connect(host, port, user, password, database,
              charset="utf8mb4",
              on_connect=[ConnectionAction.ENABLE_VECTOR])

# Custom callback
def my_callback(client):
    print(f"Connected to {client._connection_params['host']}")

client.connect(host, port, user, password, database,
              on_connect=my_callback)
classmethod from_engine(engine: Engine, **kwargs) Client[source]

Create Client instance from existing SQLAlchemy Engine

Args:

engine: SQLAlchemy Engine instance (must use MySQL driver)
**kwargs: Additional client configuration options

Returns:

Client: Configured client instance

Raises:

ConnectionError: If engine doesn't use MySQL driver

Examples

Basic usage:

    from sqlalchemy import create_engine
    from matrixone import Client

    engine = create_engine("mysql+pymysql://user:pass@host:port/db")
    client = Client.from_engine(engine)

With custom configuration::

    engine = create_engine("mysql+pymysql://user:pass@host:port/db")
    client = Client.from_engine(
        engine,
        sql_log_mode='auto',
        slow_query_threshold=0.5
    )
disconnect() None[source]

Disconnect from MatrixOne database and dispose engine.

This method properly closes all database connections and disposes of the SQLAlchemy engine. It should be called when the client is no longer needed to free up resources.

After calling this method, the client will need to be reconnected using the connect() method before any database operations can be performed.

Raises:

Exception: If disconnection fails (logged but re-raised)

Example

>>> client = Client('localhost', 6001, 'root', '111', 'test')
        >>> client.connect()
        >>> # ... perform database operations ...
        >>> client.disconnect()  # Clean up resources
get_login_info() dict | None[source]

Get parsed login information used for database connection.

Returns the login information dictionary that was used to establish the database connection. This includes user, account, role, and other authentication details.

Returns:

Optional[dict]: Dictionary containing login information with keys:
    - user: Username
    - account: Account name (if specified)
    - role: Role name (if specified)
    - host: Database host
    - port: Database port
    - database: Database name
Returns None if not connected or no login info available.

Example

>>> client = Client('localhost', 6001, 'root', '111', 'test')
        >>> client.connect()
        >>> login_info = client.get_login_info()
        >>> print(f"Connected as {login_info['user']} to {login_info['database']}")
execute(sql_or_stmt, params: Tuple | None = None, log_mode: str | None = None) ResultSet[source]

Execute SQL query or SQLAlchemy statement without transaction isolation.

This method executes queries directly using the connection pool, without wrapping them in a transaction. Each statement executes independently with auto-commit enabled. For atomic multi-statement operations, use client.session() instead.

The method supports both SQLAlchemy ORM-style statements (recommended) and string SQL with parameter binding. It’s ideal for single-statement operations like SELECT queries, simple INSERT/UPDATE/DELETE, or DDL statements.

Key Features:

  • ORM-style statements: Full support for SQLAlchemy select(), insert(), update(), delete()

  • Parameter binding: Automatic escaping of parameters to prevent SQL injection

  • Query logging: Integrated logging with performance tracking

  • Auto-commit: Each statement commits immediately (no transaction isolation)

  • Connection pooling: Efficient connection reuse from pool

  • Type safety: ResultSet with structured access to query results

Parameters:
  • sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - SQLAlchemy select() statement (recommended) - SQLAlchemy insert() statement (recommended) - SQLAlchemy update() statement (recommended) - SQLAlchemy delete() statement (recommended) - String SQL with ‘?’ placeholders for parameters - SQLAlchemy text() statement

  • params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Automatically escaped to prevent SQL injection. Ignored for SQLAlchemy statements (use .values() or .where() with bound parameters instead).

  • log_mode (Optional[str]) – Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If None, uses client’s global sql_log_mode setting. Useful for debugging specific queries or disabling logs for frequently-executed statements.

Returns:

Query result object with:
  • columns: List[str] - Column names

  • rows: List[Tuple] - Row data as tuples

  • affected_rows: int - Number of rows affected by DML operations

  • fetchall() -> List[Row] - Get all rows as list

  • fetchone() -> Optional[Row] - Get next row or None

  • fetchmany(size) -> List[Row] - Get next N rows

Return type:

ResultSet

Raises:
  • ConnectionError – If not connected to database

  • QueryError – If query execution fails or SQL syntax is invalid

Usage Examples:

from matrixone import Client
from sqlalchemy import select, insert, update, delete, and_, or_, func
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)
    status = Column(String(20))

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    amount = Column(Float)

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# ========================================
# SQLAlchemy SELECT Statements (Recommended)
# ========================================

# Basic SELECT with WHERE clause
stmt = select(User).where(User.age > 25)
result = client.execute(stmt)
for user in result.fetchall():
    print(f"User: {user.name}, Age: {user.age}")

# SELECT specific columns
stmt = select(User.name, User.email).where(User.status == 'active')
result = client.execute(stmt)
for name, email in result.fetchall():
    print(f"{name}: {email}")

# Complex WHERE with AND/OR
stmt = select(User).where(
    and_(
        User.age > 18,
        or_(
            User.status == 'active',
            User.status == 'pending'
        )
    )
)
result = client.execute(stmt)

# SELECT with JOIN
stmt = select(User, Order).join(Order, User.id == Order.user_id)
result = client.execute(stmt)
for user, order in result.fetchall():
    print(f"{user.name} ordered ${order.amount}")

# SELECT with aggregation
stmt = select(func.count(User.id), func.avg(User.age)).where(User.status == 'active')
result = client.execute(stmt)
count, avg_age = result.fetchone()
print(f"Active users: {count}, Average age: {avg_age}")

# SELECT with ORDER BY and LIMIT
stmt = select(User).where(User.age > 25).order_by(User.age.desc()).limit(10)
result = client.execute(stmt)

# ========================================
# SQLAlchemy INSERT Statements (Recommended)
# ========================================

# Single INSERT
stmt = insert(User).values(name='John', email='john@example.com', age=30)
result = client.execute(stmt)
print(f"Inserted {result.affected_rows} rows")

# Bulk INSERT
stmt = insert(User).values([
    {'name': 'Alice', 'email': 'alice@example.com', 'age': 28},
    {'name': 'Bob', 'email': 'bob@example.com', 'age': 35},
    {'name': 'Carol', 'email': 'carol@example.com', 'age': 42}
])
result = client.execute(stmt)
print(f"Inserted {result.affected_rows} rows")

# ========================================
# SQLAlchemy UPDATE Statements (Recommended)
# ========================================

# Simple UPDATE
stmt = update(User).where(User.id == 1).values(email='newemail@example.com')
result = client.execute(stmt)
print(f"Updated {result.affected_rows} rows")

# Conditional UPDATE
stmt = update(User).where(User.age < 18).values(status='minor')
result = client.execute(stmt)

# UPDATE with expressions
stmt = update(Order).values(total=Order.quantity * Order.price)
result = client.execute(stmt)

# UPDATE multiple columns
stmt = update(User).where(User.id == 1).values(
    name='Updated Name',
    email='updated@example.com',
    status='active'
)
result = client.execute(stmt)

# ========================================
# SQLAlchemy DELETE Statements (Recommended)
# ========================================

# Simple DELETE
stmt = delete(User).where(User.id == 1)
result = client.execute(stmt)
print(f"Deleted {result.affected_rows} rows")

# Conditional DELETE
stmt = delete(User).where(User.status == 'deleted')
result = client.execute(stmt)

# DELETE with complex condition
stmt = delete(User).where(
    and_(
        User.age < 18,
        User.status == 'inactive'
    )
)
result = client.execute(stmt)

# ========================================
# String SQL with Parameters (Alternative)
# ========================================

# SELECT with parameters
result = client.execute(
    "SELECT * FROM users WHERE age > ? AND status = ?",
    (25, 'active')
)

# INSERT with parameters
result = client.execute(
    "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
    ('David', 'david@example.com', 28)
)

# UPDATE with parameters
result = client.execute(
    "UPDATE users SET status = ? WHERE age < ?",
    ('minor', 18)
)

# DELETE with parameters
result = client.execute(
    "DELETE FROM users WHERE status = ?",
    ('inactive',)
)

# ========================================
# Query Logging Control
# ========================================

# Disable logging for frequently executed query
result = client.execute(
    select(User).where(User.id == 1),
    log_mode='off'
)

# Force full SQL logging for debugging
result = client.execute(
    select(User).where(User.name.like('%test%')),
    log_mode='full'
)

# Simple logging (operation type only)
result = client.execute(
    update(User).values(status='processed'),
    log_mode='simple'
)

Important Notes:

  • No transaction isolation: Each execute() call commits immediately

  • Use session() for transactions: For atomic multi-statement operations

  • ORM-style preferred: Use SQLAlchemy statements for better type safety

  • Auto-commit behavior: Changes are permanent immediately after execute()

  • Thread-safe: Uses connection pooling for concurrent access

Best Practices:

  1. Prefer ORM-style statements: Use select(), insert(), update(), delete()

  2. Use parameters: Always use parameter binding to prevent SQL injection

  3. Session for transactions: Use client.session() for atomic operations

  4. Disable logging in production: Use log_mode=’off’ for hot paths

  5. Handle exceptions: Wrap execute() in try-except for error handling

See also

  • Client.session(): For transaction-aware operations

  • Session.execute(): Execute within transaction context

  • AsyncClient.execute(): Async version for async/await workflows

insert(table_name_or_model, data: dict) ResultSet[source]

Insert a single row of data into a table.

This method provides a convenient way to insert data using a dictionary where keys are column names and values are the data to insert. The method automatically handles SQL generation and parameter binding.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data (dict): Dictionary mapping column names to values. Example:

            {'name': 'John', 'age': 30, 'email': 'john@example.com'}

Returns:

ResultSet: Object containing insertion results with:
    - affected_rows: Number of rows inserted (should be 1)
    - columns: Empty list (no columns returned for INSERT)
    - rows: Empty list (no rows returned for INSERT)

Raises:

ConnectionError: If not connected to database
QueryError: If insertion fails

Examples

# Insert a single user using table name
>>> result = client.insert('users', {
...     'name': 'John Doe',
...     'age': 30,
...     'email': 'john@example.com'
... })
>>> print(f"Inserted {result.affected_rows} row")

# Insert using model class >>> from sqlalchemy import Column, Integer, String >>> from matrixone.orm import declarative_base >>> Base = declarative_base() >>> class User(Base): … __tablename__ = ‘users’ … id = Column(Integer, primary_key=True) … name = Column(String(50)) … age = Column(Integer) >>> result = client.insert(User, { … ‘name’: ‘Jane Doe’, … ‘age’: 25 … })

# Insert with NULL values >>> result = client.insert(‘products’, { … ‘name’: ‘Product A’, … ‘price’: 99.99, … ‘description’: None # NULL value … })

batch_insert(table_name_or_model, data_list: list) ResultSet[source]

Batch insert multiple rows of data into a table.

This method efficiently inserts multiple rows in a single operation, which is much faster than calling insert() multiple times. All rows must have the same column structure.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data_list (list): List of dictionaries, where each dictionary represents
    a row to insert. All dictionaries must have the same keys.
    Example: [
        {'name': 'John', 'age': 30},
        {'name': 'Jane', 'age': 25},
        {'name': 'Bob', 'age': 35}
    ]

Returns:

ResultSet: Object containing insertion results with:
    - affected_rows: Number of rows inserted
    - columns: Empty list (no columns returned for INSERT)
    - rows: Empty list (no rows returned for INSERT)

Raises:

ConnectionError: If not connected to database
QueryError: If batch insertion fails
ValueError: If data_list is empty or has inconsistent column structure

Examples

# Insert multiple users
>>> users = [
...     {'name': 'John Doe', 'age': 30, 'email': 'john@example.com'},
...     {'name': 'Jane Smith', 'age': 25, 'email': 'jane@example.com'},
...     {'name': 'Bob Johnson', 'age': 35, 'email': 'bob@example.com'}
... ]
>>> result = client.batch_insert('users', users)
>>> print(f"Inserted {result.affected_rows} rows")

# Insert with some NULL values >>> products = [ … {‘name’: ‘Product A’, ‘price’: 99.99, ‘description’: ‘Great product’}, … {‘name’: ‘Product B’, ‘price’: 149.99, ‘description’: None} … ] >>> result = client.batch_insert(‘products’, products)

get_sqlalchemy_engine() Engine[source]

Get SQLAlchemy engine

Returns:

SQLAlchemy Engine
query(*columns, snapshot: str | None = None)[source]

Get MatrixOne query builder - SQLAlchemy style

Args:

*columns: Can be:
    - Single model class: query(Article) - returns all columns from model
    - Multiple columns: query(Article.id, Article.title) - returns specific columns
    - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries

Examples

# Traditional model query (all columns)

client.query(Article).filter(…).all()

# Column-specific query client.query(Article.id, Article.title).filter(…).all()

# With fulltext score client.query(Article.id, boolean_match(“title”, “content”).must(“python”).label(“score”))

# Snapshot query client.query(Article, snapshot=”my_snapshot”).filter(…).all()

Returns:

MatrixOneQuery instance configured for the specified columns
snapshot(snapshot_name: str) Generator[SnapshotClient, None, None][source]

Snapshot context manager

Usage

with client.snapshot(“daily_backup”) as snapshot_client: result = snapshot_client.execute(“SELECT * FROM users”)

session() Generator[Session, None, None][source]

Create a transaction-aware session for atomic database operations.

This method returns a MatrixOne Session that extends SQLAlchemy Session with MatrixOne-specific features. All operations within the session are executed atomically - they either all succeed or all fail together.

The session is a context manager that automatically handles transaction lifecycle: - Commits the transaction when the context exits normally - Rolls back the transaction if any exception occurs - Cleans up database resources automatically

Key Features:

  • Full SQLAlchemy ORM: All standard SQLAlchemy Session methods (add, delete, query, etc.)

  • Atomic transactions: Multiple operations commit or rollback together

  • MatrixOne managers: Access to snapshots, clones, vector ops, etc.

  • ORM-style operations: Use SQLAlchemy select(), insert(), update(), delete()

  • Automatic cleanup: Transaction and connection resources managed automatically

  • Query logging: Integrated query logging with performance tracking

Available Managers (transaction-aware):

  • session.snapshots: SnapshotManager for creating/managing snapshots

  • session.clone: CloneManager for cloning databases and tables

  • session.restore: RestoreManager for restoring from snapshots

  • session.pitr: PitrManager for point-in-time recovery

  • session.pubsub: PubSubManager for publish-subscribe operations

  • session.account: AccountManager for account/user management

  • session.vector_ops: VectorManager for vector operations and indexing

  • session.fulltext_index: FulltextIndexManager for fulltext search

  • session.metadata: MetadataManager for table metadata analysis

  • session.load_data: LoadDataManager for bulk data loading

  • session.stage: StageManager for external stage management

Returns:

Context manager that yields a MatrixOne Session

Return type:

Generator[Session, None, None]

Raises:

ConnectionError – If client is not connected to database

Usage Examples:

from matrixone import Client
from sqlalchemy import select, insert, update, delete
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# ========================================
# Example 1: Basic Transaction with ORM-style SQL
# ========================================
with client.session() as session:
    # Insert using SQLAlchemy insert()
    session.execute(insert(User).values(name='John', email='john@example.com', age=30))

    # Update using SQLAlchemy update()
    session.execute(update(User).where(User.age < 18).values(status='minor'))

    # Select using SQLAlchemy select()
    stmt = select(User).where(User.age > 25)
    result = session.execute(stmt)
    for user in result.scalars():
        print(f"User: {user.name}")

    # Delete using SQLAlchemy delete()
    session.execute(delete(User).where(User.status == 'inactive'))
    # All operations commit atomically

# ========================================
# Example 2: SQLAlchemy ORM Operations
# ========================================
with client.session() as session:
    # Create new objects
    user1 = User(name='Alice', email='alice@example.com', age=28)
    user2 = User(name='Bob', email='bob@example.com', age=35)

    # Add to session
    session.add(user1)
    session.add(user2)

    # Query using ORM
    stmt = select(User).where(User.name == 'Alice')
    result = session.execute(stmt)
    alice = result.scalar_one()

    # Update object
    alice.email = 'newemail@example.com'

    # Commit (or let context manager do it)
    session.commit()

# ========================================
# Example 3: MatrixOne Snapshot Operations
# ========================================
with client.session() as session:
    # Create snapshot within transaction
    from matrixone import SnapshotLevel

    snapshot = session.snapshots.create(
        name='daily_backup',
        level=SnapshotLevel.DATABASE,
        database='production'
    )

    # Clone database using snapshot
    session.clone.clone_database(
        target_db='prod_copy',
        source_db='production',
        snapshot_name='daily_backup'
    )
    # Both operations commit atomically

# ========================================
# Example 4: Bulk Data Loading with Stages
# ========================================
with client.session() as session:
    # Create stage using simple interface
    session.stage.create_local('import_stage', '/data/imports/')

    # Load data from stage using ORM model
    session.load_data.from_stage_csv('import_stage', 'users.csv', User)

    # Update statistics after loading
    session.execute("ANALYZE TABLE users")
    # All operations are atomic

# ========================================
# Example 5: Error Handling and Rollback
# ========================================
try:
    with client.session() as session:
        session.execute(insert(User).values(name='Charlie', age=40))
        session.execute(insert(InvalidTable).values(data='test'))  # This fails
        # Transaction automatically rolls back - Charlie is NOT inserted
except Exception as e:
    print(f"Transaction failed and rolled back: {e}")

# ========================================
# Example 6: Manual Transaction Control
# ========================================
with client.session() as session:
    try:
        session.execute(insert(User).values(name='David', age=25))
        session.execute(update(Account).values(balance=Account.balance - 100))

        # Verify before committing
        stmt = select(Account.balance)
        balance = session.execute(stmt).scalar()

        if balance >= 0:
            session.commit()  # Explicit commit
        else:
            session.rollback()  # Explicit rollback
            print("Insufficient balance")
    except Exception as e:
        session.rollback()
        raise

# ========================================
# Example 7: Complex Multi-Manager Transaction
# ========================================
with client.session() as session:
    # Create publication
    session.pubsub.create_database_publication(
        name='analytics_pub',
        database='analytics',
        account='subscriber_account'
    )

    # Create S3 stage for exports
    session.stage.create_s3(
        name='export_stage',
        bucket='my-bucket',
        path='exports/',
        aws_key_id='key',
        aws_secret_key='secret'
    )

    # Load fresh data using ORM model
    session.load_data.from_csv('/data/latest.csv', Analytics)

    # Create snapshot after load
    session.snapshots.create(
        name='post_load_snapshot',
        level=SnapshotLevel.DATABASE,
        database='analytics'
    )
    # All operations commit together

Best Practices:

  1. Always use context manager: Use with client.session() for automatic cleanup

  2. Keep transactions short: Long transactions can block other operations

  3. Handle exceptions: Wrap session code in try-except for error handling

  4. Use ORM-style SQL: Prefer SQLAlchemy insert(), update(), select(), delete()

  5. Avoid nested sessions: SQLAlchemy doesn’t support true nested transactions

  6. Test rollback behavior: Ensure your application handles rollbacks correctly

See also

  • Session: The session class returned by this method

  • AsyncClient.session(): Async version for async/await workflows

  • Client.execute(): Non-transactional query execution

property snapshots: SnapshotManager | None

Get snapshot manager

property clone: CloneManager | None

Get clone manager

property branch: BranchManager | None

Get branch manager for Git-style version control operations.

Provides table and database branching, diffing, and merging capabilities. Requires MatrixOne 3.0.5 or higher.

Returns:

BranchManager instance for branch operations

Example:

client = Client()
client.connect(database='test')

# Create table branch
client.branch.create_table_branch('users_branch', 'users')

# Create database branch
client.branch.create_database_branch('dev_db', 'production')

# Compare branches
diffs = client.branch.diff_table('users_branch', 'users')

# Merge branches
client.branch.merge_table('users_branch', 'users')

# Delete branches
client.branch.delete_table_branch('users_branch')
client.branch.delete_database_branch('dev_db')

See also

  • branch_guide - Complete branch management guide

property moctl: MoCtlManager | None

Get mo_ctl manager

property restore: RestoreManager | None

Get restore manager

property pitr: PitrManager | None

Get PITR manager

property pubsub: PubSubManager | None

Get publish-subscribe manager

property account: AccountManager | None

Get account manager

property load_data: LoadDataManager | None

Get load data manager

property stage: StageManager | None

Get stage manager for external stage operations

property cdc: CDCManager | None

Get CDC manager for change data capture operations.

property export

Get export manager for data export operations (INTO OUTFILE, INTO STAGE)

property vector_ops: VectorManager | None

Get unified vector operations manager for vector operations (index and data)

get_pinecone_index(table_name_or_model, vector_column: str)[source]

Get a PineconeCompatibleIndex object for vector search operations.

This method creates a Pinecone-compatible vector search interface that automatically parses the table schema and vector index configuration. The primary key column is automatically detected, and all other columns except the vector column will be included as metadata.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
vector_column: Name of the vector column

Returns:

PineconeCompatibleIndex object with Pinecone-compatible API

Example

>>> index = client.get_pinecone_index("documents", "embedding")
        >>> index = client.get_pinecone_index(DocumentModel, "embedding")
        >>> results = index.query([0.1, 0.2, 0.3], top_k=5)
        >>> for match in results.matches:
        ...     print(f"ID: {match.id}, Score: {match.score}")
        ...     print(f"Metadata: {match.metadata}")
property fulltext_index: FulltextIndexManager | None

Get fulltext index manager for fulltext index operations

property metadata: MetadataManager | None

Get metadata manager for table metadata operations

connected() bool[source]

Check if client is connected to database

version() str[source]

Get MatrixOne server version

Returns:

str: MatrixOne server version string

Raises:

ConnectionError: If not connected to MatrixOne
QueryError: If version query fails

Example

>>> client = Client('localhost', 6001, 'root', '111', 'test')
        >>> version = client.version()
        >>> print(f"MatrixOne version: {version}")
git_version() str[source]

Get MatrixOne git version information

Returns:

str: MatrixOne git version string

Raises:

ConnectionError: If not connected to MatrixOne
QueryError: If git version query fails

Example

>>> client = Client('localhost', 6001, 'root', '111', 'test')
        >>> git_version = client.git_version()
        >>> print(f"MatrixOne git version: {git_version}")
set_backend_version(version: str) None[source]

Manually set the backend version

Args:

version: Version string in format "major.minor.patch" (e.g., "3.0.1")
get_backend_version() str | None[source]

Get current backend version

Returns:

Version string or None if not set
is_feature_available(feature_name: str) bool[source]

Check if a feature is available in current backend version

Args:

feature_name: Name of the feature to check

Returns:

True if feature is available, False otherwise
get_feature_info(feature_name: str) Dict[str, Any] | None[source]

Get feature requirement information

Args:

feature_name: Name of the feature

Returns:

Feature information dictionary or None if not found
check_version_compatibility(required_version: str, operator: str = '>=') bool[source]

Check if current backend version is compatible with required version

Args:

required_version: Required version string (e.g., "3.0.1")
operator: Comparison operator (">=", ">", "<=", "<", "==", "!=")

Returns:

True if compatible, False otherwise
get_version_hint(feature_name: str, error_context: str = '') str[source]

Get helpful hint message for version-related errors

Args:

feature_name: Name of the feature
error_context: Additional context for the error

Returns:

Helpful hint message
is_development_version() bool[source]

Check if current backend is a development version

Returns:

True if backend is development version (999.x.x), False otherwise
create_table(table_name_or_model, columns: dict | None = None, **kwargs) Client[source]

Create a table with a simplified interface.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
columns: Dictionary mapping column names to their types (required if table_name_or_model is str)
        Supported formats:
        - 'id': 'bigint' (with primary_key=True if needed)
        - 'name': 'varchar(100)'
        - 'embedding': 'vecf32(128)' or 'vecf64(128)'
        - 'score': 'float'
        - 'created_at': 'datetime'
        - 'is_active': 'boolean'
**kwargs: Additional table parameters

Returns:

Client: Self for chaining

Example

client.create_table(“users”, { ‘id’: ‘bigint’, ‘name’: ‘varchar(100)’, ‘email’: ‘varchar(255)’, ‘embedding’: ‘vecf32(128)’, ‘score’: ‘float’, ‘created_at’: ‘datetime’, ‘is_active’: ‘boolean’ }, primary_key=’id’)

drop_table(table_name_or_model) Client[source]

Drop a table.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class

Returns:

Client: Self for chaining

Example

# Drop table by name

client.drop_table(“users”)

# Drop table by model class client.drop_table(UserModel)

create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) Client[source]

Create a table with vector indexes using a simplified interface.

Args:

table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as create_table)
indexes: List of index definitions, each containing:
        - 'name': Index name
        - 'column': Column name to index
        - 'type': Index type ('ivfflat' or 'hnsw')
        - 'params': Dictionary of index-specific parameters
**kwargs: Additional table parameters

Returns:

Client: Self for chaining

Example

client.create_table_with_index(“vector_docs”, { ‘id’: ‘bigint’, ‘title’: ‘varchar(200)’, ‘embedding’: ‘vector(128,f32)’ }, indexes=[ { ‘name’: ‘idx_hnsw’, ‘column’: ‘embedding’, ‘type’: ‘hnsw’, ‘params’: {‘m’: 48, ‘ef_construction’: 64, ‘ef_search’: 64} } ], primary_key=’id’)

create_table_orm(table_name: str, *columns, **kwargs) Client[source]

Create a table using SQLAlchemy ORM-style column definitions. Similar to SQLAlchemy Table() constructor but without metadata.

Args:

table_name: Name of the table
*columns: SQLAlchemy Column objects and Index objects (including VectorIndex)
**kwargs: Additional parameters (like enable_hnsw, enable_ivf)

Returns:

Client: Self for chaining

Example:

from sqlalchemy import Column, BigInteger, Integer
from matrixone.sqlalchemy_ext import Vectorf32, VectorIndex, VectorIndexType, VectorOpType

client.create_table_orm(
    'vector_docs_hnsw_demo',
    Column('a', BigInteger, primary_key=True),
    Column('b', Vectorf32(128)),
    Column('c', Integer),
    VectorIndex('idx_hnsw', 'b', index_type=VectorIndexType.HNSW,
           m=48, ef_construction=64, ef_search=64,
           op_type=VectorOpType.VECTOR_L2_OPS)
)
create_all(base_class=None)[source]

Create all tables defined in the given base class or default Base.

Args:

base_class: SQLAlchemy declarative base class. If None, uses the default Base.
drop_all(base_class=None)[source]

Drop all tables defined in the given base class or default Base.

Args:

base_class: SQLAlchemy declarative base class. If None, uses the default Base.
get_secondary_index_tables(table_name: str, database_name: str | None = None) List[str][source]

Get all secondary index table names for a given table.

This includes both regular secondary indexes (MULTIPLE type) and UNIQUE indexes.

Parameters:
  • table_name – Name of the table to get secondary indexes for

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

List of secondary index table names (includes both __mo_index_secondary_… and __mo_index_unique_… tables)

Examples:

>>> client = Client()
>>> client.connect(host='localhost', port=6001, user='root', password='111', database='test')
>>> # Use current database
>>> index_tables = client.get_secondary_index_tables('cms_all_content_chunk_info')
>>> # Or specify database explicitly
>>> index_tables = client.get_secondary_index_tables('cms_all_content_chunk_info', 'test')
>>> print(index_tables)
['__mo_index_secondary_..._cms_id', '__mo_index_unique_..._email']
get_secondary_index_table_by_name(table_name: str, index_name: str, database_name: str | None = None) str | None[source]

Get the physical table name of a secondary index by its index name.

Parameters:
  • table_name – Name of the table

  • index_name – Name of the secondary index

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

Physical table name of the secondary index, or None if not found

Examples:

>>> client = Client()
>>> client.connect(host='localhost', port=6001, user='root', password='111', database='test')
>>> # Use current database
>>> index_table = client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id')
>>> # Or specify database explicitly
>>> index_table = client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id', 'test')
>>> print(index_table)
'__mo_index_secondary_018cfbda-bde1-7c3e-805c-3f8e71769f75_cms_id'
get_table_indexes_detail(table_name: str, database_name: str | None = None) List[dict][source]

Get detailed information about all indexes for a table, including IVF, HNSW, Fulltext, and regular indexes.

This method returns comprehensive information about each index physical table, including: - Index name - Index type (MULTIPLE, PRIMARY, UNIQUE, etc.) - Algorithm type (ivfflat, hnsw, fulltext, etc.) - Algorithm table type (metadata, centroids, entries, etc.) - Physical table name - Column names - Algorithm parameters

Parameters:
  • table_name – Name of the table to get indexes for

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

  • index_name: Name of the index

  • index_type: Type of index (MULTIPLE, PRIMARY, UNIQUE, etc.)

  • algo: Algorithm type (ivfflat, hnsw, fulltext, or None for regular indexes)

  • algo_table_type: Algorithm table type (metadata, centroids, entries, etc., or None)

  • physical_table_name: Physical table name

  • columns: List of column names

  • algo_params: Algorithm parameters (or None)

Return type:

List of dictionaries, each containing

Examples:

>>> client = Client()
>>> client.connect(host='localhost', port=6001, user='root', password='111', database='test')
>>> # Get all index details for a table
>>> indexes = client.get_table_indexes_detail('ivf_health_demo_docs')
>>> for idx in indexes:
...     print(f"{idx['index_name']} ({idx['algo']}) - {idx['algo_table_type']}: {idx['physical_table_name']}")
idx_embedding_ivf (ivfflat) - metadata: __mo_index_secondary_...
idx_embedding_ivf (ivfflat) - centroids: __mo_index_secondary_...
idx_embedding_ivf (ivfflat) - entries: __mo_index_secondary_...
verify_table_index_counts(table_name: str) int[source]

Verify that the main table and all its secondary index tables have the same row count.

This method compares the COUNT(*) of the main table with all its secondary index tables in a single SQL query for consistency. If counts don’t match, raises an exception.

Parameters:

table_name – Name of the table to verify

Returns:

Row count (int) if verification succeeds

Raises:

ValueError – If any secondary index table has a different count than the main table, with details about all counts in the error message

Examples:

>>> client = Client()
>>> client.connect(host='localhost', port=6001, user='root', password='111', database='test')
>>> count = client.verify_table_index_counts('cms_all_content_chunk_info')
>>> print(f"✓ Verification passed, row count: {count}")

>>> # If verification fails:
>>> try:
...     count = client.verify_table_index_counts('some_table')
... except ValueError as e:
...     print(f"Verification failed: {e}")
class matrixone.client.ResultSet(columns: List[str], rows: List[Tuple[Any, ...]], affected_rows: int = 0)[source]

Bases: object

Result set wrapper for query results from MatrixOne database operations.

This class provides a convenient interface for accessing query results with methods similar to database cursor objects. It supports both SELECT queries (returning data) and DML operations (returning affected row counts).

Key Features:

  • Iterator interface for row-by-row access

  • Bulk data access methods (fetchall, fetchmany)

  • Column name access and metadata

  • Affected row count for DML operations

  • Cursor-like positioning for result navigation

Attributes:

columns (List[str]): List of column names in the result set
rows (List[Tuple[Any, ...]]): List of tuples containing row data
affected_rows (int): Number of rows affected by DML operations

Usage Examples:

# SELECT query results >>> result = client.execute(“SELECT id, name, age FROM users WHERE age > ?”, (25,)) >>> print(f”Found {len(result.rows)} users”) >>> for row in result.fetchall(): … print(f”ID: {row[0]}, Name: {row[1]}, Age: {row[2]}”)

# Access by column name >>> for row in result.rows: … user_id = row[result.columns.index(‘id’)] … user_name = row[result.columns.index(‘name’)]

# DML operation results >>> result = client.execute(“INSERT INTO users (name, age) VALUES (?, ?)”, (“John”, 30)) >>> print(f”Inserted {result.affected_rows} rows”)

# Iterator interface >>> for row in result: … print(row)

Note: This class is automatically created by the Client’s execute() method and provides a consistent interface for all query results.

__init__(columns: List[str], rows: List[Tuple[Any, ...]], affected_rows: int = 0)[source]
fetchall() List[Tuple[Any, ...]][source]

Fetch all remaining rows

fetchone() Tuple[Any, ...] | None[source]

Fetch one row

fetchmany(size: int = 1) List[Tuple[Any, ...]][source]

Fetch many rows

scalar() Any[source]

Get scalar value (first column of first row)

keys()[source]

Get column names

class matrixone.client.SnapshotClient(client, snapshot_name: str)[source]

Bases: object

Snapshot client wrapper for executing queries with snapshot

__init__(client, snapshot_name: str)[source]
execute(sql: str, params: Tuple | None = None) ResultSet[source]

Execute SQL with snapshot

class matrixone.client.Session(bind=None, client=None, wrap_session=None, **kwargs)[source]

Bases: Session

MatrixOne Session - extends SQLAlchemy Session with MatrixOne features.

This class inherits from SQLAlchemy Session and provides a transaction context for executing multiple database operations atomically, while adding MatrixOne-specific capabilities like snapshots, clones, vector operations, and fulltext search.

Key Features:

  • Full SQLAlchemy Session API - All inherited methods work as expected

  • Atomic transaction execution with automatic rollback on errors

  • Access to all MatrixOne managers within session context

  • Support for both SQLAlchemy statements and string SQL

  • Automatic commit/rollback handling

Available Managers: - snapshots: SnapshotManager for snapshot operations - clone: CloneManager for clone operations - restore: RestoreManager for restore operations - pitr: PitrManager for point-in-time recovery - pubsub: PubSubManager for pub/sub operations - account: AccountManager for account operations - vector_ops: TransactionVectorIndexManager for vector operations - fulltext_index: TransactionFulltextIndexManager for fulltext operations

Usage Examples

# Standard SQLAlchemy usage
from sqlalchemy import select
with client.session() as session:
    # Execute SQLAlchemy statements
    stmt = select(User).where(User.age > 25)
    result = session.execute(stmt)
    users = result.scalars().all()  # Returns ORM objects

    # ORM operations
    user = User(name="John", age=30)
    session.add(user)
    session.commit()

# MatrixOne features
with client.session() as session:
    # Snapshot operations
    session.snapshots.create("backup", SnapshotLevel.DATABASE, database="mydb")

    # Clone operations
    session.clone.clone_database("new_db", "source_db")

Note: This class is automatically created by the Client’s session() context manager and should not be instantiated directly.

__init__(bind=None, client=None, wrap_session=None, **kwargs)[source]

Initialize MatrixOne Session.

Parameters:
  • bind – SQLAlchemy Engine or Connection to bind to

  • client – MatrixOne Client instance

  • wrap_session – Existing SQLAlchemy Session to wrap with MatrixOne features

  • **kwargs – Additional arguments passed to SQLAlchemy Session

execute(sql_or_stmt, params: Tuple | None = None, **kwargs)[source]

Execute SQL or SQLAlchemy statement within session.

Overrides SQLAlchemy Session.execute() to add: - Support for string SQL with MatrixOne parameter substitution - Query logging with optional per-operation log mode override

Parameters:
  • sql_or_stmt – SQL string or SQLAlchemy statement (select, update, delete, insert, text)

  • params – Query parameters (only used for string SQL with ‘?’ placeholders)

  • **kwargs

    Additional arguments passed to parent execute(). Supports special parameter:

    • log_mode (str): Override SQL logging mode for this operation only. Options: ‘off’, ‘simple’, ‘full’. Useful for debugging specific operations without changing global settings.

Returns:

SQLAlchemy Result object

Return type:

sqlalchemy.engine.Result

Examples:

# SQLAlchemy statements
from sqlalchemy import select, update
with client.session() as session:
    stmt = select(User).where(User.age > 25)
    result = session.execute(stmt)
    users = result.scalars().all()

# String SQL (MatrixOne extension)
with client.session() as session:
    result = session.execute("INSERT INTO users (name) VALUES (?)", ("John",))
    print(f"Inserted {result.rowcount} rows")

# Debugging with temporary logging override
with client.session() as session:
    # Enable full logging for this query only
    result = session.execute("SELECT * FROM large_table", log_mode='full')
get_connection()[source]

Get the underlying SQLAlchemy connection for direct use

Returns:

SQLAlchemy Connection instance bound to this session
insert(table_name: str, data: dict[str, Any]) ResultSet[source]

Insert data into a table within transaction.

Args:

table_name: Name of the table
data: Data to insert (dict with column names as keys)

Returns:

ResultSet object
batch_insert(table_name: str, data_list: list[dict[str, Any]]) ResultSet[source]

Batch insert data into a table within transaction.

Args:

table_name: Name of the table
data_list: List of data dictionaries to insert

Returns:

ResultSet object
query(*columns, snapshot: str | None = None)[source]

Get MatrixOne query builder within transaction - SQLAlchemy style

Args:

*columns: Can be:
    - Single model class: query(Article) - returns all columns from model
    - Multiple columns: query(Article.id, Article.title) - returns specific columns
    - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries

Returns:

MatrixOneQuery instance configured for the specified columns within transaction
create_table(table_name: str, columns: dict, **kwargs) Session[source]

Create a table within MatrixOne session.

Args:

table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as client.create_table)
**kwargs: Additional table parameters

Returns:

Session: Self for chaining
drop_table(table_name: str) Session[source]

Drop a table within MatrixOne session.

Args:

table_name: Name of the table to drop

Returns:

Session: Self for chaining
create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) Session[source]

Create a table with vector indexes within MatrixOne session.

Args:

table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as client.create_table)
indexes: List of index definitions (same format as client.create_table_with_index)
**kwargs: Additional table parameters

Returns:

Session: Self for chaining
create_table_orm(table_name: str, *columns, **kwargs) Session[source]

Create a table using SQLAlchemy ORM-style definitions within MatrixOne session.

Args:

table_name: Name of the table
*columns: SQLAlchemy Column objects and Index objects (including VectorIndex)
**kwargs: Additional parameters (like enable_hnsw, enable_ivf)

Returns:

Session: Self for chaining

Client Class

class matrixone.client.Client(host: str | None = None, port: int | None = None, user: str | None = None, password: str | None = None, database: str | None = None, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, pool_size: int = 10, max_overflow: int = 20, pool_timeout: int = 30, pool_recycle: int = 3600, connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]

Bases: BaseMatrixOneClient

MatrixOne Client - High-level interface for MatrixOne database operations.

This class provides a comprehensive interface for connecting to and interacting with MatrixOne databases. It supports modern API patterns including table creation, data insertion, querying, vector operations, and transaction management.

Key Features:

  • High-level table operations (create_table, drop_table, insert, batch_insert)

  • Query builder interface for complex queries

  • Vector operations (similarity search, range search, indexing)

  • Transaction management with context managers

  • Snapshot and restore operations

  • Account and user management

  • Fulltext search capabilities

  • Connection pooling and SSL support

Examples:

from matrixone import Client

# Basic usage
client = Client(
    host='localhost',
    port=6001,
    user='root',
    password='111',
    database='test'
)

# Create table
client.create_table("users", {
    "id": "int primary key",
    "name": "varchar(100)",
    "email": "varchar(255)"
})

# Insert and query data
client.insert("users", {"id": 1, "name": "John", "email": "john@example.com"})
result = client.query("users").where("id = ?", 1).all()

# Vector operations
client.create_table("documents", {
    "id": "int primary key",
    "content": "text",
    "embedding": "vecf32(384)"
})

results = client.vector_ops.similarity_search(
    "documents",
    vector_column="embedding",
    query_vector=[0.1, 0.2, 0.3, ...],
    limit=10
)

# Transaction
with client.transaction() as tx:
    tx.execute("INSERT INTO users (name) VALUES ('John')")

Attributes:

engine (Engine): SQLAlchemy engine instance
connected (bool): Connection status
backend_version (str): Detected backend version
vector_ops (VectorManager): Vector operations manager
snapshots (SnapshotManager): Snapshot operations manager
query (QueryBuilder): Query builder for complex queries
__init__(host: str | None = None, port: int | None = None, user: str | None = None, password: str | None = None, database: str | None = None, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, pool_size: int = 10, max_overflow: int = 20, pool_timeout: int = 30, pool_recycle: int = 3600, connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]

Initialize MatrixOne client

Args:

host: Database host (optional, can be set later via connect)
port: Database port (optional, can be set later via connect)
user: Username (optional, can be set later via connect)
password: Password (optional, can be set later via connect)
database: Database name (optional, can be set later via connect)
ssl_mode: SSL mode (disabled, preferred, required)
ssl_ca: SSL CA certificate path
ssl_cert: SSL client certificate path
ssl_key: SSL client key path
account: Optional account name
role: Optional role name
pool_size: Connection pool size
max_overflow: Maximum overflow connections
pool_timeout: Pool timeout in seconds
pool_recycle: Connection recycle time in seconds
connection_timeout: Connection timeout in seconds
query_timeout: Query timeout in seconds
auto_commit: Enable auto-commit mode
charset: Character set for connection
logger: Custom logger instance. If None, creates a default logger
sql_log_mode: SQL logging mode ('off', 'auto', 'simple', 'full')
    - 'off': No SQL logging
    - 'auto': Smart logging - short SQL shown fully, long SQL summarized (default)
    - 'simple': Show operation summary only
    - 'full': Show complete SQL regardless of length
slow_query_threshold: Threshold in seconds for slow query warnings (default: 1.0)
max_sql_display_length: Maximum SQL length in auto mode before summarizing (default: 500)
connect(*, host: str = 'localhost', port: int = 6001, user: str = 'root', password: str = '111', database: str, ssl_mode: str = 'preferred', ssl_ca: str | None = None, ssl_cert: str | None = None, ssl_key: str | None = None, account: str | None = None, role: str | None = None, charset: str = 'utf8mb4', connection_timeout: int = 30, auto_commit: bool = True, on_connect: ConnectionHook | List[ConnectionAction | str] | Callable | None = None) None[source]

Connect to MatrixOne database using SQLAlchemy engine

Args:

host: Database host
port: Database port
user: Username or login info in format "user", "account#user", "account#user#role",
      "account:user", or "account:user:role" (both '#' and ':' separators are supported)
password: Password
database: Database name
ssl_mode: SSL mode (disabled, preferred, required)
ssl_ca: SSL CA certificate path
ssl_cert: SSL client certificate path
ssl_key: SSL client key path
account: Optional account name (will be combined with user if user doesn't contain '#' or ':')
role: Optional role name (will be combined with user if user doesn't contain '#' or ':')
charset: Character set for the connection (default: utf8mb4)
connection_timeout: Connection timeout in seconds (default: 30)
auto_commit: Enable autocommit (default: True)
on_connect: Connection hook to execute after successful connection.
           Can be:
           - ConnectionHook instance
           - List of ConnectionAction or string action names
           - Custom callback function

Examples:

# Enable all features after connection
client.connect(host, port, user, password, database,
              on_connect=[ConnectionAction.ENABLE_ALL])

# Enable only vector operations with custom charset
client.connect(host, port, user, password, database,
              charset="utf8mb4",
              on_connect=[ConnectionAction.ENABLE_VECTOR])

# Custom callback
def my_callback(client):
    print(f"Connected to {client._connection_params['host']}")

client.connect(host, port, user, password, database,
              on_connect=my_callback)
classmethod from_engine(engine: Engine, **kwargs) Client[source]

Create Client instance from existing SQLAlchemy Engine

Args:

engine: SQLAlchemy Engine instance (must use MySQL driver)
**kwargs: Additional client configuration options

Returns:

Client: Configured client instance

Raises:

ConnectionError: If engine doesn't use MySQL driver

Examples

Basic usage:

    from sqlalchemy import create_engine
    from matrixone import Client

    engine = create_engine("mysql+pymysql://user:pass@host:port/db")
    client = Client.from_engine(engine)

With custom configuration::

    engine = create_engine("mysql+pymysql://user:pass@host:port/db")
    client = Client.from_engine(
        engine,
        sql_log_mode='auto',
        slow_query_threshold=0.5
    )
disconnect() None[source]

Disconnect from MatrixOne database and dispose engine.

This method properly closes all database connections and disposes of the SQLAlchemy engine. It should be called when the client is no longer needed to free up resources.

After calling this method, the client will need to be reconnected using the connect() method before any database operations can be performed.

Raises:

Exception: If disconnection fails (logged but re-raised)

Example

>>> client = Client('localhost', 6001, 'root', '111', 'test')
        >>> client.connect()
        >>> # ... perform database operations ...
        >>> client.disconnect()  # Clean up resources
get_login_info() dict | None[source]

Get parsed login information used for database connection.

Returns the login information dictionary that was used to establish the database connection. This includes user, account, role, and other authentication details.

Returns:

Optional[dict]: Dictionary containing login information with keys:
    - user: Username
    - account: Account name (if specified)
    - role: Role name (if specified)
    - host: Database host
    - port: Database port
    - database: Database name
Returns None if not connected or no login info available.

Example

>>> client = Client('localhost', 6001, 'root', '111', 'test')
        >>> client.connect()
        >>> login_info = client.get_login_info()
        >>> print(f"Connected as {login_info['user']} to {login_info['database']}")
execute(sql_or_stmt, params: Tuple | None = None, log_mode: str | None = None) ResultSet[source]

Execute SQL query or SQLAlchemy statement without transaction isolation.

This method executes queries directly using the connection pool, without wrapping them in a transaction. Each statement executes independently with auto-commit enabled. For atomic multi-statement operations, use client.session() instead.

The method supports both SQLAlchemy ORM-style statements (recommended) and string SQL with parameter binding. It’s ideal for single-statement operations like SELECT queries, simple INSERT/UPDATE/DELETE, or DDL statements.

Key Features:

  • ORM-style statements: Full support for SQLAlchemy select(), insert(), update(), delete()

  • Parameter binding: Automatic escaping of parameters to prevent SQL injection

  • Query logging: Integrated logging with performance tracking

  • Auto-commit: Each statement commits immediately (no transaction isolation)

  • Connection pooling: Efficient connection reuse from pool

  • Type safety: ResultSet with structured access to query results

Parameters:
  • sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - SQLAlchemy select() statement (recommended) - SQLAlchemy insert() statement (recommended) - SQLAlchemy update() statement (recommended) - SQLAlchemy delete() statement (recommended) - String SQL with ‘?’ placeholders for parameters - SQLAlchemy text() statement

  • params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Automatically escaped to prevent SQL injection. Ignored for SQLAlchemy statements (use .values() or .where() with bound parameters instead).

  • log_mode (Optional[str]) – Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If None, uses client’s global sql_log_mode setting. Useful for debugging specific queries or disabling logs for frequently-executed statements.

Returns:

Query result object with:
  • columns: List[str] - Column names

  • rows: List[Tuple] - Row data as tuples

  • affected_rows: int - Number of rows affected by DML operations

  • fetchall() -> List[Row] - Get all rows as list

  • fetchone() -> Optional[Row] - Get next row or None

  • fetchmany(size) -> List[Row] - Get next N rows

Return type:

ResultSet

Raises:
  • ConnectionError – If not connected to database

  • QueryError – If query execution fails or SQL syntax is invalid

Usage Examples:

from matrixone import Client
from sqlalchemy import select, insert, update, delete, and_, or_, func
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)
    status = Column(String(20))

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    amount = Column(Float)

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# ========================================
# SQLAlchemy SELECT Statements (Recommended)
# ========================================

# Basic SELECT with WHERE clause
stmt = select(User).where(User.age > 25)
result = client.execute(stmt)
for user in result.fetchall():
    print(f"User: {user.name}, Age: {user.age}")

# SELECT specific columns
stmt = select(User.name, User.email).where(User.status == 'active')
result = client.execute(stmt)
for name, email in result.fetchall():
    print(f"{name}: {email}")

# Complex WHERE with AND/OR
stmt = select(User).where(
    and_(
        User.age > 18,
        or_(
            User.status == 'active',
            User.status == 'pending'
        )
    )
)
result = client.execute(stmt)

# SELECT with JOIN
stmt = select(User, Order).join(Order, User.id == Order.user_id)
result = client.execute(stmt)
for user, order in result.fetchall():
    print(f"{user.name} ordered ${order.amount}")

# SELECT with aggregation
stmt = select(func.count(User.id), func.avg(User.age)).where(User.status == 'active')
result = client.execute(stmt)
count, avg_age = result.fetchone()
print(f"Active users: {count}, Average age: {avg_age}")

# SELECT with ORDER BY and LIMIT
stmt = select(User).where(User.age > 25).order_by(User.age.desc()).limit(10)
result = client.execute(stmt)

# ========================================
# SQLAlchemy INSERT Statements (Recommended)
# ========================================

# Single INSERT
stmt = insert(User).values(name='John', email='john@example.com', age=30)
result = client.execute(stmt)
print(f"Inserted {result.affected_rows} rows")

# Bulk INSERT
stmt = insert(User).values([
    {'name': 'Alice', 'email': 'alice@example.com', 'age': 28},
    {'name': 'Bob', 'email': 'bob@example.com', 'age': 35},
    {'name': 'Carol', 'email': 'carol@example.com', 'age': 42}
])
result = client.execute(stmt)
print(f"Inserted {result.affected_rows} rows")

# ========================================
# SQLAlchemy UPDATE Statements (Recommended)
# ========================================

# Simple UPDATE
stmt = update(User).where(User.id == 1).values(email='newemail@example.com')
result = client.execute(stmt)
print(f"Updated {result.affected_rows} rows")

# Conditional UPDATE
stmt = update(User).where(User.age < 18).values(status='minor')
result = client.execute(stmt)

# UPDATE with expressions
stmt = update(Order).values(total=Order.quantity * Order.price)
result = client.execute(stmt)

# UPDATE multiple columns
stmt = update(User).where(User.id == 1).values(
    name='Updated Name',
    email='updated@example.com',
    status='active'
)
result = client.execute(stmt)

# ========================================
# SQLAlchemy DELETE Statements (Recommended)
# ========================================

# Simple DELETE
stmt = delete(User).where(User.id == 1)
result = client.execute(stmt)
print(f"Deleted {result.affected_rows} rows")

# Conditional DELETE
stmt = delete(User).where(User.status == 'deleted')
result = client.execute(stmt)

# DELETE with complex condition
stmt = delete(User).where(
    and_(
        User.age < 18,
        User.status == 'inactive'
    )
)
result = client.execute(stmt)

# ========================================
# String SQL with Parameters (Alternative)
# ========================================

# SELECT with parameters
result = client.execute(
    "SELECT * FROM users WHERE age > ? AND status = ?",
    (25, 'active')
)

# INSERT with parameters
result = client.execute(
    "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
    ('David', 'david@example.com', 28)
)

# UPDATE with parameters
result = client.execute(
    "UPDATE users SET status = ? WHERE age < ?",
    ('minor', 18)
)

# DELETE with parameters
result = client.execute(
    "DELETE FROM users WHERE status = ?",
    ('inactive',)
)

# ========================================
# Query Logging Control
# ========================================

# Disable logging for frequently executed query
result = client.execute(
    select(User).where(User.id == 1),
    log_mode='off'
)

# Force full SQL logging for debugging
result = client.execute(
    select(User).where(User.name.like('%test%')),
    log_mode='full'
)

# Simple logging (operation type only)
result = client.execute(
    update(User).values(status='processed'),
    log_mode='simple'
)

Important Notes:

  • No transaction isolation: Each execute() call commits immediately

  • Use session() for transactions: For atomic multi-statement operations

  • ORM-style preferred: Use SQLAlchemy statements for better type safety

  • Auto-commit behavior: Changes are permanent immediately after execute()

  • Thread-safe: Uses connection pooling for concurrent access

Best Practices:

  1. Prefer ORM-style statements: Use select(), insert(), update(), delete()

  2. Use parameters: Always use parameter binding to prevent SQL injection

  3. Session for transactions: Use client.session() for atomic operations

  4. Disable logging in production: Use log_mode=’off’ for hot paths

  5. Handle exceptions: Wrap execute() in try-except for error handling

See also

  • Client.session(): For transaction-aware operations

  • Session.execute(): Execute within transaction context

  • AsyncClient.execute(): Async version for async/await workflows

insert(table_name_or_model, data: dict) ResultSet[source]

Insert a single row of data into a table.

This method provides a convenient way to insert data using a dictionary where keys are column names and values are the data to insert. The method automatically handles SQL generation and parameter binding.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data (dict): Dictionary mapping column names to values. Example:

            {'name': 'John', 'age': 30, 'email': 'john@example.com'}

Returns:

ResultSet: Object containing insertion results with:
    - affected_rows: Number of rows inserted (should be 1)
    - columns: Empty list (no columns returned for INSERT)
    - rows: Empty list (no rows returned for INSERT)

Raises:

ConnectionError: If not connected to database
QueryError: If insertion fails

Examples

# Insert a single user using table name
>>> result = client.insert('users', {
...     'name': 'John Doe',
...     'age': 30,
...     'email': 'john@example.com'
... })
>>> print(f"Inserted {result.affected_rows} row")

# Insert using model class >>> from sqlalchemy import Column, Integer, String >>> from matrixone.orm import declarative_base >>> Base = declarative_base() >>> class User(Base): … __tablename__ = ‘users’ … id = Column(Integer, primary_key=True) … name = Column(String(50)) … age = Column(Integer) >>> result = client.insert(User, { … ‘name’: ‘Jane Doe’, … ‘age’: 25 … })

# Insert with NULL values >>> result = client.insert(‘products’, { … ‘name’: ‘Product A’, … ‘price’: 99.99, … ‘description’: None # NULL value … })

batch_insert(table_name_or_model, data_list: list) ResultSet[source]

Batch insert multiple rows of data into a table.

This method efficiently inserts multiple rows in a single operation, which is much faster than calling insert() multiple times. All rows must have the same column structure.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data_list (list): List of dictionaries, where each dictionary represents
    a row to insert. All dictionaries must have the same keys.
    Example: [
        {'name': 'John', 'age': 30},
        {'name': 'Jane', 'age': 25},
        {'name': 'Bob', 'age': 35}
    ]

Returns:

ResultSet: Object containing insertion results with:
    - affected_rows: Number of rows inserted
    - columns: Empty list (no columns returned for INSERT)
    - rows: Empty list (no rows returned for INSERT)

Raises:

ConnectionError: If not connected to database
QueryError: If batch insertion fails
ValueError: If data_list is empty or has inconsistent column structure

Examples

# Insert multiple users
>>> users = [
...     {'name': 'John Doe', 'age': 30, 'email': 'john@example.com'},
...     {'name': 'Jane Smith', 'age': 25, 'email': 'jane@example.com'},
...     {'name': 'Bob Johnson', 'age': 35, 'email': 'bob@example.com'}
... ]
>>> result = client.batch_insert('users', users)
>>> print(f"Inserted {result.affected_rows} rows")

# Insert with some NULL values >>> products = [ … {‘name’: ‘Product A’, ‘price’: 99.99, ‘description’: ‘Great product’}, … {‘name’: ‘Product B’, ‘price’: 149.99, ‘description’: None} … ] >>> result = client.batch_insert(‘products’, products)

get_sqlalchemy_engine() Engine[source]

Get SQLAlchemy engine

Returns:

SQLAlchemy Engine
query(*columns, snapshot: str | None = None)[source]

Get MatrixOne query builder - SQLAlchemy style

Args:

*columns: Can be:
    - Single model class: query(Article) - returns all columns from model
    - Multiple columns: query(Article.id, Article.title) - returns specific columns
    - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries

Examples

# Traditional model query (all columns)

client.query(Article).filter(…).all()

# Column-specific query client.query(Article.id, Article.title).filter(…).all()

# With fulltext score client.query(Article.id, boolean_match(“title”, “content”).must(“python”).label(“score”))

# Snapshot query client.query(Article, snapshot=”my_snapshot”).filter(…).all()

Returns:

MatrixOneQuery instance configured for the specified columns
snapshot(snapshot_name: str) Generator[SnapshotClient, None, None][source]

Snapshot context manager

Usage

with client.snapshot(“daily_backup”) as snapshot_client: result = snapshot_client.execute(“SELECT * FROM users”)

session() Generator[Session, None, None][source]

Create a transaction-aware session for atomic database operations.

This method returns a MatrixOne Session that extends SQLAlchemy Session with MatrixOne-specific features. All operations within the session are executed atomically - they either all succeed or all fail together.

The session is a context manager that automatically handles transaction lifecycle: - Commits the transaction when the context exits normally - Rolls back the transaction if any exception occurs - Cleans up database resources automatically

Key Features:

  • Full SQLAlchemy ORM: All standard SQLAlchemy Session methods (add, delete, query, etc.)

  • Atomic transactions: Multiple operations commit or rollback together

  • MatrixOne managers: Access to snapshots, clones, vector ops, etc.

  • ORM-style operations: Use SQLAlchemy select(), insert(), update(), delete()

  • Automatic cleanup: Transaction and connection resources managed automatically

  • Query logging: Integrated query logging with performance tracking

Available Managers (transaction-aware):

  • session.snapshots: SnapshotManager for creating/managing snapshots

  • session.clone: CloneManager for cloning databases and tables

  • session.restore: RestoreManager for restoring from snapshots

  • session.pitr: PitrManager for point-in-time recovery

  • session.pubsub: PubSubManager for publish-subscribe operations

  • session.account: AccountManager for account/user management

  • session.vector_ops: VectorManager for vector operations and indexing

  • session.fulltext_index: FulltextIndexManager for fulltext search

  • session.metadata: MetadataManager for table metadata analysis

  • session.load_data: LoadDataManager for bulk data loading

  • session.stage: StageManager for external stage management

Returns:

Context manager that yields a MatrixOne Session

Return type:

Generator[Session, None, None]

Raises:

ConnectionError – If client is not connected to database

Usage Examples:

from matrixone import Client
from sqlalchemy import select, insert, update, delete
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# ========================================
# Example 1: Basic Transaction with ORM-style SQL
# ========================================
with client.session() as session:
    # Insert using SQLAlchemy insert()
    session.execute(insert(User).values(name='John', email='john@example.com', age=30))

    # Update using SQLAlchemy update()
    session.execute(update(User).where(User.age < 18).values(status='minor'))

    # Select using SQLAlchemy select()
    stmt = select(User).where(User.age > 25)
    result = session.execute(stmt)
    for user in result.scalars():
        print(f"User: {user.name}")

    # Delete using SQLAlchemy delete()
    session.execute(delete(User).where(User.status == 'inactive'))
    # All operations commit atomically

# ========================================
# Example 2: SQLAlchemy ORM Operations
# ========================================
with client.session() as session:
    # Create new objects
    user1 = User(name='Alice', email='alice@example.com', age=28)
    user2 = User(name='Bob', email='bob@example.com', age=35)

    # Add to session
    session.add(user1)
    session.add(user2)

    # Query using ORM
    stmt = select(User).where(User.name == 'Alice')
    result = session.execute(stmt)
    alice = result.scalar_one()

    # Update object
    alice.email = 'newemail@example.com'

    # Commit (or let context manager do it)
    session.commit()

# ========================================
# Example 3: MatrixOne Snapshot Operations
# ========================================
with client.session() as session:
    # Create snapshot within transaction
    from matrixone import SnapshotLevel

    snapshot = session.snapshots.create(
        name='daily_backup',
        level=SnapshotLevel.DATABASE,
        database='production'
    )

    # Clone database using snapshot
    session.clone.clone_database(
        target_db='prod_copy',
        source_db='production',
        snapshot_name='daily_backup'
    )
    # Both operations commit atomically

# ========================================
# Example 4: Bulk Data Loading with Stages
# ========================================
with client.session() as session:
    # Create stage using simple interface
    session.stage.create_local('import_stage', '/data/imports/')

    # Load data from stage using ORM model
    session.load_data.from_stage_csv('import_stage', 'users.csv', User)

    # Update statistics after loading
    session.execute("ANALYZE TABLE users")
    # All operations are atomic

# ========================================
# Example 5: Error Handling and Rollback
# ========================================
try:
    with client.session() as session:
        session.execute(insert(User).values(name='Charlie', age=40))
        session.execute(insert(InvalidTable).values(data='test'))  # This fails
        # Transaction automatically rolls back - Charlie is NOT inserted
except Exception as e:
    print(f"Transaction failed and rolled back: {e}")

# ========================================
# Example 6: Manual Transaction Control
# ========================================
with client.session() as session:
    try:
        session.execute(insert(User).values(name='David', age=25))
        session.execute(update(Account).values(balance=Account.balance - 100))

        # Verify before committing
        stmt = select(Account.balance)
        balance = session.execute(stmt).scalar()

        if balance >= 0:
            session.commit()  # Explicit commit
        else:
            session.rollback()  # Explicit rollback
            print("Insufficient balance")
    except Exception as e:
        session.rollback()
        raise

# ========================================
# Example 7: Complex Multi-Manager Transaction
# ========================================
with client.session() as session:
    # Create publication
    session.pubsub.create_database_publication(
        name='analytics_pub',
        database='analytics',
        account='subscriber_account'
    )

    # Create S3 stage for exports
    session.stage.create_s3(
        name='export_stage',
        bucket='my-bucket',
        path='exports/',
        aws_key_id='key',
        aws_secret_key='secret'
    )

    # Load fresh data using ORM model
    session.load_data.from_csv('/data/latest.csv', Analytics)

    # Create snapshot after load
    session.snapshots.create(
        name='post_load_snapshot',
        level=SnapshotLevel.DATABASE,
        database='analytics'
    )
    # All operations commit together

Best Practices:

  1. Always use context manager: Use with client.session() for automatic cleanup

  2. Keep transactions short: Long transactions can block other operations

  3. Handle exceptions: Wrap session code in try-except for error handling

  4. Use ORM-style SQL: Prefer SQLAlchemy insert(), update(), select(), delete()

  5. Avoid nested sessions: SQLAlchemy doesn’t support true nested transactions

  6. Test rollback behavior: Ensure your application handles rollbacks correctly

See also

  • Session: The session class returned by this method

  • AsyncClient.session(): Async version for async/await workflows

  • Client.execute(): Non-transactional query execution

property snapshots: SnapshotManager | None

Get snapshot manager

property clone: CloneManager | None

Get clone manager

property branch: BranchManager | None

Get branch manager for Git-style version control operations.

Provides table and database branching, diffing, and merging capabilities. Requires MatrixOne 3.0.5 or higher.

Returns:

BranchManager instance for branch operations

Example:

client = Client()
client.connect(database='test')

# Create table branch
client.branch.create_table_branch('users_branch', 'users')

# Create database branch
client.branch.create_database_branch('dev_db', 'production')

# Compare branches
diffs = client.branch.diff_table('users_branch', 'users')

# Merge branches
client.branch.merge_table('users_branch', 'users')

# Delete branches
client.branch.delete_table_branch('users_branch')
client.branch.delete_database_branch('dev_db')

See also

  • branch_guide - Complete branch management guide

property moctl: MoCtlManager | None

Get mo_ctl manager

property restore: RestoreManager | None

Get restore manager

property pitr: PitrManager | None

Get PITR manager

property pubsub: PubSubManager | None

Get publish-subscribe manager

property account: AccountManager | None

Get account manager

property load_data: LoadDataManager | None

Get load data manager

property stage: StageManager | None

Get stage manager for external stage operations

property cdc: CDCManager | None

Get CDC manager for change data capture operations.

property export

Get export manager for data export operations (INTO OUTFILE, INTO STAGE)

property vector_ops: VectorManager | None

Get unified vector operations manager for vector operations (index and data)

get_pinecone_index(table_name_or_model, vector_column: str)[source]

Get a PineconeCompatibleIndex object for vector search operations.

This method creates a Pinecone-compatible vector search interface that automatically parses the table schema and vector index configuration. The primary key column is automatically detected, and all other columns except the vector column will be included as metadata.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
vector_column: Name of the vector column

Returns:

PineconeCompatibleIndex object with Pinecone-compatible API

Example

>>> index = client.get_pinecone_index("documents", "embedding")
        >>> index = client.get_pinecone_index(DocumentModel, "embedding")
        >>> results = index.query([0.1, 0.2, 0.3], top_k=5)
        >>> for match in results.matches:
        ...     print(f"ID: {match.id}, Score: {match.score}")
        ...     print(f"Metadata: {match.metadata}")
property fulltext_index: FulltextIndexManager | None

Get fulltext index manager for fulltext index operations

property metadata: MetadataManager | None

Get metadata manager for table metadata operations

connected() bool[source]

Check if client is connected to database

version() str[source]

Get MatrixOne server version

Returns:

str: MatrixOne server version string

Raises:

ConnectionError: If not connected to MatrixOne
QueryError: If version query fails

Example

>>> client = Client('localhost', 6001, 'root', '111', 'test')
        >>> version = client.version()
        >>> print(f"MatrixOne version: {version}")
git_version() str[source]

Get MatrixOne git version information

Returns:

str: MatrixOne git version string

Raises:

ConnectionError: If not connected to MatrixOne
QueryError: If git version query fails

Example

>>> client = Client('localhost', 6001, 'root', '111', 'test')
        >>> git_version = client.git_version()
        >>> print(f"MatrixOne git version: {git_version}")
set_backend_version(version: str) None[source]

Manually set the backend version

Args:

version: Version string in format "major.minor.patch" (e.g., "3.0.1")
get_backend_version() str | None[source]

Get current backend version

Returns:

Version string or None if not set
is_feature_available(feature_name: str) bool[source]

Check if a feature is available in current backend version

Args:

feature_name: Name of the feature to check

Returns:

True if feature is available, False otherwise
get_feature_info(feature_name: str) Dict[str, Any] | None[source]

Get feature requirement information

Args:

feature_name: Name of the feature

Returns:

Feature information dictionary or None if not found
check_version_compatibility(required_version: str, operator: str = '>=') bool[source]

Check if current backend version is compatible with required version

Args:

required_version: Required version string (e.g., "3.0.1")
operator: Comparison operator (">=", ">", "<=", "<", "==", "!=")

Returns:

True if compatible, False otherwise
get_version_hint(feature_name: str, error_context: str = '') str[source]

Get helpful hint message for version-related errors

Args:

feature_name: Name of the feature
error_context: Additional context for the error

Returns:

Helpful hint message
is_development_version() bool[source]

Check if current backend is a development version

Returns:

True if backend is development version (999.x.x), False otherwise
create_table(table_name_or_model, columns: dict | None = None, **kwargs) Client[source]

Create a table with a simplified interface.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
columns: Dictionary mapping column names to their types (required if table_name_or_model is str)
        Supported formats:
        - 'id': 'bigint' (with primary_key=True if needed)
        - 'name': 'varchar(100)'
        - 'embedding': 'vecf32(128)' or 'vecf64(128)'
        - 'score': 'float'
        - 'created_at': 'datetime'
        - 'is_active': 'boolean'
**kwargs: Additional table parameters

Returns:

Client: Self for chaining

Example

client.create_table(“users”, { ‘id’: ‘bigint’, ‘name’: ‘varchar(100)’, ‘email’: ‘varchar(255)’, ‘embedding’: ‘vecf32(128)’, ‘score’: ‘float’, ‘created_at’: ‘datetime’, ‘is_active’: ‘boolean’ }, primary_key=’id’)

drop_table(table_name_or_model) Client[source]

Drop a table.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class

Returns:

Client: Self for chaining

Example

# Drop table by name

client.drop_table(“users”)

# Drop table by model class client.drop_table(UserModel)

create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) Client[source]

Create a table with vector indexes using a simplified interface.

Args:

table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as create_table)
indexes: List of index definitions, each containing:
        - 'name': Index name
        - 'column': Column name to index
        - 'type': Index type ('ivfflat' or 'hnsw')
        - 'params': Dictionary of index-specific parameters
**kwargs: Additional table parameters

Returns:

Client: Self for chaining

Example

client.create_table_with_index(“vector_docs”, { ‘id’: ‘bigint’, ‘title’: ‘varchar(200)’, ‘embedding’: ‘vector(128,f32)’ }, indexes=[ { ‘name’: ‘idx_hnsw’, ‘column’: ‘embedding’, ‘type’: ‘hnsw’, ‘params’: {‘m’: 48, ‘ef_construction’: 64, ‘ef_search’: 64} } ], primary_key=’id’)

create_table_orm(table_name: str, *columns, **kwargs) Client[source]

Create a table using SQLAlchemy ORM-style column definitions. Similar to SQLAlchemy Table() constructor but without metadata.

Args:

table_name: Name of the table
*columns: SQLAlchemy Column objects and Index objects (including VectorIndex)
**kwargs: Additional parameters (like enable_hnsw, enable_ivf)

Returns:

Client: Self for chaining

Example:

from sqlalchemy import Column, BigInteger, Integer
from matrixone.sqlalchemy_ext import Vectorf32, VectorIndex, VectorIndexType, VectorOpType

client.create_table_orm(
    'vector_docs_hnsw_demo',
    Column('a', BigInteger, primary_key=True),
    Column('b', Vectorf32(128)),
    Column('c', Integer),
    VectorIndex('idx_hnsw', 'b', index_type=VectorIndexType.HNSW,
           m=48, ef_construction=64, ef_search=64,
           op_type=VectorOpType.VECTOR_L2_OPS)
)
create_all(base_class=None)[source]

Create all tables defined in the given base class or default Base.

Args:

base_class: SQLAlchemy declarative base class. If None, uses the default Base.
drop_all(base_class=None)[source]

Drop all tables defined in the given base class or default Base.

Args:

base_class: SQLAlchemy declarative base class. If None, uses the default Base.
get_secondary_index_tables(table_name: str, database_name: str | None = None) List[str][source]

Get all secondary index table names for a given table.

This includes both regular secondary indexes (MULTIPLE type) and UNIQUE indexes.

Parameters:
  • table_name – Name of the table to get secondary indexes for

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

List of secondary index table names (includes both __mo_index_secondary_… and __mo_index_unique_… tables)

Examples:

>>> client = Client()
>>> client.connect(host='localhost', port=6001, user='root', password='111', database='test')
>>> # Use current database
>>> index_tables = client.get_secondary_index_tables('cms_all_content_chunk_info')
>>> # Or specify database explicitly
>>> index_tables = client.get_secondary_index_tables('cms_all_content_chunk_info', 'test')
>>> print(index_tables)
['__mo_index_secondary_..._cms_id', '__mo_index_unique_..._email']
get_secondary_index_table_by_name(table_name: str, index_name: str, database_name: str | None = None) str | None[source]

Get the physical table name of a secondary index by its index name.

Parameters:
  • table_name – Name of the table

  • index_name – Name of the secondary index

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

Physical table name of the secondary index, or None if not found

Examples:

>>> client = Client()
>>> client.connect(host='localhost', port=6001, user='root', password='111', database='test')
>>> # Use current database
>>> index_table = client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id')
>>> # Or specify database explicitly
>>> index_table = client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id', 'test')
>>> print(index_table)
'__mo_index_secondary_018cfbda-bde1-7c3e-805c-3f8e71769f75_cms_id'
get_table_indexes_detail(table_name: str, database_name: str | None = None) List[dict][source]

Get detailed information about all indexes for a table, including IVF, HNSW, Fulltext, and regular indexes.

This method returns comprehensive information about each index physical table, including: - Index name - Index type (MULTIPLE, PRIMARY, UNIQUE, etc.) - Algorithm type (ivfflat, hnsw, fulltext, etc.) - Algorithm table type (metadata, centroids, entries, etc.) - Physical table name - Column names - Algorithm parameters

Parameters:
  • table_name – Name of the table to get indexes for

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

  • index_name: Name of the index

  • index_type: Type of index (MULTIPLE, PRIMARY, UNIQUE, etc.)

  • algo: Algorithm type (ivfflat, hnsw, fulltext, or None for regular indexes)

  • algo_table_type: Algorithm table type (metadata, centroids, entries, etc., or None)

  • physical_table_name: Physical table name

  • columns: List of column names

  • algo_params: Algorithm parameters (or None)

Return type:

List of dictionaries, each containing

Examples:

>>> client = Client()
>>> client.connect(host='localhost', port=6001, user='root', password='111', database='test')
>>> # Get all index details for a table
>>> indexes = client.get_table_indexes_detail('ivf_health_demo_docs')
>>> for idx in indexes:
...     print(f"{idx['index_name']} ({idx['algo']}) - {idx['algo_table_type']}: {idx['physical_table_name']}")
idx_embedding_ivf (ivfflat) - metadata: __mo_index_secondary_...
idx_embedding_ivf (ivfflat) - centroids: __mo_index_secondary_...
idx_embedding_ivf (ivfflat) - entries: __mo_index_secondary_...
verify_table_index_counts(table_name: str) int[source]

Verify that the main table and all its secondary index tables have the same row count.

This method compares the COUNT(*) of the main table with all its secondary index tables in a single SQL query for consistency. If counts don’t match, raises an exception.

Parameters:

table_name – Name of the table to verify

Returns:

Row count (int) if verification succeeds

Raises:

ValueError – If any secondary index table has a different count than the main table, with details about all counts in the error message

Examples:

>>> client = Client()
>>> client.connect(host='localhost', port=6001, user='root', password='111', database='test')
>>> count = client.verify_table_index_counts('cms_all_content_chunk_info')
>>> print(f"✓ Verification passed, row count: {count}")

>>> # If verification fails:
>>> try:
...     count = client.verify_table_index_counts('some_table')
... except ValueError as e:
...     print(f"Verification failed: {e}")

ResultSet Class

class matrixone.client.ResultSet(columns: List[str], rows: List[Tuple[Any, ...]], affected_rows: int = 0)[source]

Bases: object

Result set wrapper for query results from MatrixOne database operations.

This class provides a convenient interface for accessing query results with methods similar to database cursor objects. It supports both SELECT queries (returning data) and DML operations (returning affected row counts).

Key Features:

  • Iterator interface for row-by-row access

  • Bulk data access methods (fetchall, fetchmany)

  • Column name access and metadata

  • Affected row count for DML operations

  • Cursor-like positioning for result navigation

Attributes:

columns (List[str]): List of column names in the result set
rows (List[Tuple[Any, ...]]): List of tuples containing row data
affected_rows (int): Number of rows affected by DML operations

Usage Examples:

# SELECT query results >>> result = client.execute(“SELECT id, name, age FROM users WHERE age > ?”, (25,)) >>> print(f”Found {len(result.rows)} users”) >>> for row in result.fetchall(): … print(f”ID: {row[0]}, Name: {row[1]}, Age: {row[2]}”)

# Access by column name >>> for row in result.rows: … user_id = row[result.columns.index(‘id’)] … user_name = row[result.columns.index(‘name’)]

# DML operation results >>> result = client.execute(“INSERT INTO users (name, age) VALUES (?, ?)”, (“John”, 30)) >>> print(f”Inserted {result.affected_rows} rows”)

# Iterator interface >>> for row in result: … print(row)

Note: This class is automatically created by the Client’s execute() method and provides a consistent interface for all query results.

__init__(columns: List[str], rows: List[Tuple[Any, ...]], affected_rows: int = 0)[source]
fetchall() List[Tuple[Any, ...]][source]

Fetch all remaining rows

fetchone() Tuple[Any, ...] | None[source]

Fetch one row

fetchmany(size: int = 1) List[Tuple[Any, ...]][source]

Fetch many rows

scalar() Any[source]

Get scalar value (first column of first row)

keys()[source]

Get column names

Session Class

class matrixone.session.Session(bind=None, client=None, wrap_session=None, **kwargs)[source]

Bases: Session

MatrixOne Session - Transaction-aware session extending SQLAlchemy Session.

This class provides a comprehensive transaction context for executing multiple database operations atomically. It inherits all SQLAlchemy Session capabilities while adding MatrixOne-specific features like snapshots, clones, vector operations, and fulltext search.

Key Features:

  • Full SQLAlchemy API: All standard SQLAlchemy Session methods (add, delete, query, etc.)

  • Atomic transactions: All operations succeed or fail together

  • Automatic rollback: Errors trigger automatic transaction rollback

  • MatrixOne managers: Access to all MatrixOne-specific operations within transactions

  • Hybrid SQL support: Execute both SQLAlchemy statements and string SQL

  • Query logging: Integrated query logging with performance tracking

  • Context manager: Automatic transaction lifecycle management

Transaction Behavior:

  • Auto-commit on success: Transaction commits when context manager exits normally

  • Auto-rollback on error: Transaction rolls back if any exception occurs

  • Explicit control: Manual commit() and rollback() also supported

  • Isolation: All operations within session are isolated from other transactions

  • ACID compliance: Full ACID guarantees for all operations

Available Managers (all transaction-aware):

  • snapshots: SnapshotManager for creating/managing database snapshots

  • clone: CloneManager for cloning databases and tables

  • restore: RestoreManager for restoring from snapshots

  • pitr: PitrManager for point-in-time recovery operations

  • pubsub: PubSubManager for publish-subscribe operations

  • account: AccountManager for account and user management

  • vector_ops: VectorManager for vector operations and indexing

  • fulltext_index: FulltextIndexManager for fulltext search operations

  • metadata: MetadataManager for table metadata analysis

  • load_data: LoadDataManager for bulk data loading

  • stage: StageManager for external stage management

  • cdc: CDCManager for change data capture task operations

Creating Session:

There are two ways to create a MatrixOne Session:

  1. New Session (Recommended): Create directly from client:

    with client.session() as session:
        # Your operations here
        pass
    
  2. Wrap Existing Session (For Legacy Projects): Wrap existing SQLAlchemy session:

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from matrixone.session import Session as MatrixOneSession
    
    # Your existing SQLAlchemy code
    engine = create_engine('mysql+pymysql://...')
    SessionFactory = sessionmaker(bind=engine)
    sqlalchemy_session = SessionFactory()
    
    # Wrap with MatrixOne features
    mo_session = MatrixOneSession(
        client=mo_client,
        wrap_session=sqlalchemy_session
    )
    # Now you can use both SQLAlchemy and MatrixOne features
    

Usage Examples:

from matrixone import Client
from sqlalchemy import select, insert, update, delete

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# ============================================================
# Example 0: Wrapping Existing SQLAlchemy Session (Legacy Projects)
# ============================================================
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from matrixone.session import Session as MatrixOneSession

# Your existing SQLAlchemy setup
engine = create_engine('mysql+pymysql://root:111@127.0.0.1:6001/test')
SessionFactory = sessionmaker(bind=engine)
existing_session = SessionFactory()

# Wrap it with MatrixOne features
mo_session = MatrixOneSession(
    client=client,
    wrap_session=existing_session
)

try:
    # Standard SQLAlchemy operations work
    result = mo_session.execute("SELECT * FROM users")

    # MatrixOne features now available
    mo_session.stage.create_s3('backup', bucket='my-backup')
    mo_session.snapshots.create('snapshot1', level='database')
    mo_session.load_data.from_csv('/data/file.csv', 'users')

    mo_session.commit()
finally:
    mo_session.close()

# ============================================================
# Example 1: Basic Transaction with ORM-style Operations
# ============================================================
with client.session() as session:
    # All operations are atomic - succeed or fail together
    from sqlalchemy import insert, update

    # Insert using SQLAlchemy insert()
    session.execute(insert(User).values(name='John', email='john@example.com'))
    session.execute(insert(Order).values(user_id=1, amount=100.0))

    # Update using SQLAlchemy update()
    session.execute(
        update(Account).where(Account.user_id == 1).values(balance=Account.balance - 100)
    )
    # Transaction commits automatically on successful completion

# ============================================================
# Example 2: SQLAlchemy ORM Operations
# ============================================================
with client.session() as session:
    # Create new objects
    user = User(name="Alice", email="alice@example.com")
    order = Order(user_id=1, amount=50.0)

    # Add to session
        session.add(user)
    session.add(order)

    # Query using ORM
    stmt = select(User).where(User.name == "Alice")
    result = session.execute(stmt)
    users = result.scalars().all()

    # Update using ORM
    user = session.get(User, 1)
    user.email = "newemail@example.com"

    # Commit explicitly (or let context manager do it)
        session.commit()

# ============================================================
# Example 3: MatrixOne Snapshot Operations in Transaction
# ============================================================
    with client.session() as session:
    # Create snapshot within transaction
    snapshot = session.snapshots.create(
        name='daily_backup',
        level=SnapshotLevel.DATABASE,
        database='production'
    )

    # Clone database within same transaction
    session.clone.clone_database(
        target_db='production_copy',
        source_db='production',
        snapshot_name='daily_backup'
    )
    # Both operations commit atomically

# ============================================================
# Example 4: Bulk Data Loading in Transaction
# ============================================================
with client.session() as session:
    # Load data files atomically
    session.load_data.from_csv('/data/users.csv', 'users')
    session.load_data.from_csv('/data/orders.csv', 'orders')

    # Update statistics after loading
    session.execute("ANALYZE TABLE users")
    session.execute("ANALYZE TABLE orders")
    # All loads and updates commit together

# ============================================================
# Example 5: Error Handling with Automatic Rollback
# ============================================================
try:
    with client.session() as session:
        session.execute("INSERT INTO users (name) VALUES ('Bob')")
        session.execute("INSERT INTO invalid_table (data) VALUES ('test')")  # This fails
        # Transaction automatically rolls back - Bob is NOT inserted
except Exception as e:
    print(f"Transaction failed and rolled back: {e}")

# ============================================================
# Example 6: Manual Transaction Control
# ============================================================
with client.session() as session:
    try:
        session.execute("INSERT INTO users (name) VALUES ('Charlie')")
        session.execute("UPDATE accounts SET balance = balance - 50")

        # Verify conditions before committing
        result = session.execute("SELECT balance FROM accounts WHERE id = 1")
        balance = result.scalar()

        if balance >= 0:
            session.commit()  # Explicit commit
        else:
            session.rollback()  # Explicit rollback
            print("Transaction rolled back due to insufficient balance")
    except Exception as e:
        session.rollback()
        raise

# ============================================================
# Example 7: Complex Multi-Manager Transaction
# ============================================================
with client.session() as session:
    # Create publication
    pub = session.pubsub.create_database_publication(
        name='analytics_data',
        database='analytics',
        account='subscriber_account'
    )

    # Create stage using simple interface
    session.stage.create_local('export_stage', '/exports/')

    # Load fresh data
    session.load_data.from_csv('/data/latest.csv', Analytics)  # Use ORM model

    # Create snapshot after load
    session.snapshots.create(
        name='post_load_snapshot',
        level=SnapshotLevel.DATABASE,
        database='analytics'
    )
    # All operations are atomic

# ============================================================
# Example 8: Vector Operations in Transaction
# ============================================================
with client.session() as session:
    # Create vector table
    session.create_table('documents', {
        'id': 'int primary key',
        'content': 'text',
        'embedding': 'vecf32(384)'
    })

    # Create vector index
    session.vector_ops.create_ivf(
        'documents',
        name='doc_idx',
        column='embedding',
        lists=100
    )

    # Insert vector data
    session.insert('documents', {
        'id': 1,
        'content': 'sample document',
        'embedding': [0.1] * 384
    })

Best Practices:

  1. Always use context manager: Use with client.session() for automatic cleanup

  2. Keep transactions short: Long transactions can block other operations

  3. Handle exceptions: Wrap session code in try-except for proper error handling

  4. Avoid nested transactions: SQLAlchemy doesn’t support true nested transactions

  5. Use explicit commits: When you need fine-grained control over transaction boundaries

  6. Test rollback behavior: Ensure your application handles rollbacks correctly

Important Notes:

  • Session is created by Client.session() context manager

  • Don’t instantiate Session directly

  • All manager operations within session are transaction-aware

  • Session automatically manages transaction lifecycle

  • Errors trigger automatic rollback

  • Normal exit triggers automatic commit

See also

  • Client.session(): Creates and manages Session instances

  • AsyncSession: Async version for async/await workflows

  • SQLAlchemy Session: Parent class documentation

__init__(bind=None, client=None, wrap_session=None, **kwargs)[source]

Initialize MatrixOne Session.

Parameters:
  • bind – SQLAlchemy Engine or Connection to bind to

  • client – MatrixOne Client instance

  • wrap_session – Existing SQLAlchemy Session to wrap with MatrixOne features

  • **kwargs – Additional arguments passed to SQLAlchemy Session

Examples

# Create new session from engine session = Session(bind=engine, client=client)

# Wrap existing SQLAlchemy session (for legacy projects) existing_session = sessionmaker(bind=engine)() mo_session = Session(client=client, wrap_session=existing_session)

execute(sql_or_stmt, params: Tuple | None = None, **kwargs)[source]

Execute SQL or SQLAlchemy statement within the current session transaction.

This method extends SQLAlchemy’s Session.execute() with MatrixOne-specific features:

  • Parameter substitution for string SQL using ‘?’ placeholders

  • Integrated query logging with performance tracking

  • Support for both SQLAlchemy statements and raw SQL strings

All queries executed within the session participate in the current transaction. Changes are committed when the session context exits normally, or rolled back on error.

Parameters:
  • sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - String SQL with ‘?’ placeholders for parameters - SQLAlchemy select() statement - SQLAlchemy insert() statement - SQLAlchemy update() statement - SQLAlchemy delete() statement - SQLAlchemy text() statement

  • params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Ignored for SQLAlchemy statements.

  • **kwargs

    Additional keyword arguments:

    • log_mode (str): Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If not specified, uses client’s global sql_log_mode setting.

    • Other kwargs are passed to SQLAlchemy’s execute()

Returns:

SQLAlchemy Result object with methods:
  • fetchall(): Get all rows as list of tuples

  • fetchone(): Get next row as tuple or None

  • fetchmany(size): Get next N rows

  • scalars(): Get first column of each row

  • mappings(): Get rows as dictionaries

  • rowcount: Number of rows affected (for INSERT/UPDATE/DELETE)

Return type:

sqlalchemy.engine.Result

Raises:
  • QueryError – If query execution fails or SQL syntax is invalid

  • ConnectionError – If session is not connected to database

Examples:

from matrixone import Client
from sqlalchemy import select, insert, update, delete, and_, or_

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# ========================================
# String SQL with Parameters
# ========================================
with client.session() as session:
    # Simple INSERT with parameters
    result = session.execute(
        "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
        ("John Doe", "john@example.com", 30)
    )
    print(f"Inserted {result.rowcount} rows")

    # SELECT with parameters
    result = session.execute(
        "SELECT * FROM users WHERE age > ? AND email LIKE ?",
        (25, "%@example.com")
    )
    for row in result:
        print(f"User: {row.name}, Age: {row.age}")

    # UPDATE with parameters
    result = session.execute(
        "UPDATE users SET status = ? WHERE age < ?",
        ("active", 18)
    )
    print(f"Updated {result.rowcount} rows")

    # DELETE with parameters
    result = session.execute(
        "DELETE FROM users WHERE status = ?",
        ("inactive",)
    )
    print(f"Deleted {result.rowcount} rows")

# ========================================
# SQLAlchemy SELECT Statements
# ========================================
with client.session() as session:
    # Basic SELECT
    stmt = select(User).where(User.age > 25)
    result = session.execute(stmt)
    users = result.scalars().all()  # Returns list of User objects

    # SELECT specific columns
    stmt = select(User.name, User.email).where(User.status == 'active')
    result = session.execute(stmt)
    for name, email in result:
        print(f"{name}: {email}")

    # Complex WHERE with AND/OR
    stmt = select(User).where(
        and_(
            User.age > 18,
            or_(
                User.status == 'active',
                User.status == 'pending'
            )
        )
    )
    result = session.execute(stmt)

    # SELECT with JOINs
    stmt = select(User, Order).join(Order, User.id == Order.user_id)
    result = session.execute(stmt)
    for user, order in result:
        print(f"{user.name} ordered {order.amount}")

    # SELECT with aggregation
    from sqlalchemy import func
    stmt = select(func.count(User.id), func.avg(User.age))
    result = session.execute(stmt)
    count, avg_age = result.one()

# ========================================
# SQLAlchemy INSERT Statements
# ========================================
with client.session() as session:
    # Single INSERT
    stmt = insert(User).values(name='Alice', email='alice@example.com', age=28)
    result = session.execute(stmt)
    print(f"Inserted row with ID: {result.lastrowid}")

    # Bulk INSERT
    stmt = insert(User).values([
        {'name': 'Bob', 'email': 'bob@example.com', 'age': 35},
        {'name': 'Carol', 'email': 'carol@example.com', 'age': 42}
    ])
    result = session.execute(stmt)
    print(f"Inserted {result.rowcount} rows")

# ========================================
# SQLAlchemy UPDATE Statements
# ========================================
with client.session() as session:
    # Simple UPDATE
    stmt = update(User).where(User.id == 1).values(email='newemail@example.com')
    result = session.execute(stmt)
    print(f"Updated {result.rowcount} rows")

    # Conditional UPDATE
    stmt = update(User).where(User.age < 18).values(status='minor')
    result = session.execute(stmt)

    # UPDATE with expressions
    stmt = update(Order).values(total=Order.quantity * Order.price)
    result = session.execute(stmt)

# ========================================
# SQLAlchemy DELETE Statements
# ========================================
with client.session() as session:
    # Simple DELETE
    stmt = delete(User).where(User.id == 1)
    result = session.execute(stmt)

    # Conditional DELETE
    stmt = delete(User).where(User.status == 'deleted')
    result = session.execute(stmt)
    print(f"Deleted {result.rowcount} rows")

# ========================================
# Result Processing
# ========================================
with client.session() as session:
    # fetchall() - get all rows
    result = session.execute("SELECT * FROM users")
    rows = result.fetchall()
    for row in rows:
        print(f"User: {row.name}")

    # fetchone() - get one row at a time
    result = session.execute("SELECT * FROM users")
    while row := result.fetchone():
        print(f"User: {row.name}")

    # fetchmany() - get N rows
    result = session.execute("SELECT * FROM users")
    rows = result.fetchmany(10)  # Get 10 rows

    # scalars() - get first column
    stmt = select(User.name)
    result = session.execute(stmt)
    names = result.scalars().all()  # List of names

    # mappings() - get rows as dicts
    result = session.execute("SELECT name, email FROM users")
    for row in result.mappings():
        print(f"{row['name']}: {row['email']}")

    # scalar() - get single value
    stmt = select(func.count(User.id))
    result = session.execute(stmt)
    count = result.scalar()  # Single integer value

# ========================================
# Query Logging Control
# ========================================
with client.session() as session:
    # Disable logging for this query only
    result = session.execute(
        "SELECT * FROM large_table",
        log_mode='off'
    )

    # Force full SQL logging for debugging
    result = session.execute(
        "SELECT * FROM users WHERE complex_condition",
        log_mode='full'
    )

    # Simple logging (show operation type only)
    result = session.execute(
        "UPDATE massive_table SET field = 'value'",
        log_mode='simple'
    )
Best Practices:
  • Use parameters (?-placeholders) to prevent SQL injection

  • Use SQLAlchemy statements for complex queries

  • Use string SQL for simple, dynamic queries

  • Always consume or close result sets

  • Use log_mode=’off’ for frequently executed queries in production

See also

  • AsyncSession.execute(): Async version

  • Client.execute(): Client-level execute without transaction

  • SQLAlchemy Result: Result object documentation

insert(table_name: str, data: dict[str, Any]) ResultSet[source]

Insert data into a table within session.

Args:

table_name: Name of the table
data: Data to insert (dict with column names as keys)

Returns:

ResultSet object
batch_insert(table_name: str, data_list: list[dict[str, Any]]) ResultSet[source]

Batch insert data into a table within session.

Args:

table_name: Name of the table
data_list: List of data dictionaries to insert

Returns:

ResultSet object
query(*columns, snapshot: str | None = None)[source]

Get MatrixOne query builder within session - SQLAlchemy style

Args:

*columns: Can be:
    - Single model class: query(Article) - returns all columns from model
    - Multiple columns: query(Article.id, Article.title) - returns specific columns
    - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries

Returns:

MatrixOneQuery instance configured for the specified columns within session
create_table(table_name: str, columns: dict, **kwargs) Session[source]

Create a table within MatrixOne session.

Args:

table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as client.create_table)
**kwargs: Additional table parameters

Returns:

Session: Self for chaining
drop_table(table_name: str) Session[source]

Drop a table within MatrixOne session.

Args:

table_name: Name of the table to drop

Returns:

Session: Self for chaining
create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) Session[source]

Create a table with vector indexes within MatrixOne session.

Args:

table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as client.create_table)
indexes: List of index definitions (same format as client.create_table_with_index)
**kwargs: Additional table parameters

Returns:

Session: Self for chaining
create_table_orm(table_name: str, *columns, **kwargs) Session[source]

Create a table using SQLAlchemy ORM-style definitions within MatrixOne session.

Args:

table_name: Name of the table
*columns: SQLAlchemy Column objects and Index objects (including VectorIndex)
**kwargs: Additional parameters (like enable_hnsw, enable_ivf)

Returns:

Session: Self for chaining