AsyncClient

MatrixOne Async Client - Asynchronous implementation

class matrixone.async_client.AsyncResultSet(columns: List[str], rows: List[Tuple], affected_rows: int = 0)[source]

Bases: object

Async result set wrapper for query results

__init__(columns: List[str], rows: List[Tuple], affected_rows: int = 0)[source]
fetchall() List[Tuple][source]

Fetch all remaining rows

fetchone() Tuple | None[source]

Fetch one row

fetchmany(size: int = 1) List[Tuple][source]

Fetch many rows

scalar() Any[source]

Get scalar value (first column of first row)

keys()[source]

Get column names

class matrixone.async_client.AsyncSnapshotManager(client)[source]

Bases: object

Async snapshot manager

__init__(client)[source]
async create(name: str, level: str | SnapshotLevel, database: str | None = None, table: str | None = None, description: str | None = None) Snapshot[source]

Create snapshot asynchronously

async get(name: str) Snapshot[source]

Get snapshot asynchronously

async list() List[Snapshot][source]

List all snapshots asynchronously

async delete(name: str) None[source]

Delete snapshot asynchronously

async exists(name: str) bool[source]

Check if snapshot exists asynchronously

class matrixone.async_client.AsyncCloneManager(client)[source]

Bases: object

Async clone manager

__init__(client)[source]
async clone_database(target_db: str, source_db: str, snapshot_name: str | None = None, if_not_exists: bool = False) None[source]

Clone database asynchronously

async clone_table(target_table: str, source_table: str, snapshot_name: str | None = None, if_not_exists: bool = False) None[source]

Clone table asynchronously

async clone_database_with_snapshot(target_db: str, source_db: str, snapshot_name: str, if_not_exists: bool = False) None[source]

Clone database with snapshot asynchronously

async clone_table_with_snapshot(target_table: str, source_table: str, snapshot_name: str, if_not_exists: bool = False) None[source]

Clone table with snapshot asynchronously

class matrixone.async_client.AsyncRestoreManager(client)[source]

Bases: object

Async manager for restore operations

__init__(client)[source]
async restore_cluster(snapshot_name: str) bool[source]

Restore entire cluster from snapshot asynchronously

async restore_tenant(snapshot_name: str, account_name: str, to_account: str | None = None) bool[source]

Restore tenant from snapshot asynchronously

async restore_database(snapshot_name: str, account_name: str, database_name: str, to_account: str | None = None) bool[source]

Restore database from snapshot asynchronously

async restore_table(snapshot_name: str, account_name: str, database_name: str, table_name: str, to_account: str | None = None) bool[source]

Restore table from snapshot asynchronously

class matrixone.async_client.AsyncMoCtlManager(client)[source]

Bases: object

Async mo_ctl manager

__init__(client)[source]
async flush_table(database: str, table: str) Dict[str, Any][source]

Force flush table asynchronously

async increment_checkpoint() Dict[str, Any][source]

Force incremental checkpoint asynchronously

async global_checkpoint() Dict[str, Any][source]

Force global checkpoint asynchronously

class matrixone.async_client.AsyncClientExecutor(client)[source]

Bases: BaseMatrixOneExecutor

Async client executor that uses AsyncClient’s execute method

__init__(client)[source]
async insert(table_name: str, data: dict)[source]

Async insert method

async batch_insert(table_name: str, data_list: list)[source]

Async batch insert method

class matrixone.async_client.AsyncClient(connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]

Bases: BaseMatrixOneClient

MatrixOne Async Client - Asynchronous interface for MatrixOne database operations.

This class provides a comprehensive asynchronous interface for connecting to and interacting with MatrixOne databases. It supports modern async/await patterns including table creation, data insertion, querying, vector operations, and transaction management.

Key Features:

  • Asynchronous connection management with connection pooling

  • High-level table operations (create_table, drop_table, insert, batch_insert)

  • Query builder interface for complex async queries

  • Vector operations (similarity search, range search, indexing)

  • Async transaction management with context managers

  • Snapshot and restore operations

  • Account and user management

  • Fulltext search capabilities

  • Non-blocking I/O operations

Supported Operations:

  • Async connection and disconnection

  • Async query execution (SELECT, INSERT, UPDATE, DELETE)

  • Async batch operations

  • Async transaction management

  • Async table creation and management

  • Async vector and fulltext operations

  • Async snapshot and restore operations

Usage Examples:

Basic async usage::

    async def main():
        client = AsyncClient()
        await client.connect('localhost', 6001, 'root', '111', 'test')

        # Create table using high-level API
        await client.create_table("users", {
            "id": "int primary key",
            "name": "varchar(100)",
            "email": "varchar(255)"
        })

        # Insert data
        await client.insert("users", {"id": 1, "name": "John", "email": "john@example.com"})

        # Query data
        result = await client.query("users").where("id = ?", 1).all()
        print(result.rows)

        await client.disconnect()

Vector operations::

    async def vector_example():
        client = AsyncClient()
        await client.connect('localhost', 6001, 'root', '111', 'test')

        # Create vector table
        await client.create_table("documents", {
            "id": "int primary key",
            "content": "text",
            "embedding": "vecf32(384)"
        })

        # Vector similarity search
        results = await client.vector_ops.similarity_search(
            "documents",
            vector_column="embedding",
            query_vector=[0.1, 0.2, 0.3, ...],  # 384-dimensional vector
            limit=10,
            distance_type="l2"
        )

        await client.disconnect()

Async transaction usage::

    async def transaction_example():
        client = AsyncClient()
        await client.connect('localhost', 6001, 'root', '111', 'test')

        async with client.transaction() as tx:
            await tx.execute("INSERT INTO users (name) VALUES (?)", ("John",))
        await tx.execute("INSERT INTO orders (user_id, amount) VALUES (?, ?)", (1, 100.0))
        # Transaction commits automatically on success

Note: This class requires asyncio and async database drivers. Use the synchronous Client class for blocking operations or when async support is not needed.

__init__(connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]

Initialize MatrixOne async client

Args:

connection_timeout: Connection timeout in seconds
query_timeout: Query timeout in seconds
auto_commit: Enable auto-commit mode
charset: Character set for connection
logger: Custom logger instance. If None, creates a default logger
sql_log_mode: SQL logging mode ('off', 'auto', 'simple', 'full')
    - 'off': No SQL logging
    - 'auto': Smart logging - short SQL shown fully, long SQL summarized (default)
    - 'simple': Show operation summary only
    - 'full': Show complete SQL regardless of length
slow_query_threshold: Threshold in seconds for slow query warnings (default: 1.0)
max_sql_display_length: Maximum SQL length in auto mode before summarizing (default: 500)
async connect(*, host: str = 'localhost', port: int = 6001, user: str = 'root', password: str = '111', database: str, account: str | None = None, role: str | None = None, charset: str = 'utf8mb4', connection_timeout: int = 30, auto_commit: bool = True, on_connect: ConnectionHook | List[ConnectionAction | str] | Callable | None = None)[source]

Connect to MatrixOne database asynchronously

Args:

host: Database host
port: Database port
user: Username or login info in format "user", "account#user", "account#user#role",
      "account:user", or "account:user:role" (both '#' and ':' separators are supported)
