Stage Manager

StageManager

class matrixone.stage.StageManager(client, executor=None)[source]

Bases: BaseStageManager

Synchronous stage manager for MatrixOne external storage operations.

A Stage is a named external storage location (like S3, filesystem, or cloud storage) that stores data files. Stages provide centralized, secure, and reusable access to external data sources for data loading and export operations.

Key Features:

  • External storage integration: Connect to S3, filesystem, and cloud storage

  • Credential management: Securely store and manage access credentials

  • Reusable connections: Create once, use multiple times

  • Centralized configuration: Single point of configuration for data sources

  • Stage hierarchy: Support for sub-stages and parent-child relationships

  • Transaction-aware: Full integration with transaction contexts

  • Enabled/disabled control: Enable or disable stages as needed

Key Concepts:

What is a Stage?

A stage is a pointer to an external storage location where: - Data files are stored (CSV, JSONLINE, Parquet, etc.) - Files can be loaded into tables - Query results can be exported - Access credentials are securely managed

Stage Types:

  1. File System Stage: Local or network filesystem >>> CREATE STAGE fs_stage URL=’file:///data/files/

  2. S3 Stage: Amazon S3 or S3-compatible storage (MinIO, etc.) >>> CREATE STAGE s3_stage URL=’s3://bucket/path/’ … CREDENTIALS={‘AWS_KEY_ID’=’xxx’, ‘AWS_SECRET_KEY’=’xxx’}

  3. Sub-Stage: Stage based on another stage >>> CREATE STAGE sub_stage URL=’stage://parent_stage/subdir/’

Why Use Stages?

  • Centralized: One location for all data files

  • Reusable: Create once, use for multiple tables

  • Secure: Credentials managed at stage level, not in SQL

  • Efficient: Direct loading from cloud storage without intermediate copies

  • Organized: Logical grouping of related data files

  • Portable: Easy to move between environments (dev, staging, prod)

Executor Pattern:

  • If executor is None, uses self.client.execute (default client-level executor)

  • If executor is provided (e.g., session), uses executor.execute (transaction-aware)

  • All operations can participate in transactions when used via session

