Pub/Sub Manager

This section documents the pub/sub management classes that provide real-time data distribution capabilities.

PubSubManager

class matrixone.pubsub.PubSubManager(client, executor=None)[source]

Bases: BasePubSubManager

Synchronous Publish-Subscribe management for MatrixOne.

Provides comprehensive publish-subscribe functionality for real-time data distribution and event-driven architectures. Enables data sharing between accounts through publications and subscriptions, supporting both database and table-level granularity.

Key Features:

  • Database and table-level publications: Publish entire databases or specific tables

  • Cross-account data sharing: Share data between different MatrixOne accounts

  • Subscription management: Create and manage data subscriptions

  • Real-time data distribution: Enable event-driven data flow

  • Transaction-aware: Full integration with transaction contexts

  • Flexible granularity: Choose database or table level for publications

Executor Pattern:

  • If executor is None, uses self.client.execute (default client-level executor)

  • If executor is provided (e.g., session), uses executor.execute (transaction-aware)

  • All operations can participate in transactions when used via session

Usage Examples:

from matrixone import Client

# Publisher account creates a publication
publisher = Client(host='localhost', port=6001, user='publisher#root',
                  password='111', database='data')

# Create database-level publication
pub = publisher.pubsub.create_database_publication(
    name='analytics_data',
    database='analytics',
    account='subscriber_account'  # Allow this account to subscribe
)
print(f"Publication created: {pub.name}")

# Create table-level publication
pub_table = publisher.pubsub.create_table_publication(
    name='orders_data',
    database='production',
    table='orders',
    account='subscriber_account'
)

# List all publications
publications = publisher.pubsub.list_publications()
for p in publications:
    print(f"{p.name}: {p.database}.{p.tables}")

# Alter publication (change account permissions)
publisher.pubsub.alter_publication(
    name='analytics_data',
    account='new_subscriber_account'
)

# Subscriber account creates a subscription
subscriber = Client(host='localhost', port=6001, user='subscriber#root',
                   password='222', database='sub_db')

# Create subscription to published data
sub = subscriber.pubsub.create_subscription(
    subscription_name='analytics_copy',  # Local database name
    publication_name='analytics_data',
    publisher_account='publisher_account'
)

# List all subscriptions
subscriptions = subscriber.pubsub.list_subscriptions()
for s in subscriptions:
    print(f"Subscribed to: {s.pub_account}.{s.pub_database}")

# Drop subscription when no longer needed
subscriber.pubsub.drop_subscription('analytics_copy')

# Publisher drops publication
publisher.pubsub.drop_publication('analytics_data')

# Using within a transaction
with publisher.session() as session:
    # Create multiple publications atomically
    session.pubsub.create_database_publication('pub1', 'db1', 'acc1')
    session.pubsub.create_table_publication('pub2', 'db2', 'table1', 'acc2')

Implementation Notes:

  • SHOW PUBLICATIONS does not support WHERE clause, so filtering is done client-side

  • Publications enable cross-account data sharing in MatrixOne

  • Subscriptions appear as regular databases to the subscriber

  • Changes to published data are reflected in subscriptions

See also

  • SnapshotManager: For point-in-time data sharing via snapshots

  • CloneManager: For one-time data cloning

  • AccountManager: For account and permission management

create_database_publication(name: str, database: str, account: str) Publication[source]

Create database-level publication

create_table_publication(name: str, database: str, table: str, account: str) Publication[source]

Create table-level publication

get_publication(name: str) Publication[source]

Get publication by name

list_publications(account: str | None = None, database: str | None = None) List[Publication][source]

List publications with optional filters

alter_publication(name: str, account: str | None = None, database: str | None = None, table: str | None = None) Publication[source]

Alter publication

drop_publication(name: str) bool[source]

Drop publication

show_create_publication(name: str) str[source]

Show CREATE PUBLICATION statement

create_subscription(subscription_name: str, publication_name: str, publisher_account: str) Subscription[source]

Create subscription

get_subscription(name: str) Subscription[source]

Get subscription by name

list_subscriptions(pub_account: str | None = None, pub_database: str | None = None) List[Subscription][source]

List subscriptions with optional filters

Publication

class matrixone.pubsub.Publication(name: str, database: str, tables: str, sub_account: str, subscribed_accounts: str, created_time: datetime | None = None, update_time: datetime | None = None, comments: str | None = None)[source]

Bases: object

Publication object

__init__(name: str, database: str, tables: str, sub_account: str, subscribed_accounts: str, created_time: datetime | None = None, update_time: datetime | None = None, comments: str | None = None)[source]

Initialize Publication object

Subscription

class matrixone.pubsub.Subscription(pub_name: str, pub_account: str, pub_database: str, pub_tables: str, sub_name: str, pub_comment: str | None = None, pub_time: datetime | None = None, sub_time: datetime | None = None, status: int = 0)[source]

Bases: object

Subscription object

__init__(pub_name: str, pub_account: str, pub_database: str, pub_tables: str, sub_name: str, pub_comment: str | None = None, pub_time: datetime | None = None, sub_time: datetime | None = None, status: int = 0)[source]

Initialize Subscription object