Quick Startο
This guide will help you get started with the MatrixOne Python SDK quickly. For detailed configuration options, see Configuration Management Guide.
Danger
π¨ CRITICAL: Column Naming Convention
Always use lowercase with underscores (snake_case) for column names!
MatrixOne does not support SQL standard double-quoted identifiers, causing SELECT queries to fail when using camelCase column names with SQLAlchemy ORM.
# β DON'T: CamelCase - Will fail!
class User(Base):
userName = Column(String(50)) # SELECT will fail
userId = Column(Integer)
# β
DO: snake_case - Works perfectly
class User(Base):
user_name = Column(String(50)) # All operations work
user_id = Column(Integer)
Problem: CamelCase generates SELECT "userName" β (not supported by MatrixOne)
Solution: snake_case generates SELECT user_name β
(works perfectly)
Basic Usageο
Note
Connection API Requirements
The connect() method requires keyword arguments (not positional):
database- Required, no default valuehost- Default:'localhost'port- Default:6001user- Default:'root'password- Default:'111'
Minimal connection (uses all defaults):
client.connect(database='test')
By default, all features (IVF, HNSW, fulltext) are automatically enabled via on_connect=[ConnectionAction.ENABLE_ALL].
from matrixone import Client
from sqlalchemy import Column, Integer, String, DateTime
from matrixone.orm import declarative_base
# Create and connect to MatrixOne
client = Client()
client.connect(
host="127.0.0.1",
port=6001,
user="root",
password="111",
database="test"
)
# Define table model
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(200))
created_at = Column(DateTime)
# Create table using model
client.create_table(User)
# Insert data using insert API
client.insert("users", {
"id": 1,
"name": "John Doe",
"email": "john@example.com",
"created_at": "2024-01-01 10:00:00"
})
# Simple query using execute API
result = client.execute("SELECT * FROM users WHERE id = ?", (1,))
print(result.fetchall())
# Complex query using query builder
result = client.query(User).select("*").filter(User.id == 1).execute()
print(result.fetchall())
# Update data using query API
client.query(User).update({"name": "Jane Doe"}).filter(User.id == 1).execute()
# Delete data using query API
client.query(User).filter(User.id == 1).delete()
# Drop table using drop_table API
client.drop_table(User)
client.disconnect()
Transaction Management (Recommended)ο
Use client.session() for atomic transactions. All operations within a session succeed or fail together with automatic commit/rollback.
from matrixone import Client
from sqlalchemy import select, insert, update, delete
from sqlalchemy import Column, Integer, String
from matrixone.orm import declarative_base
client = Client()
client.connect(database='test')
# Define model
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
age = Column(Integer)
status = Column(String(20))
# Create table
client.create_table(User)
# Basic transaction with automatic commit/rollback
with client.session() as session:
# All operations are atomic
session.execute(insert(User).values(name='Alice', email='alice@example.com', age=30))
session.execute(update(User).where(User.age < 18).values(status='minor'))
# Query within transaction
stmt = select(User).where(User.age > 25)
result = session.execute(stmt)
users = result.scalars().all()
# Commits automatically on success, rolls back on error
# Error handling with automatic rollback
try:
with client.session() as session:
session.execute(insert(User).values(name='Bob', age=25))
# This will fail and trigger automatic rollback
session.execute(insert(InvalidTable).values(data='test'))
except Exception as e:
print(f"Transaction failed and rolled back: {e}")
client.disconnect()
Key Benefits:
β Atomic operations - all succeed or fail together
β Automatic rollback on errors
β Access to all managers (snapshots, clones, load_data, etc.)
β Full SQLAlchemy ORM support
Wrapping Existing SQLAlchemy Sessionsο
If you have existing SQLAlchemy code, you can wrap your sessions to add MatrixOne features without refactoring:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from matrixone import Client
from matrixone.session import Session as MatrixOneSession
# Your existing SQLAlchemy setup
engine = create_engine('mysql+pymysql://root:111@localhost:6001/test')
SessionFactory = sessionmaker(bind=engine)
sqlalchemy_session = SessionFactory()
# Create MatrixOne client
mo_client = Client()
mo_client.connect(host='localhost', port=6001, user='root', password='111', database='test')
# Wrap your existing session with MatrixOne features
mo_session = MatrixOneSession(
client=mo_client,
wrap_session=sqlalchemy_session
)
try:
# Your existing SQLAlchemy operations still work
result = mo_session.execute("SELECT * FROM users")
# Now you can also use MatrixOne-specific features
mo_session.stage.create_s3('backup_stage', bucket='my-backups', path='')
mo_session.snapshots.create('daily_backup', level='database')
mo_session.load_data.read_csv('/data/users.csv', table='users')
mo_session.commit()
finally:
mo_session.close()
Perfect For:
π Gradual migration from pure SQLAlchemy to MatrixOne
π’ Adding MatrixOne features to existing enterprise applications
π¦ Legacy code modernization without complete refactoring
π§ Testing MatrixOne features alongside existing code
Async Version:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from matrixone import AsyncClient
from matrixone.session import AsyncSession as MatrixOneAsyncSession
# Existing async SQLAlchemy setup
async_engine = create_async_engine('mysql+aiomysql://root:111@localhost:6001/test')
AsyncSessionFactory = async_sessionmaker(bind=async_engine)
sqlalchemy_async_session = AsyncSessionFactory()
# Create async MatrixOne client
mo_async_client = AsyncClient()
await mo_async_client.connect(host='localhost', port=6001, user='root', password='111', database='test')
# Wrap existing async session
mo_async_session = MatrixOneAsyncSession(
client=mo_async_client,
wrap_session=sqlalchemy_async_session
)
try:
# Standard async SQLAlchemy operations
result = await mo_async_session.execute("SELECT * FROM users")
# MatrixOne async features now available
await mo_async_session.stage.create_local('export_stage', '/exports/')
await mo_async_session.snapshots.create('async_backup', level='database')
await mo_async_session.commit()
finally:
await mo_async_session.close()
SQLAlchemy ORM Style (Recommended)ο
The SDK provides seamless SQLAlchemy integration with ORM-style operations:
from matrixone import Client
from matrixone.orm import Base, Column, Integer, String
from sqlalchemy import select, insert, update, delete, and_, or_
# Define ORM models
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255))
age = Column(Integer)
client = Client()
client.connect(database='test')
client.create_table(User)
# ORM-style INSERT
stmt = insert(User).values(name='John', email='john@example.com', age=30)
client.execute(stmt)
# ORM-style SELECT with WHERE
stmt = select(User).where(User.age > 25)
result = client.execute(stmt)
for user in result.scalars():
print(f"User: {user.name}, Age: {user.age}")
# ORM-style SELECT with complex WHERE
stmt = select(User).where(
and_(
User.age > 18,
or_(User.status == 'active', User.status == 'pending')
)
)
result = client.execute(stmt)
# ORM-style UPDATE
stmt = update(User).where(User.id == 1).values(email='newemail@example.com')
result = client.execute(stmt)
print(f"Updated {result.affected_rows} rows")
# ORM-style DELETE
stmt = delete(User).where(User.age < 18)
result = client.execute(stmt)
client.disconnect()
Recommended Practices:
β Use SQLAlchemy statements (
select,insert,update,delete)β Use
session()for multi-statement transactionsβ Use
client.execute()for single-statement operationsβ Prefer SQLAlchemy statements over raw SQL strings
Async Usage with Sessions and ORMο
Full async/await support with async sessions for non-blocking operations:
import asyncio
from matrixone import AsyncClient
from matrixone.orm import Base, Column, Integer, String, DECIMAL
from sqlalchemy import select, insert, update
async def async_quickstart():
client = AsyncClient()
await client.connect(database='test')
# Define table model
Base = declarative_base()
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(200))
price = Column(DECIMAL(10, 2))
category = Column(String(50))
# Create table
await client.create_table(Product)
# ORM-style async INSERT
stmt = insert(Product).values(
id=1, name='Laptop', price=999.99, category='Electronics'
)
await client.execute(stmt)
# ORM-style async SELECT
stmt = select(Product).where(Product.category == 'Electronics')
result = await client.execute(stmt)
for product in result.scalars():
print(f"Product: {product.name}, Price: ${product.price}")
# Async transaction with session
async with client.session() as session:
# All operations are atomic
await session.execute(
insert(Product).values(id=2, name='Phone', price=699.99, category='Electronics')
)
await session.execute(
update(Product).where(Product.id == 1).values(price=899.99)
)
# Concurrent queries with asyncio.gather
product_result, count_result = await asyncio.gather(
session.execute(select(Product).where(Product.category == 'Electronics')),
session.execute(select(func.count(Product.id)))
)
# Commits automatically
# Clean up
await client.drop_table(Product)
await client.disconnect()
asyncio.run(async_quickstart())
Async Advantages:
β Non-blocking operations - donβt block the event loop
β Concurrent execution with
asyncio.gather()β Perfect for web frameworks (FastAPI, aiohttp)
β Same transaction guarantees as sync version
Vector Operations with Table Modelsο
from matrixone import Client
from matrixone.config import get_connection_params
from sqlalchemy import Column, Integer, String, Text
from matrixone.orm import declarative_base
from matrixone.sqlalchemy_ext import Vectorf32
# Get connection parameters
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Define vector table model
Base = declarative_base()
class Document(Base):
__tablename__ = 'documents'
id = Column(Integer, primary_key=True)
title = Column(String(200))
content = Column(Text)
embedding = Column(Vectorf32(384)) # 384-dimensional vector
# Create table using model
client.create_table(Document)
# β οΈ Insert initial data BEFORE creating IVF index (recommended)
client.insert("documents", {
"id": 1,
"title": "Introduction to AI",
"content": "Artificial Intelligence is a field of computer science...",
"embedding": [0.1] * 384 # Example 384-dimensional vector
})
# Enable IVF indexing
client.vector_ops.enable_ivf()
# Create IVF index after initial data (better clustering)
client.vector_ops.create_ivf(Document, name="idx_embedding", column="embedding", lists=100)
# IVF supports dynamic updates (can continue inserting)
client.insert("documents", {"id": 2, ...}) # β
Works fine
# Vector similarity search using simple interface (first argument is positional)
query_vector = [0.1] * 384
results = client.vector_ops.similarity_search(
Document,
vector_column="embedding",
query_vector=query_vector,
limit=5,
distance_type="l2"
)
print("Similarity search results:", results)
# Complex vector query using query builder
result = client.query("documents").select("*").where(
"l2_distance(embedding, ?) < ?",
(query_vector, 0.5)
).order_by("l2_distance(embedding, ?)", query_vector).limit(10).execute()
for row in result.fetchall():
print(f"Document: {row[1]}, Distance: {row[3]}")
# β IMPORTANT: Monitor IVF index health for production systems
stats = client.vector_ops.get_ivf_stats(Document, "embedding")
counts = stats['distribution']['centroid_count']
balance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
print(f"Index health - Centroids: {len(counts)}, Balance ratio: {balance_ratio:.2f}")
if balance_ratio > 2.5:
print("β οΈ Warning: Index needs rebuilding for optimal performance")
# Drop vector index
client.vector_ops.drop(Document, "idx_embedding")
# Clean up
client.drop_table(Document)
client.disconnect()
HNSW Vector Indexingο
from matrixone import Client
from matrixone.config import get_connection_params
from sqlalchemy import Column, BigInteger, String
from matrixone.orm import declarative_base
from matrixone.sqlalchemy_ext import Vectorf32
# Get connection parameters
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Define vector table model
Base = declarative_base()
class Product(Base):
__tablename__ = 'products'
# IMPORTANT: HNSW index requires BigInteger primary key
id = Column(BigInteger, primary_key=True, autoincrement=True)
name = Column(String(200))
features = Column(Vectorf32(128)) # 128-dimensional feature vector
# Create table using model
client.create_table(Product)
# Insert vector data using client API
client.insert(Product, {
"name": "Smartphone",
"features": [0.2] * 128 # Example 128-dimensional vector
})
# Enable HNSW indexing
client.vector_ops.enable_hnsw()
# Create HNSW index (first argument is positional)
client.vector_ops.create_hnsw(Product, name="idx_features", column="features", m=16, ef_construction=200)
# Vector similarity search (first argument is positional)
query_vector = [0.2] * 128
results = client.vector_ops.similarity_search(
Product,
vector_column="features",
query_vector=query_vector,
limit=5,
distance_type="cosine"
)
print("HNSW similarity search results:", results)
# Drop vector index
client.vector_ops.drop(Product, "idx_features")
# Clean up
client.drop_table(Product)
client.disconnect()
ORM with Modern Patternsο
from sqlalchemy import Column, Integer, String, DECIMAL, DateTime
from matrixone.orm import declarative_base
from matrixone import Client
from matrixone.config import get_connection_params
# Define ORM models
Base = declarative_base()
class Account(Base):
__tablename__ = 'accounts'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
balance = Column(DECIMAL(10, 2), nullable=False)
created_at = Column(DateTime, nullable=False)
# Get connection and create client
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Create table using ORM model
client.create_table(Account)
# Insert data using client API
accounts_data = [
{"name": "Alice", "balance": 1000.00, "created_at": "2024-01-01 10:00:00"},
{"name": "Bob", "balance": 500.00, "created_at": "2024-01-01 10:00:00"}
]
client.batch_insert(Account, accounts_data)
# Query using client API
accounts = client.query(Account).filter(Account.balance > 600).all()
for account in accounts:
print(f"{account.name}: ${account.balance}")
# Update using ORM
session.query(Account).filter(Account.name == "Alice").update({"balance": 1200.00})
session.commit()
# Clean up using ORM
client.drop_table(Account)
session.close()
client.disconnect()
Vector Search with Modern APIο
from matrixone import Client
from matrixone.config import get_connection_params
from matrixone.sqlalchemy_ext import create_vector_column
import numpy as np
# Get connection parameters
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Create vector table using create_table API
client.create_table("documents", {
"id": "int",
"title": "varchar(200)",
"content": "text",
"embedding": "vecf32(384)" # 384-dimensional f32 vector
}, primary_key="id")
# Create vector index using vector_ops API
client.vector_ops.enable_ivf()
client.vector_ops.create_ivf(
table_name_or_model="documents",
name="idx_embedding",
column="embedding",
lists=50,
op_type="vector_l2_ops"
)
# Insert documents with embeddings using insert API
documents = [
{
"id": 1,
"title": "AI Research",
"content": "Artificial intelligence research paper",
"embedding": np.random.rand(384).astype(np.float32).tolist()
},
{
"id": 2,
"title": "ML Guide",
"content": "Machine learning tutorial",
"embedding": np.random.rand(384).astype(np.float32).tolist()
}
]
for doc in documents:
client.insert("documents", doc)
# Vector similarity search using vector_query API
query_vector = np.random.rand(384).astype(np.float32).tolist()
results = client.vector_ops.similarity_search(
table_name_or_model="documents",
vector_column="embedding",
query_vector=query_vector,
limit=5,
distance_type="l2"
)
print("Vector Search Results:")
for result in results.rows:
print(f"Document: {result[1]} (Distance: {result[-1]:.4f})")
# β CRITICAL: Check IVF index health - Essential for production monitoring
stats = client.vector_ops.get_ivf_stats("documents", "embedding")
# Analyze index balance
distribution = stats['distribution']
counts = distribution['centroid_count']
total_centroids = len(counts)
total_vectors = sum(counts)
balance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
print(f"\nIVF Index Health:")
print(f" - Total centroids: {total_centroids}")
print(f" - Total vectors: {total_vectors}")
print(f" - Balance ratio: {balance_ratio:.2f} {'β' if balance_ratio <= 2.5 else 'β οΈ'}")
if balance_ratio > 2.5:
print(f" - β οΈ Index imbalanced - consider rebuilding")
print(f" - See vector_guide for detailed monitoring procedures")
# Clean up using drop_table API
client.drop_table("documents")
client.disconnect()
Async Vector Operationsο
import asyncio
from matrixone import AsyncClient
from matrixone.config import get_connection_params
import numpy as np
async def async_vector_example():
# Get connection parameters
host, port, user, password, database = get_connection_params()
client = AsyncClient()
await client.connect(host=host, port=port, user=user, password=password, database=database)
# Create vector table using async create_table API
await client.create_table("products", {
"id": "int",
"name": "varchar(200)",
"description": "text",
"features": "vecf64(512)" # 512-dimensional f64 vector
}, primary_key="id")
# Create vector index using async vector_ops API
await client.vector_ops.enable_ivf()
await client.vector_ops.create_ivf(
table_name_or_model="products",
name="idx_features",
column="features",
lists=100,
op_type="vector_cosine_ops"
)
# Insert products with feature vectors using async insert API
products = [
{
"id": 1,
"name": "Smartphone",
"description": "Latest smartphone with AI features",
"features": np.random.rand(512).astype(np.float64).tolist()
},
{
"id": 2,
"name": "Laptop",
"description": "High-performance laptop for professionals",
"features": np.random.rand(512).astype(np.float64).tolist()
}
]
for product in products:
await client.insert("products", product)
# Vector similarity search using async vector_query API
query_vector = np.random.rand(512).astype(np.float64).tolist()
results = await client.vector_ops.similarity_search(
table_name_or_model="products",
vector_column="features",
query_vector=query_vector,
limit=3,
distance_type="cosine"
)
print("Async Vector Search Results:")
for result in results.rows:
print(f"Product: {result[1]} (Similarity: {1 - result[-1]:.4f})")
# β Monitor IVF index health asynchronously
stats = await client.vector_ops.get_ivf_stats("products", "features")
counts = stats['distribution']['centroid_count']
balance_ratio = max(counts) / min(counts) if min(counts) > 0 else float('inf')
print(f"\nAsync IVF Index Health:")
print(f" - Centroids: {len(counts)}, Balance: {balance_ratio:.2f}")
# Clean up using async drop_table API
await client.drop_table("products")
await client.disconnect()
asyncio.run(async_vector_example())
Transaction Managementο
from matrixone import Client
from matrixone.config import get_connection_params
def transaction_example():
host, port, user, password, database = get_connection_params()
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Create table using create_table API
client.create_table("orders", {
"id": "int",
"customer_id": "int",
"amount": "decimal(10,2)",
"status": "varchar(20)"
}, primary_key="id")
# Use transaction for atomic operations
with client.transaction() as tx:
# Insert order
tx.insert("orders", {
"id": 1,
"customer_id": 100,
"amount": 99.99,
"status": "pending"
})
# Update order status
tx.query("orders").update({"status": "confirmed"}).where("id = ?", 1).execute()
# If any operation fails, the entire transaction is rolled back
# Verify the transaction
result = client.query("orders").select("*").where("id = ?", 1).execute()
print("Order after transaction:", result.fetchall())
# Clean up
client.drop_table("orders")
client.disconnect()
transaction_example()
Error Handling with Modern APIο
from matrixone import Client
from matrixone.exceptions import ConnectionError, QueryError
from matrixone.config import get_connection_params
def robust_example():
client = None
try:
host, port, user, password, database = get_connection_params()
# Create client with error handling
client = Client()
client.connect(host=host, port=port, user=user, password=password, database=database)
# Create table with error handling
try:
client.create_table("test_table", {
"id": "int",
"name": "varchar(100)"
}, primary_key="id")
print("β Table created successfully")
except QueryError as e:
print(f"β Table creation failed: {e}")
# Insert data with error handling
try:
client.insert("test_table", {"id": 1, "name": "Test"})
print("β Data inserted successfully")
except QueryError as e:
print(f"β Data insertion failed: {e}")
# Query data with error handling
try:
result = client.query("test_table").select("*").execute()
print(f"β Query successful: {result.fetchall()}")
except QueryError as e:
print(f"β Query failed: {e}")
except ConnectionError as e:
print(f"β Connection failed: {e}")
except Exception as e:
print(f"β Unexpected error: {e}")
finally:
# Always clean up
if client:
try:
client.drop_table("test_table")
client.disconnect()
print("β Cleanup completed")
except Exception as e:
print(f"β οΈ Cleanup warning: {e}")
robust_example()
Configuration Best Practicesο
from matrixone import Client
from matrixone.config import get_connection_params, print_config
def configuration_example():
# Use environment variables for configuration
# Set these in your environment:
# export MATRIXONE_HOST=127.0.0.1
# export MATRIXONE_PORT=6001
# export MATRIXONE_USER=root
# export MATRIXONE_PASSWORD=111
# export MATRIXONE_DATABASE=test
# Print current configuration
print_config()
# Get connection parameters from environment
host, port, user, password, database = get_connection_params()
# Create client with optimized settings
client = Client(
connection_timeout=30, # Connection timeout in seconds
query_timeout=300, # Query timeout in seconds
auto_commit=True, # Enable auto-commit for better performance
charset='utf8mb4', # Support for international characters
sql_log_mode='simple', # Simple SQL logging for production
slow_query_threshold=1.0 # Alert on queries > 1s
)
client.connect(host=host, port=port, user=user, password=password, database=database)
# Check backend capabilities
version = client.get_backend_version()
print(f"β Connected to MatrixOne {version}")
if client.is_feature_available('vector_search'):
print("β Vector search is available")
if client.is_feature_available('fulltext_search'):
print("β Fulltext search is available")
client.disconnect()
configuration_example()
Next Stepsο
Read the API Reference for detailed API documentation
Check out the Vector Search Guide for comprehensive vector operations
Explore Fulltext Search Guide for text search capabilities
Learn about ORM Usage Guide for ORM patterns
Check out the Examples for comprehensive usage examples
Learn about Contributing to contribute to the project
Run
make examplesto test all examples with your MatrixOne setupUse
make testto run the test suite and verify your setup