Load Data Manager

LoadDataManager

Pandas-style data loading manager for synchronous operations.

Provides read_csv(), read_json(), and read_parquet() methods for loading data files into MatrixOne tables with pandas-compatible parameter naming.

class matrixone.load_data.LoadDataManager(client, executor=None)[source]

Bases: BaseLoadDataManager

Synchronous data loading manager for MatrixOne bulk data operations.

This class provides a comprehensive and high-performance interface for loading large volumes of data from files into MatrixOne tables. It supports multiple data formats, compression formats, and advanced loading options for optimal performance and flexibility.

Key Features:

  • Multiple data formats: CSV, TSV, JSON, JSONLines, Parquet

  • Flexible delimiters: Customizable field and line terminators

  • Compression support: gzip, bzip2, tar.gz, tar.bz2, lz4, lzo

  • Parallel loading: High-performance parallel data loading

  • Character set conversion: Support for various character encodings

  • Column mapping: Map file columns to table columns

  • Data transformation: Apply SET clauses for column transformations

  • Stage integration: Load from external stages (S3, local filesystem)

  • Transaction-aware: Full integration with transaction contexts

  • Header handling: Skip header rows automatically

Executor Pattern:

  • If executor is None, uses self.client.execute (default client-level executor)

  • If executor is provided (e.g., session), uses executor.execute (transaction-aware)

  • All operations can participate in transactions when used via session

Supported Data Formats:

  • CSV: Comma-Separated Values (most common)

  • TSV: Tab-Separated Values

  • JSON: JSON objects (one per file)

  • JSONLines: JSON Lines format (one JSON object per line)

  • Parquet: Apache Parquet columnar format

Usage Examples:

from matrixone import Client
from matrixone.orm import Column, Integer, String, VectorType

client = Client(host='localhost', port=6001, user='root', password='111', database='test')

# Define ORM models (recommended approach)
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    amount = Column(Float)

# ========================================
# Basic CSV Loading with ORM Models
# ========================================

# Load CSV using ORM model (recommended)
client.load_data.from_csv('/path/to/users.csv', User)

# CSV with header row (skip first line)
client.load_data.from_csv(
    '/path/to/users.csv',
    User,
    ignore_lines=1  # Skip header
)

# Custom delimiter and quote character
client.load_data.from_csv(
    '/path/to/data.txt',
    Order,
    delimiter='|',
    enclosed_by='"',
    optionally_enclosed=True
)

# ========================================
# Advanced Loading Options
# ========================================

# Load compressed file
client.load_data.from_csv(
    '/path/to/data.csv.gz',
    User,
    compression='gzip'
)

# Parallel loading for large files (high performance)
client.load_data.from_csv(
    '/path/to/large_data.csv',
    User,
    parallel=True  # Enables parallel loading
)

# Column mapping and transformation
client.load_data.from_csv(
    '/path/to/data.csv',
    User,
    columns=['name', 'email', 'age'],
    set_clause={'created_at': 'NOW()', 'status': "'active'"}
)

# ========================================
# Different File Formats
# ========================================

# Load TSV (Tab-Separated Values)
client.load_data.from_tsv('/path/to/data.tsv', User)

# Load JSON Lines format
client.load_data.from_jsonlines('/path/to/data.jsonl', User)

# ========================================
# Load from External Stages
# ========================================

# Load from S3 stage using ORM model
client.load_data.from_stage_csv('production_s3', 'users.csv', User)

# Load from local filesystem stage
client.load_data.from_stage_csv('local_data', 'orders.csv', Order)

# Load from stage with custom options
client.load_data.from_stage(
    'stage://my_stage/data.csv',
    User,
    file_format='csv',
    delimiter=','
)

# ========================================
# Transactional Data Loading
# ========================================

# Using within a transaction (all loads atomic)
with client.session() as session:
    # Load multiple files atomically using ORM models
    session.load_data.from_csv('/path/to/users.csv', User)
    session.load_data.from_csv('/path/to/orders.csv', Order)

    # Insert additional data within same transaction
    session.execute(insert(User).values(name='Admin', email='admin@example.com'))
    # All operations commit together

