Session Classes

This section documents the Session and AsyncSession classes that provide transaction management capabilities.

Session

class matrixone.session.Session(bind=None, client=None, wrap_session=None, **kwargs)[source]

Bases: Session

MatrixOne Session - Transaction-aware session extending SQLAlchemy Session.

This class provides a comprehensive transaction context for executing multiple database operations atomically. It inherits all SQLAlchemy Session capabilities while adding MatrixOne-specific features like snapshots, clones, vector operations, and fulltext search.

Key Features:

  • Full SQLAlchemy API: All standard SQLAlchemy Session methods (add, delete, query, etc.)

  • Atomic transactions: All operations succeed or fail together

  • Automatic rollback: Errors trigger automatic transaction rollback

  • MatrixOne managers: Access to all MatrixOne-specific operations within transactions

  • Hybrid SQL support: Execute both SQLAlchemy statements and string SQL

  • Query logging: Integrated query logging with performance tracking

  • Context manager: Automatic transaction lifecycle management

Transaction Behavior:

  • Auto-commit on success: Transaction commits when context manager exits normally

  • Auto-rollback on error: Transaction rolls back if any exception occurs

  • Explicit control: Manual commit() and rollback() also supported

  • Isolation: All operations within session are isolated from other transactions

  • ACID compliance: Full ACID guarantees for all operations

Available Managers (all transaction-aware):

  • snapshots: SnapshotManager for creating/managing database snapshots

  • clone: CloneManager for cloning databases and tables

  • restore: RestoreManager for restoring from snapshots

  • pitr: PitrManager for point-in-time recovery operations

  • pubsub: PubSubManager for publish-subscribe operations

  • account: AccountManager for account and user management

  • vector_ops: VectorManager for vector operations and indexing

  • fulltext_index: FulltextIndexManager for fulltext search operations

  • metadata: MetadataManager for table metadata analysis

  • load_data: LoadDataManager for bulk data loading

  • stage: StageManager for external stage management

  • cdc: CDCManager for change data capture task operations

Creating Session:

There are two ways to create a MatrixOne Session:

  1. New Session (Recommended): Create directly from client:

    with client.session() as session:
        # Your operations here
        pass
    
  2. Wrap Existing Session (For Legacy Projects): Wrap existing SQLAlchemy session:

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from matrixone.session import Session as MatrixOneSession
    
    # Your existing SQLAlchemy code
    engine = create_engine('mysql+pymysql://...')
    SessionFactory = sessionmaker(bind=engine)
    sqlalchemy_session = SessionFactory()
    
    # Wrap with MatrixOne features
    mo_session = MatrixOneSession(
        client=mo_client,
        wrap_session=sqlalchemy_session
    )
    # Now you can use both SQLAlchemy and MatrixOne features
    

Usage Examples:

from matrixone import Client
from sqlalchemy import select, insert, update, delete

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# ============================================================
# Example 0: Wrapping Existing SQLAlchemy Session (Legacy Projects)
# ============================================================
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from matrixone.session import Session as MatrixOneSession

# Your existing SQLAlchemy setup
engine = create_engine('mysql+pymysql://root:111@127.0.0.1:6001/test')
SessionFactory = sessionmaker(bind=engine)
existing_session = SessionFactory()

# Wrap it with MatrixOne features
mo_session = MatrixOneSession(
    client=client,
    wrap_session=existing_session
)

try:
    # Standard SQLAlchemy operations work
    result = mo_session.execute("SELECT * FROM users")

    # MatrixOne features now available
    mo_session.stage.create_s3('backup', bucket='my-backup')
    mo_session.snapshots.create('snapshot1', level='database')
    mo_session.load_data.from_csv('/data/file.csv', 'users')

    mo_session.commit()
finally:
    mo_session.close()