password: Password
database: Database name
account: Optional account name (will be combined with user if user doesn't contain '#' or ':')
role: Optional role name (will be combined with user if user doesn't contain '#' or ':')
charset: Character set for the connection (default: utf8mb4)
connection_timeout: Connection timeout in seconds (default: 30)
auto_commit: Enable autocommit (default: True)
on_connect: Connection hook to execute after successful connection.
           Can be:
           - ConnectionHook instance
           - List of ConnectionAction or string action names
           - Custom callback function (async or sync)

Examples:

# Enable all features after connection
await client.connect(host, port, user, password, database,
                   on_connect=[ConnectionAction.ENABLE_ALL])

# Enable only vector operations with custom charset
await client.connect(host, port, user, password, database,
                   charset="utf8mb4",
                   on_connect=[ConnectionAction.ENABLE_VECTOR])

# Custom async callback
async def my_callback(client):
    print(f"Connected to {client._connection_params['host']}")

await client.connect(host, port, user, password, database,
                   on_connect=my_callback)
classmethod from_engine(engine: AsyncEngine, **kwargs) AsyncClient[source]

Create AsyncClient instance from existing SQLAlchemy AsyncEngine

Args:

engine: SQLAlchemy AsyncEngine instance (must use MySQL driver)
**kwargs: Additional client configuration options

Returns:

AsyncClient: Configured async client instance

Raises:

ConnectionError: If engine doesn't use MySQL driver

Examples

Basic usage:

    from sqlalchemy.ext.asyncio import create_async_engine
    from matrixone import AsyncClient

    engine = create_async_engine("mysql+aiomysql://user:pass@host:port/db")
    client = AsyncClient.from_engine(engine)

With custom configuration::

    engine = create_async_engine("mysql+aiomysql://user:pass@host:port/db")
    client = AsyncClient.from_engine(
        engine,
        sql_log_mode='auto',
        slow_query_threshold=0.5
    )
async disconnect()[source]

Disconnect from MatrixOne database asynchronously

disconnect_sync()[source]

Synchronous disconnect for cleanup when event loop is closed

__del__()[source]

Cleanup when object is garbage collected

get_sqlalchemy_engine() AsyncEngine[source]

Get SQLAlchemy async engine

Returns:

SQLAlchemy AsyncEngine
async create_all(base_class=None)[source]

Create all tables defined in the given base class or default Base.

Args:

base_class: SQLAlchemy declarative base class. If None, uses the default Base.
async drop_all(base_class=None)[source]

Drop all tables defined in the given base class or default Base.

Args:

base_class: SQLAlchemy declarative base class. If None, uses the default Base.
async execute(sql_or_stmt, params: Tuple | None = None, log_mode: str | None = None) AsyncResultSet[source]

Execute SQL query or SQLAlchemy statement asynchronously without transaction isolation.

This method executes queries asynchronously using the connection pool, without wrapping them in a transaction. Each statement executes independently with auto-commit enabled. For atomic multi-statement operations, use async with client.session() instead.

The method supports both SQLAlchemy ORM-style statements (recommended) and string SQL with parameter binding. It’s ideal for single-statement async operations like SELECT queries, simple INSERT/UPDATE/DELETE, or DDL statements.

Key Features:

  • Async/await support: Non-blocking execution using async/await patterns

  • ORM-style statements: Full support for SQLAlchemy select(), insert(), update(), delete()

  • Parameter binding: Automatic escaping of parameters to prevent SQL injection

  • Query logging: Integrated async logging with performance tracking

  • Auto-commit: Each statement commits immediately (no transaction isolation)

  • Connection pooling: Efficient async connection reuse from pool

Parameters:
  • sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - SQLAlchemy select() statement (recommended) - SQLAlchemy insert() statement (recommended) - SQLAlchemy update() statement (recommended) - SQLAlchemy delete() statement (recommended) - String SQL with ‘?’ placeholders for parameters - SQLAlchemy text() statement

  • params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Automatically escaped to prevent SQL injection. Ignored for SQLAlchemy statements.

  • log_mode (Optional[str]) – Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If None, uses client’s global sql_log_mode setting. Useful for debugging or disabling logs for frequently-executed queries.

Returns:

Async query result object with:
  • columns: List[str] - Column names

  • rows: List[Tuple] - Row data as tuples

  • affected_rows: int - Number of rows affected by DML operations

  • fetchall() -> List[Row] - Get all rows as list

  • fetchone() -> Optional[Row] - Get next row or None

  • fetchmany(size) -> List[Row] - Get next N rows

Return type:

AsyncResultSet

Raises:
  • ConnectionError – If not connected to database

  • QueryError – If query execution fails or SQL syntax is invalid

Usage Examples:

from matrixone import AsyncClient
from sqlalchemy import select, insert, update, delete, and_, or_, func
from sqlalchemy.orm import declarative_base
import asyncio

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)
    status = Column(String(20))

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    amount = Column(Float)

async def main():
    client = AsyncClient()
    await client.connect(host='localhost', port=6001, user='root', password='111', database='test')

    # ========================================
    # SQLAlchemy SELECT Statements (Recommended)
    # ========================================

    # Basic SELECT with WHERE clause
    stmt = select(User).where(User.age > 25)
    result = await client.execute(stmt)
    for user in result.fetchall():
        print(f"User: {user.name}, Age: {user.age}")

    # SELECT specific columns
    stmt = select(User.name, User.email).where(User.status == 'active')
    result = await client.execute(stmt)
    for name, email in result.fetchall():
        print(f"{name}: {email}")

    # Complex WHERE with AND/OR
    stmt = select(User).where(
        and_(
            User.age > 18,
            or_(
                User.status == 'active',
                User.status == 'pending'
            )
        )
    )
    result = await client.execute(stmt)

    # SELECT with JOIN
    stmt = select(User, Order).join(Order, User.id == Order.user_id)
    result = await client.execute(stmt)
    for user, order in result.fetchall():
        print(f"{user.name} ordered ${order.amount}")

    # SELECT with aggregation
    stmt = select(func.count(User.id), func.avg(User.age)).where(User.status == 'active')
    result = await client.execute(stmt)
    count, avg_age = result.fetchone()
    print(f"Active users: {count}, Average age: {avg_age}")

    # ========================================
    # SQLAlchemy INSERT Statements (Recommended)
    # ========================================

    # Single INSERT
    stmt = insert(User).values(name='John', email='john@example.com', age=30)
    result = await client.execute(stmt)
    print(f"Inserted {result.affected_rows} rows")

    # Bulk INSERT
    stmt = insert(User).values([
        {'name': 'Alice', 'email': 'alice@example.com', 'age': 28},
        {'name': 'Bob', 'email': 'bob@example.com', 'age': 35},
        {'name': 'Carol', 'email': 'carol@example.com', 'age': 42}
    ])
    result = await client.execute(stmt)
    print(f"Inserted {result.affected_rows} rows")

    # ========================================
    # SQLAlchemy UPDATE Statements (Recommended)
    # ========================================

    # Simple UPDATE
    stmt = update(User).where(User.id == 1).values(email='newemail@example.com')
    result = await client.execute(stmt)
    print(f"Updated {result.affected_rows} rows")

    # Conditional UPDATE
    stmt = update(User).where(User.age < 18).values(status='minor')
    result = await client.execute(stmt)

    # UPDATE with expressions
    stmt = update(Order).values(total=Order.quantity * Order.price)
    result = await client.execute(stmt)

    # ========================================
    # SQLAlchemy DELETE Statements (Recommended)
    # ========================================

    # Simple DELETE
    stmt = delete(User).where(User.id == 1)
    result = await client.execute(stmt)
    print(f"Deleted {result.affected_rows} rows")

    # Conditional DELETE
    stmt = delete(User).where(User.status == 'deleted')
    result = await client.execute(stmt)

    # DELETE with complex condition
    stmt = delete(User).where(
        and_(
            User.age < 18,
            User.status == 'inactive'
        )
    )
    result = await client.execute(stmt)

    # ========================================
    # Concurrent Execution with asyncio.gather
    # ========================================

    # Execute multiple independent queries concurrently
    user_stmt = select(User).where(User.age > 25)
    order_stmt = select(Order).where(Order.amount > 100)

    user_result, order_result = await asyncio.gather(
        client.execute(user_stmt),
        client.execute(order_stmt)
    )

    print(f"Users: {len(user_result.fetchall())}")
    print(f"Orders: {len(order_result.fetchall())}")

    # ========================================
    # String SQL with Parameters (Alternative)
    # ========================================

    # SELECT with parameters
    result = await client.execute(
        "SELECT * FROM users WHERE age > ? AND status = ?",
        (25, 'active')
    )

    # INSERT with parameters
    result = await client.execute(
        "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
        ('David', 'david@example.com', 28)
    )

    # UPDATE with parameters
    result = await client.execute(
        "UPDATE users SET status = ? WHERE age < ?",
        ('minor', 18)
    )

    # ========================================
    # Query Logging Control
    # ========================================

    # Disable logging for frequently executed query
    result = await client.execute(
        select(User).where(User.id == 1),
        log_mode='off'
    )

    # Force full SQL logging for debugging
    result = await client.execute(
        select(User).where(User.name.like('%test%')),
        log_mode='full'
    )

    await client.disconnect()

asyncio.run(main())

Important Notes:

  • No transaction isolation: Each execute() call commits immediately

  • Use session() for transactions: For atomic multi-statement operations

  • ORM-style preferred: Use SQLAlchemy statements for better type safety

  • Auto-commit behavior: Changes are permanent immediately after execute()

  • Non-blocking: Uses async/await and doesn’t block event loop

  • Concurrent execution: Use asyncio.gather() for parallel queries

Best Practices:

  1. Prefer ORM-style statements: Use select(), insert(), update(), delete()

  2. Use parameters: Always use parameter binding to prevent SQL injection

  3. Session for transactions: Use client.session() for atomic operations

  4. Use asyncio.gather(): For concurrent independent queries

  5. Disable logging in production: Use log_mode=’off’ for hot paths

  6. Handle exceptions: Wrap execute() in try-except for error handling

See also

  • AsyncClient.session(): For transaction-aware async operations

  • AsyncSession.execute(): Execute within async transaction context

  • Client.execute(): Synchronous version

get_login_info() dict | None[source]

Get parsed login information

query(*columns, snapshot: str | None = None)[source]

Get async MatrixOne query builder - SQLAlchemy style

Args:

*columns: Can be:
    - Single model class: query(Article) - returns all columns from model
    - Multiple columns: query(Article.id, Article.title) - returns specific columns
    - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries

Examples

# Traditional model query (all columns)

await client.query(Article).filter(…).all()

# Column-specific query await client.query(Article.id, Article.title).filter(…).all()

# With fulltext score await client.query(Article.id, boolean_match(“title”, “content”).must(“python”).label(“score”))

# Snapshot query await client.query(Article, snapshot=”my_snapshot”).filter(…).all()

Returns:

AsyncMatrixOneQuery instance configured for the specified columns
snapshot(snapshot_name: str)[source]

Snapshot context manager

Usage

async with client.snapshot(“daily_backup”) as snapshot_client: result = await snapshot_client.execute(“SELECT * FROM users”)

async insert(table_name_or_model, data: dict) AsyncResultSet[source]

Insert data into a table asynchronously.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data: Data to insert (dict with column names as keys)

Returns:

AsyncResultSet object
async batch_insert(table_name_or_model, data_list: list) AsyncResultSet[source]

Batch insert data into a table asynchronously.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data_list: List of data dictionaries to insert

Returns:

AsyncResultSet object
session()[source]

Create an async transaction-aware session for atomic database operations.

This method returns an AsyncSession that extends SQLAlchemy AsyncSession with MatrixOne-specific features. All operations within the session are executed atomically using async/await patterns - they either all succeed or all fail together.

The session is an async context manager that automatically handles transaction lifecycle: - Commits the transaction when the context exits normally - Rolls back the transaction if any exception occurs - Cleans up database resources automatically - Enables non-blocking concurrent operations

Key Features:

  • Full async SQLAlchemy ORM: All standard async Session methods with await

  • Atomic transactions: Multiple async operations commit or rollback together

  • Async MatrixOne managers: All MatrixOne operations available asynchronously

  • Concurrent execution: Use asyncio.gather() for parallel operations

  • Non-blocking: All operations use async/await and don’t block event loop

  • ORM-style operations: Use SQLAlchemy select(), insert(), update(), delete()

Available Async Managers (transaction-aware):

  • session.snapshots: AsyncSnapshotManager for async snapshot operations

  • session.clone: AsyncCloneManager for async clone operations

  • session.restore: AsyncRestoreManager for async restore operations

  • session.pitr: AsyncPitrManager for async point-in-time recovery

  • session.pubsub: AsyncPubSubManager for async publish-subscribe

  • session.account: AsyncAccountManager for async account management

  • session.vector_ops: AsyncVectorManager for async vector operations

  • session.fulltext_index: AsyncFulltextIndexManager for async fulltext search

  • session.metadata: AsyncMetadataManager for async metadata analysis

  • session.load_data: AsyncLoadDataManager for async bulk loading

  • session.stage: AsyncStageManager for async stage management

Returns:

Async context manager yielding AsyncSession

Return type:

AsyncContextManager[AsyncSession]

Raises:

ConnectionError – If client is not connected to database

Usage Examples:

from matrixone import AsyncClient
from sqlalchemy import select, insert, update, delete
from sqlalchemy.orm import declarative_base
import asyncio

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    amount = Column(Float)

async def main():
    client = AsyncClient()
    await client.connect(host='localhost', port=6001, user='root', password='111', database='test')

    # ========================================
    # Example 1: Basic Async Transaction with ORM-style SQL
    # ========================================
    async with client.session() as session:
        # Insert using SQLAlchemy insert()
        await session.execute(insert(User).values(name='John', email='john@example.com', age=30))

        # Update using SQLAlchemy update()
        await session.execute(update(User).where(User.age < 18).values(status='minor'))

        # Select using SQLAlchemy select()
        stmt = select(User).where(User.age > 25)
        result = await session.execute(stmt)
        for user in result.scalars():
            print(f"User: {user.name}")

        # Delete using SQLAlchemy delete()
        await session.execute(delete(User).where(User.status == 'inactive'))
        # All operations commit atomically

    # ========================================
    # Example 2: Async ORM Operations
    # ========================================
    async with client.session() as session:
        # Create new objects
        user1 = User(name='Alice', email='alice@example.com', age=28)
        user2 = User(name='Bob', email='bob@example.com', age=35)

        # Add to session
        session.add(user1)
        session.add(user2)

        # Query using ORM
        stmt = select(User).where(User.name == 'Alice')
        result = await session.execute(stmt)
        alice = result.scalar_one()

        # Update object
        alice.email = 'newemail@example.com'

        # Commit
        await session.commit()

    # ========================================
    # Example 3: Concurrent Operations with asyncio.gather
    # ========================================
    async with client.session() as session:
        # Execute multiple queries concurrently
        user_task = session.execute(select(User).where(User.age > 25))
        order_task = session.execute(select(Order).where(Order.amount > 100))

        user_result, order_result = await asyncio.gather(user_task, order_task)

        users = user_result.scalars().all()
        orders = order_result.scalars().all()

        print(f"Found {len(users)} users and {len(orders)} orders")

    # ========================================
    # Example 4: Async MatrixOne Managers
    # ========================================
    async with client.session() as session:
        from matrixone import SnapshotLevel

        # Create snapshot asynchronously
        snapshot = await session.snapshots.create(
            name='daily_backup',
            level=SnapshotLevel.DATABASE,
            database='production'
        )

        # Clone database asynchronously
        await session.clone.clone_database(
            target_db='prod_copy',
            source_db='production',
            snapshot_name='daily_backup'
        )
        # Both operations commit atomically

    # ========================================
    # Example 5: Async Data Loading with Stages
    # ========================================
    async with client.session() as session:
        # Create S3 stage using simple interface
        await session.stage.create_s3(
            name='import_stage',
            bucket='my-bucket',
            path='imports/',
            aws_key_id='key',
            aws_secret_key='secret'
        )

        # Load data from stage using ORM model
        await session.load_data.from_stage_csv('import_stage', 'users.csv', User)

        # Update statistics
        await session.execute("ANALYZE TABLE users")
        # All operations are atomic

    # ========================================
    # Example 6: Error Handling and Rollback
    # ========================================
    try:
        async with client.session() as session:
            await session.execute(insert(User).values(name='Charlie', age=40))
            await session.execute(insert(InvalidTable).values(data='test'))  # Fails
            # Transaction automatically rolls back - Charlie is NOT inserted
    except Exception as e:
        print(f"Transaction failed and rolled back: {e}")

    # ========================================
    # Example 7: Complex Multi-Manager Transaction
    # ========================================
    async with client.session() as session:
        # Create publication
        await session.pubsub.create_database_publication(
            name='analytics_pub',
            database='analytics',
            account='subscriber_account'
        )

        # Create local stage
        await session.stage.create_local('export_stage', '/exports/')

        # Load data using ORM model
        await session.load_data.from_csv('/data/latest.csv', Analytics)

        # Create snapshot
        await session.snapshots.create(
            name='post_load_snapshot',
            level=SnapshotLevel.DATABASE,
            database='analytics'
        )
        # All operations commit together

    # ========================================
    # Example 8: High-Performance Concurrent Loading
    # ========================================
    async with client.session() as session:
        # Load multiple files concurrently
        await asyncio.gather(
            session.load_data.from_csv('/data/users.csv', User),
            session.load_data.from_csv('/data/orders.csv', Order),
            session.load_data.from_csv('/data/products.csv', Product)
        )
        # All loads commit atomically

    await client.disconnect()

asyncio.run(main())

Best Practices:

  1. Always use async with: Use async with client.session() for automatic cleanup

  2. Await all operations: All execute/manager operations must be awaited

  3. Use asyncio.gather(): For concurrent operations within session

  4. Keep transactions short: Long transactions can block other operations

  5. Handle exceptions: Wrap session code in try-except for error handling

  6. Use ORM-style SQL: Prefer SQLAlchemy insert(), update(), select(), delete()

See also

  • AsyncSession: The async session class returned by this method

  • Client.session(): Synchronous version

  • AsyncClient.execute(): Non-transactional async query execution

property snapshots: AsyncSnapshotManager

Get async snapshot manager

property clone: AsyncCloneManager

Get async clone manager

property branch: AsyncBranchManager

Get async branch manager for Git-style version control operations.

Provides asynchronous table and database branching, diffing, and merging capabilities. Requires MatrixOne 3.0.5 or higher.

Returns:

AsyncBranchManager instance for async branch operations

Example:

import asyncio
from matrixone import AsyncClient

async def main():
    client = AsyncClient()
    await client.connect(database='test')

    # Create table branch
    await client.branch.create_table_branch('users_branch', 'users')

    # Create database branch
    await client.branch.create_database_branch('dev_db', 'production')

    # Compare branches
    diffs = await client.branch.diff_table('users_branch', 'users')

    # Merge branches
    await client.branch.merge_table('users_branch', 'users')

    # Delete branches
    await client.branch.delete_table_branch('users_branch')
    await client.branch.delete_database_branch('dev_db')

    await client.disconnect()

asyncio.run(main())

See also

  • branch_guide - Complete branch management guide

property moctl: AsyncMoCtlManager

Get async mo_ctl manager

property restore: AsyncRestoreManager

Get async restore manager

property pitr: AsyncPitrManager

Get async PITR manager

property pubsub: AsyncPubSubManager

Get async publish-subscribe manager

property account: AsyncAccountManager

Get async account manager

connected() bool[source]

Check if client is connected to database

property vector_ops

Get unified vector operations manager for vector operations (index and data)

get_pinecone_index(table_name_or_model, vector_column: str)[source]

Get a PineconeCompatibleIndex object for vector search operations.

This method creates a Pinecone-compatible vector search interface that automatically parses the table schema and vector index configuration. The primary key column is automatically detected, and all other columns except the vector column will be included as metadata.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
vector_column: Name of the vector column

Returns:

PineconeCompatibleIndex object with Pinecone-compatible API

Example:

index = await client.get_pinecone_index("documents", "embedding")
results = await index.query_async([0.1, 0.2, 0.3], top_k=5)
for match in results.matches:
    print(f"ID: {match.id}, Score: {match.score}")
property fulltext_index

Get fulltext index manager for fulltext index operations

async version() str[source]

Get MatrixOne server version asynchronously

Returns:

str: MatrixOne server version string

Raises:

ConnectionError: If not connected to MatrixOne
QueryError: If version query fails

Example

>>> client = AsyncClient()
        >>> await client.connect('localhost', 6001, 'root', '111', 'test')
        >>> version = await client.version()
        >>> print(f"MatrixOne version: {version}")
async git_version() str[source]

Get MatrixOne git version information asynchronously

Returns:

str: MatrixOne git version string

Raises:

ConnectionError: If not connected to MatrixOne
QueryError: If git version query fails

Example

>>> client = AsyncClient()
        >>> await client.connect('localhost', 6001, 'root', '111', 'test')
        >>> git_version = await client.git_version()
        >>> print(f"MatrixOne git version: {git_version}")
async get_secondary_index_tables(table_name: str, database_name: str | None = None) List[str][source]

Get all secondary index table names for a given table (async version).

This includes both regular secondary indexes (MULTIPLE type) and UNIQUE indexes.

Parameters:
  • table_name – Name of the table to get secondary indexes for

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

List of secondary index table names (includes both __mo_index_secondary_… and __mo_index_unique_… tables)

Examples:

>>> async with AsyncClient() as client:
...     await client.connect(host='localhost', port=6001, user='root', password='111', database='test')
...     # Use current database
...     index_tables = await client.get_secondary_index_tables('cms_all_content_chunk_info')
...     # Or specify database explicitly
...     index_tables = await client.get_secondary_index_tables('cms_all_content_chunk_info', 'test')
...     print(index_tables)
async get_secondary_index_table_by_name(table_name: str, index_name: str, database_name: str | None = None) str | None[source]

Get the physical table name of a secondary index by its index name (async version).

Parameters:
  • table_name – Name of the table

  • index_name – Name of the secondary index

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

Physical table name of the secondary index, or None if not found

Examples:

>>> async with AsyncClient() as client:
...     await client.connect(host='localhost', port=6001, user='root', password='111', database='test')
...     # Use current database
...     index_table = await client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id')
...     # Or specify database explicitly
...     index_table = await client.get_secondary_index_table_by_name(
...         'cms_all_content_chunk_info', 'cms_id', 'test'
...     )
...     print(index_table)
async verify_table_index_counts(table_name: str) int[source]

Verify that the main table and all its secondary index tables have the same row count (async version).

This method compares the COUNT(*) of the main table with all its secondary index tables in a single SQL query for consistency. If counts don’t match, raises an exception.

Parameters:

table_name – Name of the table to verify

Returns:

Row count (int) if verification succeeds

Raises:

ValueError – If any secondary index table has a different count than the main table, with details about all counts in the error message

Examples:

>>> async with AsyncClient() as client:
...     await client.connect(host='localhost', port=6001, user='root', password='111', database='test')
...     count = await client.verify_table_index_counts('cms_all_content_chunk_info')
...     print(f"✓ Verification passed, row count: {count}")

>>> # If verification fails:
>>> try:
...     count = await client.verify_table_index_counts('some_table')
... except ValueError as e:
...     print(f"Verification failed: {e}")
async create_table(table_name_or_model, columns: dict | None = None, **kwargs) AsyncClient[source]

Create a table asynchronously

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
columns: Dictionary mapping column names to their definitions (required if table_name_or_model is str)
**kwargs: Additional table creation options

Returns:

AsyncClient: Self for chaining

Example

>>> await client.create_table("users", {
        ...     "id": "int primary key",
        ...     "name": "varchar(100)",
        ...     "email": "varchar(255)"
        ... })
async drop_table(table_name_or_model) AsyncClient[source]

Drop a table asynchronously

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class

Returns:

AsyncClient: Self for chaining

Example

>>> await client.drop_table("users")
async create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) AsyncClient[source]