Performance Tips:

  • Use parallel=True for large files (>100MB) to enable parallel loading

  • Use compressed files (gzip, bzip2) to reduce I/O and transfer time

  • Specify columns parameter to skip unnecessary columns

  • Use appropriate character_set to avoid encoding overhead

  • For very large datasets, consider loading in batches

Common Use Cases:

  • Data migration: Import data from legacy systems

  • ETL pipelines: Load transformed data into MatrixOne

  • Data warehouse loading: Bulk load fact and dimension tables

  • Log file ingestion: Import application or server logs

  • CSV imports: Load data from spreadsheets or exports

See also

  • StageManager: For managing external stages

  • Client.insert: For small-scale data insertion

  • Client.batch_insert: For programmatic batch inserts

__init__(client, executor=None)[source]

Initialize LoadDataManager.

Parameters:
  • client – Client object that provides execute() method

  • executor – Optional executor (e.g., session) for executing SQL. If None, uses client.execute

read_csv(filepath_or_buffer: str, table: str | type, sep: str = ',', quotechar: str | None = None, quoting: bool = False, escapechar: str | None = None, skiprows: int = 0, names: List[str] | None = None, encoding: str | None = None, parallel: bool = False, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, inline: bool = False, **kwargs)[source]

Read CSV (Comma-Separated Values) file into MatrixOne table (pandas-style).

This method provides a pandas-compatible interface for loading CSV files, with parameter names and behavior matching pandas.read_csv() where applicable.

Parameters:
  • filepath_or_buffer (str) – File path, stage path, or inline data - Local path: ‘/path/to/data.csv’ - Stage path: ‘stage://stage_name/file.csv’ - Inline data: CSV string (requires inline=True)

  • table (str or Model) – Table name or SQLAlchemy model class

  • sep (str) – Field separator/delimiter. Default: ‘,’ (pandas: sep)

  • quotechar (str, optional) – Character for quoting fields (pandas: quotechar)

  • quoting (bool) – If True, use OPTIONALLY ENCLOSED BY. Default: False

  • escapechar (str, optional) – Escape character (pandas: escapechar)

  • skiprows (int) – Number of header lines to skip. Default: 0 (pandas: skiprows)

  • names (list, optional) – Column names to load into (pandas: names)

  • encoding (str, optional) – Character encoding (pandas: encoding)

  • parallel (bool) – Enable parallel loading. Default: False

  • compression (str or CompressionFormat, optional) – Compression format

  • set_clause (dict, optional) – Column transformations

  • inline (bool) – If True, treat filepath_or_buffer as inline data. Default: False

Returns:

Load results with affected_rows

Return type:

ResultSet

Examples:

# Basic CSV (pandas-style)
client.load_data.read_csv('data.csv', table='users')

# CSV with header (pandas-style)
client.load_data.read_csv('data.csv', table='users', skiprows=1)

# Custom separator (pandas-style)
client.load_data.read_csv('data.txt', table='users', sep='|')

# With quote character (pandas-style)
client.load_data.read_csv('data.csv', table='users',
    quotechar='"', quoting=True)

# Load from stage
client.load_data.read_csv('stage://s3_stage/data.csv', table='users')

# Inline CSV
csv_data = "1,Alice\n2,Bob\n"
client.load_data.read_csv(csv_data, table='users', inline=True)

# With ORM model
client.load_data.read_csv('data.csv', table=User, skiprows=1)
read_csv_stage(stage_name: str, filename: str, table: str | type, sep: str = ',', quotechar: str | None = None, quoting: bool = False, escapechar: str | None = None, skiprows: int = 0, names: List[str] | None = None, encoding: str | None = None, parallel: bool = False, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, **kwargs)[source]

Read CSV file from external stage into MatrixOne table (pandas-style convenience method).

This is a convenience method that doesn’t require the ‘stage://’ protocol prefix.