# ============================================================
# Example 1: Basic Transaction with ORM-style Operations
# ============================================================
with client.session() as session:
    # All operations are atomic - succeed or fail together
    from sqlalchemy import insert, update

    # Insert using SQLAlchemy insert()
    session.execute(insert(User).values(name='John', email='john@example.com'))
    session.execute(insert(Order).values(user_id=1, amount=100.0))

    # Update using SQLAlchemy update()
    session.execute(
        update(Account).where(Account.user_id == 1).values(balance=Account.balance - 100)
    )
    # Transaction commits automatically on successful completion

# ============================================================
# Example 2: SQLAlchemy ORM Operations
# ============================================================
with client.session() as session:
    # Create new objects
    user = User(name="Alice", email="alice@example.com")
    order = Order(user_id=1, amount=50.0)

    # Add to session
        session.add(user)
    session.add(order)

    # Query using ORM
    stmt = select(User).where(User.name == "Alice")
    result = session.execute(stmt)
    users = result.scalars().all()

    # Update using ORM
    user = session.get(User, 1)
    user.email = "newemail@example.com"

    # Commit explicitly (or let context manager do it)
        session.commit()

# ============================================================
# Example 3: MatrixOne Snapshot Operations in Transaction
# ============================================================
    with client.session() as session:
    # Create snapshot within transaction
    snapshot = session.snapshots.create(
        name='daily_backup',
        level=SnapshotLevel.DATABASE,
        database='production'
    )

    # Clone database within same transaction
    session.clone.clone_database(
        target_db='production_copy',
        source_db='production',
        snapshot_name='daily_backup'
    )
    # Both operations commit atomically

# ============================================================
# Example 4: Bulk Data Loading in Transaction
# ============================================================
with client.session() as session:
    # Load data files atomically
    session.load_data.from_csv('/data/users.csv', 'users')
    session.load_data.from_csv('/data/orders.csv', 'orders')

    # Update statistics after loading
    session.execute("ANALYZE TABLE users")
    session.execute("ANALYZE TABLE orders")
    # All loads and updates commit together

# ============================================================
# Example 5: Error Handling with Automatic Rollback
# ============================================================
try:
    with client.session() as session:
        session.execute("INSERT INTO users (name) VALUES ('Bob')")
        session.execute("INSERT INTO invalid_table (data) VALUES ('test')")  # This fails
        # Transaction automatically rolls back - Bob is NOT inserted
except Exception as e:
    print(f"Transaction failed and rolled back: {e}")

# ============================================================
# Example 6: Manual Transaction Control
# ============================================================
with client.session() as session:
    try:
        session.execute("INSERT INTO users (name) VALUES ('Charlie')")
        session.execute("UPDATE accounts SET balance = balance - 50")

        # Verify conditions before committing
        result = session.execute("SELECT balance FROM accounts WHERE id = 1")
        balance = result.scalar()

        if balance >= 0:
            session.commit()  # Explicit commit
        else:
            session.rollback()  # Explicit rollback
            print("Transaction rolled back due to insufficient balance")
    except Exception as e:
        session.rollback()
        raise

# ============================================================
# Example 7: Complex Multi-Manager Transaction
# ============================================================
with client.session() as session:
    # Create publication
    pub = session.pubsub.create_database_publication(
        name='analytics_data',
        database='analytics',
        account='subscriber_account'
    )

    # Create stage using simple interface
    session.stage.create_local('export_stage', '/exports/')

    # Load fresh data
    session.load_data.from_csv('/data/latest.csv', Analytics)  # Use ORM model

    # Create snapshot after load
    session.snapshots.create(
        name='post_load_snapshot',
        level=SnapshotLevel.DATABASE,
        database='analytics'
    )
    # All operations are atomic

# ============================================================
# Example 8: Vector Operations in Transaction
# ============================================================
with client.session() as session:
    # Create vector table
    session.create_table('documents', {
        'id': 'int primary key',
        'content': 'text',
        'embedding': 'vecf32(384)'
    })

    # Create vector index
    session.vector_ops.create_ivf(
        'documents',
        name='doc_idx',
        column='embedding',
        lists=100
    )

    # Insert vector data
    session.insert('documents', {
        'id': 1,
        'content': 'sample document',
        'embedding': [0.1] * 384
    })