Typical Workflow:

  1. Create a stage pointing to your data location: >>> stage_mgr.create(‘my_stage’, ‘file:///data/’)

  2. Upload files to that location (outside MatrixOne)

  3. Load data from stage: >>> client.load_data.from_stage_csv(‘my_stage’, ‘users.csv’, User)

  4. Reuse the same stage for other files: >>> client.load_data.from_stage_csv(‘my_stage’, ‘orders.csv’, Order)

Usage Examples:

from matrixone import Client

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# ========================================
# Create Stages Using Simple Interfaces
# ========================================

# Create local filesystem stage (simple interface)
client.stage.create_local('local_data', '/data/imports/')

# Create S3 stage with simple interface
client.stage.create_s3(
    name='production_s3',
    bucket='my-bucket',
    path='data/',
    aws_key_id='AKIAIOSFODNN7EXAMPLE',
    aws_secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
    region='us-east-1',
    comment='Production data bucket'
)

# Create S3-compatible MinIO stage
client.stage.create_s3(
    name='minio_stage',
    bucket='my-bucket',
    endpoint='http://localhost:9000',
    aws_key_id='minioadmin',
    aws_secret_key='minioadmin'
)

# Create sub-stage for organized data (using generic create)
client.stage.create(
    'sales_data',
    'stage://production_s3/sales/',
    comment='Sales department data files'
)

# ========================================
# Stage Management Operations
# ========================================

# List all stages
stages = client.stage.list()
for stage in stages:
    print(f"{stage.name}: {stage.url} (enabled: {stage.enabled})")

# Get specific stage details
stage = client.stage.get('production_s3')
print(f"Stage URL: {stage.url}")
print(f"Created: {stage.created_time}")

# Alter stage (update URL or credentials)
client.stage.alter(
    'production_s3',
    url='s3://new-bucket/data/',
    credentials={'AWS_KEY_ID': 'new_key_id'}
)

# Disable stage temporarily
client.stage.alter('production_s3', enable=False)

# Re-enable stage
client.stage.alter('production_s3', enable=True)

# Drop stage when no longer needed
client.stage.drop('old_stage')

# ========================================
# Load Data from Stages
# ========================================

# Load from stage using ORM model
client.load_data.from_stage_csv('production_s3', 'users.csv', User)

# Load from local stage
client.load_data.from_stage_csv('local_data', 'orders.csv', Order)

# ========================================
# Transactional Stage Operations
# ========================================

# Using within a transaction (all operations atomic)
with client.session() as session:
    # Create multiple stages atomically using simple interfaces
    session.stage.create_local('stage1', '/data1/')
    session.stage.create_s3('stage2', 'bucket2', 'path/', 'key', 'secret')
    session.stage.create_local('stage3', '/data3/')

Common Use Cases:

  • Cloud data lakes: Access data stored in S3 or cloud storage

  • ETL pipelines: Centralized configuration for data sources

  • Multi-environment: Separate stages for dev, staging, production

  • Data warehousing: Organize source data by department or domain

  • Secure access: Manage credentials separately from SQL code

See also

  • LoadDataManager: For loading data from stages

  • AsyncStageManager: For async stage operations

  • Client.load_data: For data loading operations

__init__(client, executor=None)[source]

Initialize StageManager.

Parameters:
  • client – Client object that provides execute() method

  • executor – Optional executor (e.g., session) for executing SQL. If None, uses client.execute

create(name: str, url: str, credentials: Dict[str, str] | None = None, comment: str | None = None, if_not_exists: bool = False) Stage[source]

Create a new external stage.

Parameters:
  • name (str) – Stage name

  • url (str) –

    Storage location URL. Supported protocols:

    • file:///path/to/data/ - Local or network filesystem

    • s3://bucket/path/ - Amazon S3

    • stage://parent_stage/subdir/ - Sub-stage

  • credentials (dict, optional) –

    Authentication credentials. For S3/cloud storage:

    {
        'AWS_KEY_ID': 'your_key_id',
        'AWS_SECRET_KEY': 'your_secret_key',
        'AWS_REGION': 'us-east-1',
        'PROVIDER': 'minio'
    }
    

  • comment (str, optional) – Stage description

  • if_not_exists (bool) – If True, don’t error if stage already exists

Returns:

Created stage object

Return type:

Stage

Examples

>>> # File system stage
>>> stage = client.stage.create(
...     'local_data',
...     'file:///var/data/imports/'
... )
>>> # S3 stage with credentials
>>> stage = client.stage.create(
...     'production_data',
...     's3://my-bucket/data/',
...     credentials={
        'AWS_KEY_ID': 'AKIAIOSFODNN7EXAMPLE',
...         'AWS_SECRET_KEY': 'wJalrXUtnFEMI/...',
...         'AWS_REGION': 'us-east-1'
...     },
...     comment='Production data stage'
... )
>>> # Sub-stage (stage based on another stage)
>>> stage = client.stage.create(
...     'archive_2024',
...     'stage://production_data/archive/2024/'
... )
alter(name: str, url: str | None = None, credentials: Dict[str, str] | None = None, comment: str | None = None, enable: bool | None = None, if_exists: bool = False) Stage[source]

Modify an existing stage.

You can update URL, credentials, comment, or enable/disable the stage. At least one of url, credentials, comment, or enable must be provided.

Parameters:
  • name (str) – Stage name to modify

  • url (str, optional) – New storage location URL

  • credentials (dict, optional) – New authentication credentials

  • comment (str, optional) – New comment/description

  • enable (bool, optional) – Enable (True) or disable (False) the stage

  • if_exists (bool) – If True, don’t error if stage doesn’t exist

Returns:

Updated stage object

Return type:

Stage

Examples

>>> # Update URL
>>> client.stage.alter('my_stage', url='s3://new-bucket/data/')
>>> # Update credentials
>>> client.stage.alter(
...     's3_stage',
...     credentials={'AWS_KEY_ID': 'new_key'}
... )
>>> # Update comment
>>> client.stage.alter('my_stage', comment='Updated description')
>>> # Disable stage
>>> client.stage.alter('my_stage', enable=False)
>>> # Update multiple properties
>>> client.stage.alter(
...     'my_stage',
...     url='s3://new-bucket/',
...     comment='Migrated to new bucket',
...     enable=True
... )
drop(name: str, if_exists: bool = False) None[source]

Delete an external stage.

Parameters:
  • name (str) – Stage name to delete

  • if_exists (bool) – If True, don’t error if stage doesn’t exist

Examples

>>> # Drop stage
>>> client.stage.drop('my_stage')
>>> # Drop with IF EXISTS
>>> client.stage.drop('my_stage', if_exists=True)
show(like_pattern: str | None = None) List[Stage][source]

Show stages with optional pattern matching.

Parameters:

like_pattern (str, optional) – SQL LIKE pattern for filtering stages Example: ‘my_stage%’, ‘%prod%’

Returns:

List of matching stages

Return type:

List[Stage]

Examples

>>> # Show all stages
>>> stages = client.stage.show()
>>> # Show stages matching pattern
>>> stages = client.stage.show(like_pattern='prod%')
>>> for stage in stages:
...     print(f"{stage.name}: {stage.url}")
list() List[Stage][source]

List all stages by querying mo_catalog.mo_stages system table.

Returns:

List of all stages

Return type:

List[Stage]

Examples

>>> # Get all stages
>>> stages = client.stage.list()
>>> for stage in stages:
...     print(f"{stage.name}: {stage.url}")
get(name: str) Stage[source]

Get details of a specific stage.

Parameters:

name (str) – Stage name

Returns:

Stage object with details

Return type:

Stage

Raises:

ValueError – If stage not found

Examples

>>> stage = client.stage.get('my_stage')
>>> print(f"URL: {stage.url}")
>>> print(f"Status: {stage.status}")
exists(name: str) bool[source]

Check if a stage exists.

Parameters:

name (str) – Stage name

Returns:

True if stage exists, False otherwise

Return type:

bool

Examples

>>> if client.stage.exists('my_stage'):
...     print("Stage exists")
... else:
...     print("Stage does not exist")
create_local(name: str, path: str, comment: str | None = None, if_not_exists: bool = False) Stage[source]

Create a local file system stage with simplified path handling.

Parameters:
  • name – Stage name

  • path – Local file system path (supports ~, relative, absolute paths)

  • comment – Optional comment

  • if_not_exists – Create only if not exists

Returns:

Created Stage object

Examples

>>> # Relative path
>>> stage = client.stage.create_local('data', './data/')
>>> # User home directory
>>> stage = client.stage.create_local('home', '~/data/')
>>> # Absolute path
>>> stage = client.stage.create_local('data', '/var/data/')
create_s3(name: str, bucket: str, path: str = '', aws_key_id: str | None = None, aws_secret_key: str | None = None, aws_region: str | None = None, endpoint: str | None = None, comment: str | None = None, if_not_exists: bool = False) Stage[source]

Create an S3 stage with simplified AWS credentials.

Parameters:
  • name – Stage name

  • bucket – S3 bucket name (e.g., ‘my-bucket’)

  • path – Path within bucket (default: “”)

  • aws_key_id – AWS access key ID (optional, can use environment variables)

  • aws_secret_key – AWS secret key (optional, can use environment variables)

  • aws_region – AWS region (optional)

  • endpoint – S3 endpoint URL for S3-compatible services like MinIO

  • comment – Optional comment

  • if_not_exists – Create only if not exists

Returns:

Created Stage object

Examples

>>> # Using explicit credentials
>>> stage = client.stage.create_s3(
...     'prod',
...     bucket='my-bucket',
...     path='data/',
...     aws_key_id='AKIA...',
...     aws_secret_key='...'
... )
>>> # MinIO (S3-compatible)
>>> stage = client.stage.create_s3(
...     'minio',
...     bucket='my-bucket',
...     endpoint='http://localhost:9000',
...     aws_key_id='minioadmin',
...     aws_secret_key='minioadmin'
... )

AsyncStageManager

class matrixone.stage.AsyncStageManager(client, executor=None)[source]

Bases: BaseStageManager

Asynchronous stage manager for MatrixOne external storage operations.

Provides the same comprehensive stage management functionality as StageManager but with full async/await support for non-blocking I/O operations. Ideal for high-concurrency applications and async web frameworks requiring stage management.

Key Features:

  • Non-blocking operations: All stage operations use async/await

  • Async stage management: Create, alter, drop stages asynchronously

  • Concurrent operations: Manage multiple stages concurrently

  • Async external storage: Non-blocking S3 and cloud storage configuration

  • Transaction-aware: Full integration with async transaction contexts

  • Executor pattern: Works with both async client and async session

Executor Pattern:

  • If executor is None, uses self.client.execute (default async client-level executor)

  • If executor is provided (e.g., async session), uses executor.execute (async transaction-aware)

  • All operations are non-blocking and use async/await

  • Enables concurrent stage management operations

Usage Examples:

from matrixone import AsyncClient
from matrixone.orm import Column, Integer, String, Float, Base
import asyncio

# Define ORM models for data loading
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))

