Stage Management Guide
This guide covers external stage management in MatrixOne, including creating stages, loading data from stages, and using stages in transactions.
Overview
Stages provide centralized configuration for external data sources, making it easy to load data from:
S3 and S3-compatible storage (AWS S3, MinIO, etc.)
Local filesystem paths
Cloud storage services
Stages enable:
✅ Centralized data source configuration - define once, use everywhere
✅ Simplified data loading - load data with simple stage references
✅ Transaction support - atomic stage operations with
session()✅ Security - store credentials separately from queries
Creating Stages with Simple Interfaces (Recommended)
Use create_s3() and create_local() for simplified stage creation:
S3 Stage Creation
from matrixone import Client
client = Client()
client.connect(database='test')
# Create S3 stage with simple interface (recommended)
client.stage.create_s3(
name='production_s3',
bucket='my-bucket',
path='data/',
aws_key_id='AKIAIOSFODNN7EXAMPLE',
aws_secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
region='us-east-1',
comment='Production data bucket'
)
# Create S3-compatible MinIO stage
client.stage.create_s3(
name='minio_stage',
bucket='my-bucket',
endpoint='http://localhost:9000',
aws_key_id='minioadmin',
aws_secret_key='minioadmin',
comment='Local MinIO instance'
)
client.disconnect()
Local Filesystem Stage
from matrixone import Client
client = Client()
client.connect(database='test')
# Create local filesystem stage
client.stage.create_local(
name='local_imports',
path='/data/imports/',
comment='Local data import directory'
)
client.disconnect()
Stage Management Operations
List, Get, Alter, and Drop Stages
from matrixone import Client
client = Client()
client.connect(database='test')
# List all stages
stages = client.stage.list()
for stage in stages:
print(f"Stage: {stage.name}")
print(f" URL: {stage.url}")
print(f" Enabled: {stage.enabled}")
print(f" Created: {stage.created_time}")
# Get specific stage details
stage = client.stage.get('production_s3')
print(f"Stage URL: {stage.url}")
print(f"Comment: {stage.comment}")
# Alter stage (update credentials or URL)
client.stage.alter(
'production_s3',
credentials={
'AWS_KEY_ID': 'new_key_id',
'AWS_SECRET_KEY': 'new_secret_key'
}
)
# Disable stage temporarily
client.stage.alter('production_s3', enable=False)
# Re-enable stage
client.stage.alter('production_s3', enable=True)
# Drop stage when no longer needed
client.stage.drop('old_stage')
client.disconnect()
Loading Data from Stages with ORM Models (Recommended)
Use ORM models for type-safe data loading from stages:
Load from S3 Stage
from matrixone import Client
from matrixone.orm import Base, Column, Integer, String, Float
from sqlalchemy import select
# Define ORM model
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
age = Column(Integer)
client = Client()
client.connect(database='test')
# Create table
client.create_table(User)
# Load data from S3 stage using ORM model (recommended)
client.load_data.read_csv_stage('production_s3', 'users.csv', table=User)
# Verify data loaded
stmt = select(User)
result = client.execute(stmt)
for user in result.scalars():
print(f"User: {user.name}, Age: {user.age}")
client.disconnect()
Load from Local Stage
from matrixone import Client
client = Client()
client.connect(database='test')
# Load from local filesystem stage
client.load_data.read_csv_stage('local_imports', 'orders.csv', table=Order)
# Load with custom options
client.load_data.read_csv_stage(
'local_imports',
'products.csv',
Product,
ignore_lines=1, # Skip header
delimiter=','
)
client.disconnect()
Transactional Stage Operations (Recommended)
Use session() for atomic stage operations:
Atomic Stage Creation and Data Loading
from matrixone import Client
from matrixone.orm import Base, Column, Integer, String
from sqlalchemy import insert, select
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer)
amount = Column(Float)
client = Client()
client.connect(database='test')
# Atomic multi-stage operations
with client.session() as session:
# Create stages
session.stage.create_local('import_stage', '/data/imports/')
session.stage.create_s3('export_stage', 'backup-bucket', 'exports/', 'key', 'secret')
# Load data from stages atomically
session.load_data.read_csv_stage('import_stage', 'users.csv', table=User)
session.load_data.read_csv_stage('import_stage', 'orders.csv', table=Order)
# Insert additional data in same transaction
session.execute(insert(User).values(name='Admin', email='admin@example.com'))
# All operations commit together or rollback on error
client.disconnect()
Complex Transaction with Multiple Stages
from matrixone import Client, SnapshotLevel
from sqlalchemy import select, func
client = Client()
client.connect(database='test')
# Complex atomic operation
with client.session() as session:
# Create import stage
session.stage.create_local('daily_import', '/data/daily/')
# Load data
session.load_data.read_csv_stage('daily_import', 'users.csv', table=User)
session.load_data.read_csv_stage('daily_import', 'orders.csv', table=Order)
# Verify data loaded correctly
stmt = select(func.count(User.id))
user_count = session.execute(stmt).scalar()
stmt = select(func.count(Order.id))
order_count = session.execute(stmt).scalar()
print(f"Loaded {user_count} users and {order_count} orders")
# Create snapshot after successful load
session.snapshots.create(
name='post_import_snapshot',
level=SnapshotLevel.DATABASE,
database='test'
)
# All operations succeed or fail together
client.disconnect()
Async Stage Operations
Full async/await support for non-blocking stage management:
Async Stage Creation and Loading
import asyncio
from matrixone import AsyncClient
from sqlalchemy import select
async def async_stage_example():
client = AsyncClient()
await client.connect(database='test')
# Create S3 stage asynchronously
await client.stage.create_s3(
name='async_s3',
bucket='my-bucket',
path='data/',
aws_key_id='key',
aws_secret_key='secret'
)
# Load data asynchronously
await client.load_data.read_csv_stage('async_s3', 'users.csv', table=User)
# Query loaded data
stmt = select(User).where(User.age > 25)
result = await client.execute(stmt)
users = result.scalars().all()
await client.disconnect()
asyncio.run(async_stage_example())
Concurrent Async Operations
import asyncio
from matrixone import AsyncClient
async def concurrent_stage_ops():
client = AsyncClient()
await client.connect(database='test')
# Create multiple stages concurrently
await asyncio.gather(
client.stage.create_local('stage1', '/data1/'),
client.stage.create_local('stage2', '/data2/'),
client.stage.create_s3('stage3', 'bucket3', 'path/', 'key', 'secret')
)
# Load from multiple stages concurrently
await asyncio.gather(
client.load_data.read_csv_stage('stage1', 'users.csv', table=User),
client.load_data.read_csv_stage('stage2', 'orders.csv', table=Order),
client.load_data.read_csv_stage('stage3', 'products.csv', table=Product)
)
await client.disconnect()
asyncio.run(concurrent_stage_ops())
Async Transaction with Stages
import asyncio
from matrixone import AsyncClient
from sqlalchemy import insert, select
async def async_transaction_example():
client = AsyncClient()
await client.connect(database='test')
# Async transaction
async with client.session() as session:
# Create stage
await session.stage.create_local('import_stage', '/data/')
# Load data
await session.load_data.read_csv_stage('import_stage', 'users.csv', table=User)
# Insert additional data
await session.execute(insert(User).values(name='Admin', email='admin@example.com'))
# Query within transaction
stmt = select(User)
result = await session.execute(stmt)
users = result.scalars().all()
# All operations commit atomically
await client.disconnect()
asyncio.run(async_transaction_example())
Best Practices
Use Simple Interfaces
Prefer
create_s3()andcreate_local()over genericcreate()Use ORM Models
Use ORM models for type-safe data loading
Use Sessions for Transactions
Use
session()for atomic multi-stage operationsSecure Credentials
Store credentials in environment variables or secrets management
Test Stage Connectivity
Test stage access before production use
Monitor Load Operations
Track load performance and error rates
Common Use Cases
ETL Pipelines
with client.session() as session:
# Extract from S3
session.stage.create_s3('source_stage', 'data-lake', 'raw/', 'key', 'secret')
session.load_data.read_csv_stage('source_stage', 'data.csv', table=RawData)
# Transform
session.execute(
insert(ProcessedData).from_select(
['id', 'value'],
select(RawData.id, func.upper(RawData.value))
)
)
# Load complete - atomic commit
Backup and Restore
with client.session() as session:
# Create backup stage
session.stage.create_s3('backup_stage', 'backups', 'daily/', 'key', 'secret')
# Create snapshot
session.snapshots.create(name='daily_backup', level=SnapshotLevel.DATABASE, database='prod')
# Both operations atomic
Multi-Source Data Integration
with client.session() as session:
# Multiple sources
session.stage.create_s3('source_a', 'bucket-a', 'data/', 'key', 'secret')
session.stage.create_local('source_b', '/local/data/')
# Load from all sources atomically
session.load_data.read_csv_stage('source_a', 'users.csv', table=User)
session.load_data.read_csv_stage('source_b', 'orders.csv', table=Order)
See Also
Data Loading Guide - Data loading operations
Snapshot and Restore Guide - Snapshot and restore operations
Quick Start - Quick start guide