Parameters:
  • stage_name (str) – Name of the external stage

  • filename (str) – Filename within the stage

  • table (str or Model) – Table name or SQLAlchemy model class

  • sep (str) – Field separator. Default: ‘,’

  • quotechar (str, optional) – Character for quoting fields

  • quoting (bool) – Use OPTIONALLY ENCLOSED BY. Default: False

  • skiprows (int) – Number of header lines to skip. Default: 0

  • names (list, optional) – Column names

  • encoding (str, optional) – Character encoding

  • parallel (bool) – Enable parallel loading

  • compression (str or CompressionFormat, optional) – Compression format

  • set_clause (dict, optional) – Column transformations

Returns:

Load results

Return type:

ResultSet

Examples:

# Load from S3 stage
client.load_data.read_csv_stage('s3_stage', 'data.csv', table='users')

# With options
client.load_data.read_csv_stage('backup_stage', 'data.tsv',
    table='users', sep='\t', skiprows=1)
read_csv_inline(data: str, table: str | type, sep: str = ',', quotechar: str | None = None, **kwargs)[source]

Read CSV data from inline string into MatrixOne table (pandas-style).

Parameters:
  • data (str) – CSV data string

  • table (str or Model) – Table name or SQLAlchemy model class

  • sep (str) – Field separator. Default: ‘,’

  • quotechar (str, optional) – Character for quoting fields

Returns:

Load results

Return type:

ResultSet

Examples:

csv_data = "1,Alice\n2,Bob\n"
client.load_data.read_csv_inline(csv_data, table='users')
read_json(filepath_or_buffer: str, table: str | type, lines: bool = True, orient: str = 'records', compression: str | CompressionFormat | None = None, inline: bool = False, **kwargs)[source]

Read JSON file into MatrixOne table (pandas-style).

This method provides a pandas-compatible interface for loading JSON/JSONL files, matching pandas.read_json() parameter names.

Parameters:
  • filepath_or_buffer (str) – File path, stage path, or inline data - Local path: ‘/path/to/data.jsonl’ - Stage path: ‘stage://stage_name/file.jsonl’ - Inline data: JSON string (requires inline=True)

  • table (str or Model) – Table name or SQLAlchemy model class

  • lines (bool) – If True, read file as JSON Lines (one object per line). Default: True (pandas: lines)

  • orient (str) – JSON structure format (pandas: orient) - ‘records’: List of dicts like [{‘col1’: val1}, {‘col2’: val2}] - ‘values’: List of lists like [[val1, val2], [val3, val4]] Default: ‘records’

  • compression (str or CompressionFormat, optional) – Compression format

  • inline (bool) – If True, treat filepath_or_buffer as inline data. Default: False

Returns:

Load results with affected_rows

Return type:

ResultSet

Examples:

# JSON Lines with objects (pandas-style)
client.load_data.read_json('events.jsonl', table='events', lines=True)

# JSON Lines with arrays
client.load_data.read_json('data.jsonl', table='users',
    lines=True, orient='values')

# From stage
client.load_data.read_json('stage://s3/events.jsonl', table='events')

# Compressed JSON Lines
client.load_data.read_json('events.jsonl.gz', table='events',
    compression='gzip')

# Inline JSON
json_data = '{"id":1,"name":"Alice"}\n{"id":2,"name":"Bob"}\n'
client.load_data.read_json(json_data, table='users', inline=True)
read_parquet(filepath_or_buffer: str, table: str | type, **kwargs)[source]

Read Parquet file into MatrixOne table (pandas-style).

This method provides a pandas-compatible interface for loading Parquet files, matching pandas.read_parquet() behavior.

Parameters:
  • filepath_or_buffer (str) – File path or stage path - Local path: ‘/path/to/data.parquet’ - Stage path: ‘stage://stage_name/file.parquet’

  • table (str or Model) – Table name or SQLAlchemy model class

Returns:

Load results with affected_rows

Return type:

ResultSet

MatrixOne Parquet Support:

