AsyncClient
MatrixOne Async Client - Asynchronous implementation
- class matrixone.async_client.AsyncResultSet(columns: List[str], rows: List[Tuple], affected_rows: int = 0)[source]
Bases:
objectAsync result set wrapper for query results
- keys()[source]
Get column names
- class matrixone.async_client.AsyncSnapshotManager(client)[source]
Bases:
objectAsync snapshot manager
- __init__(client)[source]
- class matrixone.async_client.AsyncCloneManager(client)[source]
Bases:
objectAsync clone manager
- __init__(client)[source]
- async clone_database(target_db: str, source_db: str, snapshot_name: str | None = None, if_not_exists: bool = False) None[source]
Clone database asynchronously
- async clone_table(target_table: str, source_table: str, snapshot_name: str | None = None, if_not_exists: bool = False) None[source]
Clone table asynchronously
- class matrixone.async_client.AsyncRestoreManager(client)[source]
Bases:
objectAsync manager for restore operations
- __init__(client)[source]
- async restore_cluster(snapshot_name: str) bool[source]
Restore entire cluster from snapshot asynchronously
- async restore_tenant(snapshot_name: str, account_name: str, to_account: str | None = None) bool[source]
Restore tenant from snapshot asynchronously
- class matrixone.async_client.AsyncMoCtlManager(client)[source]
Bases:
objectAsync mo_ctl manager
- __init__(client)[source]
- class matrixone.async_client.AsyncClientExecutor(client)[source]
Bases:
BaseMatrixOneExecutorAsync client executor that uses AsyncClient’s execute method
- __init__(client)[source]
- class matrixone.async_client.AsyncClient(connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]
Bases:
BaseMatrixOneClientMatrixOne Async Client - Asynchronous interface for MatrixOne database operations.
This class provides a comprehensive asynchronous interface for connecting to and interacting with MatrixOne databases. It supports modern async/await patterns including table creation, data insertion, querying, vector operations, and transaction management.
Key Features:
Asynchronous connection management with connection pooling
High-level table operations (create_table, drop_table, insert, batch_insert)
Query builder interface for complex async queries
Vector operations (similarity search, range search, indexing)
Async transaction management with context managers
Snapshot and restore operations
Account and user management
Fulltext search capabilities
Non-blocking I/O operations
Supported Operations:
Async connection and disconnection
Async query execution (SELECT, INSERT, UPDATE, DELETE)
Async batch operations
Async transaction management
Async table creation and management
Async vector and fulltext operations
Async snapshot and restore operations
Usage Examples:
Basic async usage:: async def main(): client = AsyncClient() await client.connect('localhost', 6001, 'root', '111', 'test') # Create table using high-level API await client.create_table("users", { "id": "int primary key", "name": "varchar(100)", "email": "varchar(255)" }) # Insert data await client.insert("users", {"id": 1, "name": "John", "email": "john@example.com"}) # Query data result = await client.query("users").where("id = ?", 1).all() print(result.rows) await client.disconnect() Vector operations:: async def vector_example(): client = AsyncClient() await client.connect('localhost', 6001, 'root', '111', 'test') # Create vector table await client.create_table("documents", { "id": "int primary key", "content": "text", "embedding": "vecf32(384)" }) # Vector similarity search results = await client.vector_ops.similarity_search( "documents", vector_column="embedding", query_vector=[0.1, 0.2, 0.3, ...], # 384-dimensional vector limit=10, distance_type="l2" ) await client.disconnect() Async transaction usage:: async def transaction_example(): client = AsyncClient() await client.connect('localhost', 6001, 'root', '111', 'test') async with client.transaction() as tx: await tx.execute("INSERT INTO users (name) VALUES (?)", ("John",)) await tx.execute("INSERT INTO orders (user_id, amount) VALUES (?, ?)", (1, 100.0)) # Transaction commits automatically on success
Note: This class requires asyncio and async database drivers. Use the synchronous Client class for blocking operations or when async support is not needed.
- __init__(connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]
Initialize MatrixOne async client
Args:
connection_timeout: Connection timeout in seconds query_timeout: Query timeout in seconds auto_commit: Enable auto-commit mode charset: Character set for connection logger: Custom logger instance. If None, creates a default logger sql_log_mode: SQL logging mode ('off', 'auto', 'simple', 'full') - 'off': No SQL logging - 'auto': Smart logging - short SQL shown fully, long SQL summarized (default) - 'simple': Show operation summary only - 'full': Show complete SQL regardless of length slow_query_threshold: Threshold in seconds for slow query warnings (default: 1.0) max_sql_display_length: Maximum SQL length in auto mode before summarizing (default: 500)
- async connect(*, host: str = 'localhost', port: int = 6001, user: str = 'root', password: str = '111', database: str, account: str | None = None, role: str | None = None, charset: str = 'utf8mb4', connection_timeout: int = 30, auto_commit: bool = True, on_connect: ConnectionHook | List[ConnectionAction | str] | Callable | None = None)[source]
Connect to MatrixOne database asynchronously
Args:
host: Database host port: Database port user: Username or login info in format "user", "account#user", "account#user#role", "account:user", or "account:user:role" (both '#' and ':' separators are supported) password: Password database: Database name account: Optional account name (will be combined with user if user doesn't contain '#' or ':') role: Optional role name (will be combined with user if user doesn't contain '#' or ':') charset: Character set for the connection (default: utf8mb4) connection_timeout: Connection timeout in seconds (default: 30) auto_commit: Enable autocommit (default: True) on_connect: Connection hook to execute after successful connection. Can be: - ConnectionHook instance - List of ConnectionAction or string action names - Custom callback function (async or sync)
Examples:
# Enable all features after connection await client.connect(host, port, user, password, database, on_connect=[ConnectionAction.ENABLE_ALL]) # Enable only vector operations with custom charset await client.connect(host, port, user, password, database, charset="utf8mb4", on_connect=[ConnectionAction.ENABLE_VECTOR]) # Custom async callback async def my_callback(client): print(f"Connected to {client._connection_params['host']}") await client.connect(host, port, user, password, database, on_connect=my_callback)
- classmethod from_engine(engine: AsyncEngine, **kwargs) AsyncClient[source]
Create AsyncClient instance from existing SQLAlchemy AsyncEngine
Args:
engine: SQLAlchemy AsyncEngine instance (must use MySQL driver) **kwargs: Additional client configuration options
Returns:
AsyncClient: Configured async client instance
Raises:
ConnectionError: If engine doesn't use MySQL driver
Examples
Basic usage:
from sqlalchemy.ext.asyncio import create_async_engine from matrixone import AsyncClient engine = create_async_engine("mysql+aiomysql://user:pass@host:port/db") client = AsyncClient.from_engine(engine) With custom configuration:: engine = create_async_engine("mysql+aiomysql://user:pass@host:port/db") client = AsyncClient.from_engine( engine, sql_log_mode='auto', slow_query_threshold=0.5 )
- async disconnect()[source]
Disconnect from MatrixOne database asynchronously
- disconnect_sync()[source]
Synchronous disconnect for cleanup when event loop is closed
- __del__()[source]
Cleanup when object is garbage collected
- get_sqlalchemy_engine() AsyncEngine[source]
Get SQLAlchemy async engine
Returns:
SQLAlchemy AsyncEngine
- async create_all(base_class=None)[source]
Create all tables defined in the given base class or default Base.
Args:
base_class: SQLAlchemy declarative base class. If None, uses the default Base.
- async drop_all(base_class=None)[source]
Drop all tables defined in the given base class or default Base.
Args:
base_class: SQLAlchemy declarative base class. If None, uses the default Base.
- async execute(sql_or_stmt, params: Tuple | None = None, log_mode: str | None = None) AsyncResultSet[source]
Execute SQL query or SQLAlchemy statement asynchronously without transaction isolation.
This method executes queries asynchronously using the connection pool, without wrapping them in a transaction. Each statement executes independently with auto-commit enabled. For atomic multi-statement operations, use async with client.session() instead.
The method supports both SQLAlchemy ORM-style statements (recommended) and string SQL with parameter binding. It’s ideal for single-statement async operations like SELECT queries, simple INSERT/UPDATE/DELETE, or DDL statements.
Key Features:
Async/await support: Non-blocking execution using async/await patterns
ORM-style statements: Full support for SQLAlchemy select(), insert(), update(), delete()
Parameter binding: Automatic escaping of parameters to prevent SQL injection
Query logging: Integrated async logging with performance tracking
Auto-commit: Each statement commits immediately (no transaction isolation)
Connection pooling: Efficient async connection reuse from pool
- Parameters:
sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - SQLAlchemy select() statement (recommended) - SQLAlchemy insert() statement (recommended) - SQLAlchemy update() statement (recommended) - SQLAlchemy delete() statement (recommended) - String SQL with ‘?’ placeholders for parameters - SQLAlchemy text() statement
params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Automatically escaped to prevent SQL injection. Ignored for SQLAlchemy statements.
log_mode (Optional[str]) – Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If None, uses client’s global sql_log_mode setting. Useful for debugging or disabling logs for frequently-executed queries.
- Returns:
- Async query result object with:
columns: List[str] - Column names
rows: List[Tuple] - Row data as tuples
affected_rows: int - Number of rows affected by DML operations
fetchall() -> List[Row] - Get all rows as list
fetchone() -> Optional[Row] - Get next row or None
fetchmany(size) -> List[Row] - Get next N rows
- Return type:
- Raises:
ConnectionError – If not connected to database
QueryError – If query execution fails or SQL syntax is invalid
Usage Examples:
from matrixone import AsyncClient from sqlalchemy import select, insert, update, delete, and_, or_, func from sqlalchemy.orm import declarative_base import asyncio Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) status = Column(String(20)) class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) user_id = Column(Integer) amount = Column(Float) async def main(): client = AsyncClient() await client.connect(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # SQLAlchemy SELECT Statements (Recommended) # ======================================== # Basic SELECT with WHERE clause stmt = select(User).where(User.age > 25) result = await client.execute(stmt) for user in result.fetchall(): print(f"User: {user.name}, Age: {user.age}") # SELECT specific columns stmt = select(User.name, User.email).where(User.status == 'active') result = await client.execute(stmt) for name, email in result.fetchall(): print(f"{name}: {email}") # Complex WHERE with AND/OR stmt = select(User).where( and_( User.age > 18, or_( User.status == 'active', User.status == 'pending' ) ) ) result = await client.execute(stmt) # SELECT with JOIN stmt = select(User, Order).join(Order, User.id == Order.user_id) result = await client.execute(stmt) for user, order in result.fetchall(): print(f"{user.name} ordered ${order.amount}") # SELECT with aggregation stmt = select(func.count(User.id), func.avg(User.age)).where(User.status == 'active') result = await client.execute(stmt) count, avg_age = result.fetchone() print(f"Active users: {count}, Average age: {avg_age}") # ======================================== # SQLAlchemy INSERT Statements (Recommended) # ======================================== # Single INSERT stmt = insert(User).values(name='John', email='john@example.com', age=30) result = await client.execute(stmt) print(f"Inserted {result.affected_rows} rows") # Bulk INSERT stmt = insert(User).values([ {'name': 'Alice', 'email': 'alice@example.com', 'age': 28}, {'name': 'Bob', 'email': 'bob@example.com', 'age': 35}, {'name': 'Carol', 'email': 'carol@example.com', 'age': 42} ]) result = await client.execute(stmt) print(f"Inserted {result.affected_rows} rows") # ======================================== # SQLAlchemy UPDATE Statements (Recommended) # ======================================== # Simple UPDATE stmt = update(User).where(User.id == 1).values(email='newemail@example.com') result = await client.execute(stmt) print(f"Updated {result.affected_rows} rows") # Conditional UPDATE stmt = update(User).where(User.age < 18).values(status='minor') result = await client.execute(stmt) # UPDATE with expressions stmt = update(Order).values(total=Order.quantity * Order.price) result = await client.execute(stmt) # ======================================== # SQLAlchemy DELETE Statements (Recommended) # ======================================== # Simple DELETE stmt = delete(User).where(User.id == 1) result = await client.execute(stmt) print(f"Deleted {result.affected_rows} rows") # Conditional DELETE stmt = delete(User).where(User.status == 'deleted') result = await client.execute(stmt) # DELETE with complex condition stmt = delete(User).where( and_( User.age < 18, User.status == 'inactive' ) ) result = await client.execute(stmt) # ======================================== # Concurrent Execution with asyncio.gather # ======================================== # Execute multiple independent queries concurrently user_stmt = select(User).where(User.age > 25) order_stmt = select(Order).where(Order.amount > 100) user_result, order_result = await asyncio.gather( client.execute(user_stmt), client.execute(order_stmt) ) print(f"Users: {len(user_result.fetchall())}") print(f"Orders: {len(order_result.fetchall())}") # ======================================== # String SQL with Parameters (Alternative) # ======================================== # SELECT with parameters result = await client.execute( "SELECT * FROM users WHERE age > ? AND status = ?", (25, 'active') ) # INSERT with parameters result = await client.execute( "INSERT INTO users (name, email, age) VALUES (?, ?, ?)", ('David', 'david@example.com', 28) ) # UPDATE with parameters result = await client.execute( "UPDATE users SET status = ? WHERE age < ?", ('minor', 18) ) # ======================================== # Query Logging Control # ======================================== # Disable logging for frequently executed query result = await client.execute( select(User).where(User.id == 1), log_mode='off' ) # Force full SQL logging for debugging result = await client.execute( select(User).where(User.name.like('%test%')), log_mode='full' ) await client.disconnect() asyncio.run(main())
Important Notes:
No transaction isolation: Each execute() call commits immediately
Use session() for transactions: For atomic multi-statement operations
ORM-style preferred: Use SQLAlchemy statements for better type safety
Auto-commit behavior: Changes are permanent immediately after execute()
Non-blocking: Uses async/await and doesn’t block event loop
Concurrent execution: Use asyncio.gather() for parallel queries
Best Practices:
Prefer ORM-style statements: Use select(), insert(), update(), delete()
Use parameters: Always use parameter binding to prevent SQL injection
Session for transactions: Use client.session() for atomic operations
Use asyncio.gather(): For concurrent independent queries
Disable logging in production: Use log_mode=’off’ for hot paths
Handle exceptions: Wrap execute() in try-except for error handling
See also
AsyncClient.session(): For transaction-aware async operations
AsyncSession.execute(): Execute within async transaction context
Client.execute(): Synchronous version
- query(*columns, snapshot: str | None = None)[source]
Get async MatrixOne query builder - SQLAlchemy style
Args:
*columns: Can be: - Single model class: query(Article) - returns all columns from model - Multiple columns: query(Article.id, Article.title) - returns specific columns - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns snapshot: Optional snapshot name for snapshot queries
Examples
- # Traditional model query (all columns)
await client.query(Article).filter(…).all()
# Column-specific query await client.query(Article.id, Article.title).filter(…).all()
# With fulltext score await client.query(Article.id, boolean_match(“title”, “content”).must(“python”).label(“score”))
# Snapshot query await client.query(Article, snapshot=”my_snapshot”).filter(…).all()
Returns:
AsyncMatrixOneQuery instance configured for the specified columns
- snapshot(snapshot_name: str)[source]
Snapshot context manager
Usage
async with client.snapshot(“daily_backup”) as snapshot_client: result = await snapshot_client.execute(“SELECT * FROM users”)
- async insert(table_name_or_model, data: dict) AsyncResultSet[source]
Insert data into a table asynchronously.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class data: Data to insert (dict with column names as keys)
Returns:
AsyncResultSet object
- async batch_insert(table_name_or_model, data_list: list) AsyncResultSet[source]
Batch insert data into a table asynchronously.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class data_list: List of data dictionaries to insert
Returns:
AsyncResultSet object
- session()[source]
Create an async transaction-aware session for atomic database operations.
This method returns an AsyncSession that extends SQLAlchemy AsyncSession with MatrixOne-specific features. All operations within the session are executed atomically using async/await patterns - they either all succeed or all fail together.
The session is an async context manager that automatically handles transaction lifecycle: - Commits the transaction when the context exits normally - Rolls back the transaction if any exception occurs - Cleans up database resources automatically - Enables non-blocking concurrent operations
Key Features:
Full async SQLAlchemy ORM: All standard async Session methods with await
Atomic transactions: Multiple async operations commit or rollback together
Async MatrixOne managers: All MatrixOne operations available asynchronously
Concurrent execution: Use asyncio.gather() for parallel operations
Non-blocking: All operations use async/await and don’t block event loop
ORM-style operations: Use SQLAlchemy select(), insert(), update(), delete()
Available Async Managers (transaction-aware):
session.snapshots: AsyncSnapshotManager for async snapshot operations
session.clone: AsyncCloneManager for async clone operations
session.restore: AsyncRestoreManager for async restore operations
session.pitr: AsyncPitrManager for async point-in-time recovery
session.pubsub: AsyncPubSubManager for async publish-subscribe
session.account: AsyncAccountManager for async account management
session.vector_ops: AsyncVectorManager for async vector operations
session.fulltext_index: AsyncFulltextIndexManager for async fulltext search
session.metadata: AsyncMetadataManager for async metadata analysis
session.load_data: AsyncLoadDataManager for async bulk loading
session.stage: AsyncStageManager for async stage management
- Returns:
Async context manager yielding AsyncSession
- Return type:
AsyncContextManager[AsyncSession]
- Raises:
ConnectionError – If client is not connected to database
Usage Examples:
from matrixone import AsyncClient from sqlalchemy import select, insert, update, delete from sqlalchemy.orm import declarative_base import asyncio Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) user_id = Column(Integer) amount = Column(Float) async def main(): client = AsyncClient() await client.connect(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # Example 1: Basic Async Transaction with ORM-style SQL # ======================================== async with client.session() as session: # Insert using SQLAlchemy insert() await session.execute(insert(User).values(name='John', email='john@example.com', age=30)) # Update using SQLAlchemy update() await session.execute(update(User).where(User.age < 18).values(status='minor')) # Select using SQLAlchemy select() stmt = select(User).where(User.age > 25) result = await session.execute(stmt) for user in result.scalars(): print(f"User: {user.name}") # Delete using SQLAlchemy delete() await session.execute(delete(User).where(User.status == 'inactive')) # All operations commit atomically # ======================================== # Example 2: Async ORM Operations # ======================================== async with client.session() as session: # Create new objects user1 = User(name='Alice', email='alice@example.com', age=28) user2 = User(name='Bob', email='bob@example.com', age=35) # Add to session session.add(user1) session.add(user2) # Query using ORM stmt = select(User).where(User.name == 'Alice') result = await session.execute(stmt) alice = result.scalar_one() # Update object alice.email = 'newemail@example.com' # Commit await session.commit() # ======================================== # Example 3: Concurrent Operations with asyncio.gather # ======================================== async with client.session() as session: # Execute multiple queries concurrently user_task = session.execute(select(User).where(User.age > 25)) order_task = session.execute(select(Order).where(Order.amount > 100)) user_result, order_result = await asyncio.gather(user_task, order_task) users = user_result.scalars().all() orders = order_result.scalars().all() print(f"Found {len(users)} users and {len(orders)} orders") # ======================================== # Example 4: Async MatrixOne Managers # ======================================== async with client.session() as session: from matrixone import SnapshotLevel # Create snapshot asynchronously snapshot = await session.snapshots.create( name='daily_backup', level=SnapshotLevel.DATABASE, database='production' ) # Clone database asynchronously await session.clone.clone_database( target_db='prod_copy', source_db='production', snapshot_name='daily_backup' ) # Both operations commit atomically # ======================================== # Example 5: Async Data Loading with Stages # ======================================== async with client.session() as session: # Create S3 stage using simple interface await session.stage.create_s3( name='import_stage', bucket='my-bucket', path='imports/', aws_key_id='key', aws_secret_key='secret' ) # Load data from stage using ORM model await session.load_data.from_stage_csv('import_stage', 'users.csv', User) # Update statistics await session.execute("ANALYZE TABLE users") # All operations are atomic # ======================================== # Example 6: Error Handling and Rollback # ======================================== try: async with client.session() as session: await session.execute(insert(User).values(name='Charlie', age=40)) await session.execute(insert(InvalidTable).values(data='test')) # Fails # Transaction automatically rolls back - Charlie is NOT inserted except Exception as e: print(f"Transaction failed and rolled back: {e}") # ======================================== # Example 7: Complex Multi-Manager Transaction # ======================================== async with client.session() as session: # Create publication await session.pubsub.create_database_publication( name='analytics_pub', database='analytics', account='subscriber_account' ) # Create local stage await session.stage.create_local('export_stage', '/exports/') # Load data using ORM model await session.load_data.from_csv('/data/latest.csv', Analytics) # Create snapshot await session.snapshots.create( name='post_load_snapshot', level=SnapshotLevel.DATABASE, database='analytics' ) # All operations commit together # ======================================== # Example 8: High-Performance Concurrent Loading # ======================================== async with client.session() as session: # Load multiple files concurrently await asyncio.gather( session.load_data.from_csv('/data/users.csv', User), session.load_data.from_csv('/data/orders.csv', Order), session.load_data.from_csv('/data/products.csv', Product) ) # All loads commit atomically await client.disconnect() asyncio.run(main())
Best Practices:
Always use async with: Use async with client.session() for automatic cleanup
Await all operations: All execute/manager operations must be awaited
Use asyncio.gather(): For concurrent operations within session
Keep transactions short: Long transactions can block other operations
Handle exceptions: Wrap session code in try-except for error handling
Use ORM-style SQL: Prefer SQLAlchemy insert(), update(), select(), delete()
See also
AsyncSession: The async session class returned by this method
Client.session(): Synchronous version
AsyncClient.execute(): Non-transactional async query execution
- property snapshots: AsyncSnapshotManager
Get async snapshot manager
- property clone: AsyncCloneManager
Get async clone manager
- property branch: AsyncBranchManager
Get async branch manager for Git-style version control operations.
Provides asynchronous table and database branching, diffing, and merging capabilities. Requires MatrixOne 3.0.5 or higher.
- Returns:
AsyncBranchManager instance for async branch operations
Example:
import asyncio from matrixone import AsyncClient async def main(): client = AsyncClient() await client.connect(database='test') # Create table branch await client.branch.create_table_branch('users_branch', 'users') # Create database branch await client.branch.create_database_branch('dev_db', 'production') # Compare branches diffs = await client.branch.diff_table('users_branch', 'users') # Merge branches await client.branch.merge_table('users_branch', 'users') # Delete branches await client.branch.delete_table_branch('users_branch') await client.branch.delete_database_branch('dev_db') await client.disconnect() asyncio.run(main())
See also
branch_guide - Complete branch management guide
- property moctl: AsyncMoCtlManager
Get async mo_ctl manager
- property restore: AsyncRestoreManager
Get async restore manager
- property pitr: AsyncPitrManager
Get async PITR manager
- property pubsub: AsyncPubSubManager
Get async publish-subscribe manager
- property account: AsyncAccountManager
Get async account manager
- property vector_ops
Get unified vector operations manager for vector operations (index and data)
- get_pinecone_index(table_name_or_model, vector_column: str)[source]
Get a PineconeCompatibleIndex object for vector search operations.
This method creates a Pinecone-compatible vector search interface that automatically parses the table schema and vector index configuration. The primary key column is automatically detected, and all other columns except the vector column will be included as metadata.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class vector_column: Name of the vector column
Returns:
PineconeCompatibleIndex object with Pinecone-compatible API
Example:
index = await client.get_pinecone_index("documents", "embedding") results = await index.query_async([0.1, 0.2, 0.3], top_k=5) for match in results.matches: print(f"ID: {match.id}, Score: {match.score}")
- property fulltext_index
Get fulltext index manager for fulltext index operations
- async version() str[source]
Get MatrixOne server version asynchronously
Returns:
str: MatrixOne server version string
Raises:
ConnectionError: If not connected to MatrixOne QueryError: If version query fails
Example
>>> client = AsyncClient() >>> await client.connect('localhost', 6001, 'root', '111', 'test') >>> version = await client.version() >>> print(f"MatrixOne version: {version}")
- async git_version() str[source]
Get MatrixOne git version information asynchronously
Returns:
str: MatrixOne git version string
Raises:
ConnectionError: If not connected to MatrixOne QueryError: If git version query fails
Example
>>> client = AsyncClient() >>> await client.connect('localhost', 6001, 'root', '111', 'test') >>> git_version = await client.git_version() >>> print(f"MatrixOne git version: {git_version}")
- async get_secondary_index_tables(table_name: str, database_name: str | None = None) List[str][source]
Get all secondary index table names for a given table (async version).
This includes both regular secondary indexes (MULTIPLE type) and UNIQUE indexes.
- Parameters:
table_name – Name of the table to get secondary indexes for
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
List of secondary index table names (includes both __mo_index_secondary_… and __mo_index_unique_… tables)
Examples:
>>> async with AsyncClient() as client: ... await client.connect(host='localhost', port=6001, user='root', password='111', database='test') ... # Use current database ... index_tables = await client.get_secondary_index_tables('cms_all_content_chunk_info') ... # Or specify database explicitly ... index_tables = await client.get_secondary_index_tables('cms_all_content_chunk_info', 'test') ... print(index_tables)
- async get_secondary_index_table_by_name(table_name: str, index_name: str, database_name: str | None = None) str | None[source]
Get the physical table name of a secondary index by its index name (async version).
- Parameters:
table_name – Name of the table
index_name – Name of the secondary index
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
Physical table name of the secondary index, or None if not found
Examples:
>>> async with AsyncClient() as client: ... await client.connect(host='localhost', port=6001, user='root', password='111', database='test') ... # Use current database ... index_table = await client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id') ... # Or specify database explicitly ... index_table = await client.get_secondary_index_table_by_name( ... 'cms_all_content_chunk_info', 'cms_id', 'test' ... ) ... print(index_table)
- async verify_table_index_counts(table_name: str) int[source]
Verify that the main table and all its secondary index tables have the same row count (async version).
This method compares the COUNT(*) of the main table with all its secondary index tables in a single SQL query for consistency. If counts don’t match, raises an exception.
- Parameters:
table_name – Name of the table to verify
- Returns:
Row count (int) if verification succeeds
- Raises:
ValueError – If any secondary index table has a different count than the main table, with details about all counts in the error message
Examples:
>>> async with AsyncClient() as client: ... await client.connect(host='localhost', port=6001, user='root', password='111', database='test') ... count = await client.verify_table_index_counts('cms_all_content_chunk_info') ... print(f"✓ Verification passed, row count: {count}") >>> # If verification fails: >>> try: ... count = await client.verify_table_index_counts('some_table') ... except ValueError as e: ... print(f"Verification failed: {e}")
- async create_table(table_name_or_model, columns: dict | None = None, **kwargs) AsyncClient[source]
Create a table asynchronously
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class columns: Dictionary mapping column names to their definitions (required if table_name_or_model is str) **kwargs: Additional table creation options
Returns:
AsyncClient: Self for chaining
Example
>>> await client.create_table("users", { ... "id": "int primary key", ... "name": "varchar(100)", ... "email": "varchar(255)" ... })
- async drop_table(table_name_or_model) AsyncClient[source]
Drop a table asynchronously
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class
Returns:
AsyncClient: Self for chaining
Example
>>> await client.drop_table("users")
- async create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) AsyncClient[source]
Create a table with indexes asynchronously
Args:
table_name: Name of the table to create columns: Dictionary mapping column names to their definitions indexes: List of index definitions **kwargs: Additional table creation options
Returns:
AsyncClient: Self for chaining
Example
>>> await client.create_table_with_index("users", { ... "id": "int primary key", ... "name": "varchar(100)", ... "email": "varchar(255)" ... }, [ ... {"name": "idx_name", "columns": ["name"]}, ... {"name": "idx_email", "columns": ["email"], "unique": True} ... ])
- async create_table_orm(table_name: str, *columns, **kwargs) AsyncClient[source]
Create a table using SQLAlchemy ORM asynchronously
Args:
table_name: Name of the table to create *columns: SQLAlchemy column definitions **kwargs: Additional table creation options
Returns:
AsyncClient: Self for chaining
Example
>>> from sqlalchemy import Column, Integer, String >>> await client.create_table_orm("users", ... Column("id", Integer, primary_key=True), ... Column("name", String(100)), ... Column("email", String(255)) ... )
- property metadata: AsyncMetadataManager | None
Get metadata manager for table metadata operations
- property load_data: AsyncLoadDataManager | None
Get async load data manager for bulk data loading operations
- property stage: AsyncStageManager | None
Get async stage manager for external stage operations
- property cdc: AsyncCDCManager | None
Get async CDC manager for change data capture operations.
AsyncClient Class
- class matrixone.async_client.AsyncClient(connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]
Bases:
BaseMatrixOneClientMatrixOne Async Client - Asynchronous interface for MatrixOne database operations.
This class provides a comprehensive asynchronous interface for connecting to and interacting with MatrixOne databases. It supports modern async/await patterns including table creation, data insertion, querying, vector operations, and transaction management.
Key Features:
Asynchronous connection management with connection pooling
High-level table operations (create_table, drop_table, insert, batch_insert)
Query builder interface for complex async queries
Vector operations (similarity search, range search, indexing)
Async transaction management with context managers
Snapshot and restore operations
Account and user management
Fulltext search capabilities
Non-blocking I/O operations
Supported Operations:
Async connection and disconnection
Async query execution (SELECT, INSERT, UPDATE, DELETE)
Async batch operations
Async transaction management
Async table creation and management
Async vector and fulltext operations
Async snapshot and restore operations
Usage Examples:
Basic async usage:: async def main(): client = AsyncClient() await client.connect('localhost', 6001, 'root', '111', 'test') # Create table using high-level API await client.create_table("users", { "id": "int primary key", "name": "varchar(100)", "email": "varchar(255)" }) # Insert data await client.insert("users", {"id": 1, "name": "John", "email": "john@example.com"}) # Query data result = await client.query("users").where("id = ?", 1).all() print(result.rows) await client.disconnect() Vector operations:: async def vector_example(): client = AsyncClient() await client.connect('localhost', 6001, 'root', '111', 'test') # Create vector table await client.create_table("documents", { "id": "int primary key", "content": "text", "embedding": "vecf32(384)" }) # Vector similarity search results = await client.vector_ops.similarity_search( "documents", vector_column="embedding", query_vector=[0.1, 0.2, 0.3, ...], # 384-dimensional vector limit=10, distance_type="l2" ) await client.disconnect() Async transaction usage:: async def transaction_example(): client = AsyncClient() await client.connect('localhost', 6001, 'root', '111', 'test') async with client.transaction() as tx: await tx.execute("INSERT INTO users (name) VALUES (?)", ("John",)) await tx.execute("INSERT INTO orders (user_id, amount) VALUES (?, ?)", (1, 100.0)) # Transaction commits automatically on success
Note: This class requires asyncio and async database drivers. Use the synchronous Client class for blocking operations or when async support is not needed.
- __init__(connection_timeout: int = 30, query_timeout: int = 300, auto_commit: bool = True, charset: str = 'utf8mb4', logger: MatrixOneLogger | None = None, sql_log_mode: str = 'auto', slow_query_threshold: float = 1.0, max_sql_display_length: int = 500)[source]
Initialize MatrixOne async client
Args:
connection_timeout: Connection timeout in seconds query_timeout: Query timeout in seconds auto_commit: Enable auto-commit mode charset: Character set for connection logger: Custom logger instance. If None, creates a default logger sql_log_mode: SQL logging mode ('off', 'auto', 'simple', 'full') - 'off': No SQL logging - 'auto': Smart logging - short SQL shown fully, long SQL summarized (default) - 'simple': Show operation summary only - 'full': Show complete SQL regardless of length slow_query_threshold: Threshold in seconds for slow query warnings (default: 1.0) max_sql_display_length: Maximum SQL length in auto mode before summarizing (default: 500)
- async connect(*, host: str = 'localhost', port: int = 6001, user: str = 'root', password: str = '111', database: str, account: str | None = None, role: str | None = None, charset: str = 'utf8mb4', connection_timeout: int = 30, auto_commit: bool = True, on_connect: ConnectionHook | List[ConnectionAction | str] | Callable | None = None)[source]
Connect to MatrixOne database asynchronously
Args:
host: Database host port: Database port user: Username or login info in format "user", "account#user", "account#user#role", "account:user", or "account:user:role" (both '#' and ':' separators are supported) password: Password database: Database name account: Optional account name (will be combined with user if user doesn't contain '#' or ':') role: Optional role name (will be combined with user if user doesn't contain '#' or ':') charset: Character set for the connection (default: utf8mb4) connection_timeout: Connection timeout in seconds (default: 30) auto_commit: Enable autocommit (default: True) on_connect: Connection hook to execute after successful connection. Can be: - ConnectionHook instance - List of ConnectionAction or string action names - Custom callback function (async or sync)
Examples:
# Enable all features after connection await client.connect(host, port, user, password, database, on_connect=[ConnectionAction.ENABLE_ALL]) # Enable only vector operations with custom charset await client.connect(host, port, user, password, database, charset="utf8mb4", on_connect=[ConnectionAction.ENABLE_VECTOR]) # Custom async callback async def my_callback(client): print(f"Connected to {client._connection_params['host']}") await client.connect(host, port, user, password, database, on_connect=my_callback)
- classmethod from_engine(engine: AsyncEngine, **kwargs) AsyncClient[source]
Create AsyncClient instance from existing SQLAlchemy AsyncEngine
Args:
engine: SQLAlchemy AsyncEngine instance (must use MySQL driver) **kwargs: Additional client configuration options
Returns:
AsyncClient: Configured async client instance
Raises:
ConnectionError: If engine doesn't use MySQL driver
Examples
Basic usage:
from sqlalchemy.ext.asyncio import create_async_engine from matrixone import AsyncClient engine = create_async_engine("mysql+aiomysql://user:pass@host:port/db") client = AsyncClient.from_engine(engine) With custom configuration:: engine = create_async_engine("mysql+aiomysql://user:pass@host:port/db") client = AsyncClient.from_engine( engine, sql_log_mode='auto', slow_query_threshold=0.5 )
- get_sqlalchemy_engine() AsyncEngine[source]
Get SQLAlchemy async engine
Returns:
SQLAlchemy AsyncEngine
- async create_all(base_class=None)[source]
Create all tables defined in the given base class or default Base.
Args:
base_class: SQLAlchemy declarative base class. If None, uses the default Base.
- async drop_all(base_class=None)[source]
Drop all tables defined in the given base class or default Base.
Args:
base_class: SQLAlchemy declarative base class. If None, uses the default Base.
- async execute(sql_or_stmt, params: Tuple | None = None, log_mode: str | None = None) AsyncResultSet[source]
Execute SQL query or SQLAlchemy statement asynchronously without transaction isolation.
This method executes queries asynchronously using the connection pool, without wrapping them in a transaction. Each statement executes independently with auto-commit enabled. For atomic multi-statement operations, use async with client.session() instead.
The method supports both SQLAlchemy ORM-style statements (recommended) and string SQL with parameter binding. It’s ideal for single-statement async operations like SELECT queries, simple INSERT/UPDATE/DELETE, or DDL statements.
Key Features:
Async/await support: Non-blocking execution using async/await patterns
ORM-style statements: Full support for SQLAlchemy select(), insert(), update(), delete()
Parameter binding: Automatic escaping of parameters to prevent SQL injection
Query logging: Integrated async logging with performance tracking
Auto-commit: Each statement commits immediately (no transaction isolation)
Connection pooling: Efficient async connection reuse from pool
- Parameters:
sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - SQLAlchemy select() statement (recommended) - SQLAlchemy insert() statement (recommended) - SQLAlchemy update() statement (recommended) - SQLAlchemy delete() statement (recommended) - String SQL with ‘?’ placeholders for parameters - SQLAlchemy text() statement
params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Automatically escaped to prevent SQL injection. Ignored for SQLAlchemy statements.
log_mode (Optional[str]) – Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If None, uses client’s global sql_log_mode setting. Useful for debugging or disabling logs for frequently-executed queries.
- Returns:
- Async query result object with:
columns: List[str] - Column names
rows: List[Tuple] - Row data as tuples
affected_rows: int - Number of rows affected by DML operations
fetchall() -> List[Row] - Get all rows as list
fetchone() -> Optional[Row] - Get next row or None
fetchmany(size) -> List[Row] - Get next N rows
- Return type:
- Raises:
ConnectionError – If not connected to database
QueryError – If query execution fails or SQL syntax is invalid
Usage Examples:
from matrixone import AsyncClient from sqlalchemy import select, insert, update, delete, and_, or_, func from sqlalchemy.orm import declarative_base import asyncio Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) status = Column(String(20)) class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) user_id = Column(Integer) amount = Column(Float) async def main(): client = AsyncClient() await client.connect(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # SQLAlchemy SELECT Statements (Recommended) # ======================================== # Basic SELECT with WHERE clause stmt = select(User).where(User.age > 25) result = await client.execute(stmt) for user in result.fetchall(): print(f"User: {user.name}, Age: {user.age}") # SELECT specific columns stmt = select(User.name, User.email).where(User.status == 'active') result = await client.execute(stmt) for name, email in result.fetchall(): print(f"{name}: {email}") # Complex WHERE with AND/OR stmt = select(User).where( and_( User.age > 18, or_( User.status == 'active', User.status == 'pending' ) ) ) result = await client.execute(stmt) # SELECT with JOIN stmt = select(User, Order).join(Order, User.id == Order.user_id) result = await client.execute(stmt) for user, order in result.fetchall(): print(f"{user.name} ordered ${order.amount}") # SELECT with aggregation stmt = select(func.count(User.id), func.avg(User.age)).where(User.status == 'active') result = await client.execute(stmt) count, avg_age = result.fetchone() print(f"Active users: {count}, Average age: {avg_age}") # ======================================== # SQLAlchemy INSERT Statements (Recommended) # ======================================== # Single INSERT stmt = insert(User).values(name='John', email='john@example.com', age=30) result = await client.execute(stmt) print(f"Inserted {result.affected_rows} rows") # Bulk INSERT stmt = insert(User).values([ {'name': 'Alice', 'email': 'alice@example.com', 'age': 28}, {'name': 'Bob', 'email': 'bob@example.com', 'age': 35}, {'name': 'Carol', 'email': 'carol@example.com', 'age': 42} ]) result = await client.execute(stmt) print(f"Inserted {result.affected_rows} rows") # ======================================== # SQLAlchemy UPDATE Statements (Recommended) # ======================================== # Simple UPDATE stmt = update(User).where(User.id == 1).values(email='newemail@example.com') result = await client.execute(stmt) print(f"Updated {result.affected_rows} rows") # Conditional UPDATE stmt = update(User).where(User.age < 18).values(status='minor') result = await client.execute(stmt) # UPDATE with expressions stmt = update(Order).values(total=Order.quantity * Order.price) result = await client.execute(stmt) # ======================================== # SQLAlchemy DELETE Statements (Recommended) # ======================================== # Simple DELETE stmt = delete(User).where(User.id == 1) result = await client.execute(stmt) print(f"Deleted {result.affected_rows} rows") # Conditional DELETE stmt = delete(User).where(User.status == 'deleted') result = await client.execute(stmt) # DELETE with complex condition stmt = delete(User).where( and_( User.age < 18, User.status == 'inactive' ) ) result = await client.execute(stmt) # ======================================== # Concurrent Execution with asyncio.gather # ======================================== # Execute multiple independent queries concurrently user_stmt = select(User).where(User.age > 25) order_stmt = select(Order).where(Order.amount > 100) user_result, order_result = await asyncio.gather( client.execute(user_stmt), client.execute(order_stmt) ) print(f"Users: {len(user_result.fetchall())}") print(f"Orders: {len(order_result.fetchall())}") # ======================================== # String SQL with Parameters (Alternative) # ======================================== # SELECT with parameters result = await client.execute( "SELECT * FROM users WHERE age > ? AND status = ?", (25, 'active') ) # INSERT with parameters result = await client.execute( "INSERT INTO users (name, email, age) VALUES (?, ?, ?)", ('David', 'david@example.com', 28) ) # UPDATE with parameters result = await client.execute( "UPDATE users SET status = ? WHERE age < ?", ('minor', 18) ) # ======================================== # Query Logging Control # ======================================== # Disable logging for frequently executed query result = await client.execute( select(User).where(User.id == 1), log_mode='off' ) # Force full SQL logging for debugging result = await client.execute( select(User).where(User.name.like('%test%')), log_mode='full' ) await client.disconnect() asyncio.run(main())
Important Notes:
No transaction isolation: Each execute() call commits immediately
Use session() for transactions: For atomic multi-statement operations
ORM-style preferred: Use SQLAlchemy statements for better type safety
Auto-commit behavior: Changes are permanent immediately after execute()
Non-blocking: Uses async/await and doesn’t block event loop
Concurrent execution: Use asyncio.gather() for parallel queries
Best Practices:
Prefer ORM-style statements: Use select(), insert(), update(), delete()
Use parameters: Always use parameter binding to prevent SQL injection
Session for transactions: Use client.session() for atomic operations
Use asyncio.gather(): For concurrent independent queries
Disable logging in production: Use log_mode=’off’ for hot paths
Handle exceptions: Wrap execute() in try-except for error handling
See also
AsyncClient.session(): For transaction-aware async operations
AsyncSession.execute(): Execute within async transaction context
Client.execute(): Synchronous version
- query(*columns, snapshot: str | None = None)[source]
Get async MatrixOne query builder - SQLAlchemy style
Args:
*columns: Can be: - Single model class: query(Article) - returns all columns from model - Multiple columns: query(Article.id, Article.title) - returns specific columns - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns snapshot: Optional snapshot name for snapshot queries
Examples
- # Traditional model query (all columns)
await client.query(Article).filter(…).all()
# Column-specific query await client.query(Article.id, Article.title).filter(…).all()
# With fulltext score await client.query(Article.id, boolean_match(“title”, “content”).must(“python”).label(“score”))
# Snapshot query await client.query(Article, snapshot=”my_snapshot”).filter(…).all()
Returns:
AsyncMatrixOneQuery instance configured for the specified columns
- snapshot(snapshot_name: str)[source]
Snapshot context manager
Usage
async with client.snapshot(“daily_backup”) as snapshot_client: result = await snapshot_client.execute(“SELECT * FROM users”)
- async insert(table_name_or_model, data: dict) AsyncResultSet[source]
Insert data into a table asynchronously.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class data: Data to insert (dict with column names as keys)
Returns:
AsyncResultSet object
- async batch_insert(table_name_or_model, data_list: list) AsyncResultSet[source]
Batch insert data into a table asynchronously.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class data_list: List of data dictionaries to insert
Returns:
AsyncResultSet object
- session()[source]
Create an async transaction-aware session for atomic database operations.
This method returns an AsyncSession that extends SQLAlchemy AsyncSession with MatrixOne-specific features. All operations within the session are executed atomically using async/await patterns - they either all succeed or all fail together.
The session is an async context manager that automatically handles transaction lifecycle: - Commits the transaction when the context exits normally - Rolls back the transaction if any exception occurs - Cleans up database resources automatically - Enables non-blocking concurrent operations
Key Features:
Full async SQLAlchemy ORM: All standard async Session methods with await
Atomic transactions: Multiple async operations commit or rollback together
Async MatrixOne managers: All MatrixOne operations available asynchronously
Concurrent execution: Use asyncio.gather() for parallel operations
Non-blocking: All operations use async/await and don’t block event loop
ORM-style operations: Use SQLAlchemy select(), insert(), update(), delete()
Available Async Managers (transaction-aware):
session.snapshots: AsyncSnapshotManager for async snapshot operations
session.clone: AsyncCloneManager for async clone operations
session.restore: AsyncRestoreManager for async restore operations
session.pitr: AsyncPitrManager for async point-in-time recovery
session.pubsub: AsyncPubSubManager for async publish-subscribe
session.account: AsyncAccountManager for async account management
session.vector_ops: AsyncVectorManager for async vector operations
session.fulltext_index: AsyncFulltextIndexManager for async fulltext search
session.metadata: AsyncMetadataManager for async metadata analysis
session.load_data: AsyncLoadDataManager for async bulk loading
session.stage: AsyncStageManager for async stage management
- Returns:
Async context manager yielding AsyncSession
- Return type:
AsyncContextManager[AsyncSession]
- Raises:
ConnectionError – If client is not connected to database
Usage Examples:
from matrixone import AsyncClient from sqlalchemy import select, insert, update, delete from sqlalchemy.orm import declarative_base import asyncio Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) user_id = Column(Integer) amount = Column(Float) async def main(): client = AsyncClient() await client.connect(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # Example 1: Basic Async Transaction with ORM-style SQL # ======================================== async with client.session() as session: # Insert using SQLAlchemy insert() await session.execute(insert(User).values(name='John', email='john@example.com', age=30)) # Update using SQLAlchemy update() await session.execute(update(User).where(User.age < 18).values(status='minor')) # Select using SQLAlchemy select() stmt = select(User).where(User.age > 25) result = await session.execute(stmt) for user in result.scalars(): print(f"User: {user.name}") # Delete using SQLAlchemy delete() await session.execute(delete(User).where(User.status == 'inactive')) # All operations commit atomically # ======================================== # Example 2: Async ORM Operations # ======================================== async with client.session() as session: # Create new objects user1 = User(name='Alice', email='alice@example.com', age=28) user2 = User(name='Bob', email='bob@example.com', age=35) # Add to session session.add(user1) session.add(user2) # Query using ORM stmt = select(User).where(User.name == 'Alice') result = await session.execute(stmt) alice = result.scalar_one() # Update object alice.email = 'newemail@example.com' # Commit await session.commit() # ======================================== # Example 3: Concurrent Operations with asyncio.gather # ======================================== async with client.session() as session: # Execute multiple queries concurrently user_task = session.execute(select(User).where(User.age > 25)) order_task = session.execute(select(Order).where(Order.amount > 100)) user_result, order_result = await asyncio.gather(user_task, order_task) users = user_result.scalars().all() orders = order_result.scalars().all() print(f"Found {len(users)} users and {len(orders)} orders") # ======================================== # Example 4: Async MatrixOne Managers # ======================================== async with client.session() as session: from matrixone import SnapshotLevel # Create snapshot asynchronously snapshot = await session.snapshots.create( name='daily_backup', level=SnapshotLevel.DATABASE, database='production' ) # Clone database asynchronously await session.clone.clone_database( target_db='prod_copy', source_db='production', snapshot_name='daily_backup' ) # Both operations commit atomically # ======================================== # Example 5: Async Data Loading with Stages # ======================================== async with client.session() as session: # Create S3 stage using simple interface await session.stage.create_s3( name='import_stage', bucket='my-bucket', path='imports/', aws_key_id='key', aws_secret_key='secret' ) # Load data from stage using ORM model await session.load_data.from_stage_csv('import_stage', 'users.csv', User) # Update statistics await session.execute("ANALYZE TABLE users") # All operations are atomic # ======================================== # Example 6: Error Handling and Rollback # ======================================== try: async with client.session() as session: await session.execute(insert(User).values(name='Charlie', age=40)) await session.execute(insert(InvalidTable).values(data='test')) # Fails # Transaction automatically rolls back - Charlie is NOT inserted except Exception as e: print(f"Transaction failed and rolled back: {e}") # ======================================== # Example 7: Complex Multi-Manager Transaction # ======================================== async with client.session() as session: # Create publication await session.pubsub.create_database_publication( name='analytics_pub', database='analytics', account='subscriber_account' ) # Create local stage await session.stage.create_local('export_stage', '/exports/') # Load data using ORM model await session.load_data.from_csv('/data/latest.csv', Analytics) # Create snapshot await session.snapshots.create( name='post_load_snapshot', level=SnapshotLevel.DATABASE, database='analytics' ) # All operations commit together # ======================================== # Example 8: High-Performance Concurrent Loading # ======================================== async with client.session() as session: # Load multiple files concurrently await asyncio.gather( session.load_data.from_csv('/data/users.csv', User), session.load_data.from_csv('/data/orders.csv', Order), session.load_data.from_csv('/data/products.csv', Product) ) # All loads commit atomically await client.disconnect() asyncio.run(main())
Best Practices:
Always use async with: Use async with client.session() for automatic cleanup
Await all operations: All execute/manager operations must be awaited
Use asyncio.gather(): For concurrent operations within session
Keep transactions short: Long transactions can block other operations
Handle exceptions: Wrap session code in try-except for error handling
Use ORM-style SQL: Prefer SQLAlchemy insert(), update(), select(), delete()
See also
AsyncSession: The async session class returned by this method
Client.session(): Synchronous version
AsyncClient.execute(): Non-transactional async query execution
- property snapshots: AsyncSnapshotManager
Get async snapshot manager
- property clone: AsyncCloneManager
Get async clone manager
- property branch: AsyncBranchManager
Get async branch manager for Git-style version control operations.
Provides asynchronous table and database branching, diffing, and merging capabilities. Requires MatrixOne 3.0.5 or higher.
- Returns:
AsyncBranchManager instance for async branch operations
Example:
import asyncio from matrixone import AsyncClient async def main(): client = AsyncClient() await client.connect(database='test') # Create table branch await client.branch.create_table_branch('users_branch', 'users') # Create database branch await client.branch.create_database_branch('dev_db', 'production') # Compare branches diffs = await client.branch.diff_table('users_branch', 'users') # Merge branches await client.branch.merge_table('users_branch', 'users') # Delete branches await client.branch.delete_table_branch('users_branch') await client.branch.delete_database_branch('dev_db') await client.disconnect() asyncio.run(main())
See also
branch_guide - Complete branch management guide
- property moctl: AsyncMoCtlManager
Get async mo_ctl manager
- property restore: AsyncRestoreManager
Get async restore manager
- property pitr: AsyncPitrManager
Get async PITR manager
- property pubsub: AsyncPubSubManager
Get async publish-subscribe manager
- property account: AsyncAccountManager
Get async account manager
- property vector_ops
Get unified vector operations manager for vector operations (index and data)
- get_pinecone_index(table_name_or_model, vector_column: str)[source]
Get a PineconeCompatibleIndex object for vector search operations.
This method creates a Pinecone-compatible vector search interface that automatically parses the table schema and vector index configuration. The primary key column is automatically detected, and all other columns except the vector column will be included as metadata.
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class vector_column: Name of the vector column
Returns:
PineconeCompatibleIndex object with Pinecone-compatible API
Example:
index = await client.get_pinecone_index("documents", "embedding") results = await index.query_async([0.1, 0.2, 0.3], top_k=5) for match in results.matches: print(f"ID: {match.id}, Score: {match.score}")
- property fulltext_index
Get fulltext index manager for fulltext index operations
- async version() str[source]
Get MatrixOne server version asynchronously
Returns:
str: MatrixOne server version string
Raises:
ConnectionError: If not connected to MatrixOne QueryError: If version query fails
Example
>>> client = AsyncClient() >>> await client.connect('localhost', 6001, 'root', '111', 'test') >>> version = await client.version() >>> print(f"MatrixOne version: {version}")
- async git_version() str[source]
Get MatrixOne git version information asynchronously
Returns:
str: MatrixOne git version string
Raises:
ConnectionError: If not connected to MatrixOne QueryError: If git version query fails
Example
>>> client = AsyncClient() >>> await client.connect('localhost', 6001, 'root', '111', 'test') >>> git_version = await client.git_version() >>> print(f"MatrixOne git version: {git_version}")
- async get_secondary_index_tables(table_name: str, database_name: str | None = None) List[str][source]
Get all secondary index table names for a given table (async version).
This includes both regular secondary indexes (MULTIPLE type) and UNIQUE indexes.
- Parameters:
table_name – Name of the table to get secondary indexes for
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
List of secondary index table names (includes both __mo_index_secondary_… and __mo_index_unique_… tables)
Examples:
>>> async with AsyncClient() as client: ... await client.connect(host='localhost', port=6001, user='root', password='111', database='test') ... # Use current database ... index_tables = await client.get_secondary_index_tables('cms_all_content_chunk_info') ... # Or specify database explicitly ... index_tables = await client.get_secondary_index_tables('cms_all_content_chunk_info', 'test') ... print(index_tables)
- async get_secondary_index_table_by_name(table_name: str, index_name: str, database_name: str | None = None) str | None[source]
Get the physical table name of a secondary index by its index name (async version).
- Parameters:
table_name – Name of the table
index_name – Name of the secondary index
database_name – Name of the database (optional). If None, uses the current database.
- Returns:
Physical table name of the secondary index, or None if not found
Examples:
>>> async with AsyncClient() as client: ... await client.connect(host='localhost', port=6001, user='root', password='111', database='test') ... # Use current database ... index_table = await client.get_secondary_index_table_by_name('cms_all_content_chunk_info', 'cms_id') ... # Or specify database explicitly ... index_table = await client.get_secondary_index_table_by_name( ... 'cms_all_content_chunk_info', 'cms_id', 'test' ... ) ... print(index_table)
- async verify_table_index_counts(table_name: str) int[source]
Verify that the main table and all its secondary index tables have the same row count (async version).
This method compares the COUNT(*) of the main table with all its secondary index tables in a single SQL query for consistency. If counts don’t match, raises an exception.
- Parameters:
table_name – Name of the table to verify
- Returns:
Row count (int) if verification succeeds
- Raises:
ValueError – If any secondary index table has a different count than the main table, with details about all counts in the error message
Examples:
>>> async with AsyncClient() as client: ... await client.connect(host='localhost', port=6001, user='root', password='111', database='test') ... count = await client.verify_table_index_counts('cms_all_content_chunk_info') ... print(f"✓ Verification passed, row count: {count}") >>> # If verification fails: >>> try: ... count = await client.verify_table_index_counts('some_table') ... except ValueError as e: ... print(f"Verification failed: {e}")
- async create_table(table_name_or_model, columns: dict | None = None, **kwargs) AsyncClient[source]
Create a table asynchronously
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class columns: Dictionary mapping column names to their definitions (required if table_name_or_model is str) **kwargs: Additional table creation options
Returns:
AsyncClient: Self for chaining
Example
>>> await client.create_table("users", { ... "id": "int primary key", ... "name": "varchar(100)", ... "email": "varchar(255)" ... })
- async drop_table(table_name_or_model) AsyncClient[source]
Drop a table asynchronously
Args:
table_name_or_model: Either a table name (str) or a SQLAlchemy model class
Returns:
AsyncClient: Self for chaining
Example
>>> await client.drop_table("users")
- async create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) AsyncClient[source]
Create a table with indexes asynchronously
Args:
table_name: Name of the table to create columns: Dictionary mapping column names to their definitions indexes: List of index definitions **kwargs: Additional table creation options
Returns:
AsyncClient: Self for chaining
Example
>>> await client.create_table_with_index("users", { ... "id": "int primary key", ... "name": "varchar(100)", ... "email": "varchar(255)" ... }, [ ... {"name": "idx_name", "columns": ["name"]}, ... {"name": "idx_email", "columns": ["email"], "unique": True} ... ])
- async create_table_orm(table_name: str, *columns, **kwargs) AsyncClient[source]
Create a table using SQLAlchemy ORM asynchronously
Args:
table_name: Name of the table to create *columns: SQLAlchemy column definitions **kwargs: Additional table creation options
Returns:
AsyncClient: Self for chaining
Example
>>> from sqlalchemy import Column, Integer, String >>> await client.create_table_orm("users", ... Column("id", Integer, primary_key=True), ... Column("name", String(100)), ... Column("email", String(255)) ... )
- property metadata: AsyncMetadataManager | None
Get metadata manager for table metadata operations
- property load_data: AsyncLoadDataManager | None
Get async load data manager for bulk data loading operations
- property stage: AsyncStageManager | None
Get async stage manager for external stage operations