Stage Manager
StageManager
- class matrixone.stage.StageManager(client, executor=None)[source]
Bases:
BaseStageManagerSynchronous stage manager for MatrixOne external storage operations.
A Stage is a named external storage location (like S3, filesystem, or cloud storage) that stores data files. Stages provide centralized, secure, and reusable access to external data sources for data loading and export operations.
Key Features:
External storage integration: Connect to S3, filesystem, and cloud storage
Credential management: Securely store and manage access credentials
Reusable connections: Create once, use multiple times
Centralized configuration: Single point of configuration for data sources
Stage hierarchy: Support for sub-stages and parent-child relationships
Transaction-aware: Full integration with transaction contexts
Enabled/disabled control: Enable or disable stages as needed
Key Concepts:
What is a Stage?
A stage is a pointer to an external storage location where: - Data files are stored (CSV, JSONLINE, Parquet, etc.) - Files can be loaded into tables - Query results can be exported - Access credentials are securely managed
Stage Types:
File System Stage: Local or network filesystem >>> CREATE STAGE fs_stage URL=’file:///data/files/’
S3 Stage: Amazon S3 or S3-compatible storage (MinIO, etc.) >>> CREATE STAGE s3_stage URL=’s3://bucket/path/’ … CREDENTIALS={‘AWS_KEY_ID’=’xxx’, ‘AWS_SECRET_KEY’=’xxx’}
Sub-Stage: Stage based on another stage >>> CREATE STAGE sub_stage URL=’stage://parent_stage/subdir/’
Why Use Stages?
Centralized: One location for all data files
Reusable: Create once, use for multiple tables
Secure: Credentials managed at stage level, not in SQL
Efficient: Direct loading from cloud storage without intermediate copies
Organized: Logical grouping of related data files
Portable: Easy to move between environments (dev, staging, prod)
Executor Pattern:
If executor is None, uses self.client.execute (default client-level executor)
If executor is provided (e.g., session), uses executor.execute (transaction-aware)
All operations can participate in transactions when used via session
Typical Workflow:
Create a stage pointing to your data location: >>> stage_mgr.create(‘my_stage’, ‘file:///data/’)
Upload files to that location (outside MatrixOne)
Load data from stage: >>> client.load_data.from_stage_csv(‘my_stage’, ‘users.csv’, User)
Reuse the same stage for other files: >>> client.load_data.from_stage_csv(‘my_stage’, ‘orders.csv’, Order)
Usage Examples:
from matrixone import Client client = Client(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # Create Stages Using Simple Interfaces # ======================================== # Create local filesystem stage (simple interface) client.stage.create_local('local_data', '/data/imports/') # Create S3 stage with simple interface client.stage.create_s3( name='production_s3', bucket='my-bucket', path='data/', aws_key_id='AKIAIOSFODNN7EXAMPLE', aws_secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', region='us-east-1', comment='Production data bucket' ) # Create S3-compatible MinIO stage client.stage.create_s3( name='minio_stage', bucket='my-bucket', endpoint='http://localhost:9000', aws_key_id='minioadmin', aws_secret_key='minioadmin' ) # Create sub-stage for organized data (using generic create) client.stage.create( 'sales_data', 'stage://production_s3/sales/', comment='Sales department data files' ) # ======================================== # Stage Management Operations # ======================================== # List all stages stages = client.stage.list() for stage in stages: print(f"{stage.name}: {stage.url} (enabled: {stage.enabled})") # Get specific stage details stage = client.stage.get('production_s3') print(f"Stage URL: {stage.url}") print(f"Created: {stage.created_time}") # Alter stage (update URL or credentials) client.stage.alter( 'production_s3', url='s3://new-bucket/data/', credentials={'AWS_KEY_ID': 'new_key_id'} ) # Disable stage temporarily client.stage.alter('production_s3', enable=False) # Re-enable stage client.stage.alter('production_s3', enable=True) # Drop stage when no longer needed client.stage.drop('old_stage') # ======================================== # Load Data from Stages # ======================================== # Load from stage using ORM model client.load_data.from_stage_csv('production_s3', 'users.csv', User) # Load from local stage client.load_data.from_stage_csv('local_data', 'orders.csv', Order) # ======================================== # Transactional Stage Operations # ======================================== # Using within a transaction (all operations atomic) with client.session() as session: # Create multiple stages atomically using simple interfaces session.stage.create_local('stage1', '/data1/') session.stage.create_s3('stage2', 'bucket2', 'path/', 'key', 'secret') session.stage.create_local('stage3', '/data3/')
Common Use Cases:
Cloud data lakes: Access data stored in S3 or cloud storage
ETL pipelines: Centralized configuration for data sources
Multi-environment: Separate stages for dev, staging, production
Data warehousing: Organize source data by department or domain
Secure access: Manage credentials separately from SQL code
See also
LoadDataManager: For loading data from stages
AsyncStageManager: For async stage operations
Client.load_data: For data loading operations
- __init__(client, executor=None)[source]
Initialize StageManager.
- Parameters:
client – Client object that provides execute() method
executor – Optional executor (e.g., session) for executing SQL. If None, uses client.execute
- create(name: str, url: str, credentials: Dict[str, str] | None = None, comment: str | None = None, if_not_exists: bool = False) Stage[source]
Create a new external stage.
- Parameters:
name (str) – Stage name
url (str) –
Storage location URL. Supported protocols:
file:///path/to/data/ - Local or network filesystem
s3://bucket/path/ - Amazon S3
stage://parent_stage/subdir/ - Sub-stage
credentials (dict, optional) –
Authentication credentials. For S3/cloud storage:
{ 'AWS_KEY_ID': 'your_key_id', 'AWS_SECRET_KEY': 'your_secret_key', 'AWS_REGION': 'us-east-1', 'PROVIDER': 'minio' }
comment (str, optional) – Stage description
if_not_exists (bool) – If True, don’t error if stage already exists
- Returns:
Created stage object
- Return type:
Stage
Examples
>>> # File system stage >>> stage = client.stage.create( ... 'local_data', ... 'file:///var/data/imports/' ... )
>>> # S3 stage with credentials >>> stage = client.stage.create( ... 'production_data', ... 's3://my-bucket/data/', ... credentials={ 'AWS_KEY_ID': 'AKIAIOSFODNN7EXAMPLE', ... 'AWS_SECRET_KEY': 'wJalrXUtnFEMI/...', ... 'AWS_REGION': 'us-east-1' ... }, ... comment='Production data stage' ... )
>>> # Sub-stage (stage based on another stage) >>> stage = client.stage.create( ... 'archive_2024', ... 'stage://production_data/archive/2024/' ... )
- alter(name: str, url: str | None = None, credentials: Dict[str, str] | None = None, comment: str | None = None, enable: bool | None = None, if_exists: bool = False) Stage[source]
Modify an existing stage.
You can update URL, credentials, comment, or enable/disable the stage. At least one of url, credentials, comment, or enable must be provided.
- Parameters:
name (str) – Stage name to modify
url (str, optional) – New storage location URL
credentials (dict, optional) – New authentication credentials
comment (str, optional) – New comment/description
enable (bool, optional) – Enable (True) or disable (False) the stage
if_exists (bool) – If True, don’t error if stage doesn’t exist
- Returns:
Updated stage object
- Return type:
Stage
Examples
>>> # Update URL >>> client.stage.alter('my_stage', url='s3://new-bucket/data/')
>>> # Update credentials >>> client.stage.alter( ... 's3_stage', ... credentials={'AWS_KEY_ID': 'new_key'} ... )
>>> # Update comment >>> client.stage.alter('my_stage', comment='Updated description')
>>> # Disable stage >>> client.stage.alter('my_stage', enable=False)
>>> # Update multiple properties >>> client.stage.alter( ... 'my_stage', ... url='s3://new-bucket/', ... comment='Migrated to new bucket', ... enable=True ... )
- drop(name: str, if_exists: bool = False) None[source]
Delete an external stage.
- Parameters:
Examples
>>> # Drop stage >>> client.stage.drop('my_stage')
>>> # Drop with IF EXISTS >>> client.stage.drop('my_stage', if_exists=True)
- show(like_pattern: str | None = None) List[Stage][source]
Show stages with optional pattern matching.
- Parameters:
like_pattern (str, optional) – SQL LIKE pattern for filtering stages Example: ‘my_stage%’, ‘%prod%’
- Returns:
List of matching stages
- Return type:
List[Stage]
Examples
>>> # Show all stages >>> stages = client.stage.show()
>>> # Show stages matching pattern >>> stages = client.stage.show(like_pattern='prod%') >>> for stage in stages: ... print(f"{stage.name}: {stage.url}")
- list() List[Stage][source]
List all stages by querying mo_catalog.mo_stages system table.
- Returns:
List of all stages
- Return type:
List[Stage]
Examples
>>> # Get all stages >>> stages = client.stage.list() >>> for stage in stages: ... print(f"{stage.name}: {stage.url}")
- get(name: str) Stage[source]
Get details of a specific stage.
- Parameters:
name (str) – Stage name
- Returns:
Stage object with details
- Return type:
Stage
- Raises:
ValueError – If stage not found
Examples
>>> stage = client.stage.get('my_stage') >>> print(f"URL: {stage.url}") >>> print(f"Status: {stage.status}")
- exists(name: str) bool[source]
Check if a stage exists.
- Parameters:
name (str) – Stage name
- Returns:
True if stage exists, False otherwise
- Return type:
Examples
>>> if client.stage.exists('my_stage'): ... print("Stage exists") ... else: ... print("Stage does not exist")
- create_local(name: str, path: str, comment: str | None = None, if_not_exists: bool = False) Stage[source]
Create a local file system stage with simplified path handling.
- Parameters:
name – Stage name
path – Local file system path (supports ~, relative, absolute paths)
comment – Optional comment
if_not_exists – Create only if not exists
- Returns:
Created Stage object
Examples
>>> # Relative path >>> stage = client.stage.create_local('data', './data/')
>>> # User home directory >>> stage = client.stage.create_local('home', '~/data/')
>>> # Absolute path >>> stage = client.stage.create_local('data', '/var/data/')
- create_s3(name: str, bucket: str, path: str = '', aws_key_id: str | None = None, aws_secret_key: str | None = None, aws_region: str | None = None, endpoint: str | None = None, comment: str | None = None, if_not_exists: bool = False) Stage[source]
Create an S3 stage with simplified AWS credentials.
- Parameters:
name – Stage name
bucket – S3 bucket name (e.g., ‘my-bucket’)
path – Path within bucket (default: “”)
aws_key_id – AWS access key ID (optional, can use environment variables)
aws_secret_key – AWS secret key (optional, can use environment variables)
aws_region – AWS region (optional)
endpoint – S3 endpoint URL for S3-compatible services like MinIO
comment – Optional comment
if_not_exists – Create only if not exists
- Returns:
Created Stage object
Examples
>>> # Using explicit credentials >>> stage = client.stage.create_s3( ... 'prod', ... bucket='my-bucket', ... path='data/', ... aws_key_id='AKIA...', ... aws_secret_key='...' ... )
>>> # MinIO (S3-compatible) >>> stage = client.stage.create_s3( ... 'minio', ... bucket='my-bucket', ... endpoint='http://localhost:9000', ... aws_key_id='minioadmin', ... aws_secret_key='minioadmin' ... )
AsyncStageManager
- class matrixone.stage.AsyncStageManager(client, executor=None)[source]
Bases:
BaseStageManagerAsynchronous stage manager for MatrixOne external storage operations.
Provides the same comprehensive stage management functionality as StageManager but with full async/await support for non-blocking I/O operations. Ideal for high-concurrency applications and async web frameworks requiring stage management.
Key Features:
Non-blocking operations: All stage operations use async/await
Async stage management: Create, alter, drop stages asynchronously
Concurrent operations: Manage multiple stages concurrently
Async external storage: Non-blocking S3 and cloud storage configuration
Transaction-aware: Full integration with async transaction contexts
Executor pattern: Works with both async client and async session
Executor Pattern:
If executor is None, uses self.client.execute (default async client-level executor)
If executor is provided (e.g., async session), uses executor.execute (async transaction-aware)
All operations are non-blocking and use async/await
Enables concurrent stage management operations
Usage Examples:
from matrixone import AsyncClient from matrixone.orm import Column, Integer, String, Float, Base import asyncio # Define ORM models for data loading class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) async def main(): client = AsyncClient() await client.connect(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # Create Stages Using Simple Interfaces # ======================================== # Create local filesystem stage (simple interface - recommended) stage = await client.stage.create_local('local_data', '/data/imports/') print(f"Local stage created: {stage.name}") # Create S3 stage using simple interface (recommended) s3_stage = await client.stage.create_s3( name='production_s3', bucket='my-bucket', path='data/', aws_key_id='AKIAIOSFODNN7EXAMPLE', aws_secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', region='us-east-1', comment='Production data bucket' ) # Create MinIO stage (S3-compatible) minio_stage = await client.stage.create_s3( name='minio_stage', bucket='my-bucket', endpoint='http://localhost:9000', aws_key_id='minioadmin', aws_secret_key='minioadmin' ) # Create sub-stage for organized data await client.stage.create( 'sales_data', 'stage://production_s3/sales/', comment='Sales department data' ) # ======================================== # Stage Management Operations # ======================================== # List all stages asynchronously stages = await client.stage.list() for stage in stages: print(f"{stage.name}: {stage.url}") # Get specific stage details stage = await client.stage.get('production_s3') print(f"Stage URL: {stage.url}") # Alter stage (update credentials) await client.stage.alter( 'production_s3', credentials={'AWS_KEY_ID': 'new_key_id'} ) # Disable/enable stage await client.stage.alter('production_s3', enable=False) await client.stage.alter('production_s3', enable=True) # ======================================== # Concurrent Stage Creation # ======================================== # Create multiple stages concurrently using simple interfaces await asyncio.gather( client.stage.create_local('stage1', '/data1/'), client.stage.create_local('stage2', '/data2/'), client.stage.create_s3('stage3', 'bucket3', 'path/', 'key', 'secret') ) # Concurrent operations stages, stage_details = await asyncio.gather( client.stage.list(), client.stage.get('production_s3') ) # ======================================== # Load Data from Stages (ORM Style) # ======================================== # Load from S3 stage using ORM model await client.load_data.from_stage_csv('production_s3', 'users.csv', User) # Drop stage when no longer needed await client.stage.drop('old_stage') # ======================================== # Transactional Stage Operations # ======================================== # Using within async transaction async with client.session() as session: # Create multiple stages atomically using simple interfaces await session.stage.create_local('stage1', '/data1/') await session.stage.create_s3('stage2', 'bucket2', '', 'key', 'secret') # Both stages created atomically await client.disconnect() asyncio.run(main())
Performance Benefits:
Non-blocking I/O: Don’t block the event loop during stage operations
Concurrent management: Create and configure multiple stages simultaneously
Async web frameworks: Perfect for FastAPI, aiohttp, Sanic, etc.
High concurrency: Handle many stage operations concurrently
Use Cases:
Async ETL configuration: Non-blocking stage setup for data pipelines
Dynamic stage management: Create/modify stages in response to events
Multi-cloud setup: Configure stages for multiple cloud providers concurrently
High-throughput applications: Stage management without blocking
Real-time data ingestion: Configure stages on-the-fly
See also
AsyncClient: For async database operations
AsyncLoadDataManager: For async data loading from stages
StageManager: For synchronous stage operations
- __init__(client, executor=None)[source]
Initialize AsyncStageManager.
- Parameters:
client – Async client object
executor – Optional executor (e.g., async session) for executing SQL. If None, uses client.execute
- async create(name: str, url: str, credentials: Dict[str, str] | None = None, comment: str | None = None, if_not_exists: bool = False) Stage[source]
Create a stage asynchronously
- async alter(name: str, url: str | None = None, credentials: Dict[str, str] | None = None, comment: str | None = None, enable: bool | None = None, if_exists: bool = False) Stage[source]
Alter a stage asynchronously
- async show(like_pattern: str | None = None) List[Stage][source]
Show stages with optional pattern matching (async)
- async create_local(name: str, path: str, comment: str | None = None, if_not_exists: bool = False) Stage[source]
Create a local file system stage (async)
- async create_s3(name: str, bucket: str, path: str = '', aws_key_id: str | None = None, aws_secret_key: str | None = None, aws_region: str | None = None, endpoint: str | None = None, comment: str | None = None, if_not_exists: bool = False) Stage[source]
Create an S3 stage asynchronously.
- Parameters:
name – Stage name
bucket – S3 bucket name
path – Path within bucket (default: “”)
aws_key_id – AWS access key ID
aws_secret_key – AWS secret key
aws_region – AWS region
endpoint – S3 endpoint URL for S3-compatible services like MinIO
comment – Optional comment
if_not_exists – Create only if not exists
- Returns:
Created Stage object
Stage
- class matrixone.stage.Stage(name: str, url: str, credentials: Dict[str, str] | None = None, status: str | None = None, comment: str | None = None, created_time: datetime | None = None, modified_time: datetime | None = None, _client: Any | None = None)[source]
Bases:
objectRepresents a MatrixOne external stage.
A stage is a named external storage location (filesystem, S3, cloud storage) that can be used to load data from or export data to.
- name
Stage name
- Type:
- status
Stage status (enabled/disabled)
- Type:
str | None
- comment
Optional description
- Type:
str | None
- created_time
When the stage was created
- Type:
datetime.datetime | None
- modified_time
When the stage was last modified
- Type:
datetime.datetime | None
- name: str
- url: str
- load_csv(filepath: str, table: Any, **kwargs) Any[source]
Load CSV file from this stage (pandas-style).
- Parameters:
filepath – File path relative to stage location
table – SQLAlchemy table model or table name
**kwargs – Additional load options (sep, skiprows, names, etc.)
- Returns:
ResultSet with load operation results
Examples
>>> stage = client.stage.get('my_stage') >>> result = stage.load_csv('users.csv', User) >>> print(f"Loaded {result.affected_rows} rows")
>>> # With pandas-style options >>> result = stage.load_csv('data.csv', User, sep='|', skiprows=1)
- load_tsv(filepath: str, table: Any, **kwargs) Any[source]
Load TSV file from this stage (pandas-style).
- Parameters:
filepath – File path relative to stage location
table – SQLAlchemy table model or table name
**kwargs – Additional load options
- Returns:
ResultSet with load operation results
- load_json(filepath: str, table: Any, **kwargs) Any[source]
Load JSON Lines file from this stage (pandas-style).
- Parameters:
filepath – File path relative to stage location
table – SQLAlchemy table model or table name
**kwargs – Additional load options (orient, compression, etc.)
- Returns:
ResultSet with load operation results
- load_parquet(filepath: str, table: Any, **kwargs) Any[source]
Load Parquet file from this stage (pandas-style).
- Parameters:
filepath – File path relative to stage location
table – SQLAlchemy table model or table name
**kwargs – Additional load options
- Returns:
ResultSet with load operation results
- load_files(file_mappings: Dict[str, Any], **kwargs) Dict[str, Any][source]
Load multiple files from this stage in batch.
- Parameters:
file_mappings – Dictionary mapping file paths to table models
**kwargs – Additional parameters for load operations
- Returns:
Dictionary mapping file paths to ResultSet objects
Examples
>>> stage = client.stage.get('my_stage') >>> results = stage.load_files({ ... 'users.csv': User, ... 'events.jsonl': Event, ... 'orders.parq': Order ... }) >>> print(f"Loaded {results['users.csv'].affected_rows} users")
- export_to_csv(filename: str, query: Any, **kwargs) Any[source]
Export query results to CSV file in this stage (pandas-style).
- Parameters:
filename – Filename to create in the stage
query – SELECT query (str, SQLAlchemy select(), or MatrixOneQuery)
**kwargs – Additional export options (sep, quotechar, lineterminator, header)
- Returns:
ResultSet with export operation results
Examples
>>> stage = client.stage.get('s3_backup_stage') >>> stage.export_to_csv('orders.csv', "SELECT * FROM orders")
>>> # With pandas-style options >>> stage.export_to_csv('data.csv', query, sep='|', header=True)
- export_to_jsonl(filename: str, query: Any, **kwargs) Any[source]
Export query results to JSONL file in this stage (pandas-style).
- Parameters:
filename – Filename to create in the stage
query – SELECT query (str, SQLAlchemy select(), or MatrixOneQuery)
**kwargs – Additional export options
- Returns:
ResultSet with export operation results
Examples
>>> stage = client.stage.get('s3_backup_stage') >>> stage.export_to_jsonl('events.jsonl', "SELECT * FROM events")