Source code for matrixone.stage

# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Stage Management for MatrixOne

This module provides comprehensive stage management capabilities for MatrixOne,
including creating, modifying, and querying external storage locations (stages).
"""

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional


[docs] @dataclass class Stage: """ Represents a MatrixOne external stage. A stage is a named external storage location (filesystem, S3, cloud storage) that can be used to load data from or export data to. Attributes: name: Stage name url: Storage location URL (file://, s3://, etc.) credentials: Authentication credentials (for S3, cloud storage) status: Stage status (enabled/disabled) comment: Optional description created_time: When the stage was created modified_time: When the stage was last modified """ name: str url: str credentials: Optional[Dict[str, str]] = None status: Optional[str] = None comment: Optional[str] = None created_time: Optional[datetime] = None modified_time: Optional[datetime] = None _client: Optional[Any] = field(default=None, repr=False) # Reference to client for load_data def __repr__(self): return f"Stage(name='{self.name}', url='{self.url}', status='{self.status}')"
[docs] def load_csv(self, filepath: str, table: Any, **kwargs) -> Any: """ Load CSV file from this stage (pandas-style). Args: filepath: File path relative to stage location table: SQLAlchemy table model or table name **kwargs: Additional load options (sep, skiprows, names, etc.) Returns: ResultSet with load operation results Examples: >>> stage = client.stage.get('my_stage') >>> result = stage.load_csv('users.csv', User) >>> print(f"Loaded {result.affected_rows} rows") >>> # With pandas-style options >>> result = stage.load_csv('data.csv', User, sep='|', skiprows=1) """ if self._client is None: raise ValueError("Stage object must be associated with a client to load data") from .load_data import LoadDataManager return LoadDataManager(self._client).read_csv_stage(self.name, filepath, table, **kwargs)
[docs] def load_tsv(self, filepath: str, table: Any, **kwargs) -> Any: """ Load TSV file from this stage (pandas-style). Args: filepath: File path relative to stage location table: SQLAlchemy table model or table name **kwargs: Additional load options Returns: ResultSet with load operation results """ if self._client is None: raise ValueError("Stage object must be associated with a client to load data") from .load_data import LoadDataManager return LoadDataManager(self._client).read_csv_stage(self.name, filepath, table, sep='\t', **kwargs)
[docs] def load_json(self, filepath: str, table: Any, **kwargs) -> Any: """ Load JSON Lines file from this stage (pandas-style). Args: filepath: File path relative to stage location table: SQLAlchemy table model or table name **kwargs: Additional load options (orient, compression, etc.) Returns: ResultSet with load operation results """ if self._client is None: raise ValueError("Stage object must be associated with a client to load data") from .load_data import LoadDataManager return LoadDataManager(self._client).read_json_stage(self.name, filepath, table, **kwargs)
[docs] def load_parquet(self, filepath: str, table: Any, **kwargs) -> Any: """ Load Parquet file from this stage (pandas-style). Args: filepath: File path relative to stage location table: SQLAlchemy table model or table name **kwargs: Additional load options Returns: ResultSet with load operation results """ if self._client is None: raise ValueError("Stage object must be associated with a client to load data") from .load_data import LoadDataManager return LoadDataManager(self._client).read_parquet_stage(self.name, filepath, table, **kwargs)
[docs] def load_files(self, file_mappings: Dict[str, Any], **kwargs) -> Dict[str, Any]: """ Load multiple files from this stage in batch. Args: file_mappings: Dictionary mapping file paths to table models **kwargs: Additional parameters for load operations Returns: Dictionary mapping file paths to ResultSet objects Examples: >>> stage = client.stage.get('my_stage') >>> results = stage.load_files({ ... 'users.csv': User, ... 'events.jsonl': Event, ... 'orders.parq': Order ... }) >>> print(f"Loaded {results['users.csv'].affected_rows} users") """ if self._client is None: raise ValueError("Stage object must be associated with a client to load data") results = {} for filepath, table_model in file_mappings.items(): try: # Auto-detect file type by extension if filepath.endswith('.csv'): results[filepath] = self.load_csv(filepath, table_model, **kwargs) elif filepath.endswith('.tsv'): results[filepath] = self.load_tsv(filepath, table_model, **kwargs) elif filepath.endswith(('.jsonl', '.json')): results[filepath] = self.load_json(filepath, table_model, **kwargs) elif filepath.endswith(('.parquet', '.parq')): results[filepath] = self.load_parquet(filepath, table_model, **kwargs) else: # Fallback to generic file loading from .load_data import LoadDataManager results[filepath] = LoadDataManager(self._client).from_stage(self.name, filepath, table_model, **kwargs) except Exception as e: # Store error in results for debugging results[filepath] = f"Error: {e}" return results
[docs] def export_to_csv(self, filename: str, query: Any, **kwargs) -> Any: """ Export query results to CSV file in this stage (pandas-style). Args: filename: Filename to create in the stage query: SELECT query (str, SQLAlchemy select(), or MatrixOneQuery) **kwargs: Additional export options (sep, quotechar, lineterminator, header) Returns: ResultSet with export operation results Examples: >>> stage = client.stage.get('s3_backup_stage') >>> stage.export_to_csv('orders.csv', "SELECT * FROM orders") >>> # With pandas-style options >>> stage.export_to_csv('data.csv', query, sep='|', header=True) """ if self._client is None: raise ValueError("Stage object must be associated with a client to export data") from .export import ExportManager return ExportManager(self._client).to_csv_stage(self.name, filename, query, **kwargs)
[docs] def export_to_jsonl(self, filename: str, query: Any, **kwargs) -> Any: """ Export query results to JSONL file in this stage (pandas-style). Args: filename: Filename to create in the stage query: SELECT query (str, SQLAlchemy select(), or MatrixOneQuery) **kwargs: Additional export options Returns: ResultSet with export operation results Examples: >>> stage = client.stage.get('s3_backup_stage') >>> stage.export_to_jsonl('events.jsonl', "SELECT * FROM events") """ if self._client is None: raise ValueError("Stage object must be associated with a client to export data") from .export import ExportManager return ExportManager(self._client).to_jsonl_stage(self.name, filename, query, **kwargs)
class BaseStageManager: """ Base class for Stage management containing shared SQL building logic. This class contains all SQL generation methods and business logic that are common between synchronous and asynchronous implementations. Subclasses only need to implement the _execute() method. """ def __init__(self, client): """Initialize base stage manager""" self.client = client def _execute(self, sql: str): """ Execute SQL statement. Subclasses must implement this (sync or async). """ raise NotImplementedError("Subclasses must implement _execute()") def _get_executor(self): """Get the executor for SQL execution (session or client)""" raise NotImplementedError("Subclasses must implement _get_executor()") def _build_create_stage_sql( self, stage_name: str, url: str, credentials: Optional[Dict[str, str]] = None, enable: bool = True, comment: Optional[str] = None, if_not_exists: bool = False, ) -> str: """Build CREATE STAGE SQL statement""" sql_parts = ["CREATE STAGE"] if if_not_exists: sql_parts.append("IF NOT EXISTS") sql_parts.append(stage_name) # No escaping to match existing behavior sql_parts.append(f"URL='{url}'") if credentials: cred_parts = [f"'{k}'='{v}'" for k, v in credentials.items()] sql_parts.append(f"CREDENTIALS={{{', '.join(cred_parts)}}}") if comment: sql_parts.append(f"COMMENT='{comment}'") # Note: enable is not included in SQL - MatrixOne stages are enabled by default return " ".join(sql_parts) def _build_drop_stage_sql(self, stage_name: str, if_exists: bool = False) -> str: """Build DROP STAGE SQL statement""" sql_parts = ["DROP STAGE"] if if_exists: sql_parts.append("IF EXISTS") sql_parts.append(stage_name) # No escaping to match existing behavior return " ".join(sql_parts) def _build_alter_stage_sql( self, stage_name: str, set_url: Optional[str] = None, set_credentials: Optional[Dict[str, str]] = None, set_enable: Optional[bool] = None, set_comment: Optional[str] = None, if_exists: bool = False, ) -> str: """Build ALTER STAGE SQL statement""" sql_parts = ["ALTER STAGE"] if if_exists: sql_parts.append("IF EXISTS") sql_parts.append(f"{stage_name} SET") set_clauses = [] if set_url is not None: set_clauses.append(f"URL='{set_url}'") if set_credentials is not None: cred_parts = [f"'{k}'='{v}'" for k, v in set_credentials.items()] set_clauses.append(f"CREDENTIALS={{{', '.join(cred_parts)}}}") if set_comment is not None: set_clauses.append(f"COMMENT='{set_comment}'") if set_enable is not None: set_clauses.append(f"ENABLE={'TRUE' if set_enable else 'FALSE'}") if not set_clauses: raise ValueError("At least one SET clause must be provided for ALTER STAGE") # Use space separator for ALTER STAGE SET clauses (MatrixOne syntax) sql_parts.append(" ".join(set_clauses)) return " ".join(sql_parts) def _build_list_stages_sql(self) -> str: """Build SQL to list stages""" return "SELECT stage_name, url, stage_status, comment, created_time FROM mo_catalog.mo_stages" def _build_get_stage_sql(self, stage_name: str) -> str: """Build SQL to get a specific stage""" return ( f"SELECT stage_name, url, stage_status, comment, created_time " f"FROM mo_catalog.mo_stages WHERE stage_name = '{stage_name}'" ) def _row_to_stage(self, row: tuple) -> Stage: """Convert database row to Stage object""" return Stage( name=row[0], url=row[1], status=row[2] if len(row) > 2 else None, comment=row[3] if len(row) > 3 else None, ) def _prepare_create( self, name: str, url: str, credentials: Optional[Dict[str, str]] = None, comment: Optional[str] = None, if_not_exists: bool = False, ) -> tuple: """ Prepare data for create operation. Returns: (sql, stage_object) """ sql = self._build_create_stage_sql(name, url, credentials, enable=True, comment=comment, if_not_exists=if_not_exists) stage = Stage( name=name, url=url, credentials=credentials, status='enabled', comment=comment, created_time=datetime.now(), _client=self.client, ) return sql, stage def _prepare_drop(self, name: str, if_exists: bool = False) -> str: """Prepare SQL for drop operation""" return self._build_drop_stage_sql(name, if_exists) def _prepare_alter( self, name: str, url: Optional[str] = None, credentials: Optional[Dict[str, str]] = None, comment: Optional[str] = None, enable: Optional[bool] = None, if_exists: bool = False, ) -> str: """Prepare SQL for alter operation""" if not any([url, credentials, comment, enable is not None]): raise ValueError("At least one of url, credentials, comment, or enable must be provided") return self._build_alter_stage_sql(name, url, credentials, enable, comment, if_exists) def _prepare_list(self) -> str: """Prepare SQL for list operation""" return self._build_list_stages_sql() def _prepare_get(self, name: str) -> str: """Prepare SQL for get operation""" return self._build_get_stage_sql(name) def _process_get_result(self, rows, name: str) -> Stage: """Process get result rows into Stage object""" if not rows: raise ValueError(f"Stage '{name}' not found") row = rows[0] return Stage( name=row[0], url=row[1] if len(row) > 1 else None, status=row[2] if len(row) > 2 else None, comment=row[3] if len(row) > 3 else None, created_time=row[4] if len(row) > 4 else None, _client=self.client, ) def _process_list_result(self, rows) -> List[Stage]: """Process list result rows into Stage objects""" if not rows: return [] stages = [] for row in rows: stage = Stage( name=row[0], url=row[1] if len(row) > 1 else None, status=row[2] if len(row) > 2 else None, comment=row[3] if len(row) > 3 else None, created_time=row[4] if len(row) > 4 else None, _client=self.client, ) stages.append(stage) return stages
[docs] class StageManager(BaseStageManager): """ Synchronous stage manager for MatrixOne external storage operations. A Stage is a named external storage location (like S3, filesystem, or cloud storage) that stores data files. Stages provide centralized, secure, and reusable access to external data sources for data loading and export operations. Key Features: - **External storage integration**: Connect to S3, filesystem, and cloud storage - **Credential management**: Securely store and manage access credentials - **Reusable connections**: Create once, use multiple times - **Centralized configuration**: Single point of configuration for data sources - **Stage hierarchy**: Support for sub-stages and parent-child relationships - **Transaction-aware**: Full integration with transaction contexts - **Enabled/disabled control**: Enable or disable stages as needed Key Concepts: ============= What is a Stage? ---------------- A stage is a pointer to an external storage location where: - Data files are stored (CSV, JSONLINE, Parquet, etc.) - Files can be loaded into tables - Query results can be exported - Access credentials are securely managed Stage Types: ------------ 1. **File System Stage**: Local or network filesystem >>> CREATE STAGE fs_stage URL='file:///data/files/' 2. **S3 Stage**: Amazon S3 or S3-compatible storage (MinIO, etc.) >>> CREATE STAGE s3_stage URL='s3://bucket/path/' ... CREDENTIALS={'AWS_KEY_ID'='xxx', 'AWS_SECRET_KEY'='xxx'} 3. **Sub-Stage**: Stage based on another stage >>> CREATE STAGE sub_stage URL='stage://parent_stage/subdir/' Why Use Stages? --------------- - **Centralized**: One location for all data files - **Reusable**: Create once, use for multiple tables - **Secure**: Credentials managed at stage level, not in SQL - **Efficient**: Direct loading from cloud storage without intermediate copies - **Organized**: Logical grouping of related data files - **Portable**: Easy to move between environments (dev, staging, prod) Executor Pattern: - If executor is None, uses self.client.execute (default client-level executor) - If executor is provided (e.g., session), uses executor.execute (transaction-aware) - All operations can participate in transactions when used via session Typical Workflow: ----------------- 1. Create a stage pointing to your data location: >>> stage_mgr.create('my_stage', 'file:///data/') 2. Upload files to that location (outside MatrixOne) 3. Load data from stage: >>> client.load_data.from_stage_csv('my_stage', 'users.csv', User) 4. Reuse the same stage for other files: >>> client.load_data.from_stage_csv('my_stage', 'orders.csv', Order) Usage Examples:: from matrixone import Client client = Client(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # Create Stages Using Simple Interfaces # ======================================== # Create local filesystem stage (simple interface) client.stage.create_local('local_data', '/data/imports/') # Create S3 stage with simple interface client.stage.create_s3( name='production_s3', bucket='my-bucket', path='data/', aws_key_id='AKIAIOSFODNN7EXAMPLE', aws_secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', region='us-east-1', comment='Production data bucket' ) # Create S3-compatible MinIO stage client.stage.create_s3( name='minio_stage', bucket='my-bucket', endpoint='http://localhost:9000', aws_key_id='minioadmin', aws_secret_key='minioadmin' ) # Create sub-stage for organized data (using generic create) client.stage.create( 'sales_data', 'stage://production_s3/sales/', comment='Sales department data files' ) # ======================================== # Stage Management Operations # ======================================== # List all stages stages = client.stage.list() for stage in stages: print(f"{stage.name}: {stage.url} (enabled: {stage.enabled})") # Get specific stage details stage = client.stage.get('production_s3') print(f"Stage URL: {stage.url}") print(f"Created: {stage.created_time}") # Alter stage (update URL or credentials) client.stage.alter( 'production_s3', url='s3://new-bucket/data/', credentials={'AWS_KEY_ID': 'new_key_id'} ) # Disable stage temporarily client.stage.alter('production_s3', enable=False) # Re-enable stage client.stage.alter('production_s3', enable=True) # Drop stage when no longer needed client.stage.drop('old_stage') # ======================================== # Load Data from Stages # ======================================== # Load from stage using ORM model client.load_data.from_stage_csv('production_s3', 'users.csv', User) # Load from local stage client.load_data.from_stage_csv('local_data', 'orders.csv', Order) # ======================================== # Transactional Stage Operations # ======================================== # Using within a transaction (all operations atomic) with client.session() as session: # Create multiple stages atomically using simple interfaces session.stage.create_local('stage1', '/data1/') session.stage.create_s3('stage2', 'bucket2', 'path/', 'key', 'secret') session.stage.create_local('stage3', '/data3/') Common Use Cases: - **Cloud data lakes**: Access data stored in S3 or cloud storage - **ETL pipelines**: Centralized configuration for data sources - **Multi-environment**: Separate stages for dev, staging, production - **Data warehousing**: Organize source data by department or domain - **Secure access**: Manage credentials separately from SQL code See Also: - LoadDataManager: For loading data from stages - AsyncStageManager: For async stage operations - Client.load_data: For data loading operations """
[docs] def __init__(self, client, executor=None): """ Initialize StageManager. Args: client: Client object that provides execute() method executor: Optional executor (e.g., session) for executing SQL. If None, uses client.execute """ super().__init__(client) self.executor = executor
def _get_executor(self): """Get the executor for SQL execution (session or client)""" return self.executor if self.executor else self.client
[docs] def create( self, name: str, url: str, credentials: Optional[Dict[str, str]] = None, comment: Optional[str] = None, if_not_exists: bool = False, ) -> Stage: """ Create a new external stage. Args: name (str): Stage name url (str): Storage location URL. Supported protocols: - file:///path/to/data/ - Local or network filesystem - s3://bucket/path/ - Amazon S3 - stage://parent_stage/subdir/ - Sub-stage credentials (dict, optional): Authentication credentials. For S3/cloud storage:: { 'AWS_KEY_ID': 'your_key_id', 'AWS_SECRET_KEY': 'your_secret_key', 'AWS_REGION': 'us-east-1', 'PROVIDER': 'minio' } comment (str, optional): Stage description if_not_exists (bool): If True, don't error if stage already exists Returns: Stage: Created stage object Examples: >>> # File system stage >>> stage = client.stage.create( ... 'local_data', ... 'file:///var/data/imports/' ... ) >>> # S3 stage with credentials >>> stage = client.stage.create( ... 'production_data', ... 's3://my-bucket/data/', ... credentials={ 'AWS_KEY_ID': 'AKIAIOSFODNN7EXAMPLE', ... 'AWS_SECRET_KEY': 'wJalrXUtnFEMI/...', ... 'AWS_REGION': 'us-east-1' ... }, ... comment='Production data stage' ... ) >>> # Sub-stage (stage based on another stage) >>> stage = client.stage.create( ... 'archive_2024', ... 'stage://production_data/archive/2024/' ... ) """ sql, stage = self._prepare_create(name, url, credentials, comment, if_not_exists) self._get_executor().execute(sql) return stage
[docs] def alter( self, name: str, url: Optional[str] = None, credentials: Optional[Dict[str, str]] = None, comment: Optional[str] = None, enable: Optional[bool] = None, if_exists: bool = False, ) -> Stage: """ Modify an existing stage. You can update URL, credentials, comment, or enable/disable the stage. At least one of url, credentials, comment, or enable must be provided. Args: name (str): Stage name to modify url (str, optional): New storage location URL credentials (dict, optional): New authentication credentials comment (str, optional): New comment/description enable (bool, optional): Enable (True) or disable (False) the stage if_exists (bool): If True, don't error if stage doesn't exist Returns: Stage: Updated stage object Examples: >>> # Update URL >>> client.stage.alter('my_stage', url='s3://new-bucket/data/') >>> # Update credentials >>> client.stage.alter( ... 's3_stage', ... credentials={'AWS_KEY_ID': 'new_key'} ... ) >>> # Update comment >>> client.stage.alter('my_stage', comment='Updated description') >>> # Disable stage >>> client.stage.alter('my_stage', enable=False) >>> # Update multiple properties >>> client.stage.alter( ... 'my_stage', ... url='s3://new-bucket/', ... comment='Migrated to new bucket', ... enable=True ... ) """ sql = self._prepare_alter(name, url, credentials, comment, enable, if_exists) self._get_executor().execute(sql) return self.get(name)
[docs] def drop(self, name: str, if_exists: bool = False) -> None: """ Delete an external stage. Args: name (str): Stage name to delete if_exists (bool): If True, don't error if stage doesn't exist Examples: >>> # Drop stage >>> client.stage.drop('my_stage') >>> # Drop with IF EXISTS >>> client.stage.drop('my_stage', if_exists=True) """ sql = self._prepare_drop(name, if_exists) self._get_executor().execute(sql)
[docs] def show(self, like_pattern: Optional[str] = None) -> List[Stage]: """ Show stages with optional pattern matching. Args: like_pattern (str, optional): SQL LIKE pattern for filtering stages Example: 'my_stage%', '%prod%' Returns: List[Stage]: List of matching stages Examples: >>> # Show all stages >>> stages = client.stage.show() >>> # Show stages matching pattern >>> stages = client.stage.show(like_pattern='prod%') >>> for stage in stages: ... print(f"{stage.name}: {stage.url}") """ sql = "SHOW STAGES" if like_pattern: sql += f" LIKE '{like_pattern}'" result = self._get_executor().execute(sql) stages = [] for row in result.rows: # Parse SHOW STAGES output # Expected columns: stage_name, url, status, comment, created_time, modified_time stage = Stage( name=row[0], url=row[1] if len(row) > 1 else None, status=row[2] if len(row) > 2 else None, comment=row[3] if len(row) > 3 else None, created_time=row[4] if len(row) > 4 else None, _client=self.client, ) stages.append(stage) return stages
[docs] def list(self) -> List[Stage]: """ List all stages by querying mo_catalog.mo_stages system table. Returns: List[Stage]: List of all stages Examples: >>> # Get all stages >>> stages = client.stage.list() >>> for stage in stages: ... print(f"{stage.name}: {stage.url}") """ sql = self._prepare_list() result = self._get_executor().execute(sql) # Handle both ResultSet (client) and CursorResult (session) if hasattr(result, 'rows'): rows = result.rows else: rows = result.fetchall() if result else [] return self._process_list_result(rows)
[docs] def get(self, name: str) -> Stage: """ Get details of a specific stage. Args: name (str): Stage name Returns: Stage: Stage object with details Raises: ValueError: If stage not found Examples: >>> stage = client.stage.get('my_stage') >>> print(f"URL: {stage.url}") >>> print(f"Status: {stage.status}") """ sql = self._prepare_get(name) result = self._get_executor().execute(sql) # Handle both ResultSet (client) and CursorResult (session) if hasattr(result, 'rows'): rows = result.rows else: rows = result.fetchall() if result else [] return self._process_get_result(rows, name)
[docs] def exists(self, name: str) -> bool: """ Check if a stage exists. Args: name (str): Stage name Returns: bool: True if stage exists, False otherwise Examples: >>> if client.stage.exists('my_stage'): ... print("Stage exists") ... else: ... print("Stage does not exist") """ try: self.get(name) return True except ValueError: return False
[docs] def create_local(self, name: str, path: str, comment: Optional[str] = None, if_not_exists: bool = False) -> "Stage": """ Create a local file system stage with simplified path handling. Args: name: Stage name path: Local file system path (supports ~, relative, absolute paths) comment: Optional comment if_not_exists: Create only if not exists Returns: Created Stage object Examples: >>> # Relative path >>> stage = client.stage.create_local('data', './data/') >>> # User home directory >>> stage = client.stage.create_local('home', '~/data/') >>> # Absolute path >>> stage = client.stage.create_local('data', '/var/data/') """ import os # Expand user home directory if path.startswith('~'): path = os.path.expanduser(path) # Convert to absolute path path = os.path.abspath(path) # Ensure path ends with / if not path.endswith('/'): path += '/' # Add file:// prefix url = f'file://{path}' return self.create(name, url, comment=comment, if_not_exists=if_not_exists)
[docs] def create_s3( self, name: str, bucket: str, path: str = "", aws_key_id: Optional[str] = None, aws_secret_key: Optional[str] = None, aws_region: Optional[str] = None, endpoint: Optional[str] = None, comment: Optional[str] = None, if_not_exists: bool = False, ) -> "Stage": """ Create an S3 stage with simplified AWS credentials. Args: name: Stage name bucket: S3 bucket name (e.g., 'my-bucket') path: Path within bucket (default: "") aws_key_id: AWS access key ID (optional, can use environment variables) aws_secret_key: AWS secret key (optional, can use environment variables) aws_region: AWS region (optional) endpoint: S3 endpoint URL for S3-compatible services like MinIO comment: Optional comment if_not_exists: Create only if not exists Returns: Created Stage object Examples: >>> # Using explicit credentials >>> stage = client.stage.create_s3( ... 'prod', ... bucket='my-bucket', ... path='data/', ... aws_key_id='AKIA...', ... aws_secret_key='...' ... ) >>> # MinIO (S3-compatible) >>> stage = client.stage.create_s3( ... 'minio', ... bucket='my-bucket', ... endpoint='http://localhost:9000', ... aws_key_id='minioadmin', ... aws_secret_key='minioadmin' ... ) """ import os # Build S3 URL bucket_path = f"s3://{bucket}" if path: bucket_path += f"/{path.strip('/')}" # Build credentials credentials = None # Use provided credentials or fall back to environment variables final_key_id = aws_key_id or os.getenv('AWS_ACCESS_KEY_ID') or os.getenv('AWS_KEY_ID') final_secret = aws_secret_key or os.getenv('AWS_SECRET_ACCESS_KEY') or os.getenv('AWS_SECRET_KEY') final_region = aws_region or os.getenv('AWS_DEFAULT_REGION') or os.getenv('AWS_REGION') if final_key_id and final_secret: credentials = { 'AWS_KEY_ID': final_key_id, 'AWS_SECRET_KEY': final_secret, } if final_region: credentials['AWS_REGION'] = final_region if endpoint: credentials['ENDPOINT'] = endpoint return self.create(name, bucket_path, credentials=credentials, comment=comment, if_not_exists=if_not_exists)
[docs] class AsyncStageManager(BaseStageManager): """ Asynchronous stage manager for MatrixOne external storage operations. Provides the same comprehensive stage management functionality as StageManager but with full async/await support for non-blocking I/O operations. Ideal for high-concurrency applications and async web frameworks requiring stage management. Key Features: - **Non-blocking operations**: All stage operations use async/await - **Async stage management**: Create, alter, drop stages asynchronously - **Concurrent operations**: Manage multiple stages concurrently - **Async external storage**: Non-blocking S3 and cloud storage configuration - **Transaction-aware**: Full integration with async transaction contexts - **Executor pattern**: Works with both async client and async session Executor Pattern: - If executor is None, uses self.client.execute (default async client-level executor) - If executor is provided (e.g., async session), uses executor.execute (async transaction-aware) - All operations are non-blocking and use async/await - Enables concurrent stage management operations Usage Examples:: from matrixone import AsyncClient from matrixone.orm import Column, Integer, String, Float, Base import asyncio # Define ORM models for data loading class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) async def main(): client = AsyncClient() await client.connect(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # Create Stages Using Simple Interfaces # ======================================== # Create local filesystem stage (simple interface - recommended) stage = await client.stage.create_local('local_data', '/data/imports/') print(f"Local stage created: {stage.name}") # Create S3 stage using simple interface (recommended) s3_stage = await client.stage.create_s3( name='production_s3', bucket='my-bucket', path='data/', aws_key_id='AKIAIOSFODNN7EXAMPLE', aws_secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', region='us-east-1', comment='Production data bucket' ) # Create MinIO stage (S3-compatible) minio_stage = await client.stage.create_s3( name='minio_stage', bucket='my-bucket', endpoint='http://localhost:9000', aws_key_id='minioadmin', aws_secret_key='minioadmin' ) # Create sub-stage for organized data await client.stage.create( 'sales_data', 'stage://production_s3/sales/', comment='Sales department data' ) # ======================================== # Stage Management Operations # ======================================== # List all stages asynchronously stages = await client.stage.list() for stage in stages: print(f"{stage.name}: {stage.url}") # Get specific stage details stage = await client.stage.get('production_s3') print(f"Stage URL: {stage.url}") # Alter stage (update credentials) await client.stage.alter( 'production_s3', credentials={'AWS_KEY_ID': 'new_key_id'} ) # Disable/enable stage await client.stage.alter('production_s3', enable=False) await client.stage.alter('production_s3', enable=True) # ======================================== # Concurrent Stage Creation # ======================================== # Create multiple stages concurrently using simple interfaces await asyncio.gather( client.stage.create_local('stage1', '/data1/'), client.stage.create_local('stage2', '/data2/'), client.stage.create_s3('stage3', 'bucket3', 'path/', 'key', 'secret') ) # Concurrent operations stages, stage_details = await asyncio.gather( client.stage.list(), client.stage.get('production_s3') ) # ======================================== # Load Data from Stages (ORM Style) # ======================================== # Load from S3 stage using ORM model await client.load_data.from_stage_csv('production_s3', 'users.csv', User) # Drop stage when no longer needed await client.stage.drop('old_stage') # ======================================== # Transactional Stage Operations # ======================================== # Using within async transaction async with client.session() as session: # Create multiple stages atomically using simple interfaces await session.stage.create_local('stage1', '/data1/') await session.stage.create_s3('stage2', 'bucket2', '', 'key', 'secret') # Both stages created atomically await client.disconnect() asyncio.run(main()) Performance Benefits: - **Non-blocking I/O**: Don't block the event loop during stage operations - **Concurrent management**: Create and configure multiple stages simultaneously - **Async web frameworks**: Perfect for FastAPI, aiohttp, Sanic, etc. - **High concurrency**: Handle many stage operations concurrently Use Cases: - **Async ETL configuration**: Non-blocking stage setup for data pipelines - **Dynamic stage management**: Create/modify stages in response to events - **Multi-cloud setup**: Configure stages for multiple cloud providers concurrently - **High-throughput applications**: Stage management without blocking - **Real-time data ingestion**: Configure stages on-the-fly See Also: - AsyncClient: For async database operations - AsyncLoadDataManager: For async data loading from stages - StageManager: For synchronous stage operations """
[docs] def __init__(self, client, executor=None): """ Initialize AsyncStageManager. Args: client: Async client object executor: Optional executor (e.g., async session) for executing SQL. If None, uses client.execute """ super().__init__(client) self.executor = executor
def _get_executor(self): """Get the executor for SQL execution (session or client)""" return self.executor if self.executor else self.client
[docs] async def create( self, name: str, url: str, credentials: Optional[Dict[str, str]] = None, comment: Optional[str] = None, if_not_exists: bool = False, ) -> Stage: """Create a stage asynchronously""" sql, stage = self._prepare_create(name, url, credentials, comment, if_not_exists) await self._get_executor().execute(sql) return stage
[docs] async def drop(self, name: str, if_exists: bool = False) -> None: """Drop a stage asynchronously""" sql = self._prepare_drop(name, if_exists) await self._get_executor().execute(sql)
[docs] async def alter( self, name: str, url: Optional[str] = None, credentials: Optional[Dict[str, str]] = None, comment: Optional[str] = None, enable: Optional[bool] = None, if_exists: bool = False, ) -> Stage: """Alter a stage asynchronously""" sql = self._prepare_alter(name, url, credentials, comment, enable, if_exists) await self._get_executor().execute(sql) return await self.get(name)
[docs] async def list(self) -> List[Stage]: """List all stages asynchronously""" sql = self._prepare_list() result = await self._get_executor().execute(sql) # Handle both AsyncResultSet (client) and CursorResult (session) if hasattr(result, 'rows'): rows = result.rows else: rows = result.fetchall() if result else [] return self._process_list_result(rows)
[docs] async def get(self, name: str) -> Stage: """Get a specific stage by name asynchronously""" sql = self._prepare_get(name) result = await self._get_executor().execute(sql) # Handle both AsyncResultSet (client) and CursorResult (session) if hasattr(result, 'rows'): rows = result.rows else: rows = result.fetchall() if result else [] return self._process_get_result(rows, name)
[docs] async def exists(self, name: str) -> bool: """Check if a stage exists asynchronously""" try: await self.get(name) return True except ValueError: return False
[docs] async def show(self, like_pattern: Optional[str] = None) -> List[Stage]: """Show stages with optional pattern matching (async)""" sql = "SHOW STAGES" if like_pattern: sql += f" LIKE '{like_pattern}'" result = await self._get_executor().execute(sql) # Handle both AsyncResultSet (client) and CursorResult (session) if hasattr(result, 'rows'): rows = result.rows else: rows = result.fetchall() if result else [] stages = [] for row in rows: stage = Stage( name=row[0], url=row[1] if len(row) > 1 else None, status=row[2] if len(row) > 2 else None, comment=row[3] if len(row) > 3 else None, created_time=row[4] if len(row) > 4 else None, _client=self.client, ) stages.append(stage) return stages
[docs] async def create_local( self, name: str, path: str, comment: Optional[str] = None, if_not_exists: bool = False ) -> "Stage": """Create a local file system stage (async)""" import os # Expand user home directory if path.startswith('~'): path = os.path.expanduser(path) # Convert to absolute path path = os.path.abspath(path) # Ensure path ends with / if not path.endswith('/'): path += '/' # Add file:// prefix url = f'file://{path}' return await self.create(name, url, comment=comment, if_not_exists=if_not_exists)
[docs] async def create_s3( self, name: str, bucket: str, path: str = "", aws_key_id: Optional[str] = None, aws_secret_key: Optional[str] = None, aws_region: Optional[str] = None, endpoint: Optional[str] = None, comment: Optional[str] = None, if_not_exists: bool = False, ) -> Stage: """ Create an S3 stage asynchronously. Args: name: Stage name bucket: S3 bucket name path: Path within bucket (default: "") aws_key_id: AWS access key ID aws_secret_key: AWS secret key aws_region: AWS region endpoint: S3 endpoint URL for S3-compatible services like MinIO comment: Optional comment if_not_exists: Create only if not exists Returns: Created Stage object """ import os # Build S3 URL bucket_path = f"s3://{bucket}" if path: bucket_path += f"/{path.strip('/')}" # Build credentials credentials = None # Use provided credentials or fall back to environment variables final_key_id = aws_key_id or os.getenv('AWS_ACCESS_KEY_ID') or os.getenv('AWS_KEY_ID') final_secret = aws_secret_key or os.getenv('AWS_SECRET_ACCESS_KEY') or os.getenv('AWS_SECRET_KEY') final_region = aws_region or os.getenv('AWS_DEFAULT_REGION') or os.getenv('AWS_REGION') if final_key_id and final_secret: credentials = { 'AWS_KEY_ID': final_key_id, 'AWS_SECRET_KEY': final_secret, } if final_region: credentials['AWS_REGION'] = final_region if endpoint: credentials['ENDPOINT'] = endpoint return await self.create(name, bucket_path, credentials=credentials, comment=comment, if_not_exists=if_not_exists)