Stage Management Guide

This guide covers external stage management in MatrixOne, including creating stages, loading data from stages, and using stages in transactions.

Overview

Stages provide centralized configuration for external data sources, making it easy to load data from:

  • S3 and S3-compatible storage (AWS S3, MinIO, etc.)

  • Local filesystem paths

  • Cloud storage services

Stages enable:

  • Centralized data source configuration - define once, use everywhere

  • Simplified data loading - load data with simple stage references

  • Transaction support - atomic stage operations with session()

  • Security - store credentials separately from queries

Stage Management Operations

List, Get, Alter, and Drop Stages

from matrixone import Client

client = Client()
client.connect(database='test')

# List all stages
stages = client.stage.list()
for stage in stages:
    print(f"Stage: {stage.name}")
    print(f"  URL: {stage.url}")
    print(f"  Enabled: {stage.enabled}")
    print(f"  Created: {stage.created_time}")

# Get specific stage details
stage = client.stage.get('production_s3')
print(f"Stage URL: {stage.url}")
print(f"Comment: {stage.comment}")

# Alter stage (update credentials or URL)
client.stage.alter(
    'production_s3',
    credentials={
        'AWS_KEY_ID': 'new_key_id',
        'AWS_SECRET_KEY': 'new_secret_key'
    }
)

# Disable stage temporarily
client.stage.alter('production_s3', enable=False)

# Re-enable stage
client.stage.alter('production_s3', enable=True)

# Drop stage when no longer needed
client.stage.drop('old_stage')

client.disconnect()

Async Stage Operations

Full async/await support for non-blocking stage management:

Async Stage Creation and Loading

import asyncio
from matrixone import AsyncClient
from sqlalchemy import select

async def async_stage_example():
    client = AsyncClient()
    await client.connect(database='test')

    # Create S3 stage asynchronously
    await client.stage.create_s3(
        name='async_s3',
        bucket='my-bucket',
        path='data/',
        aws_key_id='key',
        aws_secret_key='secret'
    )

    # Load data asynchronously
    await client.load_data.read_csv_stage('async_s3', 'users.csv', table=User)

    # Query loaded data
    stmt = select(User).where(User.age > 25)
    result = await client.execute(stmt)
    users = result.scalars().all()

    await client.disconnect()

asyncio.run(async_stage_example())

Concurrent Async Operations

import asyncio
from matrixone import AsyncClient

async def concurrent_stage_ops():
    client = AsyncClient()
    await client.connect(database='test')

    # Create multiple stages concurrently
    await asyncio.gather(
        client.stage.create_local('stage1', '/data1/'),
        client.stage.create_local('stage2', '/data2/'),
        client.stage.create_s3('stage3', 'bucket3', 'path/', 'key', 'secret')
    )

    # Load from multiple stages concurrently
    await asyncio.gather(
        client.load_data.read_csv_stage('stage1', 'users.csv', table=User),
        client.load_data.read_csv_stage('stage2', 'orders.csv', table=Order),
        client.load_data.read_csv_stage('stage3', 'products.csv', table=Product)
    )

    await client.disconnect()

asyncio.run(concurrent_stage_ops())

Async Transaction with Stages

import asyncio
from matrixone import AsyncClient
from sqlalchemy import insert, select

async def async_transaction_example():
    client = AsyncClient()
    await client.connect(database='test')

    # Async transaction
    async with client.session() as session:
        # Create stage
        await session.stage.create_local('import_stage', '/data/')

        # Load data
        await session.load_data.read_csv_stage('import_stage', 'users.csv', table=User)

        # Insert additional data
        await session.execute(insert(User).values(name='Admin', email='admin@example.com'))

        # Query within transaction
        stmt = select(User)
        result = await session.execute(stmt)
        users = result.scalars().all()

        # All operations commit atomically

    await client.disconnect()

asyncio.run(async_transaction_example())

Best Practices

  1. Use Simple Interfaces

    Prefer create_s3() and create_local() over generic create()

  2. Use ORM Models

    Use ORM models for type-safe data loading

  3. Use Sessions for Transactions

    Use session() for atomic multi-stage operations

  4. Secure Credentials

    Store credentials in environment variables or secrets management

  5. Test Stage Connectivity

    Test stage access before production use

  6. Monitor Load Operations

    Track load performance and error rates

Common Use Cases

ETL Pipelines

with client.session() as session:
    # Extract from S3
    session.stage.create_s3('source_stage', 'data-lake', 'raw/', 'key', 'secret')
    session.load_data.read_csv_stage('source_stage', 'data.csv', table=RawData)

    # Transform
    session.execute(
        insert(ProcessedData).from_select(
            ['id', 'value'],
            select(RawData.id, func.upper(RawData.value))
        )
    )

    # Load complete - atomic commit

Backup and Restore

with client.session() as session:
    # Create backup stage
    session.stage.create_s3('backup_stage', 'backups', 'daily/', 'key', 'secret')

    # Create snapshot
    session.snapshots.create(name='daily_backup', level=SnapshotLevel.DATABASE, database='prod')

    # Both operations atomic

Multi-Source Data Integration

with client.session() as session:
    # Multiple sources
    session.stage.create_s3('source_a', 'bucket-a', 'data/', 'key', 'secret')
    session.stage.create_local('source_b', '/local/data/')

    # Load from all sources atomically
    session.load_data.read_csv_stage('source_a', 'users.csv', table=User)
    session.load_data.read_csv_stage('source_b', 'orders.csv', table=Order)

See Also