#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MatrixOne Interactive Diagnostic Tool
A command-line tool for diagnosing and inspecting MatrixOne database objects,
especially secondary indexes and vector indexes.
"""
import cmd
import datetime
import getpass
import json
import logging
import string
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
import uuid
from .client import Client
from .cdc import CDCSinkType, build_mysql_uri, _parse_watermark_timestamp
# Set default logging level to ERROR to keep output clean
# Users can override this with --log-level parameter
logging.getLogger('matrixone').setLevel(logging.ERROR)
# Try to import prompt_toolkit for better input experience
try:
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import FileHistory
from prompt_toolkit.styles import Style
PROMPT_TOOLKIT_AVAILABLE = True
except ImportError:
PROMPT_TOOLKIT_AVAILABLE = False
# Provide stub classes for documentation generation
class Completer:
"""Stub Completer class when prompt_toolkit is not available"""
pass
# Custom completer for mo-diag commands
if PROMPT_TOOLKIT_AVAILABLE:
class MODiagCompleter(Completer):
"""Smart completer for mo-diag commands that provides table and database name completion"""
def __init__(self, cli_instance):
self.cli = cli_instance
[docs]
def get_completions(self, document, complete_event):
"""Generate completions based on current input"""
text = document.text_before_cursor
words = text.split()
# Available commands
all_commands = [
'show_indexes',
'show_all_indexes',
'verify_counts',
'show_ivf_status',
'show_table_stats',
'flush_table',
'tables',
'databases',
'sql',
'use',
'connect',
'history',
'help',
'cdc_health',
'cdc_tasks',
'cdc_create',
'cdc_drop',
'cdc_task',
'exit',
]
# If empty or only whitespace, suggest commands
if not words:
for command in all_commands:
yield Completion(command, start_position=0)
return
# If we're typing the first word (command), complete it
if len(words) == 1 and not text.endswith(' '):
partial_command = words[0]
for command in all_commands:
if command.startswith(partial_command):
yield Completion(command, start_position=-len(partial_command))
return
command = words[0]
# Commands that expect table name as first argument
table_commands = ['show_indexes', 'verify_counts', 'show_table_stats', 'flush_table']
# Commands that expect database name
database_commands = ['use']
# Determine what to complete
if command in table_commands:
# If we're on the first argument after the command
if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')):
# Complete table names
partial = words[1] if len(words) == 2 else ''
for table in self._get_tables():
if table.startswith(partial):
yield Completion(table, start_position=-len(partial))
# Second argument might be database name
elif len(words) == 2 or (len(words) == 3 and not text.endswith(' ')):
partial = words[2] if len(words) == 3 else ''
for db in self._get_databases():
if db.startswith(partial):
yield Completion(db, start_position=-len(partial))
elif command in database_commands:
# Complete database names
if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')):
partial = words[1] if len(words) == 2 else ''
for db in self._get_databases():
if db.startswith(partial):
yield Completion(db, start_position=-len(partial))
elif command == 'show_ivf_status':
# Can take -t <table> or database name
if '-t' in words:
# After -t, complete table names
t_index = words.index('-t')
if len(words) == t_index + 1 or (len(words) == t_index + 2 and not text.endswith(' ')):
partial = words[t_index + 1] if len(words) == t_index + 2 else ''
for table in self._get_tables():
if table.startswith(partial):
yield Completion(table, start_position=-len(partial))
else:
# First argument is database name
if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')):
partial = words[1] if len(words) == 2 else ''
for db in self._get_databases():
if db.startswith(partial):
yield Completion(db, start_position=-len(partial))
elif command == 'cdc_health':
partial = words[-1] if not text.endswith(' ') else ''
options = [
'--details',
'--task=',
'--threshold=',
'--strict',
]
for option in options:
if option.startswith(partial):
yield Completion(option, start_position=-len(partial))
elif command == 'cdc_tasks':
partial = words[-1] if not text.endswith(' ') else ''
options = [
'--details',
]
for option in options:
if option.startswith(partial):
yield Completion(option, start_position=-len(partial))
elif command == 'cdc_create':
partial = words[-1] if not text.endswith(' ') else ''
options = [
'--table-level',
'--database-level',
]
for option in options:
if option.startswith(partial):
yield Completion(option, start_position=-len(partial))
elif command == 'cdc_drop':
if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')):
partial = words[1] if len(words) == 2 else ''
for task in self._get_cdc_tasks():
if task.startswith(partial):
yield Completion(task, start_position=-len(partial))
else:
partial = words[-1] if not text.endswith(' ') else ''
options = [
'--force',
]
for option in options:
if option.startswith(partial):
yield Completion(option, start_position=-len(partial))
elif command == 'cdc_task':
if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')):
partial = words[1] if len(words) == 2 else ''
for task in self._get_cdc_tasks():
if task.startswith(partial):
yield Completion(task, start_position=-len(partial))
else:
partial = words[-1] if not text.endswith(' ') else ''
options = [
'--details',
'--no-watermarks',
'--watermarks-only',
'--pause',
'--resume',
'--restart',
'--threshold=',
'--table=',
'--strict',
]
for option in options:
if option.startswith(partial):
yield Completion(option, start_position=-len(partial))
elif command == 'cdc_task':
if len(words) == 1 or (len(words) == 2 and not text.endswith(' ')):
partial = words[1] if len(words) == 2 else ''
for task in self._get_cdc_tasks():
if task.startswith(partial):
yield Completion(task, start_position=-len(partial))
else:
partial = words[-1] if not text.endswith(' ') else ''
options = [
'--details',
'--json',
'--no-watermarks',
'--watermarks-only',
'--pause',
'--resume',
'--restart',
'--threshold=',
'--strict',
]
for option in options:
if option.startswith(partial):
yield Completion(option, start_position=-len(partial))
def _get_tables(self):
"""Get list of tables in current database"""
if not self.cli.client or not self.cli.current_database:
return []
try:
result = self.cli.client.execute("SHOW TABLES")
return [row[0] for row in result.fetchall()]
except Exception:
return []
def _get_databases(self):
"""Get list of all databases"""
if not self.cli.client:
return []
try:
result = self.cli.client.execute("SHOW DATABASES")
return [row[0] for row in result.fetchall()]
except Exception:
return []
def _get_cdc_tasks(self) -> List[str]:
"""Return CDC task names for completion."""
if not self.cli.client:
return []
try:
return [task.task_name for task in self.cli.client.cdc.list()]
except Exception:
return []
else:
# Stub class for documentation generation when prompt_toolkit is not available
[docs]
class MODiagCompleter:
"""Smart completer for mo-diag commands (stub when prompt_toolkit unavailable)"""
[docs]
def __init__(self, cli_instance):
self.cli = cli_instance
# ANSI Color codes for terminal output
[docs]
class Colors:
"""ANSI color codes"""
RESET = '\033[0m'
BOLD = '\033[1m'
# Foreground colors
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
# Background colors
BG_RED = '\033[101m'
BG_GREEN = '\033[102m'
BG_YELLOW = '\033[103m'
[docs]
@staticmethod
def disable():
"""Disable colors (for non-terminal output)"""
Colors.RESET = ''
Colors.BOLD = ''
Colors.RED = ''
Colors.GREEN = ''
Colors.YELLOW = ''
Colors.BLUE = ''
Colors.MAGENTA = ''
Colors.CYAN = ''
Colors.WHITE = ''
Colors.BG_RED = ''
Colors.BG_GREEN = ''
Colors.BG_YELLOW = ''
# Check if output is to a terminal
if not sys.stdout.isatty():
Colors.disable()
[docs]
def success(msg):
"""Print success message in green"""
return f"{Colors.GREEN}{msg}{Colors.RESET}"
[docs]
def error(msg):
"""Print error message in red"""
return f"{Colors.RED}{msg}{Colors.RESET}"
[docs]
def warning(msg):
"""Print warning message in yellow"""
return f"{Colors.YELLOW}{msg}{Colors.RESET}"
[docs]
def info(msg):
"""Print info message in cyan"""
return f"{Colors.CYAN}{msg}{Colors.RESET}"
[docs]
def bold(msg):
"""Print message in bold"""
return f"{Colors.BOLD}{msg}{Colors.RESET}"
_HEX_CHARS = set(string.hexdigits)
def _decode_hex_string(value: str) -> str:
stripped = value.strip()
if not stripped or len(stripped) % 2 != 0:
return value
if not all(ch in _HEX_CHARS for ch in stripped):
return value
try:
decoded = bytes.fromhex(stripped).decode("utf-8")
except (ValueError, UnicodeDecodeError):
return value
return decoded
def _format_json_if_possible(value: str) -> str:
try:
parsed = json.loads(value)
except (TypeError, ValueError):
return value
return json.dumps(parsed, indent=2, ensure_ascii=False, default=str)
def _format_cdc_value(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, (dict, list)):
return json.dumps(value, indent=2, ensure_ascii=False, default=str)
if isinstance(value, bytes):
try:
value = value.decode("utf-8")
except UnicodeDecodeError:
value = value.decode("utf-8", errors="replace")
text = value if isinstance(value, str) else str(value)
decoded = _decode_hex_string(text)
formatted = _format_json_if_possible(decoded)
return formatted
def _parse_duration(value: str) -> datetime.timedelta:
text = value.strip().lower()
if not text:
raise ValueError("Duration cannot be empty.")
units = {"s": "seconds", "m": "minutes", "h": "hours"}
unit = None
number = text
if text[-1] in units:
unit = text[-1]
number = text[:-1]
try:
amount = float(number)
except ValueError:
raise ValueError(f"Unable to parse duration '{value}'. Use formats like 5m, 30s, 1.5h.") from None
if amount < 0:
raise ValueError("Duration must be non-negative.")
if unit is None:
return datetime.timedelta(minutes=amount)
kwargs = {units[unit]: amount}
return datetime.timedelta(**kwargs)
def _format_duration(delta: datetime.timedelta) -> str:
total_seconds = delta.total_seconds()
if total_seconds == 0:
return "0s"
if total_seconds % 3600 == 0:
hours = total_seconds / 3600
return f"{hours:g}h"
if total_seconds % 60 == 0:
minutes = total_seconds / 60
return f"{minutes:g}m"
if total_seconds < 60:
return f"{total_seconds:g}s"
minutes = total_seconds / 60
return f"{minutes:.1f}m"
def _format_timestamp(value: datetime.datetime) -> str:
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
value = value.astimezone(datetime.timezone.utc)
formatted = value.strftime("%Y-%m-%d %H:%M:%S.%f %Z")
formatted = formatted.rstrip("0").rstrip(".")
return formatted
def _parse_options_string(raw: str) -> Optional[Dict[str, Any]]:
options: Dict[str, Any] = {}
for segment in raw.split(","):
segment = segment.strip()
if not segment:
continue
if "=" not in segment:
raise ValueError(f"Invalid option format: '{segment}'. Expected key=value.")
key, value = segment.split("=", 1)
key = key.strip()
value = value.strip()
if not key:
raise ValueError("Option key cannot be empty.")
lower = value.lower()
if lower in {"true", "false"}:
options[key] = lower == "true"
else:
options[key] = value
return options
def _parse_table_mapping(line: str) -> Dict[str, str]:
if ":" in line:
source_part, sink_part = line.split(":", 1)
else:
source_part, sink_part = line, line
source_part = source_part.strip()
sink_part = sink_part.strip()
if "." not in source_part:
raise ValueError("Source mapping must include database.table (e.g. src_db.orders)")
if "." not in sink_part:
raise ValueError("Sink mapping must include database.table (e.g. dest_db.orders)")
src_db, src_table = source_part.split(".", 1)
dest_db, dest_table = sink_part.split(".", 1)
if not src_db or not src_table or not dest_db or not dest_table:
raise ValueError("Incomplete mapping. Provide both database and table names.")
return {
"source_db": src_db,
"source_table": src_table,
"sink_db": dest_db,
"sink_table": dest_table,
}
def _print_cdc_field(label: str, value: Any, indent: str = " ") -> None:
formatted = _format_cdc_value(value)
if formatted is None:
return
if "\n" in formatted:
indented = "\n".join(f"{indent} {line}" for line in formatted.splitlines())
print(f"{indent}{info(label)}\n{indented}")
else:
print(f"{indent}{info(label)} {formatted}")
[docs]
class MatrixOneCLI(cmd.Cmd):
"""Interactive CLI for MatrixOne diagnostics"""
intro = """
╔══════════════════════════════════════════════════════════════╗
║ MatrixOne Interactive Diagnostic Tool ║
║ ║
║ Type help or ? to list commands. ║
║ Type help <command> for detailed help on a command. ║
║ ║
║ Tips: ║
║ • Press Tab for auto-completion (tables/databases) ║
║ • Use ↑/↓ arrows to browse command history ║
║ • Press Ctrl+R for history search ║
╚══════════════════════════════════════════════════════════════╝
"""
prompt = f'{Colors.BOLD}{Colors.GREEN}MO-DIAG>{Colors.RESET} '
[docs]
def __init__(self, client: Optional[Client] = None):
"""
Initialize the CLI tool.
Args:
client: Optional MatrixOne client. If not provided, you'll need to connect manually.
"""
super().__init__()
self.client = client
self.current_database = None
# Setup prompt_toolkit session if available
if PROMPT_TOOLKIT_AVAILABLE:
# Setup command history file
history_file = Path.home() / '.mo_diag_history'
# Create completer
completer = MODiagCompleter(self)
self.session = PromptSession(
history=FileHistory(str(history_file)),
completer=completer,
complete_while_typing=False, # Only complete on Tab
style=Style.from_dict(
{
'prompt': 'bold ansigreen',
'database': 'bold ansiyellow',
}
),
)
else:
self.session = None
if self.client and hasattr(self.client, '_connection_params'):
self.current_database = self.client._connection_params.get('database')
if self.current_database:
self.prompt = (
f'{Colors.BOLD}{Colors.GREEN}MO-DIAG{Colors.RESET}{Colors.BOLD}'
f'[{Colors.YELLOW}{self.current_database}{Colors.RESET}{Colors.BOLD}]'
f'{Colors.GREEN}>{Colors.RESET} '
)
def _prompt(self, message: str, default: Optional[str] = None, *, is_password: bool = False) -> str:
prompt_message = message
if default and not is_password:
prompt_message += f" [{default}]"
prompt_message += ": "
while True:
try:
if PROMPT_TOOLKIT_AVAILABLE and getattr(self, "session", None):
response = self.session.prompt(prompt_message, is_password=is_password)
else:
if is_password:
response = getpass.getpass(prompt_message)
else:
response = input(prompt_message)
except (EOFError, KeyboardInterrupt):
print()
return ""
if not response and default is not None:
response = default
if response is None:
response = ""
return response
[docs]
def cmdloop(self, intro=None):
"""Override cmdloop to use prompt_toolkit for better input experience"""
if not PROMPT_TOOLKIT_AVAILABLE or not self.session:
# Fall back to standard cmdloop
return super().cmdloop(intro)
# Print intro
self.preloop()
if intro is not None:
self.intro = intro
if self.intro:
self.stdout.write(str(self.intro) + "\n")
stop = None
while not stop:
try:
# Create colored prompt
if self.current_database:
prompt_text = HTML(
f'<prompt>MO-DIAG</prompt>[<database>{self.current_database}</database>]<prompt>></prompt> '
)
else:
prompt_text = HTML('<prompt>MO-DIAG></prompt> ')
line = self.session.prompt(prompt_text)
line = self.precmd(line)
stop = self.onecmd(line)
stop = self.postcmd(stop, line)
except KeyboardInterrupt:
print("^C")
except EOFError:
print()
break
self.postloop()
[docs]
def do_connect(self, arg):
"""
Connect to MatrixOne database.
Usage: connect <host> <port> <user> <password> [database]
Example: connect localhost 6001 root 111 test
"""
args = arg.split()
if len(args) < 4:
print("❌ Usage: connect <host> <port> <user> <password> [database]")
return
host = args[0]
port = int(args[1])
user = args[2]
password = args[3]
database = args[4] if len(args) > 4 else None
try:
if not self.client:
self.client = Client()
self.client.connect(host=host, port=port, user=user, password=password, database=database)
self.current_database = database
if database:
self.prompt = (
f'{Colors.BOLD}{Colors.GREEN}MO-DIAG{Colors.RESET}{Colors.BOLD}'
f'[{Colors.YELLOW}{database}{Colors.RESET}{Colors.BOLD}]'
f'{Colors.GREEN}>{Colors.RESET} '
)
else:
self.prompt = f'{Colors.BOLD}{Colors.GREEN}MO-DIAG>{Colors.RESET} '
print(
f"{Colors.GREEN}✓ Connected to {host}:{port}"
+ (f" (database: {database})" if database else "")
+ Colors.RESET
)
except Exception as e:
print(f"{Colors.RED}❌ Connection failed: {e}{Colors.RESET}")
[docs]
def do_use(self, arg):
"""
Switch to a different database.
Usage: use <database>
Example: use test
"""
if not arg:
print("❌ Usage: use <database>")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
# Remove trailing semicolon if present
database = arg.strip().rstrip(';')
try:
self.client.execute(f"USE {database}")
self.current_database = database
self.prompt = (
f'{Colors.BOLD}{Colors.GREEN}MO-DIAG{Colors.RESET}{Colors.BOLD}'
f'[{Colors.YELLOW}{database}{Colors.RESET}{Colors.BOLD}]'
f'{Colors.GREEN}>{Colors.RESET} '
)
print(f"{Colors.GREEN}✓ Switched to database: {database}{Colors.RESET}")
except Exception as e:
print(f"❌ Failed to switch database: {e}")
[docs]
def do_show_indexes(self, arg):
"""
Show all secondary indexes for a table, including IVF, HNSW, Fulltext, and regular indexes.
Uses vertical output format (like MySQL \\G) for easy reading of long table names.
Usage: show_indexes <table_name> [database]
Example:
show_indexes cms_all_content_chunk_info
show_indexes cms_all_content_chunk_info repro3
"""
# Remove trailing semicolon
arg = arg.strip().rstrip(';')
args = arg.split()
if not args:
print("❌ Usage: show_indexes <table_name> [database]")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
table_name = args[0]
database = args[1] if len(args) > 1 else self.current_database
try:
# Use the new comprehensive index detail API
indexes = self.client.get_table_indexes_detail(table_name, database)
if not indexes:
print(f"⚠️ No secondary indexes found for table '{table_name}' in database '{database}'")
return
table_info = f"'{database}.{table_name}'"
print(f"\n{header('📊 Secondary Indexes for')} {bold(table_info)}\n")
# Vertical display format (like MySQL \G)
for i, idx in enumerate(indexes, 1):
index_name = idx['index_name']
algo = idx['algo'] if idx['algo'] else 'regular'
table_type = idx['algo_table_type'] if idx['algo_table_type'] else '-'
physical_table = idx['physical_table_name']
# Filter out internal columns like __mo_alias___mo_cpkey_col
user_columns = [col for col in idx['columns'] if not col.startswith('__mo_alias_')]
columns = ', '.join(user_columns) if user_columns else 'N/A'
# Color code by algorithm type
if algo == 'ivfflat':
algo_display = f"{Colors.CYAN}{algo}{Colors.RESET}"
elif algo == 'hnsw':
algo_display = f"{Colors.GREEN}{algo}{Colors.RESET}"
elif algo == 'fulltext':
algo_display = f"{Colors.YELLOW}{algo}{Colors.RESET}"
else:
algo_display = algo
# Print in vertical format
print(info(f"{'*' * 27} {i}. row {'*' * 27}"))
print(f" {bold('Index Name')}: {Colors.CYAN}{index_name}{Colors.RESET}")
print(f" {bold('Algorithm')}: {algo_display}")
print(f" {bold('Table Type')}: {table_type}")
print(f" {bold('Physical Table')}: {physical_table}")
print(f" {bold('Columns')}: {columns}")
# For vector/fulltext indexes, show table statistics using metadata.scan interface
if algo in ['ivfflat', 'hnsw', 'fulltext']:
try:
# Use SDK's metadata.scan interface with columns="*" to get structured results (non-tombstone only)
stats = self.client.metadata.scan(database, physical_table, is_tombstone=False, columns="*")
if stats:
# Aggregate statistics from all objects
total_rows = 0
total_compress_size = 0
total_origin_size = 0
# Deduplicate by object_name (same logic as show_table_stats)
seen_objects = set()
object_count = 0
for obj in stats:
# MetadataRow has attributes, not dictionary keys
object_name = getattr(obj, 'object_name', None)
if object_name and object_name not in seen_objects:
seen_objects.add(object_name)
object_count += 1
# Sum up statistics using attributes
row_cnt = getattr(obj, 'rows_cnt', 0) or 0
total_rows += int(row_cnt) if row_cnt else 0
compress_size = getattr(obj, 'compress_size', 0) or 0
total_compress_size += int(compress_size) if compress_size else 0
origin_size = getattr(obj, 'origin_size', 0) or 0
total_origin_size += int(origin_size) if origin_size else 0
# Format sizes
def format_size(size_bytes):
if size_bytes >= 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.2f} MB"
elif size_bytes >= 1024:
return f"{size_bytes / 1024:.2f} KB"
else:
return f"{size_bytes} B"
compress_size_str = format_size(total_compress_size)
origin_size_str = format_size(total_origin_size)
print(f" {bold('Statistics')}:")
print(f" - Objects: {object_count}")
print(f" - Rows: {total_rows:,}")
print(f" - Compressed Size: {compress_size_str}")
print(f" - Original Size: {origin_size_str}")
except Exception:
# If stats not available, just skip (no error message needed)
pass
# Summary
print(info("=" * 60))
algo_counts = {}
for idx in indexes:
algo = idx['algo'] if idx['algo'] else 'regular'
algo_counts[algo] = algo_counts.get(algo, 0) + 1
summary_parts = []
for algo, count in sorted(algo_counts.items()):
summary_parts.append(f"{count} {algo}")
print(bold(f"Total: {len(indexes)} index tables") + f" ({', '.join(summary_parts)})" + "\n")
except Exception as e:
print(f"❌ Error: {e}")
[docs]
def do_show_all_indexes(self, arg):
"""
Show index health report for all tables with secondary indexes.
This command performs diagnostic checks including:
- Row count consistency between main table and index tables
- Vector index building status (IVF/HNSW)
- Index type distribution
- Problem detection
Usage: show_all_indexes [database]
Example:
show_all_indexes
show_all_indexes repro3
"""
# Remove trailing semicolon
arg = arg.strip().rstrip(';')
database = arg if arg else self.current_database
if not database:
print("❌ No database specified and no current database set. Use 'use <database>' or provide database name.")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
try:
# Get all tables with indexes
sql = """
SELECT DISTINCT mo_tables.relname AS table_name
FROM mo_catalog.mo_indexes
JOIN mo_catalog.mo_tables ON mo_indexes.table_id = mo_tables.rel_id
WHERE mo_indexes.type IN ('MULTIPLE', 'UNIQUE') AND mo_tables.reldatabase = ?
ORDER BY mo_tables.relname
"""
result = self.client.execute(sql, (database,))
tables = [row[0] for row in result.fetchall()]
if not tables:
print(f"⚠️ No tables with secondary indexes found in database '{database}'")
return
print(f"\n{header('📊 Index Health Report')} for Database '{database}':")
print("=" * 120)
healthy_tables = []
attention_tables = []
# Check each table
for table_name in tables:
try:
# Get index details
indexes = self.client.get_table_indexes_detail(table_name, database)
index_count = len(set(idx['index_name'] for idx in indexes))
# Determine if table has special indexes (IVF/HNSW/Fulltext)
has_vector_or_fulltext = any(idx['algo'] in ['ivfflat', 'hnsw', 'fulltext'] for idx in indexes)
# Check row consistency (only for regular/UNIQUE indexes)
consistency_status = None
has_issue = False
issue_detail = None
if not has_vector_or_fulltext:
# Only check row consistency for tables with regular/UNIQUE indexes
try:
row_count = self.client.verify_table_index_counts(table_name)
consistency_status = f"✓ {row_count:,} rows"
except ValueError as e:
consistency_status = "❌ Mismatch"
has_issue = True
issue_detail = str(e).split('\n')[0] # First line of error
except Exception:
consistency_status = "⚠️ Unknown"
else:
# For vector/fulltext indexes, just show row count without verification
try:
result = self.client.execute(f"SELECT COUNT(*) FROM `{table_name}`")
row_count = result.fetchone()[0]
consistency_status = f"{row_count:,} rows"
except Exception:
consistency_status = "Unknown"
# Check IVF/HNSW/Fulltext status - group by index name to avoid duplicates
special_index_statuses = []
vector_index_issues = []
# Group indexes by name to avoid processing same index multiple times
# (e.g., IVF has multiple physical tables)
seen_indexes = {}
for idx in indexes:
idx_name = idx['index_name']
if idx_name not in seen_indexes:
seen_indexes[idx_name] = idx
# Process each unique index
for idx in seen_indexes.values():
if idx['algo'] == 'ivfflat':
try:
stats = self.client.vector_ops.get_ivf_stats(table_name, idx['columns'][0])
if stats and 'distribution' in stats:
centroid_ids = stats['distribution'].get('centroid_id', [])
centroid_counts = stats['distribution'].get('centroid_count', [])
centroid_count = len(centroid_ids)
total_vectors = sum(centroid_counts) if centroid_counts else 0
if centroid_count > 0 and total_vectors > 0:
special_index_statuses.append(
f"IVF: {centroid_count} centroids, {total_vectors} vectors"
)
elif centroid_count > 0:
special_index_statuses.append(f"IVF: {centroid_count} centroids")
else:
special_index_statuses.append("IVF: building")
has_issue = True
vector_index_issues.append("IVF index not built yet")
else:
special_index_statuses.append("IVF: no stats available")
except Exception as e:
error_msg = str(e)
# Truncate long error messages for display
if len(error_msg) > 30:
error_short = error_msg[:27] + "..."
else:
error_short = error_msg
special_index_statuses.append(f"IVF: error ({error_short})")
has_issue = True
vector_index_issues.append(f"Failed to get IVF stats: {error_msg}")
elif idx['algo'] == 'hnsw':
special_index_statuses.append("HNSW index")
elif idx['algo'] == 'fulltext':
special_index_statuses.append("Fulltext index")
# Combine status messages
special_index_status = ", ".join(special_index_statuses) if special_index_statuses else None
if not issue_detail and vector_index_issues:
issue_detail = "; ".join(vector_index_issues)
# Categorize table
table_info = {
'name': table_name,
'index_count': index_count,
'consistency': consistency_status,
'special_index_status': special_index_status,
'has_issue': has_issue,
'issue_detail': issue_detail,
}
if has_issue:
attention_tables.append(table_info)
else:
healthy_tables.append(table_info)
except Exception as e:
# If we can't check this table, mark it as needing attention
attention_tables.append(
{
'name': table_name,
'index_count': '?',
'consistency': '❌ Error',
'special_index_status': None,
'has_issue': True,
'issue_detail': str(e)[:50],
}
)
# Display healthy tables
if healthy_tables:
print(f"\n{success('✓ HEALTHY')} ({len(healthy_tables)} tables)")
print("-" * 120)
print(f"{'Table Name':<35} | {'Indexes':<8} | {'Row Count':<20} | {'Notes'}")
print("-" * 120)
for table in healthy_tables:
notes = table['special_index_status'] if table['special_index_status'] else '-'
print(f"{table['name']:<35} | {table['index_count']:<8} | {table['consistency']:<20} | {notes}")
# Display tables needing attention
if attention_tables:
print(f"\n{warning('⚠️ ATTENTION NEEDED')} ({len(attention_tables)} tables)")
print("-" * 120)
print(f"{'Table Name':<35} | {'Issue':<40} | {'Details'}")
print("-" * 120)
for table in attention_tables:
if 'Mismatch' in (table['consistency'] or ''):
issue = "Row count mismatch between indexes"
details = table['issue_detail'] if table['issue_detail'] else "Check with verify_counts"
elif 'building' in (table.get('special_index_status') or ''):
issue = "Vector index building incomplete"
details = table['issue_detail'] if table['issue_detail'] else "Check with show_ivf_status"
elif 'error' in (table.get('special_index_status') or '').lower():
issue = "Vector index error"
details = table['issue_detail'] if table['issue_detail'] else "Check index status"
elif 'Error' in (table['consistency'] or ''):
issue = "Unable to verify table"
details = table['issue_detail'] if table['issue_detail'] else "Unknown error"
else:
issue = "Unknown issue"
details = str(table.get('issue_detail', '-'))
print(f"{table['name']:<35} | {issue:<40} | {details[:38]}")
# Summary
print("\n" + "=" * 120)
print(f"{bold('Summary:')}")
print(f" {success('✓')} {len(healthy_tables)} healthy tables")
if attention_tables:
print(f" {warning('⚠️ ')} {len(attention_tables)} tables need attention")
else:
print(f" {success('✓')} All indexes healthy!")
print(f" Total: {len(tables)} tables with indexes\n")
if attention_tables:
print(f"{info('💡 Tip:')} Use 'verify_counts <table>' or 'show_ivf_status' for detailed diagnostics")
except Exception as e:
print(f"❌ Error: {e}")
[docs]
def do_cdc_health(self, arg):
"""
Check CDC task health, including error states and watermark lag.
Usage: cdc_health [threshold] [--task=<task_name>] [--threshold=<duration>] [--strict] [--details]
Example:
cdc_health
cdc_health 5
cdc_health --task=cdc_sales_sync --threshold=15
cdc_health --strict
cdc_health --details
When a numeric threshold or ``--threshold`` is provided, it overrides the
default 10 minute tolerance used to detect late watermarks. ``--strict``
applies a zero minute tolerance (useful for quick smoke checks).
"""
tokens = [token for token in arg.strip().split() if token]
threshold_delta = datetime.timedelta(minutes=10)
task_filter: Optional[str] = None
strict_mode = False
show_details = False
for token in tokens:
if token == "--strict":
strict_mode = True
elif token == "--details":
show_details = True
elif token.startswith("--task="):
task_filter = token.split("=", 1)[1] or None
elif token.startswith("--threshold="):
value = token.split("=", 1)[1]
try:
threshold_delta = _parse_duration(value)
except ValueError as exc:
print(error(str(exc)))
return
else:
# Backwards-compatible positional numeric threshold interpreted as minutes
try:
threshold_delta = _parse_duration(f"{float(token)}m")
except ValueError:
print(
"❌ Unknown option. Use 'cdc_health [threshold] [--task=<task>] [--threshold=<duration>] [--strict]'"
)
return
if strict_mode:
threshold_delta = datetime.timedelta(0)
threshold_display = _format_duration(threshold_delta)
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
threshold = threshold_delta
try:
all_tasks = self.client.cdc.list(task_filter)
failing_tasks = self.client.cdc.list_failing_tasks()
stuck_tasks = self.client.cdc.list_stuck_tasks()
late_watermarks = self.client.cdc.list_late_table_watermarks(
task_name=task_filter,
default_threshold=threshold,
)
except Exception as exc:
print(f"{Colors.RED}❌ CDC health check failed: {exc}{Colors.RESET}")
return
if task_filter:
failing_tasks = [task for task in failing_tasks if task.task_name == task_filter]
stuck_tasks = [task for task in stuck_tasks if task.task_name == task_filter]
total_tasks = len(all_tasks)
print(f"\n{header('📈 CDC Health Overview')}")
print(info(f"Threshold for watermark lag: {threshold_display}"))
print(info(f"Evaluated CDC tasks: {total_tasks}"))
if task_filter:
print(info(f"Task filter: {task_filter}"))
print()
if failing_tasks:
print(error(f"Tasks reporting errors ({len(failing_tasks)}):"))
for task in failing_tasks:
state = task.state or 'unknown'
err_msg = task.err_msg or '-'
print(f" • {task.task_name} (state: {state}, error: {err_msg})")
else:
print(success("No CDC tasks currently report errors."))
print()
if stuck_tasks:
print(warning(f"Running tasks with per-table errors ({len(stuck_tasks)}):"))
for task in stuck_tasks:
state = task.state or 'unknown'
print(f" • {task.task_name} (state: {state})")
else:
print(success("No running tasks with per-table errors detected."))
print()
if late_watermarks:
print(warning(f"Tables lagging beyond threshold ({len(late_watermarks)}):"))
for watermark in late_watermarks:
parts = [watermark.task_name or '-']
if watermark.database:
parts.append(watermark.database)
if watermark.table:
parts.append(watermark.table)
location = ".".join(part for part in parts if part)
stamp = watermark.watermark or 'N/A'
print(f" • {location} (watermark: {stamp})")
else:
print(success("All table watermarks are within the expected threshold."))
problematic = {task.task_name for task in failing_tasks}
problematic.update(task.task_name for task in stuck_tasks)
healthy_count = max(0, total_tasks - len(problematic))
print()
print(info(f"Healthy tasks (no reported issues): {healthy_count}"))
if show_details and all_tasks:
print(f"\n{header('🛠 CDC Task Definitions')}")
for task in all_tasks:
print(bold(f"• {task.task_name}"))
print(f" {info('State:')} {task.state or 'unknown'}")
if task.err_msg:
print(f" {error('Error:')} {task.err_msg}")
_print_cdc_field("Mapping:", task.table_mapping, indent=" ")
_print_cdc_field("Source URI:", task.source_uri, indent=" ")
_print_cdc_field("Sink URI:", task.sink_uri, indent=" ")
_print_cdc_field("Options:", task.additional_config, indent=" ")
_print_cdc_field("NoFull:", task.no_full, indent=" ")
_print_cdc_field("Checkpoint:", task.checkpoint, indent=" ")
print()
[docs]
def do_cdc_task(self, arg):
"""
Inspect a specific CDC task, optionally pausing/resuming and reviewing per-table watermarks.
Usage:
cdc_task <task_name> [--details] [--no-watermarks] [--watermarks-only] [--table=<name>] \
[--threshold=<duration>] [--strict] [--pause|--resume|--restart]
Examples:
cdc_task cdc_orders_sync
cdc_task cdc_orders_sync --details --table=orders
cdc_task cdc_orders_sync --pause
cdc_task cdc_orders_sync --threshold=5 --strict
"""
tokens = [token for token in arg.strip().split() if token]
if not tokens:
print(
"❌ Usage: cdc_task <task_name> [--details] [--no-watermarks] [--table=<name>] [--threshold=<duration>] [--strict] [--pause|--resume|--restart]"
)
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
task_name = tokens[0]
options = tokens[1:]
show_details = False
include_watermarks = True
threshold_delta = datetime.timedelta(minutes=10)
strict_mode = False
table_filter: Optional[str] = None
action: Optional[str] = None
watermarks_only = False
i = 0
while i < len(options):
token = options[i]
if token == "--details":
show_details = True
elif token == "--no-watermarks":
include_watermarks = False
elif token == "--watermarks-only":
watermarks_only = True
elif token in ("--pause", "--resume", "--restart"):
chosen = token[2:]
if action and action != chosen:
print("❌ Specify only one of --pause, --resume, or --restart.")
return
action = chosen
elif token == "--strict":
strict_mode = True
elif token.startswith("--threshold="):
value = token.split("=", 1)[1]
try:
threshold_delta = _parse_duration(value)
except ValueError as exc:
print(error(str(exc)))
return
elif token == "--threshold":
i += 1
if i >= len(options):
print("❌ --threshold requires a value.")
return
value = options[i]
try:
threshold_delta = _parse_duration(value)
except ValueError as exc:
print(error(str(exc)))
return
elif token.startswith("--table="):
table_filter = token.split("=", 1)[1] or None
elif token == "--table":
i += 1
if i >= len(options):
print("❌ --table requires a value.")
return
table_filter = options[i]
else:
print("❌ Unknown option. Use 'help cdc_task' for usage details.")
return
i += 1
if strict_mode:
threshold_delta = datetime.timedelta(0)
if watermarks_only and not include_watermarks:
print("❌ --watermarks-only cannot be combined with --no-watermarks.")
return
try:
task = self.client.cdc.get(task_name)
except ValueError as exc:
print(error(str(exc)))
return
except Exception as exc:
print(error(f"Failed to load CDC task '{task_name}': {exc}"))
return
if action:
action_labels = {
"pause": "paused",
"resume": "resumed",
"restart": "restarted",
}
try:
getattr(self.client.cdc, action)(task_name)
action_label = action_labels.get(action, f"{action}d")
print(success(f"CDC task '{task_name}' {action_label} successfully."))
task = self.client.cdc.get(task_name)
except Exception as exc:
print(error(f"Failed to {action} task '{task_name}': {exc}"))
return
threshold = threshold_delta
threshold_display = _format_duration(threshold_delta)
if not watermarks_only:
print(f"\n{header('🧭 CDC Task Inspect')}")
print(info(f"Task: {task.task_name}"))
print(info(f"State: {task.state or 'unknown'}"))
sink_type = task.sink_type or 'unknown'
print(info(f"Sink type: {sink_type}"))
if task.err_msg:
print(error(f"Task error: {task.err_msg}"))
_print_cdc_field("Checkpoint:", task.checkpoint)
_print_cdc_field("NoFull:", task.no_full)
_print_cdc_field("Options:", task.additional_config)
_print_cdc_field("Table mapping:", task.table_mapping)
_print_cdc_field("Source URI:", task.source_uri)
_print_cdc_field("Sink URI:", task.sink_uri)
threshold_seconds = threshold.total_seconds()
late_count = 0
if include_watermarks:
try:
watermarks = self.client.cdc.list_watermarks(task_name)
except Exception as exc:
print(error(f"Failed to load watermarks: {exc}"))
watermarks = []
if table_filter:
pattern = table_filter.lower()
def _match_watermark(mark) -> bool:
candidates = []
if mark.table:
candidates.append(mark.table.lower())
if mark.database:
candidates.append(mark.database.lower())
if mark.database and mark.table:
candidates.append(f"{mark.database}.{mark.table}".lower())
return pattern in candidates
watermarks = [mark for mark in watermarks if _match_watermark(mark)]
print(f"\n{header('🕒 Watermarks')} (threshold {threshold_display})")
now_utc = datetime.datetime.now(datetime.timezone.utc)
print(info("Adjust via '--threshold=<duration>' (supports s/m/h) or '--strict' for zero tolerance."))
print(info(f"Current UTC: {_format_timestamp(now_utc)}"))
if not watermarks:
print(info("No watermarks found for this task."))
else:
for mark in watermarks:
location_parts = [part for part in (mark.database, mark.table) if part]
location = '.'.join(location_parts) if location_parts else '(unknown)'
watermark_value = mark.watermark or 'N/A'
watermark_time = None
if mark.watermark:
ts = _parse_watermark_timestamp(mark.watermark)
if ts is None:
numeric = mark.watermark.split('-', 1)[0]
if numeric.isdigit():
try:
watermark_time = datetime.datetime.fromtimestamp(
int(numeric) / 1_000_000_000,
datetime.timezone.utc,
)
except (ValueError, OverflowError):
watermark_time = None
else:
watermark_time = ts
watermark_time_str = f" ({_format_timestamp(watermark_time)})" if watermark_time else ""
delay_str = ""
if watermark_time:
delay_seconds = (now_utc - watermark_time).total_seconds()
delay_str = f" Δ={delay_seconds:.3f}s"
is_late = delay_seconds >= threshold_seconds
else:
delay_seconds = None
is_late = False
if is_late:
late_count += 1
status = warning('⚠️ late') if is_late else success('✓ on schedule')
err_msg = f" | error: {mark.err_msg}" if mark.err_msg else ''
print(f" • {location} → {watermark_value}{watermark_time_str} ({status}){delay_str}{err_msg}")
else:
print(f"\n{info('Watermark listing skipped (--no-watermarks).')} Late tables detected: {late_count}")
print(info(f"Tables exceeding threshold: {late_count}"))
if table_filter:
print(info(f"Watermark filter: {table_filter}"))
if show_details:
try:
result = self.client.cdc.show_task(task_name)
if hasattr(result, 'rows'):
rows = result.rows
columns = getattr(result, 'columns', [])
else:
rows = result.fetchall() if result else []
columns = [col[0] for col in getattr(result, 'description', [])]
if rows and columns:
detail_payload = [{columns[idx]: row[idx] for idx in range(min(len(columns), len(row)))} for row in rows]
else:
detail_payload = rows
print(f"\n{header('📋 SHOW CDC TASK output')}")
print(json.dumps(detail_payload, indent=2, default=str))
except Exception as exc:
print(error(f"Failed to fetch SHOW CDC TASK output: {exc}"))
[docs]
def do_cdc_tasks(self, arg):
"""
List CDC tasks and basic metadata.
Usage:
cdc_tasks [--details]
Example:
cdc_tasks
cdc_tasks --details
"""
show_details = False
tokens = [token for token in arg.strip().split() if token]
for token in tokens:
if token == "--details":
show_details = True
else:
print("❌ Unknown option. Usage: cdc_tasks [--details]")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
try:
tasks = self.client.cdc.list()
except Exception as exc:
print(error(f"Failed to list CDC tasks: {exc}"))
return
if not tasks:
print(info("No CDC tasks found."))
return
print(f"\n{header('🧾 CDC Tasks Summary')}")
print(info(f"Total tasks: {len(tasks)}"))
for task in tasks:
print(bold(f"• {task.task_name}"))
print(f" {info('State:')} {task.state or 'unknown'}")
print(f" {info('Sink:')} {task.sink_type or 'unknown'}")
if task.err_msg:
print(f" {error('Error:')} {task.err_msg}")
mapping_preview = _format_cdc_value(task.table_mapping or "")
if mapping_preview and "\n" not in mapping_preview:
print(f" {info('Mapping:')} {mapping_preview}")
if show_details:
_print_cdc_field("Source URI:", task.source_uri, indent=" ")
_print_cdc_field("Sink URI:", task.sink_uri, indent=" ")
_print_cdc_field("Options:", task.additional_config, indent=" ")
_print_cdc_field("NoFull:", task.no_full, indent=" ")
_print_cdc_field("Checkpoint:", task.checkpoint, indent=" ")
print()
[docs]
def do_cdc_create(self, arg):
"""
Guided helper to create a CDC task.
Usage:
cdc_create [--database-level|--table-level]
If no level flag is supplied, you will be prompted to choose between
database-level and table-level replication.
"""
level_override: Optional[str] = None
tokens = [token for token in arg.strip().split() if token]
for token in tokens:
if token == "--database-level":
if level_override == "table":
print("❌ Specify only one of --database-level or --table-level.")
return
level_override = "database"
elif token == "--table-level":
if level_override == "database":
print("❌ Specify only one of --database-level or --table-level.")
return
level_override = "table"
else:
print("❌ Unknown option. Usage: cdc_create [--database-level|--table-level]")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
params = getattr(self.client, "_connection_params", {}) or {}
host = params.get("host", "127.0.0.1")
port = params.get("port", 6001)
user = params.get("user", "root")
password = params.get("password", "")
account = params.get("account") or params.get("tenant")
try:
port_int = int(port)
except (TypeError, ValueError):
port_int = 6001
try:
source_uri_default = build_mysql_uri(host, port_int, user, password, account=account)
except Exception:
source_uri_default = ""
task_name_default = f"cdc_task_{uuid.uuid4().hex[:6]}"
if params.get("database"):
task_name_default = f"{params['database']}_cdc_{uuid.uuid4().hex[:4]}"
task_name = self._prompt("Task name", default=task_name_default).strip()
if not task_name:
print("❌ Task name is required.")
return
sink_type_default = CDCSinkType.MATRIXONE.value
sink_type_input = self._prompt("Sink type (mysql/matrixone)", default=sink_type_default).strip().lower()
try:
sink_type = CDCSinkType(sink_type_input)
except ValueError:
print(f"❌ Invalid sink type '{sink_type_input}'. Choose from {[m.value for m in CDCSinkType]}.")
return
source_uri = self._prompt("Source URI", default=source_uri_default).strip()
if not source_uri:
print("❌ Source URI is required.")
return
sink_uri = self._prompt("Sink URI", default=source_uri).strip()
if not sink_uri:
print("❌ Sink URI is required.")
return
level = level_override
if not level:
level = self._prompt("Replication level (database/table)", default="database").strip().lower()
if level not in {"database", "table"}:
print("❌ Replication level must be 'database' or 'table'.")
return
options: Optional[Dict[str, Any]] = None
print(info("Common CDC options include Frequency=1h, NoFull=true, MaxSqlLength=2097152, SendSqlTimeout=30m."))
print(
info(
"Enter key=value pairs separated by commas (e.g. Frequency=1h,NoFull=true) or press Enter to skip. Type 'help' for more guidance."
)
)
while True:
raw_options = self._prompt("Additional CDC options", default="")
raw_options = raw_options.strip()
if not raw_options:
break
if raw_options.lower() in {"help", "?", "examples"}:
print(info("Examples:"))
print(" Frequency=1h")
print(" NoFull=true")
print(" Frequency=30m,NoFull=true,SendSqlTimeout=45m")
continue
try:
options = _parse_options_string(raw_options)
break
except ValueError as exc:
print(error(str(exc)))
continue
creation_context: Dict[str, Any]
try:
if level == "database":
source_db_default = params.get("database") or ""
source_database = self._prompt("Source database", default=source_db_default).strip()
if not source_database:
print("❌ Source database is required.")
return
sink_db_default = f"{source_database}_cdc"
sink_database = self._prompt("Sink database", default=sink_db_default).strip()
if not sink_database:
sink_database = source_database
creation_context = {
"mode": "database",
"source_database": source_database,
"sink_database": sink_database,
}
else:
print(info("Enter table mappings in the form source_db.table[:sink_db.table]."))
print(info("Press Enter on an empty prompt when you are done."))
mappings: List[Dict[str, str]] = []
while True:
entry = self._prompt("Mapping (blank to finish)").strip()
if not entry:
break
try:
mapping = _parse_table_mapping(entry)
except ValueError as exc:
print(error(str(exc)))
continue
mappings.append(mapping)
if not mappings:
print("❌ At least one table mapping is required for table-level replication.")
return
table_tuples = [(m["source_db"], m["source_table"], m["sink_db"], m["sink_table"]) for m in mappings]
creation_context = {
"mode": "table",
"table_mappings": mappings,
"table_tuples": table_tuples,
}
except Exception as exc:
print(error(f"Failed to gather CDC task details: {exc}"))
return
sink_type_display = sink_type.value if isinstance(sink_type, CDCSinkType) else str(sink_type)
preview_payload: Dict[str, Any] = {
"task_name": task_name,
"level": level,
"source_uri": source_uri,
"sink_type": sink_type_display,
"sink_uri": sink_uri,
}
if options is not None:
preview_payload["options"] = options
if creation_context["mode"] == "database":
preview_payload["source_database"] = creation_context["source_database"]
preview_payload["sink_database"] = creation_context["sink_database"]
else:
preview_payload["table_mappings"] = creation_context["table_mappings"]
print(f"\n{header('📝 CDC Task Configuration Preview')}")
print(json.dumps(preview_payload, indent=2, ensure_ascii=False, default=str))
confirm = self._prompt("Proceed with CDC task creation? (yes/no)", default="no").strip().lower()
if confirm not in {"y", "yes"}:
print(info("CDC task creation cancelled."))
return
try:
if creation_context["mode"] == "database":
task = self.client.cdc.create_database_task(
task_name=task_name,
source_uri=source_uri,
sink_type=sink_type,
sink_uri=sink_uri,
source_database=creation_context["source_database"],
sink_database=creation_context["sink_database"],
options=options,
)
else:
task = self.client.cdc.create_table_task(
task_name=task_name,
source_uri=source_uri,
sink_type=sink_type,
sink_uri=sink_uri,
table_mappings=creation_context["table_tuples"],
options=options,
)
except Exception as exc:
print(error(f"Failed to create CDC task: {exc}"))
return
print(success(f"\nCDC task '{task.task_name}' created successfully."))
print(info(f"Use 'cdc_task {task.task_name}' to inspect or manage the task."))
[docs]
def do_cdc_drop(self, arg):
"""
Drop a CDC task with double confirmation.
Usage:
cdc_drop <task_name> [--force]
The command will prompt for confirmation twice unless ``--force`` is supplied.
"""
tokens = [token for token in arg.strip().split() if token]
if not tokens:
print("❌ Usage: cdc_drop <task_name> [--force]")
return
task_name = tokens[0]
force = False
for token in tokens[1:]:
if token == "--force":
force = True
else:
print("❌ Unknown option. Usage: cdc_drop <task_name> [--force]")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
try:
task = self.client.cdc.get(task_name)
except ValueError as exc:
print(error(str(exc)))
return
except Exception as exc:
print(error(f"Failed to load CDC task '{task_name}': {exc}"))
return
print(f"\n{header('⚠️ CDC Task Drop Preview')}")
summary_payload: Dict[str, Any] = {
"task_name": task.task_name,
"state": task.state,
"sink_type": task.sink_type,
"table_mapping": task.table_mapping,
"source_uri": task.source_uri,
"sink_uri": task.sink_uri,
"options": task.additional_config,
"no_full": task.no_full,
"checkpoint": task.checkpoint,
}
print(json.dumps(summary_payload, indent=2, ensure_ascii=False, default=str))
if not force:
first_confirm = (
self._prompt(
f"Confirm drop of CDC task '{task_name}'? (yes/no)",
default="no",
)
.strip()
.lower()
)
if first_confirm not in {"y", "yes"}:
print(info("CDC task drop cancelled."))
return
second_confirm = self._prompt(
f"Type the task name '{task_name}' to confirm drop",
default="",
).strip()
if second_confirm != task_name:
print(info("Confirmation mismatch. CDC task drop cancelled."))
return
try:
self.client.cdc.drop(task_name)
except Exception as exc:
print(error(f"Failed to drop CDC task '{task_name}': {exc}"))
return
print(success(f"CDC task '{task_name}' dropped successfully."))
[docs]
def do_verify_counts(self, arg):
"""
Verify row counts between main table and all its index tables.
Usage: verify_counts <table_name> [database]
Example:
verify_counts cms_all_content_chunk_info
verify_counts cms_all_content_chunk_info repro3
"""
# Remove trailing semicolon
arg = arg.strip().rstrip(';')
args = arg.split()
if not args:
print("❌ Usage: verify_counts <table_name> [database]")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
table_name = args[0]
database = args[1] if len(args) > 1 else self.current_database
if not database:
print("❌ No database specified and no current database set.")
return
try:
# Switch to the database temporarily if needed
current_db = self.client._connection_params.get('database')
if current_db != database:
self.client.execute(f"USE {database}")
# Get index tables
index_tables = self.client.get_secondary_index_tables(table_name, database)
if not index_tables:
print(f"⚠️ No secondary indexes found for table '{table_name}'")
# Still show main table count
result = self.client.execute(f"SELECT COUNT(*) FROM `{table_name}`")
count = result.fetchone()[0]
print(f"Main table '{table_name}': {count:,} rows")
return
# Build verification SQL
from .index_utils import build_verify_counts_sql
sql = build_verify_counts_sql(table_name, index_tables)
result = self.client.execute(sql)
row = result.fetchone()
main_count = row[0]
mismatches = []
table_info = f"'{database}.{table_name}'"
print(f"\n{header('📊 Row Count Verification for')} {bold(table_info)}")
print(info("=" * 80))
print(bold(f"Main table: {main_count:,} rows"))
print(info("-" * 80))
for idx, index_table in enumerate(index_tables):
index_count = row[idx + 1]
if index_count == main_count:
print(f"{success('✓')} {index_table}: {bold(f'{index_count:,}')} rows")
else:
print(f"{error('✗ MISMATCH')} {index_table}: {bold(f'{index_count:,}')} rows")
mismatches.append((index_table, index_count))
print(info("=" * 80))
if mismatches:
print(f"❌ FAILED: {len(mismatches)} index table(s) have mismatched counts!")
else:
print(f"✅ PASSED: All index tables match ({main_count:,} rows)")
print()
# Switch back if needed
if current_db and current_db != database:
self.client.execute(f"USE {current_db}")
except Exception as e:
print(f"❌ Error: {e}")
[docs]
def do_show_ivf_status(self, arg):
"""
Show IVF index centroids building status.
Usage:
show_ivf_status [database] - Show compact summary
show_ivf_status [database] -v - Show detailed view
show_ivf_status [database] -t table - Filter by table name
Example:
show_ivf_status
show_ivf_status test -v
show_ivf_status test -t my_table
"""
# Parse arguments
args = arg.strip().rstrip(';').split() if arg.strip() else []
database = None
verbose = False
filter_table = None
i = 0
while i < len(args):
if args[i] == '-v':
verbose = True
elif args[i] == '-t' and i + 1 < len(args):
filter_table = args[i + 1]
i += 1
elif not database:
database = args[i]
i += 1
if not database:
database = self.current_database
if not database:
print("❌ No database specified and no current database set.")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
try:
# Get all tables with IVF indexes and their column information
# Group by table and index name to avoid duplicates from multiple index table types
sql = """
SELECT
mo_tables.relname,
mo_indexes.name,
mo_indexes.column_name,
MAX(mo_indexes.algo_table_type) as algo_table_type
FROM mo_catalog.mo_indexes
JOIN mo_catalog.mo_tables ON mo_indexes.table_id = mo_tables.rel_id
WHERE mo_tables.reldatabase = ?
AND mo_indexes.algo LIKE '%ivf%'
GROUP BY mo_tables.relname, mo_indexes.name, mo_indexes.column_name
ORDER BY mo_tables.relname, mo_indexes.name
"""
result = self.client.execute(sql, (database,))
index_info = result.fetchall()
if not index_info:
print(f"⚠️ No IVF indexes found in database '{database}'")
return
# Filter by table if specified
if filter_table:
index_info = [row for row in index_info if row[0] == filter_table]
if not index_info:
print(f"⚠️ No IVF indexes found for table '{filter_table}'")
return
print(f"\n📊 IVF Index Status in '{database}':")
print("=" * 150)
if not verbose:
# Compact table view
print(
f"{'Table':<30} | {'Index':<25} | {'Column':<20} | {'Centroids':<10} | "
f"{'Vectors':<12} | {'Balance':<10} | {'Status':<15}"
)
print("-" * 150)
total_ivf_indexes = 0
# For each index, get its stats using SDK API
for row in index_info:
table_name = row[0]
index_name = row[1]
column_name = row[2]
algo_table_type = row[3] if len(row) > 3 else None
try:
# Use SDK's get_ivf_stats method with column name and database
stats = self.client.vector_ops.get_ivf_stats(table_name, column_name, database=database)
if stats and 'index_tables' in stats:
# Show centroid distribution
if 'distribution' in stats:
dist = stats['distribution']
centroid_counts = dist.get('centroid_count', [])
if centroid_counts:
total_centroids = len(centroid_counts)
total_vectors = sum(centroid_counts)
min_count = min(centroid_counts)
max_count = max(centroid_counts)
avg_count = total_vectors / total_centroids if total_centroids > 0 else 0
balance = max_count / min_count if min_count > 0 else 0
if verbose:
print(f"\nTable: {table_name} | Index: {index_name} | Column: {column_name}")
print("-" * 150)
# Show physical tables
index_tables = stats['index_tables']
print("Physical Tables:")
for table_type, physical_table in index_tables.items():
print(f" - {table_type:<15}: {physical_table}")
print("\nCentroid Distribution:")
print(f" Total Centroids: {total_centroids}")
print(f" Total Vectors: {total_vectors:,}")
print(f" Min/Avg/Max: {min_count} / {avg_count:.1f} / {max_count}")
print(
f" Load Balance: {balance:.2f}x" if min_count > 0 else " Load Balance: N/A"
)
# Show top 10 centroids by size
if len(centroid_counts) > 0:
centroid_ids = dist.get('centroid_id', [])
centroid_versions = dist.get('centroid_version', [])
print("\n Top Centroids (by vector count):")
centroid_data = list(zip(centroid_ids, centroid_counts, centroid_versions))
centroid_data.sort(key=lambda x: x[1], reverse=True)
for i, (cid, count, version) in enumerate(centroid_data[:10], 1):
print(f" {i:2}. Centroid {cid}: {count:,} vectors (version {version})")
else:
# Compact view
status = "✓ active"
print(
f"{table_name:<30} | {index_name:<25} | {column_name:<20} | "
f"{total_centroids:<10} | {total_vectors:<12,} | "
f"{balance:<10.2f} | {status:<15}"
)
total_ivf_indexes += 1
else:
if verbose:
print("\nCentroid Distribution: ⚠️ No centroids found (empty index)")
else:
print(
f"{table_name:<30} | {index_name:<25} | {column_name:<20} | "
f"{'0':<10} | {'0':<12} | {'N/A':<10} | {'⚠️ empty':<15}"
)
else:
if verbose:
print("\nCentroid Distribution: ⚠️ No distribution data available")
else:
print(
f"{table_name:<30} | {index_name:<25} | {column_name:<20} | "
f"{'?':<10} | {'?':<12} | {'?':<10} | {'⚠️ no data':<15}"
)
else:
# Index exists but no stats available
if verbose:
print(f"\nTable: {table_name} | Index: {index_name}")
print("-" * 150)
print(f"Status: ⚠️ {algo_table_type if algo_table_type else 'No stats available'}")
else:
print(
f"{table_name:<30} | {index_name:<25} | {column_name:<20} | "
f"{'?':<10} | {'?':<12} | {'?':<10} | {'⚠️ no stats':<15}"
)
except Exception as e:
# If we can't get IVF stats, show the error
if verbose:
print(f"\nTable: {table_name} | Index: {index_name}")
print("-" * 150)
print(f"Status: ❌ Error - {str(e)}")
else:
error_msg = str(e)[:10]
print(
f"{table_name:<30} | {index_name:<25} | {column_name:<20} | "
f"{'?':<10} | {'?':<12} | {'?':<10} | {'❌ ' + error_msg:<15}"
)
continue
print("=" * 150)
if total_ivf_indexes == 0:
print("⚠️ No accessible IVF indexes found")
else:
print(f"Total: {total_ivf_indexes} IVF indexes")
if not verbose:
print(f"\nTip: Use 'show_ivf_status {database} -v' for detailed view with top centroids")
print(f" Use 'show_ivf_status {database} -t <table>' to filter by table")
print()
except Exception as e:
print(f"❌ Error: {e}")
[docs]
def do_show_table_stats(self, arg):
"""
Show table statistics using metadata_scan.
Usage:
show_table_stats <table> [database] - Show table stats summary
show_table_stats <table> [database] -t - Include tombstone stats
show_table_stats <table> [database] -i idx1,idx2 - Include specific index stats
show_table_stats <table> [database] -a - Include all (tombstone + all indexes)
show_table_stats <table> [database] -d - Show detailed object list
Example:
show_table_stats my_table
show_table_stats my_table test -t
show_table_stats my_table test -i idx_vec,idx_name
show_table_stats my_table test -a
show_table_stats my_table test -a -d
"""
args = arg.strip().rstrip(';').split() if arg.strip() else []
if not args:
print("❌ Usage: show_table_stats <table> [database] [-t] [-i index_names] [-a] [-d]")
return
table_name = args[0]
database = None
include_tombstone = False
include_indexes = None
include_all = False
show_detail = False
# Parse remaining arguments
i = 1
while i < len(args):
if args[i] == '-t':
include_tombstone = True
elif args[i] == '-a':
include_all = True
elif args[i] == '-d':
show_detail = True
elif args[i] == '-i' and i + 1 < len(args):
include_indexes = [idx.strip() for idx in args[i + 1].split(',')]
i += 1
elif not database:
database = args[i]
i += 1
if not database:
database = self.current_database
if not database:
print("❌ No database specified and no current database set.")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
try:
# If -a flag is set, get all secondary indexes for the table
if include_all:
include_tombstone = True
# Get all secondary indexes for the table (include UNIQUE, exclude PRIMARY)
try:
index_list = self.client.get_secondary_index_tables(table_name, database)
if index_list:
# Extract index names from physical table names
# Physical table names are like: __mo_index_secondary_xxxxx
# We need to get the actual index names from mo_indexes
# Filter out PRIMARY KEY and system indexes, but include UNIQUE, MULTIPLE, vector, fulltext indexes
sql = """
SELECT DISTINCT mo_indexes.name
FROM mo_catalog.mo_indexes
JOIN mo_catalog.mo_tables ON mo_indexes.table_id = mo_tables.rel_id
WHERE mo_tables.relname = ?
AND mo_tables.reldatabase = ?
AND mo_indexes.name NOT IN ('PRIMARY', '__mo_rowid_idx')
AND mo_indexes.type != 'PRIMARY KEY'
"""
result = self.client.execute(sql, (table_name, database))
include_indexes = [row[0] for row in result.fetchall()]
if not include_indexes:
include_indexes = None # No secondary indexes
except Exception as e:
print(f"⚠️ Warning: Could not fetch index list: {e}")
include_indexes = None
if show_detail:
# Check if showing all indexes (-a -d together = hierarchical view)
if include_all:
# Show hierarchical view: Table -> Index Name -> Physical Table (with type) -> Object List
print(f"\n📊 Detailed Table Statistics for '{database}.{table_name}':")
print("=" * 150)
# Helper functions
def format_size(size_bytes):
if isinstance(size_bytes, str):
return size_bytes
if size_bytes >= 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.2f} MB"
elif size_bytes >= 1024:
return f"{size_bytes / 1024:.2f} KB"
else:
return f"{size_bytes} B"
def deduplicate_objects(objects):
seen = set()
unique_objects = []
for obj in objects:
# Handle both dict and MetadataRow objects
if hasattr(obj, 'object_name'):
obj_name = getattr(obj, 'object_name', None)
elif isinstance(obj, dict):
obj_name = obj.get('object_name')
else:
obj_name = None
if obj_name and obj_name not in seen:
seen.add(obj_name)
unique_objects.append(obj)
return unique_objects
# Get table_id for main table
try:
table_id_result = self.client.execute(
"SELECT rel_id FROM mo_catalog.mo_tables WHERE relname = ? AND reldatabase = ?",
(table_name, database),
)
table_id_row = table_id_result.fetchone()
table_id = table_id_row[0] if table_id_row else "N/A"
except Exception:
table_id = "N/A"
# 1. Show main table
print(f"\n{bold('Table:')} {Colors.CYAN}{table_name}:{table_id}{Colors.RESET}")
# 1a. Show Data (non-tombstone objects)
try:
main_table_objects = self.client.metadata.scan(database, table_name, is_tombstone=False, columns="*")
if main_table_objects:
unique_objs = deduplicate_objects(list(main_table_objects))
total_rows = sum(getattr(obj, 'rows_cnt', 0) or 0 for obj in unique_objs)
total_null = sum(getattr(obj, 'null_cnt', 0) or 0 for obj in unique_objs)
total_origin = sum(getattr(obj, 'origin_size', 0) or 0 for obj in unique_objs)
total_compress = sum(getattr(obj, 'compress_size', 0) or 0 for obj in unique_objs)
print(f" └─ {bold('Data')}")
print(
f" Objects: {len(unique_objs)} | Rows: {total_rows:,} | "
f"Null: {total_null:,} | Original: {format_size(total_origin)} | "
f"Compressed: {format_size(total_compress)}"
)
# Show object details
if unique_objs:
print(f"\n {bold('Objects:')}")
print(
f" {'Object Name':<50} | {'Rows':<12} | {'Null Cnt':<10} | "
f"{'Original Size':<15} | {'Compressed Size':<15}"
)
print(" " + "-" * 148)
for obj in unique_objs:
obj_name = getattr(obj, 'object_name', 'N/A')
rows = getattr(obj, 'rows_cnt', 0) or 0
nulls = getattr(obj, 'null_cnt', 0) or 0
orig_size = getattr(obj, 'origin_size', 0) or 0
comp_size = getattr(obj, 'compress_size', 0) or 0
print(
f" {obj_name:<50} | {rows:<12,} | {nulls:<10,} | "
f"{format_size(orig_size):<15} | {format_size(comp_size):<15}"
)
except Exception:
pass
# 1b. Show Tombstone if requested
if include_tombstone:
try:
tombstone_objects = self.client.metadata.scan(
database, table_name, is_tombstone=True, columns="*"
)
if tombstone_objects:
unique_tomb_objs = deduplicate_objects(list(tombstone_objects))
if unique_tomb_objs:
total_rows = sum(getattr(obj, 'rows_cnt', 0) or 0 for obj in unique_tomb_objs)
total_null = sum(getattr(obj, 'null_cnt', 0) or 0 for obj in unique_tomb_objs)
total_origin = sum(getattr(obj, 'origin_size', 0) or 0 for obj in unique_tomb_objs)
total_compress = sum(getattr(obj, 'compress_size', 0) or 0 for obj in unique_tomb_objs)
print(f"\n └─ {bold('Tombstone')}")
print(
f" Objects: {len(unique_tomb_objs)} | Rows: {total_rows:,} | "
f"Null: {total_null:,} | Original: {format_size(total_origin)} | "
f"Compressed: {format_size(total_compress)}"
)
# Show tombstone object details
print(f"\n {bold('Objects:')}")
print(
f" {'Object Name':<50} | {'Rows':<12} | {'Null Cnt':<10} | "
f"{'Original Size':<15} | {'Compressed Size':<15}"
)
print(" " + "-" * 148)
for obj in unique_tomb_objs:
obj_name = getattr(obj, 'object_name', 'N/A')
rows = getattr(obj, 'rows_cnt', 0) or 0
nulls = getattr(obj, 'null_cnt', 0) or 0
orig_size = getattr(obj, 'origin_size', 0) or 0
comp_size = getattr(obj, 'compress_size', 0) or 0
print(
f" {obj_name:<50} | {rows:<12,} | {nulls:<10,} | "
f"{format_size(orig_size):<15} | {format_size(comp_size):<15}"
)
except Exception:
pass
# 2. Get all indexes and their physical tables
try:
indexes = self.client.get_table_indexes_detail(table_name, database)
if indexes:
# Group indexes by index name
indexes_by_name = {}
for idx in indexes:
idx_name = idx['index_name']
if idx_name not in indexes_by_name:
indexes_by_name[idx_name] = []
indexes_by_name[idx_name].append(idx)
# Display each index with its physical tables
for idx_name, idx_tables in indexes_by_name.items():
print(f"\n{bold('Index:')} {Colors.CYAN}{idx_name}{Colors.RESET}")
# Check if this is a multi-table index (IVF/HNSW with multiple physical tables)
has_multiple_tables = len(idx_tables) > 1
# Show each physical table for this index
for idx_table in idx_tables:
physical_table = idx_table['physical_table_name']
table_type = idx_table['algo_table_type'] if idx_table['algo_table_type'] else 'index'
# Get physical table's table_id
try:
phys_table_id_result = self.client.execute(
"SELECT rel_id FROM mo_catalog.mo_tables WHERE relname = ? AND reldatabase = ?",
(physical_table, database),
)
phys_table_id_row = phys_table_id_result.fetchone()
phys_table_id = phys_table_id_row[0] if phys_table_id_row else "N/A"
except Exception:
phys_table_id = "N/A"
# Color code by table type
if table_type == 'metadata':
type_display = f"{Colors.YELLOW}{table_type}{Colors.RESET}"
elif table_type == 'centroids':
type_display = f"{Colors.GREEN}{table_type}{Colors.RESET}"
elif table_type == 'entries':
type_display = f"{Colors.CYAN}{table_type}{Colors.RESET}"
else:
type_display = table_type
# Always show the physical table name with table_id
if has_multiple_tables:
# For multi-table indexes, show type in parentheses
print(f" └─ ({type_display}): {physical_table}:{phys_table_id}")
else:
# For single-table indexes, just show the physical table
print(f" └─ {physical_table}:{phys_table_id}")
data_indent = " "
obj_indent = " "
# Get Data (non-tombstone objects)
try:
phys_objects = self.client.metadata.scan(
database, physical_table, is_tombstone=False, columns="*"
)
if phys_objects:
unique_objs = deduplicate_objects(list(phys_objects))
total_rows = sum(getattr(obj, 'rows_cnt', 0) or 0 for obj in unique_objs)
total_null = sum(getattr(obj, 'null_cnt', 0) or 0 for obj in unique_objs)
total_origin = sum(getattr(obj, 'origin_size', 0) or 0 for obj in unique_objs)
total_compress = sum(
getattr(obj, 'compress_size', 0) or 0 for obj in unique_objs
)
print(f"{data_indent}└─ {bold('Data')}")
print(
f"{obj_indent}Objects: {len(unique_objs)} | Rows: {total_rows:,} | "
f"Null: {total_null:,} | Original: {format_size(total_origin)} | "
f"Compressed: {format_size(total_compress)}"
)
# Show object details
if unique_objs:
print(f"\n{obj_indent}{bold('Objects:')}")
print(
f"{obj_indent}{'Object Name':<50} | {'Rows':<12} | "
f"{'Null Cnt':<10} | {'Original Size':<15} | "
f"{'Compressed Size':<15}"
)
print(obj_indent + "-" * 148)
for obj in unique_objs:
obj_name = getattr(obj, 'object_name', 'N/A')
rows = getattr(obj, 'rows_cnt', 0) or 0
nulls = getattr(obj, 'null_cnt', 0) or 0
orig_size = getattr(obj, 'origin_size', 0) or 0
comp_size = getattr(obj, 'compress_size', 0) or 0
print(
f"{obj_indent}{obj_name:<50} | {rows:<12,} | "
f"{nulls:<10,} | {format_size(orig_size):<15} | "
f"{format_size(comp_size):<15}"
)
except Exception:
pass
# Show tombstone objects for this index physical table if requested
if include_tombstone:
try:
tomb_objects = self.client.metadata.scan(
database, physical_table, is_tombstone=True, columns="*"
)
if tomb_objects:
unique_tomb_objs = deduplicate_objects(list(tomb_objects))
if unique_tomb_objs:
total_rows = sum(
getattr(obj, 'rows_cnt', 0) or 0 for obj in unique_tomb_objs
)
total_null = sum(
getattr(obj, 'null_cnt', 0) or 0 for obj in unique_tomb_objs
)
total_origin = sum(
getattr(obj, 'origin_size', 0) or 0 for obj in unique_tomb_objs
)
total_compress = sum(
getattr(obj, 'compress_size', 0) or 0 for obj in unique_tomb_objs
)
print(f"\n{data_indent}└─ {bold('Tombstone')}")
print(
f"{obj_indent}Objects: {len(unique_tomb_objs)} | "
f"Rows: {total_rows:,} | Null: {total_null:,} | "
f"Original: {format_size(total_origin)} | "
f"Compressed: {format_size(total_compress)}"
)
print(f"\n{obj_indent}{bold('Objects:')}")
print(
f"{obj_indent}{'Object Name':<50} | {'Rows':<12} | "
f"{'Null Cnt':<10} | {'Original Size':<15} | "
f"{'Compressed Size':<15}"
)
print(obj_indent + "-" * 148)
for obj in unique_tomb_objs:
obj_name = getattr(obj, 'object_name', 'N/A')
rows = getattr(obj, 'rows_cnt', 0) or 0
nulls = getattr(obj, 'null_cnt', 0) or 0
orig_size = getattr(obj, 'origin_size', 0) or 0
comp_size = getattr(obj, 'compress_size', 0) or 0
print(
f"{obj_indent}{obj_name:<50} | {rows:<12,} | "
f"{nulls:<10,} | {format_size(orig_size):<15} | "
f"{format_size(comp_size):<15}"
)
except Exception:
pass
except Exception:
pass
print("\n" + "=" * 150)
print()
return
# Regular detailed view (not hierarchical)
# Get detailed statistics with object list
stats = self.client.metadata.get_table_detail_stats(
dbname=database,
tablename=table_name,
include_tombstone=include_tombstone,
include_indexes=include_indexes,
)
if not stats:
print(f"⚠️ No statistics available for table '{database}.{table_name}'")
return
print(f"\n📊 Detailed Table Statistics for '{database}.{table_name}':")
print("=" * 150)
# Helper function to format object row
def format_obj_row(obj):
# Try different possible key names
object_name = obj.get('object_name', 'N/A')
create_ts = obj.get('create_ts', 'N/A')
# Safely convert to int or string for formatting
def safe_value(value, default=0):
# If value is already a string (formatted), return as is
if isinstance(value, str):
return value
try:
return int(value) if value is not None else default
except (ValueError, TypeError):
return default
rows_cnt = safe_value(obj.get('rows_cnt', obj.get('row_cnt', 0)))
null_cnt = safe_value(obj.get('null_cnt', 0))
# Size fields might already be formatted strings (e.g., "1.5 MB")
origin_size = obj.get('origin_size', obj.get('original_size', 0))
compress_size = obj.get('compress_size', obj.get('compressed_size', 0))
# Format row based on whether sizes are numbers or strings
if isinstance(rows_cnt, str):
rows_str = f"{rows_cnt:<12}"
else:
rows_str = f"{rows_cnt:<12,}"
if isinstance(null_cnt, str):
null_str = f"{null_cnt:<10}"
else:
null_str = f"{null_cnt:<10,}"
if isinstance(origin_size, str):
orig_str = f"{origin_size:<15}"
else:
orig_str = f"{origin_size:<15,}"
if isinstance(compress_size, str):
comp_str = f"{compress_size:<15}"
else:
comp_str = f"{compress_size:<15,}"
return f"{object_name:<50} | {create_ts:<20} | {rows_str} | {null_str} | {orig_str} | {comp_str}"
# Helper function to deduplicate objects by object_name
def deduplicate_objects(objects):
"""Deduplicate objects by object_name, keeping first occurrence"""
seen = set()
unique_objects = []
for obj in objects:
obj_name = obj.get('object_name', 'N/A')
if obj_name not in seen:
seen.add(obj_name)
unique_objects.append(obj)
return unique_objects
# Show main table details
if table_name in stats:
unique_objs = deduplicate_objects(stats[table_name])
print(f"\n{Colors.BOLD}Table: {table_name}{Colors.RESET} ({len(unique_objs)} objects)")
print("-" * 150)
print(
f"{'Object Name':<50} | {'Create Time':<20} | {'Rows':<12} | "
f"{'Null Cnt':<10} | {'Original Size':<15} | {'Compressed Size':<15}"
)
print("-" * 150)
for obj in unique_objs:
print(format_obj_row(obj))
# Show tombstone details
if 'tombstone' in stats:
unique_objs = deduplicate_objects(stats['tombstone'])
print(f"\n{Colors.BOLD}Tombstone{Colors.RESET} ({len(unique_objs)} objects)")
print("-" * 150)
print(
f"{'Object Name':<50} | {'Create Time':<20} | {'Rows':<12} | "
f"{'Null Cnt':<10} | {'Original Size':<15} | {'Compressed Size':<15}"
)
print("-" * 150)
for obj in unique_objs:
print(format_obj_row(obj))
# Show index details
for key, objects in stats.items():
if key not in [table_name, 'tombstone']:
unique_objs = deduplicate_objects(objects)
index_name = key.replace(f'{table_name}_', '')
print(f"\n{Colors.BOLD}Index: {index_name}{Colors.RESET} ({len(unique_objs)} objects)")
print("-" * 150)
print(
f"{'Object Name':<50} | {'Create Time':<20} | {'Rows':<12} | "
f"{'Null Cnt':<10} | {'Original Size':<15} | {'Compressed Size':<15}"
)
print("-" * 150)
for obj in unique_objs:
print(format_obj_row(obj))
print("=" * 150)
else:
# Get brief statistics (summary)
stats = self.client.metadata.get_table_brief_stats(
dbname=database,
tablename=table_name,
include_tombstone=include_tombstone,
include_indexes=include_indexes,
)
if not stats:
print(f"⚠️ No statistics available for table '{database}.{table_name}'")
return
print(f"\n📊 Table Statistics for '{database}.{table_name}':")
print("=" * 120)
print(
f"{'Component':<30} | {'Objects':<10} | {'Rows':<15} | "
f"{'Null Count':<12} | {'Original Size':<15} | {'Compressed Size':<15}"
)
print("-" * 120)
# Show main table stats
if table_name in stats:
table_stats = stats[table_name]
if isinstance(table_stats, dict): # Type guard for mypy
print(
f"{table_name:<30} | {table_stats['total_objects']:<10} | "
f"{table_stats['row_cnt']:<15,} | {table_stats['null_cnt']:<12,} | "
f"{table_stats['original_size']:<15} | {table_stats['compress_size']:<15}"
)
# Show tombstone stats
if 'tombstone' in stats:
tomb_stats = stats['tombstone']
if isinstance(tomb_stats, dict): # Type guard for mypy
print(
f"{' └─ tombstone':<30} | {tomb_stats['total_objects']:<10} | "
f"{tomb_stats['row_cnt']:<15,} | {tomb_stats['null_cnt']:<12,} | "
f"{tomb_stats['original_size']:<15} | {tomb_stats['compress_size']:<15}"
)
# Show index stats - use get_table_indexes_detail to get all physical tables
if include_indexes:
try:
index_details = self.client.get_table_indexes_detail(table_name, database)
# Group by index name
indexes_by_name = {}
for idx in index_details:
idx_name = idx['index_name']
if idx_name not in indexes_by_name:
indexes_by_name[idx_name] = []
indexes_by_name[idx_name].append(idx)
# Helper function for size formatting
def format_size(size_bytes):
if isinstance(size_bytes, str):
return size_bytes
if size_bytes >= 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.2f} MB"
elif size_bytes >= 1024:
return f"{size_bytes / 1024:.2f} KB"
else:
return f"{size_bytes} B"
# Helper function to deduplicate objects
def deduplicate_objects(objects):
seen_objects = set()
unique_objs = []
for obj in objects:
obj_name = getattr(obj, 'object_name', None)
if obj_name and obj_name not in seen_objects:
seen_objects.add(obj_name)
unique_objs.append(obj)
return unique_objs
# Display each index
for idx_name, idx_tables in indexes_by_name.items():
has_multiple_tables = len(idx_tables) > 1
for idx_table in idx_tables:
physical_table = idx_table['physical_table_name']
table_type = idx_table['algo_table_type'] if idx_table['algo_table_type'] else 'index'
# Get data statistics for this physical table
try:
phys_objects = self.client.metadata.scan(
database, physical_table, is_tombstone=False, columns="*"
)
if phys_objects:
unique_objs = deduplicate_objects(list(phys_objects))
total_rows = sum(getattr(obj, 'rows_cnt', 0) or 0 for obj in unique_objs)
total_null = sum(getattr(obj, 'null_cnt', 0) or 0 for obj in unique_objs)
total_origin = sum(getattr(obj, 'origin_size', 0) or 0 for obj in unique_objs)
total_compress = sum(getattr(obj, 'compress_size', 0) or 0 for obj in unique_objs)
# Format display label
if has_multiple_tables:
label = f" └─ index: {idx_name} ({table_type})"
else:
label = f" └─ index: {idx_name}"
print(
f"{label:<30} | {len(unique_objs):<10} | {total_rows:<15,} | "
f"{total_null:<12,} | {format_size(total_origin):<15} | "
f"{format_size(total_compress):<15}"
)
# Show tombstone if requested
if include_tombstone:
try:
tomb_objects = self.client.metadata.scan(
database, physical_table, is_tombstone=True, columns="*"
)
if tomb_objects:
unique_tomb_objs = deduplicate_objects(list(tomb_objects))
if unique_tomb_objs:
total_rows = sum(
getattr(obj, 'rows_cnt', 0) or 0 for obj in unique_tomb_objs
)
total_null = sum(
getattr(obj, 'null_cnt', 0) or 0 for obj in unique_tomb_objs
)
total_origin = sum(
getattr(obj, 'origin_size', 0) or 0 for obj in unique_tomb_objs
)
total_compress = sum(
getattr(obj, 'compress_size', 0) or 0 for obj in unique_tomb_objs
)
tomb_label = " └─ tombstone"
print(
f"{tomb_label:<30} | {len(unique_tomb_objs):<10} | "
f"{total_rows:<15,} | {total_null:<12,} | "
f"{format_size(total_origin):<15} | "
f"{format_size(total_compress):<15}"
)
except Exception:
pass
except Exception:
pass
except Exception:
pass
print("=" * 120)
print(
"\nTip: Use '-t' for tombstone, '-i idx1,idx2' for indexes, '-a' for all, '-d' for detailed object list"
)
print()
except Exception as e:
print(f"❌ Error: {e}")
[docs]
def do_flush_table(self, arg):
"""Flush table and all its secondary index tables
Usage:
flush_table <table> [database] - Flush main table and all its index tables
Example:
flush_table my_table
flush_table my_table test
Note: Requires sys user privileges
"""
if not arg.strip():
print(f"{error('Error:')} Table name is required")
print(f"{info('Usage:')} flush_table <table> [database]")
return
# Parse arguments
parts = arg.strip().split()
if len(parts) < 1:
print(f"{error('Error:')} Table name is required")
return
table_name = parts[0].strip().rstrip(';')
database_name = self.current_database
# Check for database parameter
if len(parts) > 1:
database_name = parts[1].strip().rstrip(';')
try:
print(f"{info('🔄 Flushing table:')} {database_name}.{table_name}")
# Flush main table
try:
self.client.moctl.flush_table(database_name, table_name)
print(f"{success('✓')} Main table flushed successfully")
except Exception as e:
print(f"{error('❌')} Failed to flush main table: {e}")
return
# Get all index tables (including IVF/HNSW/Fulltext physical tables)
try:
indexes = self.client.get_table_indexes_detail(table_name, database_name)
if indexes:
# Extract all unique physical table names
physical_tables = list(set(idx['physical_table_name'] for idx in indexes if idx['physical_table_name']))
if physical_tables:
print(f"{info('📋 Found')} {len(physical_tables)} index physical tables")
# Group by index name for better display
indexes_by_name = {}
for idx in indexes:
idx_name = idx['index_name']
if idx_name not in indexes_by_name:
indexes_by_name[idx_name] = []
indexes_by_name[idx_name].append(idx)
# Flush each physical table
success_count = 0
for idx_name, idx_tables in indexes_by_name.items():
print(f"\n{info('📑 Index:')} {idx_name}")
for idx_table in idx_tables:
physical_table = idx_table['physical_table_name']
table_type = idx_table['algo_table_type'] if idx_table['algo_table_type'] else 'index'
try:
self.client.moctl.flush_table(database_name, physical_table)
print(f" {success('✓')} {table_type}: {physical_table}")
success_count += 1
except Exception as e:
print(f" {error('❌')} {table_type}: {physical_table} - {e}")
print(f"\n{info('📊 Summary:')}")
print(f" Main table: {success('✓')} flushed")
print(f" Index physical tables: {success_count}/{len(physical_tables)} flushed successfully")
else:
print(f"{info('ℹ️')} No index physical tables found")
print(f"{info('📊 Summary:')} Main table: {success('✓')} flushed")
else:
print(f"{info('ℹ️')} No indexes found")
print(f"{info('📊 Summary:')} Main table: {success('✓')} flushed")
except Exception as e:
print(f"{warning('⚠️')} Failed to get index tables: {e}")
print(f"{info('📊 Summary:')} Main table: {success('✓')} flushed (index tables not processed)")
except Exception as e:
print(f"{error('❌ Error:')} {e}")
[docs]
def do_tables(self, arg):
"""Show all tables in current database or specified database
Usage:
tables - Show all tables in current database
tables <database> - Show all tables in specified database
Example:
tables
tables test
"""
if not self.client:
print(f"{error('❌ Error:')} Not connected. Use 'connect' first.")
return
# Parse arguments
database_name = self.current_database
if arg.strip():
database_name = arg.strip().rstrip(';')
if not database_name:
print(f"{error('❌ Error:')} No database specified and no current database set.")
print(f"{info('Usage:')} tables [database]")
return
try:
# Execute SHOW TABLES
sql = f"SHOW TABLES FROM `{database_name}`"
result = self.client.execute(sql)
rows = result.fetchall()
if rows:
print(f"\n{header('📋 Tables in database')} '{database_name}':")
print("=" * 80)
# Print table names
for i, row in enumerate(rows, 1):
table_name = row[0]
print(f"{i:4}. {table_name}")
print("=" * 80)
print(f"{info('Total:')} {len(rows)} tables")
else:
print(f"{warning('⚠️')} No tables found in database '{database_name}'")
print()
except Exception as e:
print(f"{error('❌ Error:')} {e}")
[docs]
def do_databases(self, arg):
"""Show all databases
Usage:
databases - Show all databases
Example:
databases
"""
if not self.client:
print(f"{error('❌ Error:')} Not connected. Use 'connect' first.")
return
try:
# Execute SHOW DATABASES
result = self.client.execute("SHOW DATABASES")
rows = result.fetchall()
if rows:
print(f"\n{header('🗄️ Databases:')}")
print("=" * 80)
# Print database names with current database highlighted
for i, row in enumerate(rows, 1):
db_name = row[0]
if db_name == self.current_database:
print(f"{i:4}. {success(db_name)} {info('← current')}")
else:
print(f"{i:4}. {db_name}")
print("=" * 80)
print(f"{info('Total:')} {len(rows)} databases")
else:
print(f"{warning('⚠️')} No databases found")
print()
except Exception as e:
print(f"{error('❌ Error:')} {e}")
[docs]
def do_history(self, arg):
"""Show command history
Usage:
history - Show last 20 commands
history <n> - Show last n commands
history -c - Clear history
Example:
history
history 50
history -c
"""
if not PROMPT_TOOLKIT_AVAILABLE or not self.session:
print(f"{warning('⚠️')} History is only available when prompt_toolkit is installed.")
print(f"{info('Tip:')} Install with: pip install prompt_toolkit")
return
# Parse arguments
arg = arg.strip()
# Clear history
if arg == '-c':
try:
history_file = Path.home() / '.mo_diag_history'
if history_file.exists():
history_file.unlink()
print(f"{success('✓')} History cleared")
else:
print(f"{info('ℹ️')} No history file found")
# Recreate the history
self.session.history = FileHistory(str(history_file))
except Exception as e:
print(f"{error('❌ Error:')} Failed to clear history: {e}")
return
# Determine how many commands to show
try:
count = int(arg) if arg else 20
except ValueError:
print(f"{error('❌ Error:')} Invalid number: {arg}")
return
# Get history
try:
history_file = Path.home() / '.mo_diag_history'
if not history_file.exists():
print(f"{info('ℹ️')} No history yet")
return
# Read history file
with open(history_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Parse prompt_toolkit FileHistory format:
# Empty line
# # timestamp (comment)
# +command
# Empty line
commands = []
seen = set()
for line in lines:
line = line.strip()
# Skip empty lines and timestamp comments
if not line or line.startswith('#'):
continue
# Commands start with '+'
if line.startswith('+'):
cmd = line[1:] # Remove the '+' prefix
if cmd and cmd not in seen:
commands.append(cmd)
seen.add(cmd)
# Show last N commands
if commands:
start_idx = max(0, len(commands) - count)
print(f"\n{header('📜 Command History')} (last {min(count, len(commands))} commands):")
print("=" * 80)
for i, cmd in enumerate(commands[start_idx:], start=start_idx + 1):
print(f"{i:4}. {cmd}")
print("=" * 80)
print(f"{info('Total:')} {len(commands)} commands in history\n")
else:
print(f"{info('ℹ️')} No history yet")
except Exception as e:
print(f"{error('❌ Error:')} Failed to read history: {e}")
[docs]
def do_sql(self, arg):
"""
Execute a SQL query directly.
Usage: sql <SQL statement>
Example: sql SELECT COUNT(*) FROM cms_all_content_chunk_info
"""
if not arg:
print("❌ Usage: sql <SQL statement>")
return
if not self.client:
print("❌ Not connected. Use 'connect' first.")
return
try:
result = self.client.execute(arg)
rows = result.fetchall()
if rows:
# Print column headers if available
try:
# Try to get column names from result
if hasattr(result, 'keys') and callable(result.keys):
headers = result.keys()
elif hasattr(result, 'columns'):
headers = result.columns
elif hasattr(result, '_metadata') and hasattr(result._metadata, 'keys'):
headers = result._metadata.keys
else:
# Fallback: use column numbers
headers = [f"col{i}" for i in range(len(rows[0]))]
print("\n" + " | ".join(str(h) for h in headers))
print("-" * (sum(len(str(h)) for h in headers) + len(headers) * 3))
except Exception:
# If we can't get headers, just skip them
pass
# Print rows
for row in rows:
print(" | ".join(str(v) if v is not None else "NULL" for v in row))
print(f"\n{len(rows)} row(s) returned\n")
else:
print("✓ Query executed successfully (0 rows returned)\n")
except Exception as e:
print(f"❌ Error: {e}")
[docs]
def do_exit(self, arg):
"""Exit the interactive tool."""
print("\n👋 Goodbye!\n")
if self.client:
try:
self.client.disconnect()
except Exception:
pass
return True
[docs]
def do_quit(self, arg):
"""Exit the interactive tool."""
return self.do_exit(arg)
[docs]
def do_EOF(self, arg):
"""Handle Ctrl+D to exit."""
print()
return self.do_exit(arg)
[docs]
def emptyline(self):
"""Do nothing on empty line."""
pass
def _create_connected_client(host, port, user, password, database, log_level):
"""Create and return a connected MatrixOne client with logging configured."""
import logging
level = getattr(logging, log_level.upper(), logging.ERROR)
mo_logger = logging.getLogger('matrixone')
mo_logger.setLevel(level)
for handler in mo_logger.handlers:
handler.setLevel(level)
sql_log_mode = 'off' if level >= logging.ERROR else 'auto'
client = Client(sql_log_mode=sql_log_mode)
mo_logger.setLevel(level)
for handler in mo_logger.handlers:
handler.setLevel(level)
client.connect(host=host, port=port, user=user, password=password, database=database)
return client
[docs]
def main_cli():
"""
Main entry point for the CLI tool when installed via pip.
This function is called when running 'mo-diag' command.
Supports both interactive and non-interactive modes:
- Interactive: mo-diag (enters interactive shell)
- Non-interactive: mo-diag -c "show_ivf_status test" (executes single command and exits)
"""
import argparse
parser = argparse.ArgumentParser(
description='MatrixOne Interactive Diagnostic Tool',
epilog='''
Examples:
# Interactive mode
mo-diag --host localhost --port 6001 --user root --password 111
# Non-interactive mode - execute single command
mo-diag -d test -c "show_ivf_status"
mo-diag -d test -c "show_table_stats my_table -a"
mo-diag -d test -c "sql SELECT COUNT(*) FROM my_table"
''',
)
parser.add_argument('--host', default='localhost', help='Database host (default: localhost)')
parser.add_argument('--port', type=int, default=6001, help='Database port (default: 6001)')
parser.add_argument('--user', default='root', help='Database user (default: root)')
parser.add_argument('--password', default='111', help='Database password (default: 111)')
parser.add_argument('--database', '-d', help='Database name (optional)')
parser.add_argument(
'--log-level',
default='ERROR',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help='Logging level (default: ERROR)',
)
parser.add_argument('--command', '-c', help='Execute a single command and exit (non-interactive mode)')
subparsers = parser.add_subparsers(dest='subcommand')
cdc_parser = subparsers.add_parser('cdc', help='Manage CDC tasks')
cdc_subparsers = cdc_parser.add_subparsers(dest='cdc_command')
cdc_show_parser = cdc_subparsers.add_parser('show', help='Show CDC tasks or a specific task')
cdc_show_parser.add_argument('task_name', nargs='?', help='Name of the CDC task to inspect')
cdc_show_parser.add_argument('--details', action='store_true', help='Show detailed information')
cdc_show_parser.add_argument('--no-watermarks', action='store_true', help='Skip watermark listing (task view)')
cdc_show_parser.add_argument('--watermarks-only', action='store_true', help='Show only watermarks (task view)')
cdc_show_parser.add_argument('--threshold', help='Watermark latency threshold (task view, e.g. 5m, 30s)')
cdc_show_parser.add_argument('--table', help='Filter watermarks by table name (task view)')
cdc_show_parser.add_argument('--strict', action='store_true', help='Zero tolerance for watermark delays (task view)')
cdc_create_parser = cdc_subparsers.add_parser('create', help='Launch guided CDC task creation')
cdc_create_parser.add_argument('--database-level', action='store_true', help='Force database-level replication')
cdc_create_parser.add_argument('--table-level', action='store_true', help='Force table-level replication')
cdc_drop_parser = cdc_subparsers.add_parser('drop', help='Drop a CDC task')
cdc_drop_parser.add_argument('task_name', help='Name of the CDC task to drop')
cdc_drop_parser.add_argument('--force', action='store_true', help='Skip confirmation prompts')
args = parser.parse_args()
# Non-interactive mode: execute single command
if args.command:
try:
client = _create_connected_client(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
database=args.database,
log_level=args.log_level,
)
except Exception as e:
print(f"❌ Failed to connect: {e}")
import sys
sys.exit(1)
cli = MatrixOneCLI(client)
try:
cli.onecmd(args.command)
except Exception as e:
print(f"❌ Command execution failed: {e}")
import sys
sys.exit(1)
finally:
if client:
try:
client.disconnect()
except Exception:
pass
return
if args.subcommand == 'cdc':
if not args.cdc_command:
print("❌ Usage: mo-diag cdc <show|create|drop> [...]")
return
try:
client = _create_connected_client(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
database=args.database,
log_level=args.log_level,
)
except Exception as e:
print(f"❌ Failed to connect: {e}")
import sys
sys.exit(1)
cli = MatrixOneCLI(client)
try:
if args.cdc_command == 'show':
if not args.task_name:
if args.no_watermarks or args.watermarks_only or args.threshold or args.table or args.strict:
print(
"❌ Options --no-watermarks/--watermarks-only/--threshold/--table/--strict require a task name."
)
return
command = "cdc_tasks"
if args.details:
command += " --details"
cli.onecmd(command)
else:
segments = [f"cdc_task {args.task_name}"]
if args.details:
segments.append("--details")
if args.no_watermarks:
segments.append("--no-watermarks")
if args.watermarks_only:
segments.append("--watermarks-only")
if args.threshold:
segments.append(f"--threshold={args.threshold}")
if args.table:
segments.append(f"--table={args.table}")
if args.strict:
segments.append("--strict")
command = " ".join(segments)
cli.onecmd(command)
elif args.cdc_command == 'create':
flags: List[str] = []
if args.database_level and args.table_level:
print("❌ Specify only one of --database-level or --table-level.")
return
if args.database_level:
flags.append("--database-level")
if args.table_level:
flags.append("--table-level")
cli.do_cdc_create(" ".join(flags))
elif args.cdc_command == 'drop':
segments = [args.task_name]
if args.force:
segments.append("--force")
cli.do_cdc_drop(" ".join(segments))
else:
print(f"❌ Unknown CDC command '{args.cdc_command}'.")
return
finally:
if 'client' in locals() and client:
try:
client.disconnect()
except Exception:
pass
return
# Interactive mode
start_interactive_tool(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
database=args.database,
log_level=args.log_level,
)
if __name__ == '__main__':
main_cli()