Best Practices:

  1. Always use context manager: Use with client.session() for automatic cleanup

  2. Keep transactions short: Long transactions can block other operations

  3. Handle exceptions: Wrap session code in try-except for proper error handling

  4. Avoid nested transactions: SQLAlchemy doesn’t support true nested transactions

  5. Use explicit commits: When you need fine-grained control over transaction boundaries

  6. Test rollback behavior: Ensure your application handles rollbacks correctly

Important Notes:

  • Session is created by Client.session() context manager

  • Don’t instantiate Session directly

  • All manager operations within session are transaction-aware

  • Session automatically manages transaction lifecycle

  • Errors trigger automatic rollback

  • Normal exit triggers automatic commit

See also

  • Client.session(): Creates and manages Session instances

  • AsyncSession: Async version for async/await workflows

  • SQLAlchemy Session: Parent class documentation

__init__(bind=None, client=None, wrap_session=None, **kwargs)[source]

Initialize MatrixOne Session.

Parameters:
  • bind – SQLAlchemy Engine or Connection to bind to

  • client – MatrixOne Client instance

  • wrap_session – Existing SQLAlchemy Session to wrap with MatrixOne features

  • **kwargs – Additional arguments passed to SQLAlchemy Session

Examples

# Create new session from engine session = Session(bind=engine, client=client)

# Wrap existing SQLAlchemy session (for legacy projects) existing_session = sessionmaker(bind=engine)() mo_session = Session(client=client, wrap_session=existing_session)

execute(sql_or_stmt, params: Tuple | None = None, **kwargs)[source]

Execute SQL or SQLAlchemy statement within the current session transaction.

This method extends SQLAlchemy’s Session.execute() with MatrixOne-specific features:

  • Parameter substitution for string SQL using ‘?’ placeholders

  • Integrated query logging with performance tracking

  • Support for both SQLAlchemy statements and raw SQL strings

All queries executed within the session participate in the current transaction. Changes are committed when the session context exits normally, or rolled back on error.

Parameters:
  • sql_or_stmt (str | SQLAlchemy statement) – The SQL query to execute. Can be: - String SQL with ‘?’ placeholders for parameters - SQLAlchemy select() statement - SQLAlchemy insert() statement - SQLAlchemy update() statement - SQLAlchemy delete() statement - SQLAlchemy text() statement

  • params (Optional[Tuple]) – Query parameters for string SQL only. Values are substituted for ‘?’ placeholders in order. Ignored for SQLAlchemy statements.

  • **kwargs

    Additional keyword arguments:

    • log_mode (str): Override SQL logging mode for this query only. Options: ‘off’, ‘simple’, ‘full’. If not specified, uses client’s global sql_log_mode setting.

    • Other kwargs are passed to SQLAlchemy’s execute()

Returns:

SQLAlchemy Result object with methods:
  • fetchall(): Get all rows as list of tuples

  • fetchone(): Get next row as tuple or None

  • fetchmany(size): Get next N rows

  • scalars(): Get first column of each row

  • mappings(): Get rows as dictionaries

  • rowcount: Number of rows affected (for INSERT/UPDATE/DELETE)

Return type:

sqlalchemy.engine.Result

Raises:
  • QueryError – If query execution fails or SQL syntax is invalid

  • ConnectionError – If session is not connected to database

Examples:

