Source code for matrixone.metadata
# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MatrixOne Metadata Operations
This module provides metadata scanning capabilities for MatrixOne tables,
allowing users to analyze table statistics, column information, and data distribution.
"""
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from sqlalchemy.engine import Result
[docs]
class MetadataColumn(Enum):
"""Enumeration of available metadata columns"""
COL_NAME = "col_name"
OBJECT_NAME = "object_name"
IS_HIDDEN = "is_hidden"
OBJ_LOC = "obj_loc"
CREATE_TS = "create_ts"
DELETE_TS = "delete_ts"
ROWS_CNT = "rows_cnt"
NULL_CNT = "null_cnt"
COMPRESS_SIZE = "compress_size"
ORIGIN_SIZE = "origin_size"
MIN = "min"
MAX = "max"
SUM = "sum"
[docs]
@dataclass
class MetadataRow:
"""Structured representation of a metadata scan row"""
col_name: str
object_name: str
is_hidden: bool
obj_loc: str
create_ts: str
delete_ts: str
rows_cnt: int
null_cnt: int
compress_size: int
origin_size: int
min: Optional[Any] = None
max: Optional[Any] = None
sum: Optional[Any] = None
[docs]
@classmethod
def from_sqlalchemy_row(cls, row) -> "MetadataRow":
"""Create MetadataRow from SQLAlchemy Row object"""
return cls(
col_name=row._mapping['col_name'],
object_name=row._mapping['object_name'],
is_hidden=row._mapping['is_hidden'] == 'true',
obj_loc=row._mapping['obj_loc'],
create_ts=row._mapping['create_ts'],
delete_ts=row._mapping['delete_ts'],
rows_cnt=row._mapping['rows_cnt'],
null_cnt=row._mapping['null_cnt'],
compress_size=row._mapping['compress_size'],
origin_size=row._mapping['origin_size'],
min=row._mapping.get('min'),
max=row._mapping.get('max'),
sum=row._mapping.get('sum'),
)
class BaseMetadataManager:
"""
Base metadata manager with shared SQL building logic.
Uses executor pattern to support both Client and Session contexts.
"""
def __init__(self, client, executor=None):
"""
Initialize base metadata manager.
Args:
client: MatrixOne client instance
executor: Optional executor (Session). If None, uses client directly
"""
self.client = client
self.executor = executor if executor is not None else client
def _get_executor(self):
"""Get the executor for SQL execution"""
return self.executor
def _build_metadata_scan_sql(
self,
dbname: str,
tablename: str,
is_tombstone: Optional[bool] = None,
indexname: Optional[str] = None,
distinct_object_name: Optional[bool] = None,
) -> str:
"""
Build metadata_scan SQL query.
Args:
dbname: Database name
tablename: Table name
is_tombstone: Optional tombstone flag (True or False)
indexname: Optional index name
Returns:
SQL query string
"""
# Build the metadata_scan SQL query based on MatrixOne syntax
if is_tombstone and indexname:
# Format: "db_name.table_name.?index_name.#"
table_ref = f"{dbname}.{tablename}.?{indexname}.#"
elif is_tombstone:
# Format: "db_name.table_name.#"
table_ref = f"{dbname}.{tablename}.#"
elif indexname:
# Format: "db_name.table_name.?index_name"
table_ref = f"{dbname}.{tablename}.?{indexname}"
else:
# Format: "db_name.table_name"
table_ref = f"{dbname}.{tablename}"
# Column parameter - always use '*' for metadata_scan
column_param = "'*'"
# Build the SQL query
if distinct_object_name:
return f"SELECT DISTINCT(object_name) as object_name, * FROM metadata_scan('{table_ref}', {column_param}) g"
else:
return f"SELECT * FROM metadata_scan('{table_ref}', {column_param}) g"
def _process_scan_result(
self, result: Result, columns: Optional[Union[List[Union[MetadataColumn, str]], str]] = None
) -> Union[Result, List[MetadataRow], List[Dict[str, Any]]]:
"""
Process scan result based on columns parameter.
Args:
result: SQLAlchemy Result object
columns: Optional list of columns to return
Returns:
Processed result based on columns parameter
"""
# If columns are specified, return structured results
if columns is not None:
if columns == "*" or (isinstance(columns, list) and len(columns) == 0):
# Return all columns as structured data
return [MetadataRow.from_sqlalchemy_row(row) for row in result.fetchall()]
else:
# Return only specified columns as structured data
rows = []
for row in result.fetchall():
metadata_row = MetadataRow.from_sqlalchemy_row(row)
# Filter to only requested columns
filtered_row = {}
for col in columns:
col_name = col.value if isinstance(col, MetadataColumn) else col
if hasattr(metadata_row, col_name):
filtered_row[col_name] = getattr(metadata_row, col_name)
rows.append(filtered_row)
return rows
# Return raw SQLAlchemy Result
return result
def _format_size(self, size_bytes: int) -> str:
"""
Format size in bytes to human-readable format.
Args:
size_bytes: Size in bytes
Returns:
Formatted size string (e.g., "1.5 MB", "500 KB")
"""
if size_bytes == 0:
return "0 B"
units = ['B', 'KB', 'MB', 'GB', 'TB']
unit_index = 0
size = float(size_bytes)
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
# Format to 2 decimal places maximum
if size == int(size):
return f"{int(size)} {units[unit_index]}"
else:
return f"{size:.2f} {units[unit_index]}".rstrip('0').rstrip('.')
def _prepare_brief_stats(
self,
dbname: str,
tablename: str,
is_tombstone: Optional[bool] = None,
indexname: Optional[str] = None,
include_tombstone: bool = False,
include_indexes: Optional[List[str]] = None,
) -> List[tuple]:
"""
Prepare SQL queries for brief statistics collection.
Returns:
List of (query_key, sql) tuples
"""
queries = []
# Main table query
table_sql = self._build_metadata_scan_sql(dbname, tablename, is_tombstone, indexname)
queries.append((tablename, table_sql))
# Tombstone query
if include_tombstone:
tombstone_sql = self._build_metadata_scan_sql(dbname, tablename, is_tombstone=True)
queries.append(("tombstone", tombstone_sql))
# Index queries
if include_indexes:
for index_name in include_indexes:
index_sql = self._build_metadata_scan_sql(dbname, tablename, indexname=index_name)
queries.append((index_name, index_sql))
return queries
def _process_brief_stats_result(self, rows) -> Dict[str, Any]:
"""
Process rows into brief statistics.
Args:
rows: Fetched rows from query result
Returns:
Dictionary with aggregated statistics
"""
if not rows:
return {}
total_objects = len(set(row._mapping['object_name'] for row in rows))
total_original_size = sum(row._mapping.get('origin_size', 0) for row in rows)
total_compress_size = sum(row._mapping.get('compress_size', 0) for row in rows)
total_row_cnt = sum(row._mapping.get('rows_cnt', 0) for row in rows)
total_null_cnt = sum(row._mapping.get('null_cnt', 0) for row in rows)
return {
"total_objects": total_objects,
"original_size": self._format_size(total_original_size),
"compress_size": self._format_size(total_compress_size),
"row_cnt": total_row_cnt,
"null_cnt": total_null_cnt,
}
def _prepare_detail_stats(
self,
dbname: str,
tablename: str,
is_tombstone: Optional[bool] = None,
indexname: Optional[str] = None,
include_tombstone: bool = False,
include_indexes: Optional[List[str]] = None,
) -> List[tuple]:
"""
Prepare SQL queries for detailed statistics collection.
Returns:
List of (query_key, sql) tuples
"""
queries = []
# Main table query
table_sql = self._build_metadata_scan_sql(dbname, tablename, is_tombstone, indexname)
queries.append((tablename, table_sql))
# Tombstone query
if include_tombstone:
tombstone_sql = self._build_metadata_scan_sql(dbname, tablename, is_tombstone=True)
queries.append(("tombstone", tombstone_sql))
# Index queries
if include_indexes:
for index_name in include_indexes:
index_sql = self._build_metadata_scan_sql(dbname, tablename, indexname=index_name)
queries.append((index_name, index_sql))
return queries
def _process_detail_stats_result(self, rows) -> List[Dict[str, Any]]:
"""
Process rows into detailed statistics.
Args:
rows: Fetched rows from query result
Returns:
List of detailed statistics dictionaries
"""
details = []
for row in rows:
details.append(
{
"object_name": row._mapping['object_name'],
"create_ts": row._mapping['create_ts'],
"delete_ts": row._mapping['delete_ts'],
"row_cnt": row._mapping.get('rows_cnt', 0),
"null_cnt": row._mapping.get('null_cnt', 0),
"original_size": self._format_size(row._mapping.get('origin_size', 0)),
"compress_size": self._format_size(row._mapping.get('compress_size', 0)),
}
)
return details
[docs]
class MetadataManager(BaseMetadataManager):
"""
Synchronous metadata manager for MatrixOne table metadata operations.
This class provides comprehensive metadata scanning capabilities for analyzing
table statistics, column information, data distribution, and storage details.
It enables deep introspection of table structure and performance characteristics.
Key Features:
- **Metadata scanning**: Access detailed table and column metadata
- **Column statistics**: Row counts, null counts, min/max values, sums
- **Storage analysis**: Compression ratios, object sizes, data distribution
- **Index metadata**: Scan index-specific metadata
- **Tombstone inspection**: Analyze deleted data objects
- **Performance insights**: Identify storage hotspots and optimization opportunities
- **Transaction-aware**: Full integration with transaction contexts
Executor Pattern:
- If executor is None, uses self.client.execute (default client-level executor)
- If executor is provided (e.g., session), uses executor.execute (transaction-aware)
- All operations can participate in transactions when used via session
Available Metadata Columns:
- **COL_NAME**: Column name
- **OBJECT_NAME**: Object/block name in storage
- **IS_HIDDEN**: Whether object is hidden
- **OBJ_LOC**: Object storage location
- **CREATE_TS**: Creation timestamp
- **DELETE_TS**: Deletion timestamp (for tombstones)
- **ROWS_CNT**: Number of rows in the object
- **NULL_CNT**: Number of null values
- **COMPRESS_SIZE**: Compressed storage size
- **ORIGIN_SIZE**: Original uncompressed size
- **MIN**: Minimum value in column
- **MAX**: Maximum value in column
- **SUM**: Sum of values (for numeric columns)
Usage Examples::
from matrixone import Client
from matrixone.metadata import MetadataColumn
client = Client(host='localhost', port=6001, user='root', password='111', database='test')
# Basic metadata scan (returns SQLAlchemy Result)
result = client.metadata.scan('test_db', 'users')
for row in result:
print(f"Column: {row.col_name}, Rows: {row.rows_cnt}")
# Scan specific columns with structured output
rows = client.metadata.scan(
'test_db', 'users',
columns=[MetadataColumn.COL_NAME, MetadataColumn.ROWS_CNT, MetadataColumn.NULL_CNT]
)
for row in rows:
print(f"{row['col_name']}: {row['rows_cnt']} rows, {row['null_cnt']} nulls")
# Get all structured metadata
metadata_rows = client.metadata.scan('test_db', 'users', columns='*')
for row in metadata_rows:
print(f"{row.col_name}: {row.rows_cnt} rows")
# Scan index metadata
index_result = client.metadata.scan(
'test_db', 'users',
indexname='idx_email'
)
# Scan tombstone (deleted) objects
tombstone_result = client.metadata.scan(
'test_db', 'users',
is_tombstone=True
)
# Get table statistics summary
stats = client.metadata.get_table_stats('test_db', 'users')
print(f"Total rows: {stats['total_rows']}")
print(f"Total size: {stats['total_size']}")
print(f"Compression ratio: {stats['compression_ratio']}")
# Get detailed statistics with indexes and tombstones
detailed_stats = client.metadata.get_table_stats(
'test_db', 'users',
include_indexes=['idx_email', 'idx_name'],
include_tombstone=True
)
print(f"Table data: {detailed_stats['table']}")
print(f"Tombstone data: {detailed_stats['tombstone']}")
for idx in detailed_stats['indexes']:
print(f"Index {idx['name']}: {idx['size']}")
# Using within a transaction
with client.session() as session:
# Scan metadata within transaction context
result = session.metadata.scan('test_db', 'users')
Use Cases:
- **Performance analysis**: Identify tables with high null counts or poor compression
- **Storage optimization**: Analyze object distribution and compression ratios
- **Data quality**: Check for null values and data distribution
- **Index analysis**: Evaluate index size and effectiveness
- **Tombstone management**: Monitor deleted data objects
- **Capacity planning**: Track table growth and storage usage
See Also:
- Client.create_table: For creating tables
- VectorManager: For vector-specific metadata operations
- LoadDataManager: For bulk data loading
"""
[docs]
def scan(
self,
dbname: str,
tablename: str,
is_tombstone: Optional[bool] = None,
indexname: Optional[str] = None,
columns: Optional[Union[List[Union[MetadataColumn, str]], str]] = None,
distinct_object_name: Optional[bool] = None,
) -> Union[Result, List[MetadataRow]]:
"""
Scan table metadata using metadata_scan function.
Args:
dbname: Database name
tablename: Table name
is_tombstone: Optional tombstone flag (True or False)
indexname: Optional index name
columns: Optional list of columns to return. Can be MetadataColumn enum values or strings.
If None, returns all columns as SQLAlchemy Result. If specified, returns List of MetadataRow.
distinct_object_name: Optional flag to return distinct object names only.
Returns:
- If columns is None: SQLAlchemy Result object
- If columns is specified: List of MetadataRow
Example::
# Scan all columns of a table (returns SQLAlchemy Result)
result = client.metadata.scan("test_db", "users")
# Scan specific column
result = client.metadata.scan("test_db", "users", indexname="id")
# Scan with tombstone filter
result = client.metadata.scan("test_db", "users", is_tombstone=False)
# Scan tombstone objects
result = client.metadata.scan("test_db", "users", is_tombstone=True)
# Scan specific index
result = client.metadata.scan("test_db", "users", indexname="idx_name")
# Get structured results with specific columns
from matrixone.metadata import MetadataColumn
rows = client.metadata.scan("test_db", "users",
columns=[MetadataColumn.COL_NAME,
MetadataColumn.ROWS_CNT])
for row in rows:
print(f"Column: {row.col_name}, Rows: {row.rows_cnt}")
# Get all structured results
rows = client.metadata.scan("test_db", "users", columns="*")
"""
# Build SQL query
sql = self._build_metadata_scan_sql(dbname, tablename, is_tombstone, indexname, distinct_object_name)
# Execute the query using executor
result = self._get_executor().execute(sql)
# Process result based on columns parameter
return self._process_scan_result(result, columns)
[docs]
def get_table_brief_stats(
self,
dbname: str,
tablename: str,
is_tombstone: Optional[bool] = None,
indexname: Optional[str] = None,
include_tombstone: bool = False,
include_indexes: Optional[List[str]] = None,
) -> Dict[str, Dict[str, Any]]:
"""
Get brief statistics for a table, tombstone, and indexes.
Args:
dbname: Database name
tablename: Table name
is_tombstone: Optional tombstone flag (True or False)
indexname: Optional index name
include_tombstone: Whether to include tombstone statistics
include_indexes: List of index names to include
Returns:
Dictionary with brief statistics for table, tombstone, and indexes
"""
queries = self._prepare_brief_stats(dbname, tablename, is_tombstone, indexname, include_tombstone, include_indexes)
result = {}
for key, sql in queries:
query_result = self._get_executor().execute(sql)
rows = query_result.fetchall()
stats = self._process_brief_stats_result(rows)
if stats:
result[key] = stats
return result
[docs]
def get_table_detail_stats(
self,
dbname: str,
tablename: str,
is_tombstone: Optional[bool] = None,
indexname: Optional[str] = None,
include_tombstone: bool = False,
include_indexes: Optional[List[str]] = None,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Get detailed statistics for a table, tombstone, and indexes.
Args:
dbname: Database name
tablename: Table name
is_tombstone: Optional tombstone flag (True or False)
indexname: Optional index name
include_tombstone: Whether to include tombstone statistics
include_indexes: List of index names to include
Returns:
Dictionary with detailed statistics for table, tombstone, and indexes
"""
queries = self._prepare_detail_stats(dbname, tablename, is_tombstone, indexname, include_tombstone, include_indexes)
result = {}
for key, sql in queries:
query_result = self._get_executor().execute(sql)
rows = query_result.fetchall()
details = self._process_detail_stats_result(rows)
if details:
result[key] = details
return result
[docs]
class AsyncMetadataManager(BaseMetadataManager):
"""
Asynchronous metadata manager for MatrixOne table metadata operations.
Provides the same comprehensive metadata scanning functionality as MetadataManager
but with full async/await support for non-blocking I/O operations. Ideal for
high-concurrency applications requiring metadata analysis.
Key Features:
- **Non-blocking operations**: All metadata operations use async/await
- **Async metadata scanning**: Asynchronously access table and column metadata
- **Concurrent analysis**: Scan multiple tables concurrently
- **Async statistics**: Get table statistics without blocking
- **Transaction-aware**: Full integration with async transaction contexts
- **Executor pattern**: Works with both async client and async session
Executor Pattern:
- If executor is None, uses self.client.execute (default async client-level executor)
- If executor is provided (e.g., async session), uses executor.execute (async transaction-aware)
- All operations are non-blocking and use async/await
- Enables concurrent metadata operations
Usage Examples::
from matrixone import AsyncClient
from matrixone.metadata import MetadataColumn
import asyncio
async def main():
client = AsyncClient()
await client.connect(host='localhost', port=6001, user='root', password='111', database='test')
# Basic async metadata scan
result = await client.metadata.scan('test_db', 'users')
async for row in result:
print(f"Column: {row.col_name}, Rows: {row.rows_cnt}")
# Scan specific columns with structured output
rows = await client.metadata.scan(
'test_db', 'users',
columns=[MetadataColumn.COL_NAME, MetadataColumn.ROWS_CNT]
)
for row in rows:
print(f"{row['col_name']}: {row['rows_cnt']} rows")
# Get async table statistics
stats = await client.metadata.get_table_stats('test_db', 'users')
print(f"Total rows: {stats['total_rows']}")
print(f"Compression ratio: {stats['compression_ratio']}")
# Concurrent metadata scanning of multiple tables
results = await asyncio.gather(
client.metadata.scan('test_db', 'users'),
client.metadata.scan('test_db', 'orders'),
client.metadata.scan('test_db', 'products')
)
# Concurrent statistics for multiple tables
stats_list = await asyncio.gather(
client.metadata.get_table_stats('test_db', 'users'),
client.metadata.get_table_stats('test_db', 'orders'),
client.metadata.get_table_stats('test_db', 'products')
)
for table_stats in stats_list:
print(f"Table: {table_stats['total_rows']} rows")
# Scan index metadata asynchronously
index_result = await client.metadata.scan(
'test_db', 'users',
indexname='idx_email'
)
# Using within async transaction
async with client.session() as session:
result = await session.metadata.scan('test_db', 'users')
stats = await session.metadata.get_table_stats('test_db', 'orders')
await client.disconnect()
asyncio.run(main())
Use Cases:
- **Async performance monitoring**: Non-blocking metadata collection
- **Concurrent analysis**: Analyze multiple tables simultaneously
- **Real-time dashboards**: Update table statistics without blocking
- **High-throughput applications**: Metadata operations in async web servers
- **Batch metadata collection**: Gather stats from many tables efficiently
See Also:
- AsyncClient: For async database operations
- AsyncSession: For async transaction management
- MetadataManager: For synchronous metadata operations
"""
[docs]
async def scan(
self,
dbname: str,
tablename: str,
is_tombstone: Optional[bool] = None,
indexname: Optional[str] = None,
columns: Optional[List[Union[MetadataColumn, str]]] = None,
distinct_object_name: Optional[bool] = None,
) -> Union[Result, List[MetadataRow]]:
"""
Scan table metadata using metadata_scan function (async).
Args:
dbname: Database name
tablename: Table name
is_tombstone: Optional tombstone flag (True or False)
indexname: Optional index name
columns: Optional list of columns to return
Returns:
SQLAlchemy Result object
Example::
# Scan all columns of a table
result = await client.metadata.scan("test_db", "users")
# Scan specific column
result = await client.metadata.scan("test_db", "users", indexname="id")
# Scan with tombstone filter
result = await client.metadata.scan("test_db", "users", is_tombstone=False)
# Scan tombstone objects
result = await client.metadata.scan("test_db", "users", is_tombstone=True)
# Scan specific index
result = await client.metadata.scan("test_db", "users", indexname="idx_name")
"""
# Build SQL query
sql = self._build_metadata_scan_sql(dbname, tablename, is_tombstone, indexname, distinct_object_name)
# Execute the query using executor
result = await self._get_executor().execute(sql)
# Process result based on columns parameter
return self._process_scan_result(result, columns)
[docs]
async def get_table_brief_stats(
self,
dbname: str,
tablename: str,
is_tombstone: Optional[bool] = None,
indexname: Optional[str] = None,
include_tombstone: bool = False,
include_indexes: Optional[List[str]] = None,
) -> Dict[str, Dict[str, Any]]:
"""
Get brief statistics for a table, tombstone, and indexes (async).
Args:
dbname: Database name
tablename: Table name
is_tombstone: Optional tombstone flag (True or False)
indexname: Optional index name
include_tombstone: Whether to include tombstone statistics
include_indexes: List of index names to include
Returns:
Dictionary with brief statistics for table, tombstone, and indexes
"""
queries = self._prepare_brief_stats(dbname, tablename, is_tombstone, indexname, include_tombstone, include_indexes)
result = {}
for key, sql in queries:
query_result = await self._get_executor().execute(sql)
rows = query_result.fetchall()
stats = self._process_brief_stats_result(rows)
if stats:
result[key] = stats
return result
[docs]
async def get_table_detail_stats(
self,
dbname: str,
tablename: str,
is_tombstone: Optional[bool] = None,
indexname: Optional[str] = None,
include_tombstone: bool = False,
include_indexes: Optional[List[str]] = None,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Get detailed statistics for a table, tombstone, and indexes (async).
Args:
dbname: Database name
tablename: Table name
is_tombstone: Optional tombstone flag (True or False)
indexname: Optional index name
include_tombstone: Whether to include tombstone statistics
include_indexes: List of index names to include
Returns:
Dictionary with detailed statistics for table, tombstone, and indexes
"""
queries = self._prepare_detail_stats(dbname, tablename, is_tombstone, indexname, include_tombstone, include_indexes)
result = {}
for key, sql in queries:
query_result = await self._get_executor().execute(sql)
rows = query_result.fetchall()
details = self._process_detail_stats_result(rows)
if details:
result[key] = details
return result