Fully Supported Features:
  • All compression formats (NONE, SNAPPY, GZIP, LZ4, ZSTD, Brotli)

  • Parquet 1.0 and 2.0 formats

  • Column statistics (write_statistics=True/False)

  • Nullable columns (nullable=True/False, NULL values supported)

  • Large files, wide tables, empty files

Supported Data Types:
  • INT32/INT64/UINT32 -> INT/BIGINT/INT UNSIGNED

  • FLOAT32/FLOAT64 -> FLOAT/DOUBLE

  • STRING -> VARCHAR (not TEXT!)

  • BOOLEAN -> BOOL

  • DATE32/DATE64 -> DATE

  • TIME32/TIME64 -> TIME

  • TIMESTAMP(tz=’UTC’) -> TIMESTAMP (UTC timezone required!)

Not Supported:
  • Dictionary encoding (use_dictionary must be False)

  • INT8/INT16 types (use INT32 or INT64)

  • large_string type (use pa.string())

  • TIMESTAMP without timezone (must add tz=’UTC’)

  • BINARY, DECIMAL types

  • Complex types (List, Struct, Map)

Recommended Settings:

compression=’snappy’ # Enable compression use_dictionary=False # Must disable dictionary encoding write_statistics=True # Enable statistics data_page_version=’2.0’ # Use Parquet 2.0

