# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MatrixOne Python SDK - PITR Manager
Provides Point-in-Time Recovery functionality for MatrixOne
"""
from datetime import datetime
from typing import List, Optional
from .exceptions import PitrError
from .version import requires_version
[docs]
class Pitr:
"""PITR (Point-in-Time Recovery) object"""
[docs]
def __init__(
self,
name: str,
created_time: datetime,
modified_time: datetime,
level: str,
account_name: Optional[str] = None,
database_name: Optional[str] = None,
table_name: Optional[str] = None,
range_value: int = 1,
range_unit: str = "d",
):
"""
Initialize PITR object
Args::
name: PITR name
created_time: Creation time
modified_time: Last modification time
level: PITR level (cluster, account, database, table)
account_name: Account name (for account/database/table level)
database_name: Database name (for database/table level)
table_name: Table name (for table level)
range_value: Time range value (1-100)
range_unit: Time range unit (h, d, mo, y)
"""
self.name = name
self.created_time = created_time
self.modified_time = modified_time
self.level = level
self.account_name = account_name
self.database_name = database_name
self.table_name = table_name
self.range_value = range_value
self.range_unit = range_unit
def __repr__(self):
return f"<Pitr(name='{self.name}', level='{self.level}', " f"range={self.range_value}{self.range_unit})>"
[docs]
class PitrManager:
"""
Manager for Point-in-Time Recovery (PITR) operations in MatrixOne.
This class provides comprehensive PITR functionality for recovering data
to specific points in time. PITR allows you to restore databases or
tables to their state at any point in time, providing granular recovery
capabilities for data protection and disaster recovery.
Key Features:
- Point-in-time recovery for databases and tables
- Recovery to specific timestamps
- Integration with backup and snapshot systems
- Transaction-aware recovery operations
- Support for both cluster and table-level recovery
Supported Recovery Levels:
- CLUSTER: Full cluster recovery to a specific point in time
- DATABASE: Database-level recovery to a specific point in time
- TABLE: Table-level recovery to a specific point in time
Usage Examples::
# Initialize PITR manager
pitr = client.pitr
# Recover database to specific timestamp
pitr.recover_database(
database='my_database',
timestamp='2024-01-15 10:30:00',
target_database='recovered_database'
)
# Recover table to specific timestamp
pitr.recover_table(
database='my_database',
table='users',
timestamp='2024-01-15 10:30:00',
target_database='recovered_database',
target_table='recovered_users'
)
# List available recovery points
recovery_points = pitr.list_recovery_points('my_database')
# Get recovery status
status = pitr.get_recovery_status('recovery_job_id')
Note: PITR functionality requires MatrixOne version 1.0.0 or higher and
appropriate backup infrastructure. Recovery operations may take significant
time depending on the amount of data and the target timestamp.
"""
[docs]
def __init__(self, client, executor=None):
"""
Initialize PITR manager.
Args:
client: MatrixOne client instance
executor: Optional executor (e.g., session) for executing SQL.
If None, uses client.execute
"""
self._client = client
self.executor = executor
def _get_executor(self):
"""Get the executor for SQL execution (session or client)"""
return self.executor if self.executor else self._client
[docs]
@requires_version(
min_version="1.0.0",
feature_name="pitr_cluster_level",
description="Cluster-level Point-in-Time Recovery functionality",
alternative="Use snapshot restore instead",
)
def create_cluster_pitr(self, name: str, range_value: int = 1, range_unit: str = "d") -> Pitr:
"""
Create cluster-level PITR
Args::
name: PITR name
range_value: Time range value (1-100)
range_unit: Time range unit (h, d, mo, y)
Returns::
Pitr: Created PITR object
Raises::
PitrError: If PITR creation fails
Example
>>> pitr = client.pitr.create_cluster_pitr("cluster_pitr1", 1, "d")
"""
try:
self._validate_range(range_value, range_unit)
sql = f"CREATE PITR {self._client._escape_identifier(name)} " f"FOR CLUSTER RANGE {range_value} '{range_unit}'"
result = self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to create cluster PITR '{name}'")
# Return PITR object (we'll get the actual details via SHOW PITR)
return self.get(name)
except Exception as e:
raise PitrError(f"Failed to create cluster PITR '{name}': {e}") from e
[docs]
def create_account_pitr(
self,
name: str,
account_name: Optional[str] = None,
range_value: int = 1,
range_unit: str = "d",
) -> Pitr:
"""
Create account-level PITR
Args::
name: PITR name
account_name: Account name (None for current account)
range_value: Time range value (1-100)
range_unit: Time range unit (h, d, mo, y)
Returns::
Pitr: Created PITR object
Raises::
PitrError: If PITR creation fails
Example
>>> # For current account
>>> pitr = client.pitr.create_account_pitr("account_pitr1", range_value=2, range_unit="h")
>>>
>>> # For specific account (cluster admin only)
>>> pitr = client.pitr.create_account_pitr("account_pitr1", "acc1", 1, "d")
"""
try:
self._validate_range(range_value, range_unit)
if account_name:
sql = (
f"CREATE PITR {self._client._escape_identifier(name)} "
f"FOR ACCOUNT {self._client._escape_identifier(account_name)} "
f"RANGE {range_value} '{range_unit}'"
)
else:
sql = (
f"CREATE PITR {self._client._escape_identifier(name)} " f"FOR ACCOUNT RANGE {range_value} '{range_unit}'"
)
result = self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to create account PITR '{name}'")
return self.get(name)
except Exception as e:
raise PitrError(f"Failed to create account PITR '{name}': {e}") from e
[docs]
def create_database_pitr(self, name: str, database_name: str, range_value: int = 1, range_unit: str = "d") -> Pitr:
"""
Create database-level PITR
Args::
name: PITR name
database_name: Database name
range_value: Time range value (1-100)
range_unit: Time range unit (h, d, mo, y)
Returns::
Pitr: Created PITR object
Raises::
PitrError: If PITR creation fails
Example
>>> pitr = client.pitr.create_database_pitr("db_pitr1", "db1", 1, "y")
"""
try:
self._validate_range(range_value, range_unit)
sql = (
f"CREATE PITR {self._client._escape_identifier(name)} "
f"FOR DATABASE {self._client._escape_identifier(database_name)} "
f"RANGE {range_value} '{range_unit}'"
)
result = self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to create database PITR '{name}'")
return self.get(name)
except Exception as e:
raise PitrError(f"Failed to create database PITR '{name}': {e}") from e
[docs]
def create_table_pitr(
self,
name: str,
database_name: str,
table_name: str,
range_value: int = 1,
range_unit: str = "d",
) -> Pitr:
"""
Create table-level PITR
Args::
name: PITR name
database_name: Database name
table_name: Table name
range_value: Time range value (1-100)
range_unit: Time range unit (h, d, mo, y)
Returns::
Pitr: Created PITR object
Raises::
PitrError: If PITR creation fails
Example
>>> pitr = client.pitr.create_table_pitr("tab_pitr1", "db1", "t1", 1, "y")
"""
try:
self._validate_range(range_value, range_unit)
sql = (
f"CREATE PITR {self._client._escape_identifier(name)} "
f"FOR TABLE {self._client._escape_identifier(database_name)} "
f"{self._client._escape_identifier(table_name)} "
f"RANGE {range_value} '{range_unit}'"
)
result = self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to create table PITR '{name}'")
return self.get(name)
except Exception as e:
raise PitrError(f"Failed to create table PITR '{name}': {e}") from e
[docs]
def get(self, name: str) -> Pitr:
"""
Get PITR by name
Args::
name: PITR name
Returns::
Pitr: PITR object
Raises::
PitrError: If PITR not found
"""
try:
sql = f"SHOW PITR WHERE pitr_name = {self._client._escape_string(name)}"
result = self._get_executor().execute(sql)
if not result or not result.rows:
raise PitrError(f"PITR '{name}' not found")
row = result.rows[0]
return self._row_to_pitr(row)
except Exception as e:
raise PitrError(f"Failed to get PITR '{name}': {e}") from e
[docs]
def list(
self,
level: Optional[str] = None,
account_name: Optional[str] = None,
database_name: Optional[str] = None,
table_name: Optional[str] = None,
) -> List[Pitr]:
"""
List PITRs with optional filters
Args::
level: Filter by PITR level (cluster, account, database, table)
account_name: Filter by account name
database_name: Filter by database name
table_name: Filter by table name
Returns::
List[Pitr]: List of PITR objects
"""
try:
conditions = []
if level:
conditions.append(f"pitr_level = {self._client._escape_string(level)}")
if account_name:
conditions.append(f"account_name = {self._client._escape_string(account_name)}")
if database_name:
conditions.append(f"database_name = {self._client._escape_string(database_name)}")
if table_name:
conditions.append(f"table_name = {self._client._escape_string(table_name)}")
if conditions:
where_clause = " WHERE " + " AND ".join(conditions)
else:
where_clause = ""
sql = f"SHOW PITR{where_clause}"
result = self._get_executor().execute(sql)
if not result or not result.rows:
return []
return [self._row_to_pitr(row) for row in result.rows]
except Exception as e:
raise PitrError(f"Failed to list PITRs: {e}") from e
[docs]
def alter(self, name: str, range_value: int, range_unit: str) -> Pitr:
"""
Alter PITR range
Args::
name: PITR name
range_value: New time range value (1-100)
range_unit: New time range unit (h, d, mo, y)
Returns::
Pitr: Updated PITR object
Raises::
PitrError: If PITR alteration fails
"""
try:
self._validate_range(range_value, range_unit)
sql = f"ALTER PITR {self._client._escape_identifier(name)} " f"RANGE {range_value} '{range_unit}'"
result = self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to alter PITR '{name}'")
return self.get(name)
except Exception as e:
raise PitrError(f"Failed to alter PITR '{name}': {e}") from e
[docs]
def delete(self, name: str) -> bool:
"""
Delete PITR
Args::
name: PITR name
Returns::
bool: True if deletion was successful
Raises::
PitrError: If PITR deletion fails
"""
try:
sql = f"DROP PITR {self._client._escape_identifier(name)}"
result = self._get_executor().execute(sql)
return result is not None
except Exception as e:
raise PitrError(f"Failed to delete PITR '{name}': {e}") from e
def _validate_range(self, range_value: int, range_unit: str) -> None:
"""Validate PITR range parameters"""
if not (1 <= range_value <= 100):
raise PitrError("Range value must be between 1 and 100")
valid_units = ["h", "d", "mo", "y"]
if range_unit not in valid_units:
raise PitrError(f"Range unit must be one of: {', '.join(valid_units)}")
def _row_to_pitr(self, row: tuple) -> Pitr:
"""Convert database row to Pitr object"""
# Expected columns: pitr_name, created_time, modified_time, pitr_level,
# account_name, database_name, table_name, pitr_length, pitr_unit
return Pitr(
name=row[0],
created_time=row[1],
modified_time=row[2],
level=row[3],
account_name=row[4] if row[4] != "*" else None,
database_name=row[5] if row[5] != "*" else None,
table_name=row[6] if row[6] != "*" else None,
range_value=row[7],
range_unit=row[8],
)
class AsyncPitrManager:
"""
Asynchronous PITR management for MatrixOne.
Provides the same functionality as PitrManager but with async/await support.
Uses the same executor pattern to support both client and session contexts.
"""
def __init__(self, client, executor=None):
"""
Initialize async PITR manager.
Args:
client: MatrixOne async client instance
executor: Optional executor (e.g., async session) for executing SQL.
If None, uses client.execute
"""
self._client = client
self.executor = executor
def _get_executor(self):
"""Get the executor for SQL execution (session or client)"""
return self.executor if self.executor else self._client
def _validate_range(self, range_value: int, range_unit: str) -> None:
"""Validate PITR range parameters"""
if not (1 <= range_value <= 100):
raise PitrError("Range value must be between 1 and 100")
valid_units = ["h", "d", "mo", "y"]
if range_unit not in valid_units:
raise PitrError(f"Range unit must be one of: {', '.join(valid_units)}")
def _row_to_pitr(self, row: tuple) -> Pitr:
"""Convert database row to Pitr object"""
return Pitr(
name=row[0],
created_time=row[1],
modified_time=row[2],
level=row[3],
account_name=row[4],
database_name=row[5],
table_name=row[6],
range_value=row[7],
range_unit=row[8],
)
@requires_version(
min_version="1.0.0",
feature_name="pitr_cluster_level",
description="Cluster-level Point-in-Time Recovery functionality",
alternative="Use snapshot restore instead",
)
async def create_cluster_pitr(self, name: str, range_value: int = 1, range_unit: str = "d") -> Pitr:
"""Create cluster-level PITR asynchronously"""
try:
self._validate_range(range_value, range_unit)
sql = f"CREATE PITR {self._client._escape_identifier(name)} FOR CLUSTER RANGE {range_value} '{range_unit}'"
result = await self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to create cluster PITR '{name}'")
return await self.get(name)
except Exception as e:
raise PitrError(f"Failed to create cluster PITR '{name}': {e}")
async def create_account_pitr(
self,
name: str,
account_name: Optional[str] = None,
range_value: int = 1,
range_unit: str = "d",
) -> Pitr:
"""Create account-level PITR asynchronously"""
try:
self._validate_range(range_value, range_unit)
if account_name:
sql = (
f"CREATE PITR {self._client._escape_identifier(name)} "
f"FOR ACCOUNT {self._client._escape_identifier(account_name)} "
f"RANGE {range_value} '{range_unit}'"
)
else:
sql = f"CREATE PITR {self._client._escape_identifier(name)} FOR ACCOUNT RANGE {range_value} '{range_unit}'"
result = await self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to create account PITR '{name}'")
return await self.get(name)
except Exception as e:
raise PitrError(f"Failed to create account PITR '{name}': {e}")
async def create_database_pitr(self, name: str, database_name: str, range_value: int = 1, range_unit: str = "d") -> Pitr:
"""Create database-level PITR asynchronously"""
try:
self._validate_range(range_value, range_unit)
sql = (
f"CREATE PITR {self._client._escape_identifier(name)} "
f"FOR DATABASE {self._client._escape_identifier(database_name)} "
f"RANGE {range_value} '{range_unit}'"
)
result = await self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to create database PITR '{name}'")
return await self.get(name)
except Exception as e:
raise PitrError(f"Failed to create database PITR '{name}': {e}")
async def create_table_pitr(
self,
name: str,
database_name: str,
table_name: str,
range_value: int = 1,
range_unit: str = "d",
) -> Pitr:
"""Create table-level PITR asynchronously"""
try:
self._validate_range(range_value, range_unit)
sql = (
f"CREATE PITR {self._client._escape_identifier(name)} "
f"FOR TABLE {self._client._escape_identifier(database_name)} "
f"{self._client._escape_identifier(table_name)} "
f"RANGE {range_value} '{range_unit}'"
)
result = await self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to create table PITR '{name}'")
return await self.get(name)
except Exception as e:
raise PitrError(f"Failed to create table PITR '{name}': {e}")
async def get(self, name: str) -> Pitr:
"""Get PITR by name asynchronously"""
try:
sql = f"SHOW PITR WHERE pitr_name = {self._client._escape_string(name)}"
result = await self._get_executor().execute(sql)
if not result or not result.rows:
raise PitrError(f"PITR '{name}' not found")
row = result.rows[0]
return self._row_to_pitr(row)
except Exception as e:
raise PitrError(f"Failed to get PITR '{name}': {e}")
async def list(
self,
level: Optional[str] = None,
account_name: Optional[str] = None,
database_name: Optional[str] = None,
table_name: Optional[str] = None,
) -> List[Pitr]:
"""List PITRs asynchronously"""
try:
conditions = []
if level:
conditions.append(f"pitr_level = {self._client._escape_string(level)}")
if account_name:
conditions.append(f"account_name = {self._client._escape_string(account_name)}")
if database_name:
conditions.append(f"database_name = {self._client._escape_string(database_name)}")
if table_name:
conditions.append(f"table_name = {self._client._escape_string(table_name)}")
if conditions:
where_clause = " WHERE " + " AND ".join(conditions)
else:
where_clause = ""
sql = f"SHOW PITR{where_clause}"
result = await self._get_executor().execute(sql)
if not result or not result.rows:
return []
return [self._row_to_pitr(row) for row in result.rows]
except Exception as e:
raise PitrError(f"Failed to list PITRs: {e}")
async def alter(self, name: str, range_value: int, range_unit: str) -> Pitr:
"""Alter PITR configuration asynchronously"""
try:
self._validate_range(range_value, range_unit)
sql = f"ALTER PITR {self._client._escape_identifier(name)} RANGE {range_value} '{range_unit}'"
result = await self._get_executor().execute(sql)
if result is None:
raise PitrError(f"Failed to alter PITR '{name}'")
return await self.get(name)
except Exception as e:
raise PitrError(f"Failed to alter PITR '{name}': {e}")
async def delete(self, name: str) -> bool:
"""Delete PITR asynchronously"""
try:
sql = f"DROP PITR {self._client._escape_identifier(name)}"
result = await self._get_executor().execute(sql)
return result is not None
except Exception as e:
raise PitrError(f"Failed to delete PITR '{name}': {e}")