async def main():
    client = AsyncClient()
    await client.connect(host='localhost', port=6001, user='root', password='111', database='test')

    # ========================================
    # Create Stages Using Simple Interfaces
    # ========================================

    # Create local filesystem stage (simple interface - recommended)
    stage = await client.stage.create_local('local_data', '/data/imports/')
    print(f"Local stage created: {stage.name}")

    # Create S3 stage using simple interface (recommended)
    s3_stage = await client.stage.create_s3(
        name='production_s3',
        bucket='my-bucket',
        path='data/',
        aws_key_id='AKIAIOSFODNN7EXAMPLE',
        aws_secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
        region='us-east-1',
        comment='Production data bucket'
    )

    # Create MinIO stage (S3-compatible)
    minio_stage = await client.stage.create_s3(
        name='minio_stage',
        bucket='my-bucket',
        endpoint='http://localhost:9000',
        aws_key_id='minioadmin',
        aws_secret_key='minioadmin'
    )

    # Create sub-stage for organized data
    await client.stage.create(
        'sales_data',
        'stage://production_s3/sales/',
        comment='Sales department data'
    )

    # ========================================
    # Stage Management Operations
    # ========================================

    # List all stages asynchronously
    stages = await client.stage.list()
    for stage in stages:
        print(f"{stage.name}: {stage.url}")

    # Get specific stage details
    stage = await client.stage.get('production_s3')
    print(f"Stage URL: {stage.url}")

    # Alter stage (update credentials)
    await client.stage.alter(
        'production_s3',
        credentials={'AWS_KEY_ID': 'new_key_id'}
    )

    # Disable/enable stage
    await client.stage.alter('production_s3', enable=False)
    await client.stage.alter('production_s3', enable=True)

    # ========================================
    # Concurrent Stage Creation
    # ========================================

    # Create multiple stages concurrently using simple interfaces
    await asyncio.gather(
        client.stage.create_local('stage1', '/data1/'),
        client.stage.create_local('stage2', '/data2/'),
        client.stage.create_s3('stage3', 'bucket3', 'path/', 'key', 'secret')
    )

    # Concurrent operations
    stages, stage_details = await asyncio.gather(
        client.stage.list(),
        client.stage.get('production_s3')
    )

    # ========================================
    # Load Data from Stages (ORM Style)
    # ========================================

    # Load from S3 stage using ORM model
    await client.load_data.from_stage_csv('production_s3', 'users.csv', User)

    # Drop stage when no longer needed
    await client.stage.drop('old_stage')

    # ========================================
    # Transactional Stage Operations
    # ========================================

    # Using within async transaction
    async with client.session() as session:
        # Create multiple stages atomically using simple interfaces
        await session.stage.create_local('stage1', '/data1/')
        await session.stage.create_s3('stage2', 'bucket2', '', 'key', 'secret')
        # Both stages created atomically

    await client.disconnect()