from matrixone import Client
from sqlalchemy import select, insert, update, delete, and_, or_

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# ========================================
# String SQL with Parameters
# ========================================
with client.session() as session:
    # Simple INSERT with parameters
    result = session.execute(
        "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
        ("John Doe", "john@example.com", 30)
    )
    print(f"Inserted {result.rowcount} rows")

    # SELECT with parameters
    result = session.execute(
        "SELECT * FROM users WHERE age > ? AND email LIKE ?",
        (25, "%@example.com")
    )
    for row in result:
        print(f"User: {row.name}, Age: {row.age}")

    # UPDATE with parameters
    result = session.execute(
        "UPDATE users SET status = ? WHERE age < ?",
        ("active", 18)
    )
    print(f"Updated {result.rowcount} rows")

    # DELETE with parameters
    result = session.execute(
        "DELETE FROM users WHERE status = ?",
        ("inactive",)
    )
    print(f"Deleted {result.rowcount} rows")

# ========================================
# SQLAlchemy SELECT Statements
# ========================================
with client.session() as session:
    # Basic SELECT
    stmt = select(User).where(User.age > 25)
    result = session.execute(stmt)
    users = result.scalars().all()  # Returns list of User objects

    # SELECT specific columns
    stmt = select(User.name, User.email).where(User.status == 'active')
    result = session.execute(stmt)
    for name, email in result:
        print(f"{name}: {email}")

    # Complex WHERE with AND/OR
    stmt = select(User).where(
        and_(
            User.age > 18,
            or_(
                User.status == 'active',
                User.status == 'pending'
            )
        )
    )
    result = session.execute(stmt)

    # SELECT with JOINs
    stmt = select(User, Order).join(Order, User.id == Order.user_id)
    result = session.execute(stmt)
    for user, order in result:
        print(f"{user.name} ordered {order.amount}")

    # SELECT with aggregation
    from sqlalchemy import func
    stmt = select(func.count(User.id), func.avg(User.age))
    result = session.execute(stmt)
    count, avg_age = result.one()

# ========================================
# SQLAlchemy INSERT Statements
# ========================================
with client.session() as session:
    # Single INSERT
    stmt = insert(User).values(name='Alice', email='alice@example.com', age=28)
    result = session.execute(stmt)
    print(f"Inserted row with ID: {result.lastrowid}")

    # Bulk INSERT
    stmt = insert(User).values([
        {'name': 'Bob', 'email': 'bob@example.com', 'age': 35},
        {'name': 'Carol', 'email': 'carol@example.com', 'age': 42}
    ])
    result = session.execute(stmt)
    print(f"Inserted {result.rowcount} rows")

# ========================================
# SQLAlchemy UPDATE Statements
# ========================================
with client.session() as session:
    # Simple UPDATE
    stmt = update(User).where(User.id == 1).values(email='newemail@example.com')
    result = session.execute(stmt)
    print(f"Updated {result.rowcount} rows")

    # Conditional UPDATE
    stmt = update(User).where(User.age < 18).values(status='minor')
    result = session.execute(stmt)

    # UPDATE with expressions
    stmt = update(Order).values(total=Order.quantity * Order.price)
    result = session.execute(stmt)

# ========================================
# SQLAlchemy DELETE Statements
# ========================================
with client.session() as session:
    # Simple DELETE
    stmt = delete(User).where(User.id == 1)
    result = session.execute(stmt)

    # Conditional DELETE
    stmt = delete(User).where(User.status == 'deleted')
    result = session.execute(stmt)
    print(f"Deleted {result.rowcount} rows")

# ========================================
# Result Processing
# ========================================
with client.session() as session:
    # fetchall() - get all rows
    result = session.execute("SELECT * FROM users")
    rows = result.fetchall()
    for row in rows:
        print(f"User: {row.name}")

    # fetchone() - get one row at a time
    result = session.execute("SELECT * FROM users")
    while row := result.fetchone():
        print(f"User: {row.name}")

    # fetchmany() - get N rows
    result = session.execute("SELECT * FROM users")
    rows = result.fetchmany(10)  # Get 10 rows

    # scalars() - get first column
    stmt = select(User.name)
    result = session.execute(stmt)
    names = result.scalars().all()  # List of names

    # mappings() - get rows as dicts
    result = session.execute("SELECT name, email FROM users")
    for row in result.mappings():
        print(f"{row['name']}: {row['email']}")

    # scalar() - get single value
    stmt = select(func.count(User.id))
    result = session.execute(stmt)
    count = result.scalar()  # Single integer value