Create a table with indexes asynchronously

Args:

table_name: Name of the table to create
columns: Dictionary mapping column names to their definitions
indexes: List of index definitions
**kwargs: Additional table creation options

Returns:

AsyncClient: Self for chaining

Example

>>> await client.create_table_with_index("users", {
        ...     "id": "int primary key",
        ...     "name": "varchar(100)",
        ...     "email": "varchar(255)"
        ... }, [
        ...     {"name": "idx_name", "columns": ["name"]},
        ...     {"name": "idx_email", "columns": ["email"], "unique": True}
        ... ])
async create_table_orm(table_name: str, *columns, **kwargs) AsyncClient[source]

Create a table using SQLAlchemy ORM asynchronously

Args:

table_name: Name of the table to create
*columns: SQLAlchemy column definitions
**kwargs: Additional table creation options

Returns:

AsyncClient: Self for chaining

Example

>>> from sqlalchemy import Column, Integer, String
        >>> await client.create_table_orm("users",
        ...     Column("id", Integer, primary_key=True),
        ...     Column("name", String(100)),
        ...     Column("email", String(255))
        ... )
property metadata: AsyncMetadataManager | None

Get metadata manager for table metadata operations

property load_data: AsyncLoadDataManager | None

Get async load data manager for bulk data loading operations