asyncio.run(main())

Performance Benefits:

  • Non-blocking I/O: Don’t block the event loop during stage operations

  • Concurrent management: Create and configure multiple stages simultaneously

  • Async web frameworks: Perfect for FastAPI, aiohttp, Sanic, etc.

  • High concurrency: Handle many stage operations concurrently

Use Cases:

  • Async ETL configuration: Non-blocking stage setup for data pipelines

  • Dynamic stage management: Create/modify stages in response to events

  • Multi-cloud setup: Configure stages for multiple cloud providers concurrently

  • High-throughput applications: Stage management without blocking

  • Real-time data ingestion: Configure stages on-the-fly

See also

  • AsyncClient: For async database operations

  • AsyncLoadDataManager: For async data loading from stages

  • StageManager: For synchronous stage operations

__init__(client, executor=None)[source]

Initialize AsyncStageManager.

Parameters:
  • client – Async client object

  • executor – Optional executor (e.g., async session) for executing SQL. If None, uses client.execute

async create(name: str, url: str, credentials: Dict[str, str] | None = None, comment: str | None = None, if_not_exists: bool = False) Stage[source]

Create a stage asynchronously

async drop(name: str, if_exists: bool = False) None[source]

Drop a stage asynchronously

async alter(name: str, url: str | None = None, credentials: Dict[str, str] | None = None, comment: str | None = None, enable: bool | None = None, if_exists: bool = False) Stage[source]