# ========================================
# Query Logging Control
# ========================================
with client.session() as session:
    # Disable logging for this query only
    result = session.execute(
        "SELECT * FROM large_table",
        log_mode='off'
    )

    # Force full SQL logging for debugging
    result = session.execute(
        "SELECT * FROM users WHERE complex_condition",
        log_mode='full'
    )

    # Simple logging (show operation type only)
    result = session.execute(
        "UPDATE massive_table SET field = 'value'",
        log_mode='simple'
    )
Best Practices:
  • Use parameters (?-placeholders) to prevent SQL injection

  • Use SQLAlchemy statements for complex queries

  • Use string SQL for simple, dynamic queries

  • Always consume or close result sets

  • Use log_mode=’off’ for frequently executed queries in production

See also

  • AsyncSession.execute(): Async version

  • Client.execute(): Client-level execute without transaction

  • SQLAlchemy Result: Result object documentation

insert(table_name: str, data: dict[str, Any]) ResultSet[source]

Insert data into a table within session.

Args:

table_name: Name of the table
data: Data to insert (dict with column names as keys)

Returns:

ResultSet object
batch_insert(table_name: str, data_list: list[dict[str, Any]]) ResultSet[source]

Batch insert data into a table within session.

Args:

table_name: Name of the table
data_list: List of data dictionaries to insert

Returns:

ResultSet object
query(*columns, snapshot: str | None = None)[source]

Get MatrixOne query builder within session - SQLAlchemy style

Args:

*columns: Can be:
    - Single model class: query(Article) - returns all columns from model
    - Multiple columns: query(Article.id, Article.title) - returns specific columns
    - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries

Returns:

MatrixOneQuery instance configured for the specified columns within session
create_table(table_name: str, columns: dict, **kwargs) Session[source]

Create a table within MatrixOne session.

Args:

table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as client.create_table)
**kwargs: Additional table parameters

Returns:

Session: Self for chaining
drop_table(table_name: str) Session[source]

Drop a table within MatrixOne session.

Args:

table_name: Name of the table to drop

Returns:

Session: Self for chaining
create_table_with_index(table_name: str, columns: dict, indexes: list | None = None, **kwargs) Session[source]

Create a table with vector indexes within MatrixOne session.

Args:

table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as client.create_table)
indexes: List of index definitions (same format as client.create_table_with_index)
**kwargs: Additional table parameters

Returns:

Session: Self for chaining
create_table_orm(table_name: str, *columns, **kwargs) Session[source]

Create a table using SQLAlchemy ORM-style definitions within MatrixOne session.

Args:

table_name: Name of the table
*columns: SQLAlchemy Column objects and Index objects (including VectorIndex)
**kwargs: Additional parameters (like enable_hnsw, enable_ivf)

Returns:

Session: Self for chaining

AsyncSession

class matrixone.session.AsyncSession(bind=None, client=None, wrap_session=None, **kwargs)[source]

Bases: AsyncSession

MatrixOne Async Session - Asynchronous transaction-aware session extending SQLAlchemy AsyncSession.

This class provides async/await support for all Session functionality with full SQLAlchemy AsyncSession API compatibility, while adding MatrixOne-specific async managers for snapshots, clones, vector operations, and more.

Key Features:

  • Full async SQLAlchemy API: All standard async Session methods (add, delete, execute, etc.)

  • Non-blocking transactions: Async transaction management with async/await

  • Async MatrixOne managers: All MatrixOne operations available asynchronously

  • Concurrent operations: Execute multiple operations concurrently with asyncio.gather

  • Automatic lifecycle: Context manager handles async transaction lifecycle

  • Query logging: Integrated async query logging