property stage: AsyncStageManager | None

Get async stage manager for external stage operations

property cdc: AsyncCDCManager | None

Get async CDC manager for change data capture operations.

AsyncClient Class

class matrixone.async_client.AsyncClient(connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]

Bases: BaseMatrixOneClient

MatrixOne Async Client - Asynchronous interface for MatrixOne database operations.

This class provides a comprehensive asynchronous interface for connecting to and interacting with MatrixOne databases. It supports modern async/await patterns including table creation, data insertion, querying, vector operations, and transaction management.

Key Features:

  • Asynchronous connection management with connection pooling

  • High-level table operations (create_table, drop_table, insert, batch_insert)

  • Query builder interface for complex async queries

  • Vector operations (similarity search, range search, indexing)

  • Async transaction management with context managers

  • Snapshot and restore operations

  • Account and user management

  • Fulltext search capabilities

  • Non-blocking I/O operations

Supported Operations:

  • Async connection and disconnection

  • Async query execution (SELECT, INSERT, UPDATE, DELETE)

  • Async batch operations

  • Async transaction management

  • Async table creation and management

  • Async vector and fulltext operations

  • Async snapshot and restore operations

Usage Examples:

Basic async usage::

    async def main():
        client = AsyncClient()
        await client.connect('localhost', 6001, 'root', '111', 'test')

        # Create table using high-level API
        await client.create_table("users", {
            "id": "int primary key",
            "name": "varchar(100)",
            "email": "varchar(255)"
        })

        # Insert data
        await client.insert("users", {"id": 1, "name": "John", "email": "john@example.com"})

        # Query data
        result = await client.query("users").where("id = ?", 1).all()
        print(result.rows)

        await client.disconnect()

