# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MatrixOne Clone Management
Unified clone management supporting sync/async and client/session executors.
"""
from typing import Optional
from .exceptions import CloneError, ConnectionError
from .version import requires_version
class BaseCloneManager:
"""
Base clone manager with shared logic for sync and async implementations.
This base class contains all the SQL building logic that is shared between
sync and async implementations. Subclasses only need to implement the
execution methods.
"""
def __init__(self, client, executor=None):
"""
Initialize base clone manager.
Args:
client: MatrixOne client instance
executor: Optional executor (e.g., session) for executing SQL.
If None, uses client as executor
"""
self.client = client
self.executor = executor
def _get_executor(self):
"""Get the executor for SQL execution (session or client)"""
return self.executor if self.executor else self.client
def _build_clone_database_sql(
self,
target_db: str,
source_db: str,
snapshot_name: Optional[str] = None,
if_not_exists: bool = False,
) -> str:
"""Build CLONE DATABASE SQL statement"""
if_not_exists_clause = "IF NOT EXISTS " if if_not_exists else ""
if snapshot_name:
return (
f"CREATE DATABASE {if_not_exists_clause}{target_db} " f"CLONE {source_db} {{snapshot = \"{snapshot_name}\"}}"
)
else:
return f"CREATE DATABASE {if_not_exists_clause}{target_db} CLONE {source_db}"
def _build_clone_table_sql(
self,
target_table: str,
source_table: str,
snapshot_name: Optional[str] = None,
if_not_exists: bool = False,
) -> str:
"""Build CLONE TABLE SQL statement"""
if_not_exists_clause = "IF NOT EXISTS " if if_not_exists else ""
if snapshot_name:
return (
f"CREATE TABLE {if_not_exists_clause}{target_table} "
f"CLONE {source_table} {{snapshot = \"{snapshot_name}\"}}"
)
else:
return f"CREATE TABLE {if_not_exists_clause}{target_table} CLONE {source_table}"
[docs]
class CloneManager(BaseCloneManager):
"""
Synchronous clone management for MatrixOne database operations.
This class provides comprehensive database and table cloning functionality, enabling
efficient data replication, testing environments, and data distribution scenarios.
Cloning operations can work with current data or point-in-time snapshots.
Key Features:
- **Database cloning**: Clone entire databases with all tables and data
- **Table-level cloning**: Clone specific tables for targeted data replication
- **Snapshot-based cloning**: Clone from specific point-in-time snapshots
- **Efficient native operations**: Uses MatrixOne's native CLONE functionality for performance
- **Transaction-aware**: Full integration with transaction contexts via executor pattern
- **Flexible naming**: Support for IF NOT EXISTS clause to prevent conflicts
- **Cross-database cloning**: Clone tables between different databases
Executor Pattern:
- If executor is None, uses self.client.execute (default client-level executor)
- If executor is provided (e.g., session), uses executor.execute (transaction-aware)
- This allows the same logic to work in both client and session contexts
- All operations can participate in transactions when used via session
Usage Examples::
from matrixone import Client
client = Client(host='localhost', port=6001, user='root', password='111', database='test')
# Clone entire database (current state)
client.clone.clone_database(
target_db='production_backup',
source_db='production'
)
# Clone database with IF NOT EXISTS
client.clone.clone_database(
target_db='dev_environment',
source_db='production',
if_not_exists=True
)
# Clone database from a specific snapshot
client.clone.clone_database(
target_db='test_environment',
source_db='production',
snapshot_name='daily_backup_2024_01_01'
)
# Clone specific table
client.clone.clone_table(
target_table='users_backup',
source_table='users'
)
# Clone table across databases
client.clone.clone_table(
target_table='dev_db.users',
source_table='prod_db.users'
)
# Clone table from snapshot with validation
client.clone.clone_table_with_snapshot(
target_table='users_202401',
source_table='users',
snapshot_name='monthly_backup',
if_not_exists=True
)
# Using within a transaction (all operations are atomic)
with client.session() as session:
# Clone multiple databases atomically
session.clone.clone_database('backup_db1', 'source_db1')
session.clone.clone_database('backup_db2', 'source_db2')
# Both clones succeed or fail together
Version Requirements:
Clone functionality requires MatrixOne version 1.0.0 or higher.
Earlier versions do not support native cloning operations.
See Also:
- SnapshotManager: For creating snapshots before cloning
- RestoreManager: For restoring databases from snapshots
- MetadataManager: For managing database metadata
"""
[docs]
@requires_version(
min_version="1.0.0",
feature_name="database_cloning",
description="Database cloning functionality",
alternative="Use CREATE DATABASE and data migration instead",
)
def clone_database(
self,
target_db: str,
source_db: str,
snapshot_name: Optional[str] = None,
if_not_exists: bool = False,
) -> None:
"""
Clone a database.
Args:
target_db: Target database name
source_db: Source database name
snapshot_name: Optional snapshot name for point-in-time clone
if_not_exists: Use IF NOT EXISTS clause
Raises:
ConnectionError: If not connected to database
CloneError: If clone operation fails
"""
if not self.client._engine:
raise ConnectionError("Not connected to database")
sql = self._build_clone_database_sql(target_db, source_db, snapshot_name, if_not_exists)
try:
self._get_executor().execute(sql)
except Exception as e:
raise CloneError(f"Failed to clone database: {e}") from e
[docs]
def clone_table(
self,
target_table: str,
source_table: str,
snapshot_name: Optional[str] = None,
if_not_exists: bool = False,
) -> None:
"""
Clone a table.
Args:
target_table: Target table name (can include database: db.table)
source_table: Source table name (can include database: db.table)
snapshot_name: Optional snapshot name for point-in-time clone
if_not_exists: Use IF NOT EXISTS clause
Raises:
ConnectionError: If not connected to database
CloneError: If clone operation fails
"""
if not self.client._engine:
raise ConnectionError("Not connected to database")
sql = self._build_clone_table_sql(target_table, source_table, snapshot_name, if_not_exists)
try:
self._get_executor().execute(sql)
except Exception as e:
raise CloneError(f"Failed to clone table: {e}") from e
[docs]
def clone_database_with_snapshot(
self,
target_db: str,
source_db: str,
snapshot_name: str,
if_not_exists: bool = False,
) -> None:
"""
Clone a database using a specific snapshot.
Args:
target_db: Target database name
source_db: Source database name
snapshot_name: Snapshot name for point-in-time clone
if_not_exists: Use IF NOT EXISTS clause
Raises:
ConnectionError: If not connected to database
CloneError: If clone operation fails or snapshot doesn't exist
"""
# Verify snapshot exists using snapshot manager
if not self.client.snapshots.exists(snapshot_name):
raise CloneError(f"Snapshot '{snapshot_name}' does not exist")
self.clone_database(target_db, source_db, snapshot_name, if_not_exists)
[docs]
def clone_table_with_snapshot(
self,
target_table: str,
source_table: str,
snapshot_name: str,
if_not_exists: bool = False,
) -> None:
"""
Clone a table using a specific snapshot.
Args:
target_table: Target table name (can include database: db.table)
source_table: Source table name (can include database: db.table)
snapshot_name: Snapshot name for point-in-time clone
if_not_exists: Use IF NOT EXISTS clause
Raises:
ConnectionError: If not connected to database
CloneError: If clone operation fails or snapshot doesn't exist
"""
# Verify snapshot exists using snapshot manager
if not self.client.snapshots.exists(snapshot_name):
raise CloneError(f"Snapshot '{snapshot_name}' does not exist")
self.clone_table(target_table, source_table, snapshot_name, if_not_exists)
class AsyncCloneManager(BaseCloneManager):
"""
Asynchronous clone management for MatrixOne database operations.
Provides the same comprehensive cloning functionality as CloneManager but with
full async/await support for non-blocking I/O operations. Ideal for high-concurrency
applications and async web frameworks.
Key Features:
- **Non-blocking operations**: All clone operations use async/await
- **Database cloning**: Asynchronously clone entire databases
- **Table-level cloning**: Asynchronously clone specific tables
- **Snapshot-based cloning**: Async cloning from point-in-time snapshots
- **Concurrent operations**: Clone multiple databases/tables concurrently
- **Transaction-aware**: Full integration with async transaction contexts
- **Executor pattern**: Works with both async client and async session
Executor Pattern:
- If executor is None, uses self.client.execute (default async client-level executor)
- If executor is provided (e.g., async session), uses executor.execute (async transaction-aware)
- All operations are non-blocking and use async/await
- Enables concurrent cloning operations when not in a transaction
Usage Examples::
from matrixone import AsyncClient
import asyncio
async def main():
client = AsyncClient()
await client.connect(host='localhost', port=6001, user='root', password='111', database='test')
# Clone entire database asynchronously
await client.clone.clone_database(
target_db='production_backup',
source_db='production'
)
# Clone database from snapshot
await client.clone.clone_database(
target_db='test_environment',
source_db='production',
snapshot_name='daily_backup'
)
# Clone specific table asynchronously
await client.clone.clone_table(
target_table='users_backup',
source_table='users'
)
# Concurrent cloning of multiple databases
await asyncio.gather(
client.clone.clone_database('backup1', 'source1'),
client.clone.clone_database('backup2', 'source2'),
client.clone.clone_database('backup3', 'source3')
)
# Using within async transaction
async with client.session() as session:
await session.clone.clone_database('backup_db', 'source_db')
await session.clone.clone_table('backup_table', 'source_table')
# Both operations commit atomically
await client.disconnect()
asyncio.run(main())
Version Requirements:
Clone functionality requires MatrixOne version 1.0.0 or higher.
Requires async database drivers (aiomysql or asyncmy).
See Also:
- AsyncSnapshotManager: For async snapshot operations
- AsyncRestoreManager: For async restore operations
- AsyncSession: For async transaction management
"""
@requires_version(
min_version="1.0.0",
feature_name="database_cloning",
description="Database cloning functionality",
alternative="Use CREATE DATABASE and data migration instead",
)
async def clone_database(
self,
target_db: str,
source_db: str,
snapshot_name: Optional[str] = None,
if_not_exists: bool = False,
) -> None:
"""
Clone a database asynchronously.
Args:
target_db: Target database name
source_db: Source database name
snapshot_name: Optional snapshot name for point-in-time clone
if_not_exists: Use IF NOT EXISTS clause
Raises:
ConnectionError: If not connected to database
CloneError: If clone operation fails
"""
if not self.client._engine:
raise ConnectionError("Not connected to database")
sql = self._build_clone_database_sql(target_db, source_db, snapshot_name, if_not_exists)
try:
await self._get_executor().execute(sql)
except Exception as e:
raise CloneError(f"Failed to clone database: {e}") from e
async def clone_table(
self,
target_table: str,
source_table: str,
snapshot_name: Optional[str] = None,
if_not_exists: bool = False,
) -> None:
"""
Clone a table asynchronously.
Args:
target_table: Target table name (can include database: db.table)
source_table: Source table name (can include database: db.table)
snapshot_name: Optional snapshot name for point-in-time clone
if_not_exists: Use IF NOT EXISTS clause
Raises:
ConnectionError: If not connected to database
CloneError: If clone operation fails
"""
if not self.client._engine:
raise ConnectionError("Not connected to database")
sql = self._build_clone_table_sql(target_table, source_table, snapshot_name, if_not_exists)
try:
await self._get_executor().execute(sql)
except Exception as e:
raise CloneError(f"Failed to clone table: {e}") from e
async def clone_database_with_snapshot(
self,
target_db: str,
source_db: str,
snapshot_name: str,
if_not_exists: bool = False,
) -> None:
"""
Clone a database using a specific snapshot asynchronously.
Args:
target_db: Target database name
source_db: Source database name
snapshot_name: Snapshot name for point-in-time clone
if_not_exists: Use IF NOT EXISTS clause
Raises:
ConnectionError: If not connected to database
CloneError: If clone operation fails or snapshot doesn't exist
"""
# Verify snapshot exists using snapshot manager
# Note: exists() needs to be awaited for async version
if not await self.client.snapshots.exists(snapshot_name):
raise CloneError(f"Snapshot '{snapshot_name}' does not exist")
await self.clone_database(target_db, source_db, snapshot_name, if_not_exists)
async def clone_table_with_snapshot(
self,
target_table: str,
source_table: str,
snapshot_name: str,
if_not_exists: bool = False,
) -> None:
"""
Clone a table using a specific snapshot asynchronously.
Args:
target_table: Target table name (can include database: db.table)
source_table: Source table name (can include database: db.table)
snapshot_name: Snapshot name for point-in-time clone
if_not_exists: Use IF NOT EXISTS clause
Raises:
ConnectionError: If not connected to database
CloneError: If clone operation fails or snapshot doesn't exist
"""
# Verify snapshot exists using snapshot manager
# Note: exists() needs to be awaited for async version
if not await self.client.snapshots.exists(snapshot_name):
raise CloneError(f"Snapshot '{snapshot_name}' does not exist")
await self.clone_table(target_table, source_table, snapshot_name, if_not_exists)