Transaction Behavior:

  • Auto-commit on success: Transaction commits when async context manager exits normally

  • Auto-rollback on error: Transaction rolls back if any exception occurs

  • Non-blocking: All operations use async/await and don’t block the event loop

  • Isolation: All operations within session are isolated from other transactions

  • ACID compliance: Full ACID guarantees for all async operations

Available Managers (all async/transaction-aware):

  • snapshots: AsyncSnapshotManager for async snapshot operations

  • clone: AsyncCloneManager for async clone operations

  • restore: AsyncRestoreManager for async restore operations

  • pitr: AsyncPitrManager for async point-in-time recovery

  • pubsub: AsyncPubSubManager for async publish-subscribe

  • account: AsyncAccountManager for async account management

  • vector_ops: AsyncVectorManager for async vector operations

  • fulltext_index: AsyncFulltextIndexManager for async fulltext search

  • metadata: AsyncMetadataManager for async metadata analysis

  • load_data: AsyncLoadDataManager for async bulk loading

  • stage: AsyncStageManager for async stage management

  • cdc: AsyncCDCManager for async change data capture task operations

Creating AsyncSession:

There are two ways to create a MatrixOne AsyncSession:

  1. New AsyncSession (Recommended): Create directly from async client:

    async with client.session() as session:
        # Your async operations here
        pass
    
  2. Wrap Existing AsyncSession (For Legacy Projects): Wrap existing SQLAlchemy async session:

    from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
    from matrixone.session import AsyncSession as MatrixOneAsyncSession
    
    # Your existing async SQLAlchemy code
    async_engine = create_async_engine('mysql+aiomysql://...')
    AsyncSessionFactory = async_sessionmaker(bind=async_engine)
    sqlalchemy_async_session = AsyncSessionFactory()
    
    # Wrap with MatrixOne features
    mo_async_session = MatrixOneAsyncSession(
        client=mo_async_client,
        wrap_session=sqlalchemy_async_session
    )
    # Now you can use both async SQLAlchemy and MatrixOne features
    

Usage Examples:

from matrixone import AsyncClient
from sqlalchemy import select, insert, update, delete
import asyncio

async def main():
    client = AsyncClient()
    await client.connect(host='localhost', port=6001, user='root', password='111', database='test')

    # ========================================
    # Example 0: Wrapping Existing Async Session (Legacy Projects)
    # ========================================
    from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
    from matrixone.session import AsyncSession as MatrixOneAsyncSession

    # Your existing async SQLAlchemy setup
    async_engine = create_async_engine('mysql+aiomysql://root:111@127.0.0.1:6001/test')
    AsyncSessionFactory = async_sessionmaker(bind=async_engine)
    existing_async_session = AsyncSessionFactory()

    # Wrap it with MatrixOne features
    mo_async_session = MatrixOneAsyncSession(
        client=client,
        wrap_session=existing_async_session
    )

    try:
        # Standard async SQLAlchemy operations work
        result = await mo_async_session.execute("SELECT * FROM users")

        # MatrixOne async features now available
        await mo_async_session.stage.create_s3('backup', bucket='my-backup')
        await mo_async_session.snapshots.create('snapshot1', level='database')
        await mo_async_session.load_data.from_csv('/data/file.csv', 'users')

        await mo_async_session.commit()
    finally:
        await mo_async_session.close()

    # ========================================
    # Example 1: Basic Async Transaction
    # ========================================
    async with client.session() as session:
        # All operations are atomic
        await session.execute("INSERT INTO users (name) VALUES ('John')")
        await session.execute("INSERT INTO orders (user_id, amount) VALUES (1, 100)")
        # Transaction commits automatically

    # ========================================
    # Example 2: Async ORM Operations
    # ========================================
    async with client.session() as session:
        # Add objects
        user = User(name="Alice")
        session.add(user)

        # Query
        stmt = select(User).where(User.name == "Alice")
        result = await session.execute(stmt)
        users = result.scalars().all()

        # Commit
        await session.commit()

    # ========================================
    # Example 3: Concurrent Operations
    # ========================================
    async with client.session() as session:
        # Execute multiple queries concurrently
        results = await asyncio.gather(
            session.execute("SELECT * FROM users"),
            session.execute("SELECT * FROM orders"),
            session.execute("SELECT * FROM products")
        )
        users, orders, products = results

    # ========================================
    # Example 4: Async MatrixOne Managers
    # ========================================
    async with client.session() as session:
        # Create snapshot
        snapshot = await session.snapshots.create(
            'backup',
            SnapshotLevel.DATABASE,
            database='production'
        )

        # Clone database
        await session.clone.clone_database('prod_copy', 'production')

        # Load data
        await session.load_data.from_csv('/data/users.csv', 'users')

    # ========================================
    # Example 5: Error Handling
    # ========================================
    try:
        async with client.session() as session:
            await session.execute("INSERT INTO users (name) VALUES ('Bob')")
            await session.execute("INSERT INTO invalid_table (x) VALUES (1)")
            # Automatically rolls back on error
    except Exception as e:
        print(f"Transaction rolled back: {e}")

    await client.disconnect()