Vector operations::

    async def vector_example():
        client = AsyncClient()
        await client.connect('localhost', 6001, 'root', '111', 'test')

        # Create vector table
        await client.create_table("documents", {
            "id": "int primary key",
            "content": "text",
            "embedding": "vecf32(384)"
        })

        # Vector similarity search
        results = await client.vector_ops.similarity_search(
            "documents",
            vector_column="embedding",
            query_vector=[0.1, 0.2, 0.3, ...],  # 384-dimensional vector
            limit=10,
            distance_type="l2"
        )

        await client.disconnect()

Async transaction usage::

    async def transaction_example():
        client = AsyncClient()
        await client.connect('localhost', 6001, 'root', '111', 'test')

        async with client.transaction() as tx:
            await tx.execute("INSERT INTO users (name) VALUES (?)", ("John",))
        await tx.execute("INSERT INTO orders (user_id, amount) VALUES (?, ?)", (1, 100.0))
        # Transaction commits automatically on success

Note: This class requires asyncio and async database drivers. Use the synchronous Client class for blocking operations or when async support is not needed.

__init__(connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]

Initialize MatrixOne async client

Args:

connection_timeout: Connection timeout in seconds
query_timeout: Query timeout in seconds
auto_commit: Enable auto-commit mode
charset: Character set for connection
logger: Custom logger instance. If None, creates a default logger
sql_log_mode: SQL logging mode ('off', 'auto', 'simple', 'full')
    - 'off': No SQL logging
    - 'auto': Smart logging - short SQL shown fully, long SQL summarized (default)
    - 'simple': Show operation summary only
    - 'full': Show complete SQL regardless of length
slow_query_threshold: Threshold in seconds for slow query warnings (default: 1.0)
max_sql_display_length: Maximum SQL length in auto mode before summarizing (default: 500)
async connect(*, host: str = 'localhost', port: int = 6001, user: str = 'root', password: str = '111', database: str, account: str | None = None, role: str | None = None, charset: str = 'utf8mb4', connection_timeout: int = 30, auto_commit: bool = True, on_connect: ConnectionHook | List[ConnectionAction | str] | Callable | None = None)[source]

Connect to MatrixOne database asynchronously

Args:

host: Database host
port: Database port
user: Username or login info in format "user", "account#user", "account#user#role",
      "account:user", or "account:user:role" (both '#' and ':' separators are supported)