Alter a stage asynchronously

async list() List[Stage][source]

List all stages asynchronously

async get(name: str) Stage[source]

Get a specific stage by name asynchronously

async exists(name: str) bool[source]

Check if a stage exists asynchronously

async show(like_pattern: str | None = None) List[Stage][source]

Show stages with optional pattern matching (async)

async create_local(name: str, path: str, comment: str | None = None, if_not_exists: bool = False) Stage[source]

Create a local file system stage (async)

async create_s3(name: str, bucket: str, path: str = '', aws_key_id: str | None = None, aws_secret_key: str | None = None, aws_region: str | None = None, endpoint: str | None = None, comment: str | None = None, if_not_exists: bool = False) Stage[source]

Create an S3 stage asynchronously.

Parameters:
  • name – Stage name

  • bucket – S3 bucket name

  • path – Path within bucket (default: “”)

  • aws_key_id – AWS access key ID

  • aws_secret_key – AWS secret key

  • aws_region – AWS region

  • endpoint – S3 endpoint URL for S3-compatible services like MinIO

  • comment – Optional comment

  • if_not_exists – Create only if not exists

Returns:

Created Stage object

Stage

class matrixone.stage.Stage(name: str, url: str, credentials: Dict[str, str] | None = None, status: str | None = None, comment: str | None = None, created_time: datetime | None = None, modified_time: datetime | None = None, _client: Any | None = None)[source]

Bases: object

Represents a MatrixOne external stage.

A stage is a named external storage location (filesystem, S3, cloud storage) that can be used to load data from or export data to.

name

Stage name

Type:

str

url

