# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MatrixOne Session and AsyncSession classes.
This module provides Session and AsyncSession classes that extend SQLAlchemy's
Session and AsyncSession with MatrixOne-specific features like snapshots, clones,
vector operations, and fulltext search.
"""
from typing import Any, Optional, Tuple, TYPE_CHECKING
try:
from sqlalchemy.orm import Session as SQLAlchemySession
except ImportError:
SQLAlchemySession = None
try:
from sqlalchemy.ext.asyncio import AsyncSession as SQLAlchemyAsyncSession
except ImportError:
SQLAlchemyAsyncSession = None
if TYPE_CHECKING:
from .client import ResultSet # noqa: F401
from .async_client import AsyncResultSet # noqa: F401
[docs]
class Session(SQLAlchemySession):
"""
MatrixOne Session - Transaction-aware session extending SQLAlchemy Session.
This class provides a comprehensive transaction context for executing multiple database
operations atomically. It inherits all SQLAlchemy Session capabilities while adding
MatrixOne-specific features like snapshots, clones, vector operations, and fulltext search.
Key Features:
- **Full SQLAlchemy API**: All standard SQLAlchemy Session methods (add, delete, query, etc.)
- **Atomic transactions**: All operations succeed or fail together
- **Automatic rollback**: Errors trigger automatic transaction rollback
- **MatrixOne managers**: Access to all MatrixOne-specific operations within transactions
- **Hybrid SQL support**: Execute both SQLAlchemy statements and string SQL
- **Query logging**: Integrated query logging with performance tracking
- **Context manager**: Automatic transaction lifecycle management
Transaction Behavior:
- **Auto-commit on success**: Transaction commits when context manager exits normally
- **Auto-rollback on error**: Transaction rolls back if any exception occurs
- **Explicit control**: Manual commit() and rollback() also supported
- **Isolation**: All operations within session are isolated from other transactions
- **ACID compliance**: Full ACID guarantees for all operations
Available Managers (all transaction-aware):
- **snapshots**: SnapshotManager for creating/managing database snapshots
- **clone**: CloneManager for cloning databases and tables
- **restore**: RestoreManager for restoring from snapshots
- **pitr**: PitrManager for point-in-time recovery operations
- **pubsub**: PubSubManager for publish-subscribe operations
- **account**: AccountManager for account and user management
- **vector_ops**: VectorManager for vector operations and indexing
- **fulltext_index**: FulltextIndexManager for fulltext search operations
- **metadata**: MetadataManager for table metadata analysis
- **load_data**: LoadDataManager for bulk data loading
- **stage**: StageManager for external stage management
- **cdc**: CDCManager for change data capture task operations
Creating Session:
There are two ways to create a MatrixOne Session:
1. New Session (Recommended): Create directly from client::
with client.session() as session:
# Your operations here
pass
2. Wrap Existing Session (For Legacy Projects): Wrap existing SQLAlchemy session::
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from matrixone.session import Session as MatrixOneSession
# Your existing SQLAlchemy code
engine = create_engine('mysql+pymysql://...')
SessionFactory = sessionmaker(bind=engine)
sqlalchemy_session = SessionFactory()
# Wrap with MatrixOne features
mo_session = MatrixOneSession(
client=mo_client,
wrap_session=sqlalchemy_session
)
# Now you can use both SQLAlchemy and MatrixOne features
Usage Examples::
from matrixone import Client
from sqlalchemy import select, insert, update, delete
client = Client(host='localhost', port=6001, user='root', password='111', database='test')
# ============================================================
# Example 0: Wrapping Existing SQLAlchemy Session (Legacy Projects)
# ============================================================
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from matrixone.session import Session as MatrixOneSession
# Your existing SQLAlchemy setup
engine = create_engine('mysql+pymysql://root:111@127.0.0.1:6001/test')
SessionFactory = sessionmaker(bind=engine)
existing_session = SessionFactory()
# Wrap it with MatrixOne features
mo_session = MatrixOneSession(
client=client,
wrap_session=existing_session
)
try:
# Standard SQLAlchemy operations work
result = mo_session.execute("SELECT * FROM users")
# MatrixOne features now available
mo_session.stage.create_s3('backup', bucket='my-backup')
mo_session.snapshots.create('snapshot1', level='database')
mo_session.load_data.from_csv('/data/file.csv', 'users')
mo_session.commit()
finally:
mo_session.close()
# ============================================================
# Example 1: Basic Transaction with ORM-style Operations
# ============================================================
with client.session() as session:
# All operations are atomic - succeed or fail together
from sqlalchemy import insert, update
# Insert using SQLAlchemy insert()
session.execute(insert(User).values(name='John', email='john@example.com'))
session.execute(insert(Order).values(user_id=1, amount=100.0))
# Update using SQLAlchemy update()
session.execute(
update(Account).where(Account.user_id == 1).values(balance=Account.balance - 100)
)
# Transaction commits automatically on successful completion
# ============================================================
# Example 2: SQLAlchemy ORM Operations
# ============================================================
with client.session() as session:
# Create new objects
user = User(name="Alice", email="alice@example.com")
order = Order(user_id=1, amount=50.0)
# Add to session
session.add(user)
session.add(order)
# Query using ORM
stmt = select(User).where(User.name == "Alice")
result = session.execute(stmt)
users = result.scalars().all()
# Update using ORM
user = session.get(User, 1)
user.email = "newemail@example.com"
# Commit explicitly (or let context manager do it)
session.commit()
# ============================================================
# Example 3: MatrixOne Snapshot Operations in Transaction
# ============================================================
with client.session() as session:
# Create snapshot within transaction
snapshot = session.snapshots.create(
name='daily_backup',
level=SnapshotLevel.DATABASE,
database='production'
)
# Clone database within same transaction
session.clone.clone_database(
target_db='production_copy',
source_db='production',
snapshot_name='daily_backup'
)
# Both operations commit atomically
# ============================================================
# Example 4: Bulk Data Loading in Transaction
# ============================================================
with client.session() as session:
# Load data files atomically
session.load_data.from_csv('/data/users.csv', 'users')
session.load_data.from_csv('/data/orders.csv', 'orders')
# Update statistics after loading
session.execute("ANALYZE TABLE users")
session.execute("ANALYZE TABLE orders")
# All loads and updates commit together
# ============================================================
# Example 5: Error Handling with Automatic Rollback
# ============================================================
try:
with client.session() as session:
session.execute("INSERT INTO users (name) VALUES ('Bob')")
session.execute("INSERT INTO invalid_table (data) VALUES ('test')") # This fails
# Transaction automatically rolls back - Bob is NOT inserted
except Exception as e:
print(f"Transaction failed and rolled back: {e}")
# ============================================================
# Example 6: Manual Transaction Control
# ============================================================
with client.session() as session:
try:
session.execute("INSERT INTO users (name) VALUES ('Charlie')")
session.execute("UPDATE accounts SET balance = balance - 50")
# Verify conditions before committing
result = session.execute("SELECT balance FROM accounts WHERE id = 1")
balance = result.scalar()
if balance >= 0:
session.commit() # Explicit commit
else:
session.rollback() # Explicit rollback
print("Transaction rolled back due to insufficient balance")
except Exception as e:
session.rollback()
raise
# ============================================================
# Example 7: Complex Multi-Manager Transaction
# ============================================================
with client.session() as session:
# Create publication
pub = session.pubsub.create_database_publication(
name='analytics_data',
database='analytics',
account='subscriber_account'
)
# Create stage using simple interface
session.stage.create_local('export_stage', '/exports/')
# Load fresh data
session.load_data.from_csv('/data/latest.csv', Analytics) # Use ORM model
# Create snapshot after load
session.snapshots.create(
name='post_load_snapshot',
level=SnapshotLevel.DATABASE,
database='analytics'
)
# All operations are atomic
# ============================================================
# Example 8: Vector Operations in Transaction
# ============================================================
with client.session() as session:
# Create vector table
session.create_table('documents', {
'id': 'int primary key',
'content': 'text',
'embedding': 'vecf32(384)'
})
# Create vector index
session.vector_ops.create_ivf(
'documents',
name='doc_idx',
column='embedding',
lists=100
)
# Insert vector data
session.insert('documents', {
'id': 1,
'content': 'sample document',
'embedding': [0.1] * 384
})
Best Practices:
1. **Always use context manager**: Use `with client.session()` for automatic cleanup
2. **Keep transactions short**: Long transactions can block other operations
3. **Handle exceptions**: Wrap session code in try-except for proper error handling
4. **Avoid nested transactions**: SQLAlchemy doesn't support true nested transactions
5. **Use explicit commits**: When you need fine-grained control over transaction boundaries
6. **Test rollback behavior**: Ensure your application handles rollbacks correctly
Important Notes:
- Session is created by `Client.session()` context manager
- Don't instantiate Session directly
- All manager operations within session are transaction-aware
- Session automatically manages transaction lifecycle
- Errors trigger automatic rollback
- Normal exit triggers automatic commit
See Also:
- Client.session(): Creates and manages Session instances
- AsyncSession: Async version for async/await workflows
- SQLAlchemy Session: Parent class documentation
"""
[docs]
def __init__(self, bind=None, client=None, wrap_session=None, **kwargs):
"""
Initialize MatrixOne Session.
Args:
bind: SQLAlchemy Engine or Connection to bind to
client: MatrixOne Client instance
wrap_session: Existing SQLAlchemy Session to wrap with MatrixOne features
**kwargs: Additional arguments passed to SQLAlchemy Session
Examples:
# Create new session from engine
session = Session(bind=engine, client=client)
# Wrap existing SQLAlchemy session (for legacy projects)
existing_session = sessionmaker(bind=engine)()
mo_session = Session(client=client, wrap_session=existing_session)
"""
if wrap_session is not None:
# Wrap existing SQLAlchemy session with MatrixOne features
# Copy the session's state to this instance
self.__dict__.update(wrap_session.__dict__)
else:
# Initialize parent SQLAlchemy Session with engine/connection
super().__init__(bind=bind, expire_on_commit=False, **kwargs)
# Store MatrixOne client reference
self.client = client
# Import manager classes dynamically to avoid circular imports
# These are defined in their respective modules
from .snapshot import SnapshotManager
from .clone import CloneManager
from .branch import BranchManager
# These are defined in client.py after the Session class
import sys
client_module = sys.modules.get('matrixone.client')
if client_module:
VectorManager = getattr(client_module, 'VectorManager')
FulltextIndexManager = getattr(client_module, 'FulltextIndexManager')
# These are defined in their respective modules
from .metadata import MetadataManager
from .load_data import LoadDataManager
from .stage import StageManager
from .cdc import CDCManager
from .pitr import PitrManager
from .pubsub import PubSubManager
from .restore import RestoreManager
from .account import AccountManager
from .export import ExportManager
# Create managers that use this session as executor
# The executor pattern allows managers to work in both client and session contexts
self.snapshots = SnapshotManager(client, executor=self)
self.clone = CloneManager(client, executor=self)
self.branch = BranchManager(client, executor=self)
self.restore = RestoreManager(client, executor=self)
self.pitr = PitrManager(client, executor=self)
self.pubsub = PubSubManager(client, executor=self)
self.account = AccountManager(client, executor=self)
self.vector_ops = VectorManager(client, executor=self)
self.fulltext_index = FulltextIndexManager(client, executor=self)
self.metadata = MetadataManager(client, executor=self)
self.load_data = LoadDataManager(client, executor=self)
self.stage = StageManager(client, executor=self)
self.cdc = CDCManager(client, executor=self)
self.export = ExportManager(self)
[docs]
def execute(self, sql_or_stmt, params: Optional[Tuple] = None, **kwargs):
"""
Execute SQL or SQLAlchemy statement within the current session transaction.
This method extends SQLAlchemy's Session.execute() with MatrixOne-specific features:
- Parameter substitution for string SQL using '?' placeholders
- Integrated query logging with performance tracking
- Support for both SQLAlchemy statements and raw SQL strings
All queries executed within the session participate in the current transaction.
Changes are committed when the session context exits normally, or rolled back on error.
Args:
sql_or_stmt (str | SQLAlchemy statement): The SQL query to execute. Can be:
- String SQL with '?' placeholders for parameters
- SQLAlchemy select() statement
- SQLAlchemy insert() statement
- SQLAlchemy update() statement
- SQLAlchemy delete() statement
- SQLAlchemy text() statement
params (Optional[Tuple]): Query parameters for string SQL only. Values are
substituted for '?' placeholders in order. Ignored for SQLAlchemy statements.
**kwargs: Additional keyword arguments:
- log_mode (str): Override SQL logging mode for this query only.
Options: 'off', 'simple', 'full'. If not specified, uses client's
global sql_log_mode setting.
- Other kwargs are passed to SQLAlchemy's execute()
Returns:
sqlalchemy.engine.Result: SQLAlchemy Result object with methods:
- fetchall(): Get all rows as list of tuples
- fetchone(): Get next row as tuple or None
- fetchmany(size): Get next N rows
- scalars(): Get first column of each row
- mappings(): Get rows as dictionaries
- rowcount: Number of rows affected (for INSERT/UPDATE/DELETE)
Raises:
QueryError: If query execution fails or SQL syntax is invalid
ConnectionError: If session is not connected to database
Examples::
from matrixone import Client
from sqlalchemy import select, insert, update, delete, and_, or_
client = Client(host='localhost', port=6001, user='root', password='111', database='test')
# ========================================
# String SQL with Parameters
# ========================================
with client.session() as session:
# Simple INSERT with parameters
result = session.execute(
"INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
("John Doe", "john@example.com", 30)
)
print(f"Inserted {result.rowcount} rows")
# SELECT with parameters
result = session.execute(
"SELECT * FROM users WHERE age > ? AND email LIKE ?",
(25, "%@example.com")
)
for row in result:
print(f"User: {row.name}, Age: {row.age}")
# UPDATE with parameters
result = session.execute(
"UPDATE users SET status = ? WHERE age < ?",
("active", 18)
)
print(f"Updated {result.rowcount} rows")
# DELETE with parameters
result = session.execute(
"DELETE FROM users WHERE status = ?",
("inactive",)
)
print(f"Deleted {result.rowcount} rows")
# ========================================
# SQLAlchemy SELECT Statements
# ========================================
with client.session() as session:
# Basic SELECT
stmt = select(User).where(User.age > 25)
result = session.execute(stmt)
users = result.scalars().all() # Returns list of User objects
# SELECT specific columns
stmt = select(User.name, User.email).where(User.status == 'active')
result = session.execute(stmt)
for name, email in result:
print(f"{name}: {email}")
# Complex WHERE with AND/OR
stmt = select(User).where(
and_(
User.age > 18,
or_(
User.status == 'active',
User.status == 'pending'
)
)
)
result = session.execute(stmt)
# SELECT with JOINs
stmt = select(User, Order).join(Order, User.id == Order.user_id)
result = session.execute(stmt)
for user, order in result:
print(f"{user.name} ordered {order.amount}")
# SELECT with aggregation
from sqlalchemy import func
stmt = select(func.count(User.id), func.avg(User.age))
result = session.execute(stmt)
count, avg_age = result.one()
# ========================================
# SQLAlchemy INSERT Statements
# ========================================
with client.session() as session:
# Single INSERT
stmt = insert(User).values(name='Alice', email='alice@example.com', age=28)
result = session.execute(stmt)
print(f"Inserted row with ID: {result.lastrowid}")
# Bulk INSERT
stmt = insert(User).values([
{'name': 'Bob', 'email': 'bob@example.com', 'age': 35},
{'name': 'Carol', 'email': 'carol@example.com', 'age': 42}
])
result = session.execute(stmt)
print(f"Inserted {result.rowcount} rows")
# ========================================
# SQLAlchemy UPDATE Statements
# ========================================
with client.session() as session:
# Simple UPDATE
stmt = update(User).where(User.id == 1).values(email='newemail@example.com')
result = session.execute(stmt)
print(f"Updated {result.rowcount} rows")
# Conditional UPDATE
stmt = update(User).where(User.age < 18).values(status='minor')
result = session.execute(stmt)
# UPDATE with expressions
stmt = update(Order).values(total=Order.quantity * Order.price)
result = session.execute(stmt)
# ========================================
# SQLAlchemy DELETE Statements
# ========================================
with client.session() as session:
# Simple DELETE
stmt = delete(User).where(User.id == 1)
result = session.execute(stmt)
# Conditional DELETE
stmt = delete(User).where(User.status == 'deleted')
result = session.execute(stmt)
print(f"Deleted {result.rowcount} rows")
# ========================================
# Result Processing
# ========================================
with client.session() as session:
# fetchall() - get all rows
result = session.execute("SELECT * FROM users")
rows = result.fetchall()
for row in rows:
print(f"User: {row.name}")
# fetchone() - get one row at a time
result = session.execute("SELECT * FROM users")
while row := result.fetchone():
print(f"User: {row.name}")
# fetchmany() - get N rows
result = session.execute("SELECT * FROM users")
rows = result.fetchmany(10) # Get 10 rows
# scalars() - get first column
stmt = select(User.name)
result = session.execute(stmt)
names = result.scalars().all() # List of names
# mappings() - get rows as dicts
result = session.execute("SELECT name, email FROM users")
for row in result.mappings():
print(f"{row['name']}: {row['email']}")
# scalar() - get single value
stmt = select(func.count(User.id))
result = session.execute(stmt)
count = result.scalar() # Single integer value
# ========================================
# Query Logging Control
# ========================================
with client.session() as session:
# Disable logging for this query only
result = session.execute(
"SELECT * FROM large_table",
log_mode='off'
)
# Force full SQL logging for debugging
result = session.execute(
"SELECT * FROM users WHERE complex_condition",
log_mode='full'
)
# Simple logging (show operation type only)
result = session.execute(
"UPDATE massive_table SET field = 'value'",
log_mode='simple'
)
Best Practices:
- Use parameters (?-placeholders) to prevent SQL injection
- Use SQLAlchemy statements for complex queries
- Use string SQL for simple, dynamic queries
- Always consume or close result sets
- Use log_mode='off' for frequently executed queries in production
See Also:
- AsyncSession.execute(): Async version
- Client.execute(): Client-level execute without transaction
- SQLAlchemy Result: Result object documentation
"""
import time
start_time = time.time()
# Extract log_mode from kwargs (don't pass it to SQLAlchemy)
log_mode = kwargs.pop('log_mode', None)
try:
# Check if this is a string SQL
if isinstance(sql_or_stmt, str):
# String SQL - apply MatrixOne parameter substitution
final_sql = self.client._substitute_parameters(sql_or_stmt, params)
original_sql = sql_or_stmt
from sqlalchemy import text
# Call parent's execute() with text()
result = super().execute(text(final_sql), **kwargs)
else:
# SQLAlchemy statement - call parent's execute() directly
result = super().execute(sql_or_stmt, params, **kwargs)
original_sql = f"<SQLAlchemy {type(sql_or_stmt).__name__}>"
execution_time = time.time() - start_time
# Log query
if hasattr(result, 'returns_rows') and result.returns_rows:
self.client.logger.log_query(original_sql, execution_time, None, success=True, log_mode=log_mode)
else:
self.client.logger.log_query(
original_sql,
execution_time,
getattr(result, 'rowcount', 0),
success=True,
log_mode=log_mode,
)
return result
except Exception as e:
execution_time = time.time() - start_time
self.client.logger.log_query(
original_sql if 'original_sql' in locals() else str(sql_or_stmt),
execution_time,
success=False,
log_mode=log_mode,
)
self.client.logger.log_error(e, context="Session query execution")
from .client import _classify_db_error
raise _classify_db_error(e, sql_or_stmt) from None
[docs]
def insert(self, table_name: str, data: dict[str, Any]) -> "ResultSet":
"""
Insert data into a table within session.
Args::
table_name: Name of the table
data: Data to insert (dict with column names as keys)
Returns::
ResultSet object
"""
sql = self.client._build_insert_sql(table_name, data)
return self.execute(sql)
[docs]
def batch_insert(self, table_name: str, data_list: list[dict[str, Any]]) -> "ResultSet":
"""
Batch insert data into a table within session.
Args::
table_name: Name of the table
data_list: List of data dictionaries to insert
Returns::
ResultSet object
"""
if not data_list:
from .client import ResultSet
return ResultSet([], [], affected_rows=0)
sql = self.client._build_batch_insert_sql(table_name, data_list)
return self.execute(sql)
[docs]
def query(self, *columns, snapshot: str = None):
"""Get MatrixOne query builder within session - SQLAlchemy style
Args::
*columns: Can be:
- Single model class: query(Article) - returns all columns from model
- Multiple columns: query(Article.id, Article.title) - returns specific columns
- Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries
Returns::
MatrixOneQuery instance configured for the specified columns within session
"""
from .orm import MatrixOneQuery
if len(columns) == 1:
# Traditional single model class usage
column = columns[0]
if isinstance(column, str):
# String table name
return MatrixOneQuery(column, self.client, transaction_wrapper=self, snapshot=snapshot)
elif hasattr(column, '__tablename__'):
# This is a model class
return MatrixOneQuery(column, self.client, transaction_wrapper=self, snapshot=snapshot)
elif hasattr(column, 'name') and hasattr(column, 'as_sql'):
# This is a CTE object
from .orm import CTE
if isinstance(column, CTE):
query = MatrixOneQuery(None, self.client, transaction_wrapper=self, snapshot=snapshot)
query._table_name = column.name
query._select_columns = ["*"] # Default to select all from CTE
query._ctes = [column] # Add the CTE to the query
return query
else:
# This is a single column/expression - need to handle specially
query = MatrixOneQuery(None, self.client, transaction_wrapper=self, snapshot=snapshot)
query._select_columns = [column]
# Try to infer table name from column
if hasattr(column, 'table') and hasattr(column.table, 'name'):
query._table_name = column.table.name
return query
else:
# Multiple columns/expressions
model_class = None
select_columns = []
for column in columns:
if hasattr(column, '__tablename__'):
# This is a model class - use its table
model_class = column
else:
# This is a column or expression
select_columns.append(column)
if model_class:
query = MatrixOneQuery(model_class, self.client, transaction_wrapper=self, snapshot=snapshot)
if select_columns:
# Add additional columns to the model's default columns
query._select_columns = select_columns
return query
else:
# No model class provided, need to infer table from columns
query = MatrixOneQuery(None, self.client, transaction_wrapper=self, snapshot=snapshot)
query._select_columns = select_columns
# Try to infer table name from first column that has table info
for col in select_columns:
if hasattr(col, 'table') and hasattr(col.table, 'name'):
query._table_name = col.table.name
break
elif isinstance(col, str) and '.' in col:
# String column like "table.column" - extract table name
parts = col.split('.')
if len(parts) >= 2:
# For "db.table.column" format, use "db.table"
# For "table.column" format, use "table"
table_name = '.'.join(parts[:-1])
query._table_name = table_name
break
return query
[docs]
def create_table(self, table_name: str, columns: dict, **kwargs) -> "Session":
"""
Create a table within MatrixOne session.
Args::
table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as client.create_table)
**kwargs: Additional table parameters
Returns::
Session: Self for chaining
"""
from sqlalchemy.schema import CreateTable
from .sqlalchemy_ext import VectorTableBuilder
# Parse primary key from kwargs
primary_key = kwargs.pop('primary_key', None)
# Use VectorTableBuilder to create the table
builder = VectorTableBuilder(table_name)
# Add columns
for col_name, col_type in columns.items():
is_primary = primary_key == col_name if primary_key else False
if is_primary:
builder.add_column(col_name, col_type, primary_key=True)
else:
builder.add_column(col_name, col_type)
# Build the table
table = builder.build_table()
# Generate CREATE TABLE SQL
create_sql = CreateTable(table)
sql = str(create_sql.compile(dialect=self.client.get_sqlalchemy_engine().dialect))
self.execute(sql)
return self
[docs]
def drop_table(self, table_name: str) -> "Session":
"""
Drop a table within MatrixOne session.
Args::
table_name: Name of the table to drop
Returns::
Session: Self for chaining
"""
sql = f"DROP TABLE IF EXISTS {table_name}"
self.execute(sql)
return self
[docs]
def create_table_with_index(self, table_name: str, columns: dict, indexes: list = None, **kwargs) -> "Session":
"""
Create a table with vector indexes within MatrixOne session.
Args::
table_name: Name of the table
columns: Dictionary mapping column names to their types (same format as client.create_table)
indexes: List of index definitions (same format as client.create_table_with_index)
**kwargs: Additional table parameters
Returns::
Session: Self for chaining
"""
from sqlalchemy.schema import CreateTable
from .sqlalchemy_ext import VectorTableBuilder
# First create the table (same as create_table)
primary_key = kwargs.pop('primary_key', None)
builder = VectorTableBuilder(table_name)
for col_name, col_type in columns.items():
is_primary = primary_key == col_name if primary_key else False
if is_primary:
builder.add_column(col_name, col_type, primary_key=True)
else:
builder.add_column(col_name, col_type)
table = builder.build_table()
create_sql = CreateTable(table)
sql = str(create_sql.compile(dialect=self.client.get_sqlalchemy_engine().dialect))
self.execute(sql)
# Then create the indexes if provided
if indexes:
for index_def in indexes:
index_name = index_def.get('name')
column_name = index_def.get('column')
index_type = index_def.get('type', 'ivfflat')
params = index_def.get('params', {})
if index_type == 'ivfflat':
lists = params.get('lists', 100)
distance = params.get('distance', 'l2_distance')
self.vector_ops.create_ivf(
table_name, name=index_name, column=column_name, lists=lists, distance=distance
)
elif index_type == 'hnsw':
m = params.get('M', 16)
ef_construction = params.get('ef_construction', 200)
distance = params.get('distance', 'l2_distance')
self.vector_ops.create_hnsw(
table_name,
name=index_name,
column=column_name,
m=m,
ef_construction=ef_construction,
distance=distance,
)
else:
raise ValueError(f"Unsupported index type: {index_type}")
return self
[docs]
def create_table_orm(self, table_name: str, *columns, **kwargs) -> "Session":
"""
Create a table using SQLAlchemy ORM-style definitions within MatrixOne session.
Args::
table_name: Name of the table
*columns: SQLAlchemy Column objects and Index objects (including VectorIndex)
**kwargs: Additional parameters (like enable_hnsw, enable_ivf)
Returns::
Session: Self for chaining
"""
from sqlalchemy import MetaData, Table
from sqlalchemy.schema import CreateTable
# Create metadata and table
metadata = MetaData()
# Build table with columns
table = Table(table_name, metadata, *columns)
# Generate CREATE TABLE SQL
create_sql = CreateTable(table)
sql = str(create_sql.compile(dialect=self.client.get_sqlalchemy_engine().dialect))
self.execute(sql)
return self
[docs]
class AsyncSession(SQLAlchemyAsyncSession):
"""
MatrixOne Async Session - Asynchronous transaction-aware session extending SQLAlchemy AsyncSession.
This class provides async/await support for all Session functionality with full
SQLAlchemy AsyncSession API compatibility, while adding MatrixOne-specific async
managers for snapshots, clones, vector operations, and more.
Key Features:
- **Full async SQLAlchemy API**: All standard async Session methods (add, delete, execute, etc.)
- **Non-blocking transactions**: Async transaction management with async/await
- **Async MatrixOne managers**: All MatrixOne operations available asynchronously
- **Concurrent operations**: Execute multiple operations concurrently with asyncio.gather
- **Automatic lifecycle**: Context manager handles async transaction lifecycle
- **Query logging**: Integrated async query logging
Transaction Behavior:
- **Auto-commit on success**: Transaction commits when async context manager exits normally
- **Auto-rollback on error**: Transaction rolls back if any exception occurs
- **Non-blocking**: All operations use async/await and don't block the event loop
- **Isolation**: All operations within session are isolated from other transactions
- **ACID compliance**: Full ACID guarantees for all async operations
Available Managers (all async/transaction-aware):
- **snapshots**: AsyncSnapshotManager for async snapshot operations
- **clone**: AsyncCloneManager for async clone operations
- **restore**: AsyncRestoreManager for async restore operations
- **pitr**: AsyncPitrManager for async point-in-time recovery
- **pubsub**: AsyncPubSubManager for async publish-subscribe
- **account**: AsyncAccountManager for async account management
- **vector_ops**: AsyncVectorManager for async vector operations
- **fulltext_index**: AsyncFulltextIndexManager for async fulltext search
- **metadata**: AsyncMetadataManager for async metadata analysis
- **load_data**: AsyncLoadDataManager for async bulk loading
- **stage**: AsyncStageManager for async stage management
- **cdc**: AsyncCDCManager for async change data capture task operations
Creating AsyncSession:
There are two ways to create a MatrixOne AsyncSession:
1. **New AsyncSession (Recommended)**: Create directly from async client::
async with client.session() as session:
# Your async operations here
pass
2. **Wrap Existing AsyncSession (For Legacy Projects)**: Wrap existing SQLAlchemy async session::
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from matrixone.session import AsyncSession as MatrixOneAsyncSession
# Your existing async SQLAlchemy code
async_engine = create_async_engine('mysql+aiomysql://...')
AsyncSessionFactory = async_sessionmaker(bind=async_engine)
sqlalchemy_async_session = AsyncSessionFactory()
# Wrap with MatrixOne features
mo_async_session = MatrixOneAsyncSession(
client=mo_async_client,
wrap_session=sqlalchemy_async_session
)
# Now you can use both async SQLAlchemy and MatrixOne features
Usage Examples::
from matrixone import AsyncClient
from sqlalchemy import select, insert, update, delete
import asyncio
async def main():
client = AsyncClient()
await client.connect(host='localhost', port=6001, user='root', password='111', database='test')
# ========================================
# Example 0: Wrapping Existing Async Session (Legacy Projects)
# ========================================
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from matrixone.session import AsyncSession as MatrixOneAsyncSession
# Your existing async SQLAlchemy setup
async_engine = create_async_engine('mysql+aiomysql://root:111@127.0.0.1:6001/test')
AsyncSessionFactory = async_sessionmaker(bind=async_engine)
existing_async_session = AsyncSessionFactory()
# Wrap it with MatrixOne features
mo_async_session = MatrixOneAsyncSession(
client=client,
wrap_session=existing_async_session
)
try:
# Standard async SQLAlchemy operations work
result = await mo_async_session.execute("SELECT * FROM users")
# MatrixOne async features now available
await mo_async_session.stage.create_s3('backup', bucket='my-backup')
await mo_async_session.snapshots.create('snapshot1', level='database')
await mo_async_session.load_data.from_csv('/data/file.csv', 'users')
await mo_async_session.commit()
finally:
await mo_async_session.close()
# ========================================
# Example 1: Basic Async Transaction
# ========================================
async with client.session() as session:
# All operations are atomic
await session.execute("INSERT INTO users (name) VALUES ('John')")
await session.execute("INSERT INTO orders (user_id, amount) VALUES (1, 100)")
# Transaction commits automatically
# ========================================
# Example 2: Async ORM Operations
# ========================================
async with client.session() as session:
# Add objects
user = User(name="Alice")
session.add(user)
# Query
stmt = select(User).where(User.name == "Alice")
result = await session.execute(stmt)
users = result.scalars().all()
# Commit
await session.commit()
# ========================================
# Example 3: Concurrent Operations
# ========================================
async with client.session() as session:
# Execute multiple queries concurrently
results = await asyncio.gather(
session.execute("SELECT * FROM users"),
session.execute("SELECT * FROM orders"),
session.execute("SELECT * FROM products")
)
users, orders, products = results
# ========================================
# Example 4: Async MatrixOne Managers
# ========================================
async with client.session() as session:
# Create snapshot
snapshot = await session.snapshots.create(
'backup',
SnapshotLevel.DATABASE,
database='production'
)
# Clone database
await session.clone.clone_database('prod_copy', 'production')
# Load data
await session.load_data.from_csv('/data/users.csv', 'users')
# ========================================
# Example 5: Error Handling
# ========================================
try:
async with client.session() as session:
await session.execute("INSERT INTO users (name) VALUES ('Bob')")
await session.execute("INSERT INTO invalid_table (x) VALUES (1)")
# Automatically rolls back on error
except Exception as e:
print(f"Transaction rolled back: {e}")
await client.disconnect()
asyncio.run(main())
Important Notes:
- Use `async with client.session()` for async sessions
- All execute operations must be awaited
- All manager operations must be awaited
- Perfect for FastAPI, aiohttp, and other async frameworks
- Enables high-concurrency database operations
See Also:
- AsyncClient.session(): Creates AsyncSession instances
- Session: Synchronous version
- SQLAlchemy AsyncSession: Parent class documentation
"""
[docs]
def __init__(self, bind=None, client=None, wrap_session=None, **kwargs):
"""
Initialize MatrixOne AsyncSession.
Args:
bind: SQLAlchemy AsyncEngine or AsyncConnection to bind to
client: MatrixOne AsyncClient instance
wrap_session: Existing SQLAlchemy AsyncSession to wrap with MatrixOne features
**kwargs: Additional arguments passed to SQLAlchemy AsyncSession
Examples:
# Create new async session from engine
session = AsyncSession(bind=async_engine, client=async_client)
# Wrap existing SQLAlchemy async session (for legacy projects)
existing_session = async_sessionmaker(bind=async_engine)()
mo_session = AsyncSession(client=async_client, wrap_session=existing_session)
"""
# Store references before calling super().__init__
self.client = client
if wrap_session is not None:
# Wrap existing SQLAlchemy async session with MatrixOne features
# Copy the session's state to this instance
self.__dict__.update(wrap_session.__dict__)
else:
# Initialize parent SQLAlchemy AsyncSession with engine/connection
super().__init__(bind=bind, expire_on_commit=False, **kwargs)
# Import manager classes dynamically to avoid circular imports
# Some are defined in async_client.py after the AsyncSession class
# Import async vector and fulltext managers
from .vector_manager import AsyncVectorManager
from .fulltext_manager import AsyncFulltextIndexManager
# These are defined in their respective modules
from .snapshot import AsyncSnapshotManager
from .clone import AsyncCloneManager
from .branch import AsyncBranchManager
from .restore import AsyncRestoreManager
from .pitr import AsyncPitrManager
from .pubsub import AsyncPubSubManager
from .account import AsyncAccountManager
from .load_data import AsyncLoadDataManager
from .stage import AsyncStageManager
from .cdc import AsyncCDCManager
from .metadata import AsyncMetadataManager
from .export import AsyncExportManager
# Create managers that use this session as executor
# The executor pattern allows managers to work in both client and session contexts
self.snapshots = AsyncSnapshotManager(client, executor=self)
self.clone = AsyncCloneManager(client, executor=self)
self.branch = AsyncBranchManager(client, executor=self)
self.restore = AsyncRestoreManager(client, executor=self)
self.pitr = AsyncPitrManager(client, executor=self)
self.pubsub = AsyncPubSubManager(client, executor=self)
self.account = AsyncAccountManager(client, executor=self)
self.vector_ops = AsyncVectorManager(client, executor=self)
self.fulltext_index = AsyncFulltextIndexManager(client, executor=self)
self.metadata = AsyncMetadataManager(client, executor=self)
self.load_data = AsyncLoadDataManager(client, executor=self)
self.stage = AsyncStageManager(client, executor=self)
self.cdc = AsyncCDCManager(client, executor=self)
self.export = AsyncExportManager(self)
[docs]
async def execute(self, sql_or_stmt, params: Optional[Tuple] = None, **kwargs):
"""
Execute SQL or SQLAlchemy statement within async session.
Supports:
- String SQL with MatrixOne parameter substitution
- SQLAlchemy statements (select, update, delete, insert, text)
- Query logging
Args:
sql_or_stmt: SQL string or SQLAlchemy statement
params: Query parameters (only used for string SQL with '?' placeholders)
**kwargs: Additional execution options (including log_mode for logging control)
Returns:
SQLAlchemy async result object
"""
import time
start_time = time.time()
# Extract log_mode from kwargs (don't pass it to SQLAlchemy)
log_mode = kwargs.pop('log_mode', None)
try:
# Check if this is a string SQL
if isinstance(sql_or_stmt, str):
# String SQL - apply MatrixOne parameter substitution
final_sql = self.client._substitute_parameters(sql_or_stmt, params)
original_sql = sql_or_stmt
from sqlalchemy import text
# Call parent's execute() with text()
result = await super().execute(text(final_sql), **kwargs)
else:
# SQLAlchemy statement - call parent's execute() directly
result = await super().execute(sql_or_stmt, params, **kwargs)
original_sql = f"<SQLAlchemy {type(sql_or_stmt).__name__}>"
execution_time = time.time() - start_time
# Log query
if hasattr(result, 'returns_rows') and result.returns_rows:
self.client.logger.log_query(original_sql, execution_time, None, success=True, log_mode=log_mode)
else:
self.client.logger.log_query(
original_sql,
execution_time,
getattr(result, 'rowcount', 0),
success=True,
log_mode=log_mode,
)
return result
except Exception as e:
execution_time = time.time() - start_time
self.client.logger.log_query(
original_sql if 'original_sql' in locals() else str(sql_or_stmt),
execution_time,
success=False,
log_mode=log_mode,
)
self.client.logger.log_error(e, context="Async session query execution")
from .client import _classify_db_error
raise _classify_db_error(e, sql_or_stmt) from None
[docs]
async def insert(self, table_name_or_model, data: dict):
"""
Insert data into a table within session asynchronously.
Args::
table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data: Data to insert (dict with column names as keys)
Returns::
SQLAlchemy async result object
"""
# Handle model class input
if hasattr(table_name_or_model, '__tablename__'):
# It's a model class
table_name = table_name_or_model.__tablename__
else:
# It's a table name string
table_name = table_name_or_model
sql = self.client._build_insert_sql(table_name, data)
return await self.execute(sql)
[docs]
async def batch_insert(self, table_name_or_model, data_list: list):
"""
Batch insert data into a table within session asynchronously.
Args::
table_name_or_model: Either a table name (str) or a SQLAlchemy model class
data_list: List of data dictionaries to insert
Returns::
SQLAlchemy async result object
"""
if not data_list:
# Return empty AsyncResultSet for consistency with client behavior
from .async_client import AsyncResultSet # noqa: F811
return AsyncResultSet([], [], affected_rows=0)
# Handle model class input
if hasattr(table_name_or_model, '__tablename__'):
# It's a model class
table_name = table_name_or_model.__tablename__
else:
# It's a table name string
table_name = table_name_or_model
sql = self.client._build_batch_insert_sql(table_name, data_list)
return await self.execute(sql)
[docs]
def query(self, *columns, snapshot: str = None):
"""Get async MatrixOne query builder within session - SQLAlchemy style
Args::
*columns: Can be:
- Single model class: query(Article) - returns all columns from model
- Multiple columns: query(Article.id, Article.title) - returns specific columns
- Mixed: query(Article, Article.id, some_expression.label('alias')) - model + additional columns
snapshot: Optional snapshot name for snapshot queries
Returns::
AsyncMatrixOneQuery instance configured for the specified columns within session
"""
from .async_orm import AsyncMatrixOneQuery
if len(columns) == 1:
# Traditional single model class usage
column = columns[0]
if isinstance(column, str):
# String table name
return AsyncMatrixOneQuery(column, self.client, None, transaction_wrapper=self, snapshot=snapshot)
elif hasattr(column, '__tablename__'):
# This is a model class
return AsyncMatrixOneQuery(column, self.client, None, transaction_wrapper=self, snapshot=snapshot)
elif hasattr(column, 'name') and hasattr(column, 'as_sql'):
# This is a CTE object
from .orm import CTE
if isinstance(column, CTE):
query = AsyncMatrixOneQuery(None, self.client, None, transaction_wrapper=self, snapshot=snapshot)
query._table_name = column.name
query._select_columns = ["*"] # Default to select all from CTE
query._ctes = [column] # Add the CTE to the query
return query
else:
# This is a single column/expression
query = AsyncMatrixOneQuery(None, self.client, None, transaction_wrapper=self, snapshot=snapshot)
query._select_columns = [column]
# Try to infer table name from column
if hasattr(column, 'table') and hasattr(column.table, 'name'):
query._table_name = column.table.name
return query
else:
# Multiple columns/expressions
model_class = None
select_columns = []
for column in columns:
if hasattr(column, '__tablename__'):
# This is a model class - use its table
model_class = column
else:
# This is a column or expression
select_columns.append(column)
if model_class:
query = AsyncMatrixOneQuery(model_class, self.client, None, transaction_wrapper=self, snapshot=snapshot)
if select_columns:
# Add additional columns to the model's default columns
query._select_columns = select_columns
return query
else:
# No model class provided, need to infer table from columns
query = AsyncMatrixOneQuery(None, self.client, None, transaction_wrapper=self, snapshot=snapshot)
query._select_columns = select_columns
# Try to infer table name from first column that has table info
for col in select_columns:
if hasattr(col, 'table') and hasattr(col.table, 'name'):
query._table_name = col.table.name
break
elif isinstance(col, str) and '.' in col:
# String column like "table.column" - extract table name
parts = col.split('.')
if len(parts) >= 2:
table_name = '.'.join(parts[:-1])
query._table_name = table_name
break
return query
[docs]
async def create_table(self, table_name: str, columns: dict, **kwargs):
"""
Create a table within MatrixOne async session.
Args::
table_name: Name of the table
columns: Dictionary mapping column names to their types
**kwargs: Additional table parameters
Returns::
Self for chaining
"""
from sqlalchemy.schema import CreateTable
from .sqlalchemy_ext import VectorTableBuilder
primary_key = kwargs.pop('primary_key', None)
builder = VectorTableBuilder(table_name)
for col_name, col_type in columns.items():
is_primary = primary_key == col_name if primary_key else False
if is_primary:
builder.add_column(col_name, col_type, primary_key=True)
else:
builder.add_column(col_name, col_type)
table = builder.build_table()
create_sql = CreateTable(table)
sql = str(create_sql.compile(dialect=self.client.get_sqlalchemy_engine().dialect))
await self.execute(sql)
return self
[docs]
async def drop_table(self, table_name: str):
"""
Drop a table within MatrixOne async session.
Args::
table_name: Name of the table to drop
Returns::
Self for chaining
"""
sql = f"DROP TABLE IF EXISTS {table_name}"
await self.execute(sql)
return self