password: Password
database: Database name
account: Optional account name (will be combined with user if user doesn't contain '#' or ':')
role: Optional role name (will be combined with user if user doesn't contain '#' or ':')
charset: Character set for the connection (default: utf8mb4)
connection_timeout: Connection timeout in seconds (default: 30)
auto_commit: Enable autocommit (default: True)
on_connect: Connection hook to execute after successful connection.
           Can be:
           - ConnectionHook instance
           - List of ConnectionAction or string action names
           - Custom callback function (async or sync)

Examples:

# Enable all features after connection
await client.connect(host, port, user, password, database,
                   on_connect=[ConnectionAction.ENABLE_ALL])

# Enable only vector operations with custom charset
await client.connect(host, port, user, password, database,
                   charset="utf8mb4",
                   on_connect=[ConnectionAction.ENABLE_VECTOR])

# Custom async callback
async def my_callback(client):
    print(f"Connected to {client._connection_params['host']}")

await client.connect(host, port, user, password, database,
                   on_connect=my_callback)
classmethod from_engine(engine: AsyncEngine, **kwargs) AsyncClient[source]

Create AsyncClient instance from existing SQLAlchemy AsyncEngine

Args:

engine: SQLAlchemy AsyncEngine instance (must use MySQL driver)
**kwargs: Additional client configuration options

Returns:

AsyncClient: Configured async client instance

Raises:

ConnectionError: If engine doesn't use MySQL driver

Examples

Basic usage:

    from sqlalchemy.ext.asyncio import create_async_engine
    from matrixone import AsyncClient

    engine = create_async_engine("mysql+aiomysql://user:pass@host:port/db")
    client = AsyncClient.from_engine(engine)

With custom configuration::

    engine = create_async_engine("mysql+aiomysql://user:pass@host:port/db")
    client = AsyncClient.from_engine(
        engine,
        sql_log_mode='auto',
        slow_query_threshold=0.5
    )
async disconnect()[source]

Disconnect from MatrixOne database asynchronously

disconnect_sync()[source]

Synchronous disconnect for cleanup when event loop is closed

__del__()[source]

Cleanup when object is garbage collected

get_sqlalchemy_engine() AsyncEngine[source]

Get SQLAlchemy async engine

Returns:

SQLAlchemy AsyncEngine
async create_all(base_class=None)[source]

Create all tables defined in the given base class or default Base.

Args:

base_class: SQLAlchemy declarative base class. If None, uses the default Base.
async drop_all(base_class=None)[source]

Drop all tables defined in the given base class or default Base.

Args:

base_class: SQLAlchemy declarative base class. If None, uses the default Base.
async execute(sql_or_stmt, params: Tuple | None = None, log_mode: str | None = None) AsyncResultSet[source]

Execute SQL query or SQLAlchemy statement asynchronously without transaction isolation.

This method executes queries asynchronously using the connection pool, without wrapping them in a transaction. Each statement executes independently with auto-commit enabled. For atomic multi-statement operations, use async with client.session() instead.

The method supports both SQLAlchemy ORM-style statements (recommended) and string SQL with parameter binding. It’s ideal for single-statement async operations like SELECT queries, simple INSERT/UPDATE/DELETE, or DDL statements.

Key Features:

  • Async/await support: Non-blocking execution using async/await patterns

  • ORM-style statements: Full support for SQLAlchemy select(), insert(), update(), delete()

  • Parameter binding: Automatic escaping of parameters to prevent SQL injection

  • Query logging: Integrated async logging with performance tracking

  • Auto-commit: Each statement commits immediately (no transaction isolation)

  • Connection pooling: Efficient async connection reuse from pool

Parameters:
  • sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - SQLAlchemy select() statement (recommended) - SQLAlchemy insert() statement (recommended) - SQLAlchemy update() statement (recommended) - SQLAlchemy delete() statement (recommended) - String SQL with ‘?’ placeholders for parameters - SQLAlchemy text() statement

  • params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Automatically escaped to prevent SQL injection. Ignored for SQLAlchemy statements.

  • log_mode (Optional[str]) – Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If None, uses client’s global sql_log_mode setting. Useful for debugging or disabling logs for frequently-executed queries.

Returns:

Async query result object with:
  • columns: List[str] - Column names

  • rows: List[Tuple] - Row data as tuples

  • affected_rows: int - Number of rows affected by DML operations

  • fetchall() -> List[Row] - Get all rows as list

  • fetchone() -> Optional[Row] - Get next row or None

  • fetchmany(size) -> List[Row] - Get next N rows

Return type:

AsyncResultSet

Raises:
  • ConnectionError – If not connected to database

  • QueryError – If query execution fails or SQL syntax is invalid

Usage Examples:

from matrixone import AsyncClient
from sqlalchemy import select, insert, update, delete, and_, or_, func
from sqlalchemy.orm import declarative_base
import asyncio

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)
    status = Column(String(20))

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    amount = Column(Float)

async def main():
    client = AsyncClient()
    await client.connect(host='localhost', port=6001, user='root', password='111', database='test')

    # ========================================
    # SQLAlchemy SELECT Statements (Recommended)
    # ========================================

    # Basic SELECT with WHERE clause
    stmt = select(User).where(User.age > 25)
    result = await client.execute(stmt)
    for user in result.fetchall():
        print(f"User: {user.name}, Age: {user.age}")

    # SELECT specific columns
    stmt = select(User.name, User.email).where(User.status == 'active')
    result = await client.execute(stmt)
    for name, email in result.fetchall():
        print(f"{name}: {email}")

    # Complex WHERE with AND/OR
    stmt = select(User).where(
        and_(
            User.age > 18,
            or_(
                User.status == 'active',
                User.status == 'pending'
            )
        )
    )
    result = await client.execute(stmt)

    # SELECT with JOIN
    stmt = select(User, Order).join(Order, User.id == Order.user_id)
    result = await client.execute(stmt)
    for user, order in result.fetchall():
        print(f"{user.name} ordered ${order.amount}")

    # SELECT with aggregation
    stmt = select(func.count(User.id), func.avg(User.age)).where(User.status == 'active')
    result = await client.execute(stmt)
    count, avg_age = result.fetchone()
    print(f"Active users: {count}, Average age: {avg_age}")

    # ========================================
    # SQLAlchemy INSERT Statements (Recommended)
    # ========================================

    # Single INSERT
    stmt = insert(User).values(name='John', email='john@example.com', age=30)
    result = await client.execute(stmt)
    print(f"Inserted {result.affected_rows} rows")

    # Bulk INSERT
    stmt = insert(User).values([
        {'name': 'Alice', 'email': 'alice@example.com', 'age': 28},
        {'name': 'Bob', 'email': 'bob@example.com', 'age': 35},
        {'name': 'Carol', 'email': 'carol@example.com', 'age': 42}
    ])
    result = await client.execute(stmt)
    print(f"Inserted {result.affected_rows} rows")

    # ========================================
    # SQLAlchemy UPDATE Statements (Recommended)
    # ========================================

    # Simple UPDATE
    stmt = update(User).where(User.id == 1).values(email='newemail@example.com')
    result = await client.execute(stmt)
    print(f"Updated {result.affected_rows} rows")

    # Conditional UPDATE
    stmt = update(User).where(User.age < 18).values(status='minor')
    result = await client.execute(stmt)

    # UPDATE with expressions
    stmt = update(Order).values(total=Order.quantity * Order.price)
    result = await client.execute(stmt)

    # ========================================
    # SQLAlchemy DELETE Statements (Recommended)
    # ========================================

    # Simple DELETE
    stmt = delete(User).where(User.id == 1)
    result = await client.execute(stmt)
    print(f"Deleted {result.affected_rows} rows")

    # Conditional DELETE
    stmt = delete(User).where(User.status == 'deleted')
    result = await client.execute(stmt)

    # DELETE with complex condition
    stmt = delete(User).where(
        and_(
            User.age < 18,
            User.status == 'inactive'
        )
    )
    result = await client.execute(stmt)

    # ========================================
    # Concurrent Execution with asyncio.gather
    # ========================================

    # Execute multiple independent queries concurrently
    user_stmt = select(User).where(User.age > 25)
    order_stmt = select(Order).where(Order.amount > 100)

    user_result, order_result = await asyncio.gather(
        client.execute(user_stmt),
        client.execute(order_stmt)
    )

    print(f"Users: {len(user_result.fetchall())}")
    print(f"Orders: {len(order_result.fetchall())}")

    # ========================================
    # String SQL with Parameters (Alternative)
    # ========================================

    # SELECT with parameters
    result = await client.execute(
        "SELECT * FROM users WHERE age > ? AND status = ?",
        (25, 'active')
    )

    # INSERT with parameters
    result = await client.execute(
        "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
        ('David', 'david@example.com', 28)
    )

    # UPDATE with parameters
    result = await client.execute(
        "UPDATE users SET status = ? WHERE age < ?",
        ('minor', 18)
    )

    # ========================================
    # Query Logging Control
    # ========================================

    # Disable logging for frequently executed query
    result = await client.execute(
        select(User).where(User.id == 1),
        log_mode='off'
    )

    # Force full SQL logging for debugging
    result = await client.execute(
        select(User).where(User.name.like('%test%')),
        log_mode='full'
    )

    await client.disconnect()

asyncio.run(main())

Important Notes:

  • No transaction isolation: Each execute() call commits immediately

  • Use session() for transactions: For atomic multi-statement operations

  • ORM-style preferred: Use SQLAlchemy statements for better type safety

  • Auto-commit behavior: Changes are permanent immediately after execute()

  • Non-blocking: Uses async/await and doesn’t block event loop

  • Concurrent execution: Use asyncio.gather() for parallel queries

Best Practices:

  1. Prefer ORM-style statements: Use select(), insert(), update(), delete()

  2. Use parameters: Always use parameter binding to prevent SQL injection

  3. Session for transactions: Use client.session() for atomic operations

  4. Use asyncio.gather(): For concurrent independent queries

  5. Disable logging in production: Use log_mode=’off’ for hot paths

  6. Handle exceptions: Wrap execute() in try-except for error handling

See also

  • AsyncClient.session(): For transaction-aware async operations

  • AsyncSession.execute(): Execute within async transaction context

  • Client.execute(): Synchronous version

get_login_info() dict | None[source]

Get parsed login information

query(*columns, snapshot: str | None = None)[source]

Get async MatrixOne query builder - SQLAlchemy style

Args:

*columns: Can be:
    - Single model class: query(Article) - returns all columns from model
    - Multiple columns: query(Article.id, Article.title) - returns specific columns
    - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries

Examples

# Traditional model query (all columns)

await client.query(Article).filter(…).all()

# Column-specific query await client.query(Article.id, Article.title).filter(…).all()

# With fulltext score await client.query(Article.id, boolean_match(“title”, “content”).must(“python”).label(“score”))

# Snapshot query await client.query(Article, snapshot=”my_snapshot”).filter(…).all()

Returns:

AsyncMatrixOneQuery instance configured for the specified columns
snapshot(snapshot_name: str)[source]

Snapshot context manager

Usage

async with client.snapshot(“daily_backup”) as snapshot_client: result = await snapshot_client.execute(“SELECT * FROM users”)

async insert(table_name_or_model, data: dict) AsyncResultSet[source]

Insert data into a table asynchronously.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data: Data to insert (dict with column names as keys)

Returns:

AsyncResultSet object
async batch_insert(table_name_or_model, data_list: list) AsyncResultSet[source]

Batch insert data into a table asynchronously.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data_list: List of data dictionaries to insert

Returns:

AsyncResultSet object
session()[source]

Create an async transaction-aware session for atomic database operations.

This method returns an AsyncSession that extends SQLAlchemy AsyncSession with MatrixOne-specific features. All operations within the session are executed atomically using async/await patterns - they either all succeed or all fail together.

The session is an async context manager that automatically handles transaction lifecycle: - Commits the transaction when the context exits normally - Rolls back the transaction if any exception occurs - Cleans up database resources automatically - Enables non-blocking concurrent operations

Key Features:

  • Full async SQLAlchemy ORM: All standard async Session methods with await

  • Atomic transactions: Multiple async operations commit or rollback together

  • Async MatrixOne managers: All MatrixOne operations available asynchronously

  • Concurrent execution: Use asyncio.gather() for parallel operations

  • Non-blocking: All operations use async/await and don’t block event loop

  • ORM-style operations: Use SQLAlchemy select(), insert(), update(), delete()

Available Async Managers (transaction-aware):

  • session.snapshots: AsyncSnapshotManager for async snapshot operations

  • session.clone: AsyncCloneManager for async clone operations

  • session.restore: AsyncRestoreManager for async restore operations

  • session.pitr: AsyncPitrManager for async point-in-time recovery

  • session.pubsub: AsyncPubSubManager for async publish-subscribe

  • session.account: AsyncAccountManager for async account management

  • session.vector_ops: AsyncVectorManager for async vector operations

  • session.fulltext_index: AsyncFulltextIndexManager for async fulltext search

  • session.metadata: AsyncMetadataManager for async metadata analysis

  • session.load_data: AsyncLoadDataManager for async bulk loading

  • session.stage: AsyncStageManager for async stage management

Returns:

Async context manager yielding AsyncSession

Return type:

AsyncContextManager[AsyncSession]

Raises:

ConnectionError – If client is not connected to database

Usage Examples:

from matrixone import AsyncClient
from sqlalchemy import select, insert, update, delete
from sqlalchemy.orm import declarative_base
import asyncio

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    amount = Column(Float)

async def main():
    client = AsyncClient()
    await client.connect(host='localhost', port=6001, user='root', password='111', database='test')

    # ========================================
    # Example 1: Basic Async Transaction with ORM-style SQL
    # ========================================
    async with client.session() as session:
        # Insert using SQLAlchemy insert()
        await session.execute(insert(User).values(name='John', email='john@example.com', age=30))

        # Update using SQLAlchemy update()
        await session.execute(update(User).where(User.age < 18).values(status='minor'))

        # Select using SQLAlchemy select()
        stmt = select(User).where(User.age > 25)
        result = await session.execute(stmt)
        for user in result.scalars():
            print(f"User: {user.name}")

        # Delete using SQLAlchemy delete()
        await session.execute(delete(User).where(User.status == 'inactive'))
        # All operations commit atomically

    # ========================================
    # Example 2: Async ORM Operations
    # ========================================
    async with client.session() as session:
        # Create new objects
        user1 = User(name='Alice', email='alice@example.com', age=28)
        user2 = User(name='Bob', email='bob@example.com', age=35)

        # Add to session
        session.add(user1)
        session.add(user2)

        # Query using ORM
        stmt = select(User).where(User.name == 'Alice')
        result = await session.execute(stmt)
        alice = result.scalar_one()

        # Update object
        alice.email = 'newemail@example.com'

        # Commit
        await session.commit()

    # ========================================
    # Example 3: Concurrent Operations with asyncio.gather
    # ========================================
    async with client.session() as session:
        # Execute multiple queries concurrently
        user_task = session.execute(select(User).where(User.age > 25))
        order_task = session.execute(select(Order).where(Order.amount > 100))

        user_result, order_result = await asyncio.gather(user_task, order_task)

        users = user_result.scalars().all()
        orders = order_result.scalars().all()

        print(f"Found {len(users)} users and {len(orders)} orders")

    # ========================================
    # Example 4: Async MatrixOne Managers
    # ========================================
    async with client.session() as session:
        from matrixone import SnapshotLevel

        # Create snapshot asynchronously
        snapshot = await session.snapshots.create(
            name='daily_backup',
            level=SnapshotLevel.DATABASE,
            database='production'
        )

        # Clone database asynchronously
        await session.clone.clone_database(
            target_db='prod_copy',
            source_db='production',
            snapshot_name='daily_backup'
        )
        # Both operations commit atomically

    # ========================================
    # Example 5: Async Data Loading with Stages
    # ========================================
    async with client.session() as session:
        # Create S3 stage using simple interface
        await session.stage.create_s3(
            name='import_stage',
            bucket='my-bucket',
            path='imports/',
            aws_key_id='key',
            aws_secret_key='secret'
        )

        # Load data from stage using ORM model
        await session.load_data.from_stage_csv('import_stage', 'users.csv', User)

        # Update statistics
        await session.execute("ANALYZE TABLE users")
        # All operations are atomic

    # ========================================
    # Example 6: Error Handling and Rollback
    # ========================================
    try:
        async with client.session() as session:
            await session.execute(insert(User).values(name='Charlie', age=40))
            await session.execute(insert(InvalidTable).values(data='test'))  # Fails
            # Transaction automatically rolls back - Charlie is NOT inserted
    except Exception as e:
        print(f"Transaction failed and rolled back: {e}")

    # ========================================
    # Example 7: Complex Multi-Manager Transaction
    # ========================================
    async with client.session() as session:
        # Create publication
        await session.pubsub.create_database_publication(
            name='analytics_pub',
            database='analytics',
            account='subscriber_account'
        )

        # Create local stage
        await session.stage.create_local('export_stage', '/exports/')

        # Load data using ORM model
        await session.load_data.from_csv('/data/latest.csv', Analytics)

        # Create snapshot
        await session.snapshots.create(
            name='post_load_snapshot',
            level=SnapshotLevel.DATABASE,
            database='analytics'
        )
        # All operations commit together

    # ========================================
    # Example 8: High-Performance Concurrent Loading
    # ========================================
    async with client.session() as session:
        # Load multiple files concurrently
        await asyncio.gather(
            session.load_data.from_csv('/data/users.csv', User),
            session.load_data.from_csv('/data/orders.csv', Order),
            session.load_data.from_csv('/data/products.csv', Product)
        )
        # All loads commit atomically

    await client.disconnect()

asyncio.run(main())

Best Practices:

  1. Always use async with: Use async with client.session() for automatic cleanup

  2. Await all operations: All execute/manager operations must be awaited

  3. Use asyncio.gather(): For concurrent operations within session

  4. Keep transactions short: Long transactions can block other operations

  5. Handle exceptions: Wrap session code in try-except for error handling

  6. Use ORM-style SQL: Prefer SQLAlchemy insert(), update(), select(), delete()

See also

  • AsyncSession: The async session class returned by this method

  • Client.session(): Synchronous version

  • AsyncClient.execute(): Non-transactional async query execution

property snapshots: AsyncSnapshotManager

Get async snapshot manager

property clone: AsyncCloneManager

Get async clone manager

property branch: AsyncBranchManager

Get async branch manager for Git-style version control operations.

Provides asynchronous table and database branching, diffing, and merging capabilities. Requires MatrixOne 3.0.5 or higher.

Returns:

AsyncBranchManager instance for async branch operations

Example:

import asyncio
from matrixone import AsyncClient

async def main():
    client = AsyncClient()
    await client.connect(database='test')

    # Create table branch
    await client.branch.create_table_branch('users_branch', 'users')

    # Create database branch
    await client.branch.create_database_branch('dev_db', 'production')

    # Compare branches
    diffs = await client.branch.diff_table('users_branch', 'users')

    # Merge branches
    await client.branch.merge_table('users_branch', 'users')

    # Delete branches
    await client.branch.delete_table_branch('users_branch')
    await client.branch.delete_database_branch('dev_db')

    await client.disconnect()

asyncio.run(main())

See also

  • branch_guide - Complete branch management guide

property moctl: AsyncMoCtlManager

Get async mo_ctl manager

property restore: AsyncRestoreManager

Get async restore manager

property pitr: AsyncPitrManager

Get async PITR manager

property pubsub: AsyncPubSubManager

Get async publish-subscribe manager

property account: AsyncAccountManager

Get async account manager

connected() bool[source]

Check if client is connected to database

property vector_ops

Get unified vector operations manager for vector operations (index and data)

get_pinecone_index(table_name_or_model, vector_column: str)[source]

Get a PineconeCompatibleIndex object for vector search operations.

This method creates a Pinecone-compatible vector search interface that automatically parses the table schema and vector index configuration. The primary key column is automatically detected, and all other columns except the vector column will be included as metadata.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
vector_column: Name of the vector column

Returns:

PineconeCompatibleIndex object with Pinecone-compatible API

Example:

index = await client.get_pinecone_index("documents", "embedding")
results = await index.query_async([0.1, 0.2, 0.3], top_k=5)
for match in results.matches:
    print(f"ID: {match.id}, Score: {match.score}")
property fulltext_index

Get fulltext index manager for fulltext index operations

async version() str[source]

Get MatrixOne server version asynchronously

Returns:

str: MatrixOne server version string

Raises:

ConnectionError: If not connected to MatrixOne
QueryError: If version query fails

Example

>>> client = AsyncClient()
        >>> await client.connect('localhost', 6001, 'root', '111', 'test')
        >>> version = await client.version()
        >>> print(f"MatrixOne version: {version}")
async git_version() str[source]

Get MatrixOne git version information asynchronously

Returns:

str: MatrixOne git version string

Raises:

ConnectionError: If not connected to MatrixOne
QueryError: If git version query fails

Example

>>> client = AsyncClient()
        >>> await client.connect('localhost', 6001, 'root', '111', 'test')
        >>> git_version = await client.git_version()
        >>> print(f"MatrixOne git version: {git_version}")
async get_secondary_index_tables(table_name: str, database_name: str | None = None) List[str][source]

Get all secondary index table names for a given table (async version).

This includes both regular secondary indexes (MULTIPLE type) and UNIQUE indexes.

Parameters:
  • table_name – Name of the table to get secondary indexes for

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

List of secondary index table names (includes both __mo_index_secondary_… and __mo_index_unique_… tables)

Examples:

>>> async with AsyncClient() as client:
...     await client.connect(host='localhost', port=6001, user='root', password='111', database='test')
...     # Use current database
...     index_tables = await client.get_secondary_index_tables('cms_all_content_chunk_info')
...     # Or specify database explicitly
...     index_tables = await client.get_secondary_index_tables('cms_all_content_chunk_info', 'test')
...     print(index_tables)
async get_secondary_index_table_by_name(table_name: str, index_name: str, database_name: str | None = None) str | None[source]

Get the physical table name of a secondary index by its index name (async version).

Parameters:
  • table_name – Name of the table

  • index_name – Name of the secondary index

  • database_name – Name of the database (optional). If None, uses the current database.

Returns:

Physical table name of the secondary index, or None if not found

Examples:

>>> async with AsyncClient() as client:
...     await client.connect(host='localhost', port=6001, user='root', password='111', database='test')
...     # Use current database
...     index_table = await client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id')
...     # Or specify database explicitly
...     index_table = await client.get_secondary_index_table_by_name(
...         'cms_all_content_chunk_info', 'cms_id', 'test'
...     )
...     print(index_table)
async verify_table_index_counts(table_name: str) int[source]

Verify that the main table and all its secondary index tables have the same row count (async version).

This method compares the COUNT(*) of the main table with all its secondary index tables in a single SQL query for consistency. If counts don’t match, raises an exception.

Parameters:

table_name – Name of the table to verify

Returns:

Row count (int) if verification succeeds

Raises:

ValueError – If any secondary index table has a different count than the main table, with details about all counts in the error message

Examples:

>>> async with AsyncClient() as client:
...     await client.connect(host='localhost', port=6001, user='root', password='111', database='test')
...     count = await client.verify_table_index_counts('cms_all_content_chunk_info')
...     print(f"✓ Verification passed, row count: {count}")

>>> # If verification fails:
>>> try:
...     count = await client.verify_table_index_counts('some_table')
... except ValueError as e:
...     print(f"Verification failed: {e}")
async create_table(table_name_or_model, columns: dict | None = None, **kwargs) AsyncClient[source]

Create a table asynchronously

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
columns: Dictionary mapping column names to their definitions (required if table_name_or_model is str)
**kwargs: Additional table creation options

Returns:

AsyncClient: Self for chaining

Example

>>> await client.create_table("users", {
        ...     "id": "int primary key",
        ...     "name": "varchar(100)",
        ...     "email": "varchar(255)"
        ... })
async drop_table(table_name_or_model) AsyncClient[source]

Drop a table asynchronously

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class

Returns:

AsyncClient: Self for chaining

Example

>>> await client.drop_table("users")
async create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) AsyncClient[source]