Common Errors:
  • “indexed INT64 page is not yet implemented” -> Set use_dictionary=False

  • “load STRING(required) to TEXT NULL is not yet implemented” -> Use VARCHAR in table definition, not TEXT

  • “load TIMESTAMP(isAdjustedToUTC=false…” -> Use pa.timestamp(‘ms’, tz=’UTC’)

Example:

import pyarrow as pa
import pyarrow.parquet as pq

# Define non-nullable schema (REQUIRED)
schema = pa.schema([
    pa.field('id', pa.int64(), nullable=False),
    pa.field('name', pa.string(), nullable=False)
])

# Create table
table = pa.table({
    'id': pa.array([1, 2, 3], type=pa.int64()),
    'name': pa.array(['Alice', 'Bob', 'Charlie'], type=pa.string())
}, schema=schema)

# Write with MatrixOne-compatible options (Verified 2025)
pq.write_table(
    table, 'data.parq',
    compression='snappy',        # ✅ All compression supported!
    use_dictionary=False,        # ⚠️ Required! Dict not supported
    write_statistics=True,       # ✅ Supported! (Double-checked)
    data_page_version='2.0'      # ✅ Parquet 2.0 supported!
)

# Now load the file
client.load_data.from_parquet('data.parq', User)

Examples:

# Basic Parquet (pandas-style)
client.load_data.read_parquet('data.parquet', table='users')

# From stage
client.load_data.read_parquet('stage://s3/data.parquet', table='users')

# With ORM model
client.load_data.read_parquet('data.parquet', table=User)
read_json_stage(stage_name: str, filename: str, table: str | type, lines: bool = True, orient: str = 'records', compression: str | CompressionFormat | None = None, **kwargs)[source]

Read JSON file from external stage into MatrixOne table (pandas-style convenience method).

Parameters:
  • stage_name (str) – Name of the external stage

  • filename (str) – Filename within the stage

  • table (str or Model) – Table name or SQLAlchemy model class

  • lines (bool) – If True, read as JSON Lines. Default: True

  • orient (str) – JSON structure. Default: ‘records’

  • compression (str or CompressionFormat, optional) – Compression format

Returns:

Load results

Return type:

ResultSet

Examples:

client.load_data.read_json_stage('s3_stage', 'events.jsonl', table='events')
read_parquet_stage(stage_name: str, filename: str, table: str | type, **kwargs)[source]

Read Parquet file from external stage into MatrixOne table (pandas-style convenience method).

Parameters:
  • stage_name (str) – Name of the external stage

  • filename (str) – Filename within the stage

  • table (str or Model) – Table name or SQLAlchemy model class

Returns:

Load results

Return type:

ResultSet

Examples:

client.load_data.read_parquet_stage('s3_stage', 'data.parquet', table='users')
read_json_inline(data: str, table: str | type, orient: str = 'records', **kwargs)[source]

Read JSON data from inline string into MatrixOne table (pandas-style).

Parameters:
  • data (str) – JSON data string

  • table (str or Model) – Table name or SQLAlchemy model class

  • orient (str) – JSON structure. Default: ‘records’

Returns:

Load results

Return type:

ResultSet

Examples:

json_data = '{"id":1,"name":"Alice"}\n'
client.load_data.read_json_inline(json_data, table='users')
from_file(file_path: str, table_name_or_model, fields_terminated_by: str = ',', fields_enclosed_by: str | None = None, fields_optionally_enclosed: bool = False, fields_escaped_by: str | None = None, lines_terminated_by: str | None = None, lines_starting_by: str | None = None, ignore_lines: int = 0, character_set: str | None = None, parallel: bool = False, columns: List[str] | None = None, format: str | LoadDataFormat | None = None, jsondata: str | JsonDataStructure | None = None, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, **kwargs)[source]

Load data from a file into a table.

This is the main method for loading data from files. It supports all standard LOAD DATA INFILE options and provides a clean Python interface.

Parameters:
  • file_path (str) – Path to the file to load. Can be: - Local file path: ‘/path/to/data.csv’ - Stage file: ‘stage://stage_name/path/to/file’

  • table_name_or_model – Either a table name (str) or SQLAlchemy model class

  • fields_terminated_by (str) – Character(s) that separate fields. Default: ‘,’ (comma). Common values: - ‘,’ for CSV files - ‘t’ for TSV files - ‘|’ for pipe-delimited - ‘*’ or any custom delimiter

  • fields_enclosed_by (str, optional) – Character that encloses field values. Used when fields contain the delimiter character. Common values: ‘”’ or “’”

  • fields_optionally_enclosed (bool) – If True, use OPTIONALLY ENCLOSED BY. This means only some fields may be enclosed, not all. Default: False

  • fields_escaped_by (str, optional) – Escape character for special characters. Default: ‘' (backslash) when enclosed_by is specified.

  • lines_terminated_by (str, optional) – Character(s) that terminate lines. Default: ‘n’. For Windows files: ‘rn’

  • lines_starting_by (str, optional) – Prefix that identifies lines to load. Lines not starting with this prefix are ignored.

  • ignore_lines (int) – Number of lines to skip at the beginning of the file. Useful for skipping header rows. Default: 0

  • character_set (str, optional) – Character set of the input file. Examples: ‘utf8’, ‘utf-8’, ‘utf-16’, ‘gbk’

  • parallel (bool) – Enable parallel loading for large files. Default: False. Set to True for faster loading of large files.

  • columns (list, optional) – List of column names to load data into. When specified, only these columns are populated. Example: [‘col1’, ‘col2’, ‘col3’]

  • format (str or LoadDataFormat, optional) – File format. Supported values: - LoadDataFormat.CSV or ‘csv’ (default) - LoadDataFormat.JSONLINE or ‘jsonline’ - LoadDataFormat.PARQUET or ‘parquet’

  • jsondata (str or JsonDataStructure, optional) – JSON data structure (for JSONLINE). - JsonDataStructure.OBJECT or ‘object’: JSON objects (one per line) - JsonDataStructure.ARRAY or ‘array’: JSON arrays (one per line) Required when format is JSONLINE

  • compression (str or CompressionFormat, optional) – Compression format. - CompressionFormat.NONE or ‘none’: No compression (default) - CompressionFormat.GZIP or ‘gzip’: Gzip compression - CompressionFormat.BZIP2 or ‘bzip2’: Bzip2 compression - CompressionFormat.LZ4 or ‘lz4’: LZ4 compression - CompressionFormat.TAR_GZ or ‘tar.gz’: Tar+Gzip - CompressionFormat.TAR_BZ2 or ‘tar.bz2’: Tar+Bzip2

  • set_clause (dict, optional) – Column transformations. Dictionary of column transformations, e.g.: {‘col1’: ‘NULLIF(col1, “null”)’, ‘col2’: ‘NULLIF(col2, 1)’}

  • **kwargs – Additional options for future extensions

Returns:

Object containing load results with affected_rows count

Return type:

ResultSet

Raises:
  • ValueError – If parameters are invalid

  • QueryError – If file loading fails

Examples:

# Basic CSV loading
>>> result = client.load_data.from_file('/path/to/users.csv', 'users')
>>> print(f"Loaded {result.affected_rows} rows")

# CSV with header row
>>> result = client.load_data.from_file(
...     '/path/to/data.csv',
...     'products',
...     ignore_lines=1
... )

# Pipe-delimited with quoted fields
>>> result = client.load_data.from_file(
...     '/path/to/data.txt',
...     'orders',
...     fields_terminated_by='|',
...     fields_enclosed_by='"',
...     fields_escaped_by='\\'
... )

# Tab-separated values (TSV)
>>> result = client.load_data.from_file(
...     '/path/to/data.tsv',
...     'logs',
...     fields_terminated_by='\t'
... )

# Load with character set conversion
>>> result = client.load_data.from_file(
...     '/path/to/data.csv',
...     'users',
...     character_set='utf-8'
... )

# Load specific columns only
>>> result = client.load_data.from_file(
...     '/path/to/data.csv',
...     'users',
...     columns=['name', 'email', 'age']
... )

# Parallel loading for large files
>>> result = client.load_data.from_file(
...     '/path/to/large_data.csv',
...     'big_table',
...     parallel=True
... )

# Load with lines starting by filter
>>> result = client.load_data.from_file(
...     '/path/to/data.txt',
...     'filtered_data',
...     lines_starting_by='DATA:'
... )

# Load JSONLINE format (object)
>>> result = client.load_data.from_file(
...     '/path/to/data.jl',
...     'users',
...     format=LoadDataFormat.JSONLINE,
...     jsondata=JsonDataStructure.OBJECT
... )

# Load JSONLINE format (array) with compression
>>> result = client.load_data.from_file(
...     '/path/to/data.jl.gz',
...     'users',
...     format=LoadDataFormat.JSONLINE,
...     jsondata=JsonDataStructure.ARRAY,
...     compression=CompressionFormat.GZIP
... )

# Load Parquet format
>>> result = client.load_data.from_file(
...     '/path/to/data.parq',
...     'users',
...     format=LoadDataFormat.PARQUET
... )

# Load with SET clause (NULLIF)
>>> result = client.load_data.from_file(
...     '/path/to/data.csv',
...     'users',
...     set_clause={
...         'col1': 'NULLIF(col1, "null")',
...         'col2': 'NULLIF(col2, 1)'
...     }
... )
from_local_file(file_path: str, table_name_or_model, **kwargs)[source]

Load data from a local file using LOAD DATA LOCAL INFILE.

This method uses the LOCAL keyword, which allows loading files from the client machine rather than the server machine.

Parameters:
  • file_path (str) – Path to the local file

  • table_name_or_model – Either a table name (str) or SQLAlchemy model class

  • **kwargs – Same options as from_file()

Returns:

Object containing load results

Return type:

ResultSet

Examples:

# Load from client machine
>>> result = client.load_data.from_local_file(
...     '/local/path/to/data.csv',
...     'users',
...     ignore_lines=1
... )

AsyncLoadDataManager

Pandas-style data loading manager for asynchronous operations.

Provides async versions of all loading methods from LoadDataManager for non-blocking data loading operations.

class matrixone.load_data.AsyncLoadDataManager(client, executor=None)[source]

Bases: BaseLoadDataManager

Asynchronous data loading manager for MatrixOne bulk data operations.

Provides the same comprehensive bulk data loading functionality as LoadDataManager but with full async/await support for non-blocking I/O operations. Ideal for high-concurrency applications and async web frameworks requiring data loading.

Key Features:

  • Non-blocking operations: All load operations use async/await

  • Async file loading: Load data files without blocking the event loop

  • Concurrent loading: Load multiple files concurrently

  • Multiple formats: CSV, TSV, JSON, JSONLines, Parquet (async)

  • Async stage integration: Load from external stages asynchronously

  • Transaction-aware: Full integration with async transaction contexts

  • Executor pattern: Works with both async client and async session

Executor Pattern:

  • If executor is None, uses self.client.execute (default async client-level executor)

  • If executor is provided (e.g., async session), uses executor.execute (async transaction-aware)

  • All operations are non-blocking and use async/await

  • Enables concurrent data loading operations

Usage Examples:

from matrixone import AsyncClient
from matrixone.orm import Column, Integer, String, Float, Base
from sqlalchemy import insert
import asyncio

# Define ORM models (recommended approach)
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    age = Column(Integer)

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    amount = Column(Float)

class Product(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String(200))
    price = Column(Float)

async def main():
    client = AsyncClient()
    await client.connect(host='localhost', port=6001, user='root', password='111', database='test')

    # ========================================
    # Basic Async CSV Loading with ORM
    # ========================================

    # Load CSV using ORM model (recommended)
    await client.load_data.from_csv('/path/to/users.csv', User)

    # CSV with header row
    await client.load_data.from_csv(
        '/path/to/users.csv',
        User,
        ignore_lines=1  # Skip header
    )

    # Custom delimiter and quote character (async)
    await client.load_data.from_csv(
        '/path/to/data.txt',
        Order,
        delimiter='|',
        enclosed_by='"'
    )

    # ========================================
    # Advanced Loading Options
    # ========================================

    # Parallel async loading (high performance)
    await client.load_data.from_csv(
        '/path/to/large_data.csv',
        User,
        parallel=True
    )

    # Load compressed file asynchronously
    await client.load_data.from_csv(
        '/path/to/data.csv.gz',
        User,
        compression='gzip'
    )

    # Column mapping with transformation
    await client.load_data.from_csv(
        '/path/to/data.csv',
        User,
        columns=['name', 'email', 'age'],
        set_clause={'created_at': 'NOW()'}
    )

    # Load JSON Lines format asynchronously
    await client.load_data.from_jsonlines('/path/to/data.jsonl', User)

    # ========================================
    # Load from External Stages
    # ========================================

    # Load from S3 stage using ORM model
    await client.load_data.from_stage_csv('production_s3', 'users.csv', User)

    # Load from local stage
    await client.load_data.from_stage_csv('local_data', 'orders.csv', Order)

    # ========================================
    # Concurrent Loading (High Performance)
    # ========================================

    # Concurrent loading of multiple files using ORM models
    await asyncio.gather(
        client.load_data.from_csv('/path/to/users.csv', User),
        client.load_data.from_csv('/path/to/orders.csv', Order),
        client.load_data.from_csv('/path/to/products.csv', Product)
    )

    # ========================================
    # Transactional Async Loading
    # ========================================

    # Using within async transaction
    async with client.session() as session:
        # Load multiple files atomically using ORM models
        await session.load_data.from_csv('/path/to/users.csv', User)
        await session.load_data.from_csv('/path/to/orders.csv', Order)

        # Insert additional data within same transaction using SQLAlchemy
        await session.execute(insert(User).values(name='Admin', email='admin@example.com'))
        # All operations commit together

    await client.disconnect()

asyncio.run(main())

Performance Benefits:

  • Non-blocking I/O: Don’t block the event loop during file loading

  • Concurrent loading: Load multiple files simultaneously

  • Async web frameworks: Perfect for FastAPI, aiohttp, etc.

  • High concurrency: Handle many load operations concurrently

Use Cases:

  • Async ETL pipelines: Non-blocking data transformation and loading

  • Real-time data ingestion: Load data without blocking request handling

  • Async web services: Load data in response to API calls

  • Concurrent batch loading: Load multiple files in parallel

  • High-throughput applications: Maximize I/O throughput

See also

  • AsyncClient: For async database operations

  • AsyncStageManager: For async stage management

  • LoadDataManager: For synchronous data loading

__init__(client, executor=None)[source]

Initialize AsyncLoadDataManager.

Parameters:
  • client – Client object

  • executor – Optional executor (e.g., async session) for executing SQL. If None, uses client.execute

async read_csv(filepath_or_buffer: str, table: str | type, sep: str = ',', quotechar: str | None = None, quoting: bool = False, escapechar: str | None = None, skiprows: int = 0, names: List[str] | None = None, encoding: str | None = None, parallel: bool = False, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, inline: bool = False, **kwargs)[source]

Async version of read_csv (pandas-style)

async read_json(filepath_or_buffer: str, table: str | type, lines: bool = True, orient: str = 'records', compression: str | CompressionFormat | None = None, inline: bool = False, **kwargs)[source]

Async version of read_json (pandas-style)

async read_parquet(filepath_or_buffer: str, table: str | type, **kwargs)[source]

Async version of read_parquet (pandas-style)

async read_csv_stage(stage_name: str, filename: str, table: str | type, sep: str = ',', quotechar: str | None = None, quoting: bool = False, escapechar: str | None = None, skiprows: int = 0, names: List[str] | None = None, encoding: str | None = None, parallel: bool = False, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, **kwargs)[source]

Async version of read_csv_stage (pandas-style)

async read_json_stage(stage_name: str, filename: str, table: str | type, lines: bool = True, orient: str = 'records', compression: str | CompressionFormat | None = None, **kwargs)[source]

Async version of read_json_stage (pandas-style)

async read_parquet_stage(stage_name: str, filename: str, table: str | type, **kwargs)[source]

Async version of read_parquet_stage (pandas-style)

async read_csv_inline(data: str, table: str | type, sep: str = ',', quotechar: str | None = None, **kwargs)[source]

Async version of read_csv_inline (pandas-style)

async read_json_inline(data: str, table: str | type, orient: str = 'records', **kwargs)[source]

Async version of read_json_inline (pandas-style)

async from_file(file_path: str, table_name_or_model, fields_terminated_by: str = ',', fields_enclosed_by: str | None = None, fields_optionally_enclosed: bool = False, fields_escaped_by: str | None = None, lines_terminated_by: str | None = None, lines_starting_by: str | None = None, ignore_lines: int = 0, character_set: str | None = None, parallel: bool = False, columns: List[str] | None = None, format: str | LoadDataFormat | None = None, jsondata: str | JsonDataStructure | None = None, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, **kwargs)[source]

Async version of from_file - builds SQL and executes asynchronously

async from_local_file(file_path: str, table_name_or_model, **kwargs)[source]

Async version of from_local_file

Enumerations

LoadDataFormat

Supported file formats for data loading.

class matrixone.load_data.LoadDataFormat(value)[source]

Bases: str, Enum

Supported file formats for LOAD DATA

CSV = 'csv'
JSONLINE = 'jsonline'
PARQUET = 'parquet'
__format__(format_spec)

Returns format using actual value type unless __str__ has been overridden.

CompressionFormat

Supported compression formats for data files.

class matrixone.load_data.CompressionFormat(value)[source]

Bases: str, Enum

Supported compression formats

NONE = 'none'
GZIP = 'gzip'
GZ = 'gz'
BZIP2 = 'bzip2'
BZ2 = 'bz2'
LZ4 = 'lz4'
TAR_GZ = 'tar.gz'
TAR_BZ2 = 'tar.bz2'
__format__(format_spec)

Returns format using actual value type unless __str__ has been overridden.

JsonDataStructure

JSON data structure types (for compatibility, prefer using orient parameter in read_json()).

class matrixone.load_data.JsonDataStructure(value)[source]

Bases: str, Enum

JSONLINE data structure types

OBJECT = 'object'
ARRAY = 'array'
__format__(format_spec)

Returns format using actual value type unless __str__ has been overridden.