Storage location URL (file://, s3://, etc.)

Type:

str

credentials

Authentication credentials (for S3, cloud storage)

Type:

Dict[str, str] | None

status

Stage status (enabled/disabled)

Type:

str | None

comment

Optional description

Type:

str | None

created_time

When the stage was created

Type:

datetime.datetime | None

modified_time

When the stage was last modified

Type:

datetime.datetime | None

name: str
url: str
credentials: Dict[str, str] | None = None
status: str | None = None
comment: str | None = None
created_time: datetime | None = None
modified_time: datetime | None = None
load_csv(filepath: str, table: Any, **kwargs) Any[source]

Load CSV file from this stage (pandas-style).

Parameters:
  • filepath – File path relative to stage location

  • table – SQLAlchemy table model or table name

  • **kwargs – Additional load options (sep, skiprows, names, etc.)

Returns:

ResultSet with load operation results

Examples

>>> stage = client.stage.get('my_stage')
>>> result = stage.load_csv('users.csv', User)
>>> print(f"Loaded {result.affected_rows} rows")
>>> # With pandas-style options
>>> result = stage.load_csv('data.csv', User, sep='|', skiprows=1)
load_tsv(filepath: str, table: Any, **kwargs) Any[source]

Load TSV file from this stage (pandas-style).

Parameters:
  • filepath – File path relative to stage location

  • table – SQLAlchemy table model or table name

  • **kwargs – Additional load options

Returns:

ResultSet with load operation results

load_json(filepath: str, table: Any, **kwargs) Any[source]

Load JSON Lines file from this stage (pandas-style).

Parameters:
  • filepath – File path relative to stage location

  • table – SQLAlchemy table model or table name

  • **kwargs – Additional load options (orient, compression, etc.)

Returns:

ResultSet with load operation results

load_parquet(filepath: str, table: Any, **kwargs) Any[source]

Load Parquet file from this stage (pandas-style).

Parameters:
  • filepath – File path relative to stage location

  • table – SQLAlchemy table model or table name

  • **kwargs – Additional load options

Returns:

ResultSet with load operation results

load_files(file_mappings: Dict[str, Any], **kwargs) Dict[str, Any][source]

Load multiple files from this stage in batch.

Parameters:
  • file_mappings – Dictionary mapping file paths to table models

  • **kwargs – Additional parameters for load operations

Returns:

Dictionary mapping file paths to ResultSet objects

Examples

>>> stage = client.stage.get('my_stage')
>>> results = stage.load_files({
...     'users.csv': User,
...     'events.jsonl': Event,
...     'orders.parq': Order
... })
>>> print(f"Loaded {results['users.csv'].affected_rows} users")
export_to_csv(filename: str, query: Any, **kwargs) Any[source]

Export query results to CSV file in this stage (pandas-style).

Parameters:
  • filename – Filename to create in the stage

  • query – SELECT query (str, SQLAlchemy select(), or MatrixOneQuery)

  • **kwargs – Additional export options (sep, quotechar, lineterminator, header)

Returns:

ResultSet with export operation results

Examples

>>> stage = client.stage.get('s3_backup_stage')
>>> stage.export_to_csv('orders.csv', "SELECT * FROM orders")
>>> # With pandas-style options
>>> stage.export_to_csv('data.csv', query, sep='|', header=True)
export_to_jsonl(filename: str, query: Any, **kwargs) Any[source]

Export query results to JSONL file in this stage (pandas-style).

Parameters:
  • filename – Filename to create in the stage

  • query – SELECT query (str, SQLAlchemy select(), or MatrixOneQuery)

  • **kwargs – Additional export options

Returns:

ResultSet with export operation results

Examples

>>> stage = client.stage.get('s3_backup_stage')
>>> stage.export_to_jsonl('events.jsonl', "SELECT * FROM events")
__init__(name: str, url: str, credentials: Dict[str, str] | None = None, status: str | None = None, comment: str | None = None, created_time: datetime | None = None, modified_time: datetime | None = None, _client: Any | None = None) None