Create a table with indexes asynchronously

Args:

table_name: Name of the table to create
columns: Dictionary mapping column names to their definitions
indexes: List of index definitions
**kwargs: Additional table creation options

Returns:

AsyncClient: Self for chaining

Example

>>> await client.create_table_with_index("users", {
        ...     "id": "int primary key",
        ...     "name": "varchar(100)",
        ...     "email": "varchar(255)"
        ... }, [
        ...     {"name": "idx_name", "columns": ["name"]},
        ...     {"name": "idx_email", "columns": ["email"], "unique": True}
        ... ])
async create_table_orm(table_name: str, *columns, **kwargs) AsyncClient[source]

Create a table using SQLAlchemy ORM asynchronously

Args:

table_name: Name of the table to create
*columns: SQLAlchemy column definitions
**kwargs: Additional table creation options

Returns:

AsyncClient: Self for chaining

Example

>>> from sqlalchemy import Column, Integer, String
        >>> await client.create_table_orm("users",
        ...     Column("id", Integer, primary_key=True),
        ...     Column("name", String(100)),
        ...     Column("email", String(255))
        ... )
property metadata: AsyncMetadataManager | None

Get metadata manager for table metadata operations

property load_data: AsyncLoadDataManager | None

Get async load data manager for bulk data loading operations

property stage: AsyncStageManager | None

Get async stage manager for external stage operations

property cdc: AsyncCDCManager | None

Get async CDC manager for change data capture operations.

AsyncResultSet Class

class matrixone.async_client.AsyncResultSet(columns: List[str], rows: List[Tuple], affected_rows: int = 0)[source]

Bases: object

Async result set wrapper for query results

__init__(columns: List[str], rows: List[Tuple], affected_rows: int = 0)[source]
fetchall() List[Tuple][source]

Fetch all remaining rows

fetchone() Tuple | None[source]

Fetch one row

fetchmany(size: int = 1) List[Tuple][source]

Fetch many rows

scalar() Any[source]

Get scalar value (first column of first row)

keys()[source]

Get column names