asyncio.run(main())

Important Notes:

  • Use async with client.session() for async sessions

  • All execute operations must be awaited

  • All manager operations must be awaited

  • Perfect for FastAPI, aiohttp, and other async frameworks

  • Enables high-concurrency database operations

See also

  • AsyncClient.session(): Creates AsyncSession instances

  • Session: Synchronous version

  • SQLAlchemy AsyncSession: Parent class documentation

__init__(bind=None, client=None, wrap_session=None, **kwargs)[source]

Initialize MatrixOne AsyncSession.

Parameters:
  • bind – SQLAlchemy AsyncEngine or AsyncConnection to bind to

  • client – MatrixOne AsyncClient instance

  • wrap_session – Existing SQLAlchemy AsyncSession to wrap with MatrixOne features

  • **kwargs – Additional arguments passed to SQLAlchemy AsyncSession

Examples

# Create new async session from engine session = AsyncSession(bind=async_engine, client=async_client)

# Wrap existing SQLAlchemy async session (for legacy projects) existing_session = async_sessionmaker(bind=async_engine)() mo_session = AsyncSession(client=async_client, wrap_session=existing_session)

async execute(sql_or_stmt, params: Tuple | None = None, **kwargs)[source]

Execute SQL or SQLAlchemy statement within async session.

Supports: - String SQL with MatrixOne parameter substitution - SQLAlchemy statements (select, update, delete, insert, text) - Query logging

Parameters:
  • sql_or_stmt – SQL string or SQLAlchemy statement

  • params – Query parameters (only used for string SQL with ‘?’ placeholders)

  • **kwargs – Additional execution options (including log_mode for logging control)

Returns:

SQLAlchemy async result object

async insert(table_name_or_model, data: dict)[source]

Insert data into a table within session asynchronously.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data: Data to insert (dict with column names as keys)

Returns:

SQLAlchemy async result object
async batch_insert(table_name_or_model, data_list: list)[source]

Batch insert data into a table within session asynchronously.

Args:

table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data_list: List of data dictionaries to insert

Returns:

SQLAlchemy async result object
query(*columns, snapshot: str | None = None)[source]

Get async MatrixOne query builder within session - SQLAlchemy style

Args:

*columns: Can be:
    - Single model class: query(Article) - returns all columns from model
    - Multiple columns: query(Article.id, Article.title) - returns specific columns
    - Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries

Returns:

AsyncMatrixOneQuery instance configured for the specified columns within session
async create_table(table_name: str, columns: dict, **kwargs)[source]

Create a table within MatrixOne async session.

Args:

table_name: Name of the table
columns: Dictionary mapping column names to their types
**kwargs: Additional table parameters

Returns:

Self for chaining
async drop_table(table_name: str)[source]

Drop a table within MatrixOne async session.

Args:

table_name: Name of the table to drop

Returns:

Self for chaining