Load Data Manager
LoadDataManager
Pandas-style data loading manager for synchronous operations.
Provides read_csv(), read_json(), and read_parquet() methods for loading
data files into MatrixOne tables with pandas-compatible parameter naming.
- class matrixone.load_data.LoadDataManager(client, executor=None)[source]
Bases:
BaseLoadDataManagerSynchronous data loading manager for MatrixOne bulk data operations.
This class provides a comprehensive and high-performance interface for loading large volumes of data from files into MatrixOne tables. It supports multiple data formats, compression formats, and advanced loading options for optimal performance and flexibility.
Key Features:
Multiple data formats: CSV, TSV, JSON, JSONLines, Parquet
Flexible delimiters: Customizable field and line terminators
Compression support: gzip, bzip2, tar.gz, tar.bz2, lz4, lzo
Parallel loading: High-performance parallel data loading
Character set conversion: Support for various character encodings
Column mapping: Map file columns to table columns
Data transformation: Apply SET clauses for column transformations
Stage integration: Load from external stages (S3, local filesystem)
Transaction-aware: Full integration with transaction contexts
Header handling: Skip header rows automatically
Executor Pattern:
If executor is None, uses self.client.execute (default client-level executor)
If executor is provided (e.g., session), uses executor.execute (transaction-aware)
All operations can participate in transactions when used via session
Supported Data Formats:
CSV: Comma-Separated Values (most common)
TSV: Tab-Separated Values
JSON: JSON objects (one per file)
JSONLines: JSON Lines format (one JSON object per line)
Parquet: Apache Parquet columnar format
Usage Examples:
from matrixone import Client from matrixone.orm import Column, Integer, String, VectorType client = Client(host='localhost', port=6001, user='root', password='111', database='test') # Define ORM models (recommended approach) class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) user_id = Column(Integer) amount = Column(Float) # ======================================== # Basic CSV Loading with ORM Models # ======================================== # Load CSV using ORM model (recommended) client.load_data.from_csv('/path/to/users.csv', User) # CSV with header row (skip first line) client.load_data.from_csv( '/path/to/users.csv', User, ignore_lines=1 # Skip header ) # Custom delimiter and quote character client.load_data.from_csv( '/path/to/data.txt', Order, delimiter='|', enclosed_by='"', optionally_enclosed=True ) # ======================================== # Advanced Loading Options # ======================================== # Load compressed file client.load_data.from_csv( '/path/to/data.csv.gz', User, compression='gzip' ) # Parallel loading for large files (high performance) client.load_data.from_csv( '/path/to/large_data.csv', User, parallel=True # Enables parallel loading ) # Column mapping and transformation client.load_data.from_csv( '/path/to/data.csv', User, columns=['name', 'email', 'age'], set_clause={'created_at': 'NOW()', 'status': "'active'"} ) # ======================================== # Different File Formats # ======================================== # Load TSV (Tab-Separated Values) client.load_data.from_tsv('/path/to/data.tsv', User) # Load JSON Lines format client.load_data.from_jsonlines('/path/to/data.jsonl', User) # ======================================== # Load from External Stages # ======================================== # Load from S3 stage using ORM model client.load_data.from_stage_csv('production_s3', 'users.csv', User) # Load from local filesystem stage client.load_data.from_stage_csv('local_data', 'orders.csv', Order) # Load from stage with custom options client.load_data.from_stage( 'stage://my_stage/data.csv', User, file_format='csv', delimiter=',' ) # ======================================== # Transactional Data Loading # ======================================== # Using within a transaction (all loads atomic) with client.session() as session: # Load multiple files atomically using ORM models session.load_data.from_csv('/path/to/users.csv', User) session.load_data.from_csv('/path/to/orders.csv', Order) # Insert additional data within same transaction session.execute(insert(User).values(name='Admin', email='admin@example.com')) # All operations commit together
Performance Tips:
Use parallel=True for large files (>100MB) to enable parallel loading
Use compressed files (gzip, bzip2) to reduce I/O and transfer time
Specify columns parameter to skip unnecessary columns
Use appropriate character_set to avoid encoding overhead
For very large datasets, consider loading in batches
Common Use Cases:
Data migration: Import data from legacy systems
ETL pipelines: Load transformed data into MatrixOne
Data warehouse loading: Bulk load fact and dimension tables
Log file ingestion: Import application or server logs
CSV imports: Load data from spreadsheets or exports
See also
StageManager: For managing external stages
Client.insert: For small-scale data insertion
Client.batch_insert: For programmatic batch inserts
- __init__(client, executor=None)[source]
Initialize LoadDataManager.
- Parameters:
client – Client object that provides execute() method
executor – Optional executor (e.g., session) for executing SQL. If None, uses client.execute
- read_csv(filepath_or_buffer: str, table: str | type, sep: str = ',', quotechar: str | None = None, quoting: bool = False, escapechar: str | None = None, skiprows: int = 0, names: List[str] | None = None, encoding: str | None = None, parallel: bool = False, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, inline: bool = False, **kwargs)[source]
Read CSV (Comma-Separated Values) file into MatrixOne table (pandas-style).
This method provides a pandas-compatible interface for loading CSV files, with parameter names and behavior matching pandas.read_csv() where applicable.
- Parameters:
filepath_or_buffer (str) – File path, stage path, or inline data - Local path: ‘/path/to/data.csv’ - Stage path: ‘stage://stage_name/file.csv’ - Inline data: CSV string (requires inline=True)
table (str or Model) – Table name or SQLAlchemy model class
sep (str) – Field separator/delimiter. Default: ‘,’ (pandas: sep)
quotechar (str, optional) – Character for quoting fields (pandas: quotechar)
quoting (bool) – If True, use OPTIONALLY ENCLOSED BY. Default: False
escapechar (str, optional) – Escape character (pandas: escapechar)
skiprows (int) – Number of header lines to skip. Default: 0 (pandas: skiprows)
names (list, optional) – Column names to load into (pandas: names)
encoding (str, optional) – Character encoding (pandas: encoding)
parallel (bool) – Enable parallel loading. Default: False
compression (str or CompressionFormat, optional) – Compression format
set_clause (dict, optional) – Column transformations
inline (bool) – If True, treat filepath_or_buffer as inline data. Default: False
- Returns:
Load results with affected_rows
- Return type:
Examples:
# Basic CSV (pandas-style) client.load_data.read_csv('data.csv', table='users') # CSV with header (pandas-style) client.load_data.read_csv('data.csv', table='users', skiprows=1) # Custom separator (pandas-style) client.load_data.read_csv('data.txt', table='users', sep='|') # With quote character (pandas-style) client.load_data.read_csv('data.csv', table='users', quotechar='"', quoting=True) # Load from stage client.load_data.read_csv('stage://s3_stage/data.csv', table='users') # Inline CSV csv_data = "1,Alice\n2,Bob\n" client.load_data.read_csv(csv_data, table='users', inline=True) # With ORM model client.load_data.read_csv('data.csv', table=User, skiprows=1)
- read_csv_stage(stage_name: str, filename: str, table: str | type, sep: str = ',', quotechar: str | None = None, quoting: bool = False, escapechar: str | None = None, skiprows: int = 0, names: List[str] | None = None, encoding: str | None = None, parallel: bool = False, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, **kwargs)[source]
Read CSV file from external stage into MatrixOne table (pandas-style convenience method).
This is a convenience method that doesn’t require the ‘stage://’ protocol prefix.
- Parameters:
stage_name (str) – Name of the external stage
filename (str) – Filename within the stage
table (str or Model) – Table name or SQLAlchemy model class
sep (str) – Field separator. Default: ‘,’
quotechar (str, optional) – Character for quoting fields
quoting (bool) – Use OPTIONALLY ENCLOSED BY. Default: False
skiprows (int) – Number of header lines to skip. Default: 0
names (list, optional) – Column names
encoding (str, optional) – Character encoding
parallel (bool) – Enable parallel loading
compression (str or CompressionFormat, optional) – Compression format
set_clause (dict, optional) – Column transformations
- Returns:
Load results
- Return type:
Examples:
# Load from S3 stage client.load_data.read_csv_stage('s3_stage', 'data.csv', table='users') # With options client.load_data.read_csv_stage('backup_stage', 'data.tsv', table='users', sep='\t', skiprows=1)
- read_csv_inline(data: str, table: str | type, sep: str = ',', quotechar: str | None = None, **kwargs)[source]
Read CSV data from inline string into MatrixOne table (pandas-style).
- Parameters:
- Returns:
Load results
- Return type:
Examples:
csv_data = "1,Alice\n2,Bob\n" client.load_data.read_csv_inline(csv_data, table='users')
- read_json(filepath_or_buffer: str, table: str | type, lines: bool = True, orient: str = 'records', compression: str | CompressionFormat | None = None, inline: bool = False, **kwargs)[source]
Read JSON file into MatrixOne table (pandas-style).
This method provides a pandas-compatible interface for loading JSON/JSONL files, matching pandas.read_json() parameter names.
- Parameters:
filepath_or_buffer (str) – File path, stage path, or inline data - Local path: ‘/path/to/data.jsonl’ - Stage path: ‘stage://stage_name/file.jsonl’ - Inline data: JSON string (requires inline=True)
table (str or Model) – Table name or SQLAlchemy model class
lines (bool) – If True, read file as JSON Lines (one object per line). Default: True (pandas: lines)
orient (str) – JSON structure format (pandas: orient) - ‘records’: List of dicts like [{‘col1’: val1}, {‘col2’: val2}] - ‘values’: List of lists like [[val1, val2], [val3, val4]] Default: ‘records’
compression (str or CompressionFormat, optional) – Compression format
inline (bool) – If True, treat filepath_or_buffer as inline data. Default: False
- Returns:
Load results with affected_rows
- Return type:
Examples:
# JSON Lines with objects (pandas-style) client.load_data.read_json('events.jsonl', table='events', lines=True) # JSON Lines with arrays client.load_data.read_json('data.jsonl', table='users', lines=True, orient='values') # From stage client.load_data.read_json('stage://s3/events.jsonl', table='events') # Compressed JSON Lines client.load_data.read_json('events.jsonl.gz', table='events', compression='gzip') # Inline JSON json_data = '{"id":1,"name":"Alice"}\n{"id":2,"name":"Bob"}\n' client.load_data.read_json(json_data, table='users', inline=True)
- read_parquet(filepath_or_buffer: str, table: str | type, **kwargs)[source]
Read Parquet file into MatrixOne table (pandas-style).
This method provides a pandas-compatible interface for loading Parquet files, matching pandas.read_parquet() behavior.
- Parameters:
- Returns:
Load results with affected_rows
- Return type:
MatrixOne Parquet Support:
- Fully Supported Features:
All compression formats (NONE, SNAPPY, GZIP, LZ4, ZSTD, Brotli)
Parquet 1.0 and 2.0 formats
Column statistics (write_statistics=True/False)
Nullable columns (nullable=True/False, NULL values supported)
Large files, wide tables, empty files
- Supported Data Types:
INT32/INT64/UINT32 -> INT/BIGINT/INT UNSIGNED
FLOAT32/FLOAT64 -> FLOAT/DOUBLE
STRING -> VARCHAR (not TEXT!)
BOOLEAN -> BOOL
DATE32/DATE64 -> DATE
TIME32/TIME64 -> TIME
TIMESTAMP(tz=’UTC’) -> TIMESTAMP (UTC timezone required!)
- Not Supported:
Dictionary encoding (use_dictionary must be False)
INT8/INT16 types (use INT32 or INT64)
large_string type (use pa.string())
TIMESTAMP without timezone (must add tz=’UTC’)
BINARY, DECIMAL types
Complex types (List, Struct, Map)
- Recommended Settings:
compression=’snappy’ # Enable compression use_dictionary=False # Must disable dictionary encoding write_statistics=True # Enable statistics data_page_version=’2.0’ # Use Parquet 2.0
- Common Errors:
“indexed INT64 page is not yet implemented” -> Set use_dictionary=False
“load STRING(required) to TEXT NULL is not yet implemented” -> Use VARCHAR in table definition, not TEXT
“load TIMESTAMP(isAdjustedToUTC=false…” -> Use pa.timestamp(‘ms’, tz=’UTC’)
Example:
import pyarrow as pa import pyarrow.parquet as pq # Define non-nullable schema (REQUIRED) schema = pa.schema([ pa.field('id', pa.int64(), nullable=False), pa.field('name', pa.string(), nullable=False) ]) # Create table table = pa.table({ 'id': pa.array([1, 2, 3], type=pa.int64()), 'name': pa.array(['Alice', 'Bob', 'Charlie'], type=pa.string()) }, schema=schema) # Write with MatrixOne-compatible options (Verified 2025) pq.write_table( table, 'data.parq', compression='snappy', # ✅ All compression supported! use_dictionary=False, # ⚠️ Required! Dict not supported write_statistics=True, # ✅ Supported! (Double-checked) data_page_version='2.0' # ✅ Parquet 2.0 supported! ) # Now load the file client.load_data.from_parquet('data.parq', User)
Examples:
# Basic Parquet (pandas-style) client.load_data.read_parquet('data.parquet', table='users') # From stage client.load_data.read_parquet('stage://s3/data.parquet', table='users') # With ORM model client.load_data.read_parquet('data.parquet', table=User)
- read_json_stage(stage_name: str, filename: str, table: str | type, lines: bool = True, orient: str = 'records', compression: str | CompressionFormat | None = None, **kwargs)[source]
Read JSON file from external stage into MatrixOne table (pandas-style convenience method).
- Parameters:
stage_name (str) – Name of the external stage
filename (str) – Filename within the stage
table (str or Model) – Table name or SQLAlchemy model class
lines (bool) – If True, read as JSON Lines. Default: True
orient (str) – JSON structure. Default: ‘records’
compression (str or CompressionFormat, optional) – Compression format
- Returns:
Load results
- Return type:
Examples:
client.load_data.read_json_stage('s3_stage', 'events.jsonl', table='events')
- read_parquet_stage(stage_name: str, filename: str, table: str | type, **kwargs)[source]
Read Parquet file from external stage into MatrixOne table (pandas-style convenience method).
- Parameters:
- Returns:
Load results
- Return type:
Examples:
client.load_data.read_parquet_stage('s3_stage', 'data.parquet', table='users')
- read_json_inline(data: str, table: str | type, orient: str = 'records', **kwargs)[source]
Read JSON data from inline string into MatrixOne table (pandas-style).
- Parameters:
- Returns:
Load results
- Return type:
Examples:
json_data = '{"id":1,"name":"Alice"}\n' client.load_data.read_json_inline(json_data, table='users')
- from_file(file_path: str, table_name_or_model, fields_terminated_by: str = ',', fields_enclosed_by: str | None = None, fields_optionally_enclosed: bool = False, fields_escaped_by: str | None = None, lines_terminated_by: str | None = None, lines_starting_by: str | None = None, ignore_lines: int = 0, character_set: str | None = None, parallel: bool = False, columns: List[str] | None = None, format: str | LoadDataFormat | None = None, jsondata: str | JsonDataStructure | None = None, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, **kwargs)[source]
Load data from a file into a table.
This is the main method for loading data from files. It supports all standard LOAD DATA INFILE options and provides a clean Python interface.
- Parameters:
file_path (str) – Path to the file to load. Can be: - Local file path: ‘/path/to/data.csv’ - Stage file: ‘stage://stage_name/path/to/file’
table_name_or_model – Either a table name (str) or SQLAlchemy model class
fields_terminated_by (str) – Character(s) that separate fields. Default: ‘,’ (comma). Common values: - ‘,’ for CSV files - ‘t’ for TSV files - ‘|’ for pipe-delimited - ‘*’ or any custom delimiter
fields_enclosed_by (str, optional) – Character that encloses field values. Used when fields contain the delimiter character. Common values: ‘”’ or “’”
fields_optionally_enclosed (bool) – If True, use OPTIONALLY ENCLOSED BY. This means only some fields may be enclosed, not all. Default: False
fields_escaped_by (str, optional) – Escape character for special characters. Default: ‘' (backslash) when enclosed_by is specified.
lines_terminated_by (str, optional) – Character(s) that terminate lines. Default: ‘n’. For Windows files: ‘rn’
lines_starting_by (str, optional) – Prefix that identifies lines to load. Lines not starting with this prefix are ignored.
ignore_lines (int) – Number of lines to skip at the beginning of the file. Useful for skipping header rows. Default: 0
character_set (str, optional) – Character set of the input file. Examples: ‘utf8’, ‘utf-8’, ‘utf-16’, ‘gbk’
parallel (bool) – Enable parallel loading for large files. Default: False. Set to True for faster loading of large files.
columns (list, optional) – List of column names to load data into. When specified, only these columns are populated. Example: [‘col1’, ‘col2’, ‘col3’]
format (str or LoadDataFormat, optional) – File format. Supported values: - LoadDataFormat.CSV or ‘csv’ (default) - LoadDataFormat.JSONLINE or ‘jsonline’ - LoadDataFormat.PARQUET or ‘parquet’
jsondata (str or JsonDataStructure, optional) – JSON data structure (for JSONLINE). - JsonDataStructure.OBJECT or ‘object’: JSON objects (one per line) - JsonDataStructure.ARRAY or ‘array’: JSON arrays (one per line) Required when format is JSONLINE
compression (str or CompressionFormat, optional) – Compression format. - CompressionFormat.NONE or ‘none’: No compression (default) - CompressionFormat.GZIP or ‘gzip’: Gzip compression - CompressionFormat.BZIP2 or ‘bzip2’: Bzip2 compression - CompressionFormat.LZ4 or ‘lz4’: LZ4 compression - CompressionFormat.TAR_GZ or ‘tar.gz’: Tar+Gzip - CompressionFormat.TAR_BZ2 or ‘tar.bz2’: Tar+Bzip2
set_clause (dict, optional) – Column transformations. Dictionary of column transformations, e.g.: {‘col1’: ‘NULLIF(col1, “null”)’, ‘col2’: ‘NULLIF(col2, 1)’}
**kwargs – Additional options for future extensions
- Returns:
Object containing load results with affected_rows count
- Return type:
- Raises:
ValueError – If parameters are invalid
QueryError – If file loading fails
Examples:
# Basic CSV loading >>> result = client.load_data.from_file('/path/to/users.csv', 'users') >>> print(f"Loaded {result.affected_rows} rows") # CSV with header row >>> result = client.load_data.from_file( ... '/path/to/data.csv', ... 'products', ... ignore_lines=1 ... ) # Pipe-delimited with quoted fields >>> result = client.load_data.from_file( ... '/path/to/data.txt', ... 'orders', ... fields_terminated_by='|', ... fields_enclosed_by='"', ... fields_escaped_by='\\' ... ) # Tab-separated values (TSV) >>> result = client.load_data.from_file( ... '/path/to/data.tsv', ... 'logs', ... fields_terminated_by='\t' ... ) # Load with character set conversion >>> result = client.load_data.from_file( ... '/path/to/data.csv', ... 'users', ... character_set='utf-8' ... ) # Load specific columns only >>> result = client.load_data.from_file( ... '/path/to/data.csv', ... 'users', ... columns=['name', 'email', 'age'] ... ) # Parallel loading for large files >>> result = client.load_data.from_file( ... '/path/to/large_data.csv', ... 'big_table', ... parallel=True ... ) # Load with lines starting by filter >>> result = client.load_data.from_file( ... '/path/to/data.txt', ... 'filtered_data', ... lines_starting_by='DATA:' ... ) # Load JSONLINE format (object) >>> result = client.load_data.from_file( ... '/path/to/data.jl', ... 'users', ... format=LoadDataFormat.JSONLINE, ... jsondata=JsonDataStructure.OBJECT ... ) # Load JSONLINE format (array) with compression >>> result = client.load_data.from_file( ... '/path/to/data.jl.gz', ... 'users', ... format=LoadDataFormat.JSONLINE, ... jsondata=JsonDataStructure.ARRAY, ... compression=CompressionFormat.GZIP ... ) # Load Parquet format >>> result = client.load_data.from_file( ... '/path/to/data.parq', ... 'users', ... format=LoadDataFormat.PARQUET ... ) # Load with SET clause (NULLIF) >>> result = client.load_data.from_file( ... '/path/to/data.csv', ... 'users', ... set_clause={ ... 'col1': 'NULLIF(col1, "null")', ... 'col2': 'NULLIF(col2, 1)' ... } ... )
- from_local_file(file_path: str, table_name_or_model, **kwargs)[source]
Load data from a local file using LOAD DATA LOCAL INFILE.
This method uses the LOCAL keyword, which allows loading files from the client machine rather than the server machine.
- Parameters:
file_path (str) – Path to the local file
table_name_or_model – Either a table name (str) or SQLAlchemy model class
**kwargs – Same options as from_file()
- Returns:
Object containing load results
- Return type:
Examples:
# Load from client machine >>> result = client.load_data.from_local_file( ... '/local/path/to/data.csv', ... 'users', ... ignore_lines=1 ... )
AsyncLoadDataManager
Pandas-style data loading manager for asynchronous operations.
Provides async versions of all loading methods from LoadDataManager for non-blocking data loading operations.
- class matrixone.load_data.AsyncLoadDataManager(client, executor=None)[source]
Bases:
BaseLoadDataManagerAsynchronous data loading manager for MatrixOne bulk data operations.
Provides the same comprehensive bulk data loading functionality as LoadDataManager but with full async/await support for non-blocking I/O operations. Ideal for high-concurrency applications and async web frameworks requiring data loading.
Key Features:
Non-blocking operations: All load operations use async/await
Async file loading: Load data files without blocking the event loop
Concurrent loading: Load multiple files concurrently
Multiple formats: CSV, TSV, JSON, JSONLines, Parquet (async)
Async stage integration: Load from external stages asynchronously
Transaction-aware: Full integration with async transaction contexts
Executor pattern: Works with both async client and async session
Executor Pattern:
If executor is None, uses self.client.execute (default async client-level executor)
If executor is provided (e.g., async session), uses executor.execute (async transaction-aware)
All operations are non-blocking and use async/await
Enables concurrent data loading operations
Usage Examples:
from matrixone import AsyncClient from matrixone.orm import Column, Integer, String, Float, Base from sqlalchemy import insert import asyncio # Define ORM models (recommended approach) class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(255)) age = Column(Integer) class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) user_id = Column(Integer) amount = Column(Float) class Product(Base): __tablename__ = 'products' id = Column(Integer, primary_key=True) name = Column(String(200)) price = Column(Float) async def main(): client = AsyncClient() await client.connect(host='localhost', port=6001, user='root', password='111', database='test') # ======================================== # Basic Async CSV Loading with ORM # ======================================== # Load CSV using ORM model (recommended) await client.load_data.from_csv('/path/to/users.csv', User) # CSV with header row await client.load_data.from_csv( '/path/to/users.csv', User, ignore_lines=1 # Skip header ) # Custom delimiter and quote character (async) await client.load_data.from_csv( '/path/to/data.txt', Order, delimiter='|', enclosed_by='"' ) # ======================================== # Advanced Loading Options # ======================================== # Parallel async loading (high performance) await client.load_data.from_csv( '/path/to/large_data.csv', User, parallel=True ) # Load compressed file asynchronously await client.load_data.from_csv( '/path/to/data.csv.gz', User, compression='gzip' ) # Column mapping with transformation await client.load_data.from_csv( '/path/to/data.csv', User, columns=['name', 'email', 'age'], set_clause={'created_at': 'NOW()'} ) # Load JSON Lines format asynchronously await client.load_data.from_jsonlines('/path/to/data.jsonl', User) # ======================================== # Load from External Stages # ======================================== # Load from S3 stage using ORM model await client.load_data.from_stage_csv('production_s3', 'users.csv', User) # Load from local stage await client.load_data.from_stage_csv('local_data', 'orders.csv', Order) # ======================================== # Concurrent Loading (High Performance) # ======================================== # Concurrent loading of multiple files using ORM models await asyncio.gather( client.load_data.from_csv('/path/to/users.csv', User), client.load_data.from_csv('/path/to/orders.csv', Order), client.load_data.from_csv('/path/to/products.csv', Product) ) # ======================================== # Transactional Async Loading # ======================================== # Using within async transaction async with client.session() as session: # Load multiple files atomically using ORM models await session.load_data.from_csv('/path/to/users.csv', User) await session.load_data.from_csv('/path/to/orders.csv', Order) # Insert additional data within same transaction using SQLAlchemy await session.execute(insert(User).values(name='Admin', email='admin@example.com')) # All operations commit together await client.disconnect() asyncio.run(main())
Performance Benefits:
Non-blocking I/O: Don’t block the event loop during file loading
Concurrent loading: Load multiple files simultaneously
Async web frameworks: Perfect for FastAPI, aiohttp, etc.
High concurrency: Handle many load operations concurrently
Use Cases:
Async ETL pipelines: Non-blocking data transformation and loading
Real-time data ingestion: Load data without blocking request handling
Async web services: Load data in response to API calls
Concurrent batch loading: Load multiple files in parallel
High-throughput applications: Maximize I/O throughput
See also
AsyncClient: For async database operations
AsyncStageManager: For async stage management
LoadDataManager: For synchronous data loading
- __init__(client, executor=None)[source]
Initialize AsyncLoadDataManager.
- Parameters:
client – Client object
executor – Optional executor (e.g., async session) for executing SQL. If None, uses client.execute
- async read_csv(filepath_or_buffer: str, table: str | type, sep: str = ',', quotechar: str | None = None, quoting: bool = False, escapechar: str | None = None, skiprows: int = 0, names: List[str] | None = None, encoding: str | None = None, parallel: bool = False, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, inline: bool = False, **kwargs)[source]
Async version of read_csv (pandas-style)
- async read_json(filepath_or_buffer: str, table: str | type, lines: bool = True, orient: str = 'records', compression: str | CompressionFormat | None = None, inline: bool = False, **kwargs)[source]
Async version of read_json (pandas-style)
- async read_parquet(filepath_or_buffer: str, table: str | type, **kwargs)[source]
Async version of read_parquet (pandas-style)
- async read_csv_stage(stage_name: str, filename: str, table: str | type, sep: str = ',', quotechar: str | None = None, quoting: bool = False, escapechar: str | None = None, skiprows: int = 0, names: List[str] | None = None, encoding: str | None = None, parallel: bool = False, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, **kwargs)[source]
Async version of read_csv_stage (pandas-style)
- async read_json_stage(stage_name: str, filename: str, table: str | type, lines: bool = True, orient: str = 'records', compression: str | CompressionFormat | None = None, **kwargs)[source]
Async version of read_json_stage (pandas-style)
- async read_parquet_stage(stage_name: str, filename: str, table: str | type, **kwargs)[source]
Async version of read_parquet_stage (pandas-style)
- async read_csv_inline(data: str, table: str | type, sep: str = ',', quotechar: str | None = None, **kwargs)[source]
Async version of read_csv_inline (pandas-style)
- async read_json_inline(data: str, table: str | type, orient: str = 'records', **kwargs)[source]
Async version of read_json_inline (pandas-style)
- async from_file(file_path: str, table_name_or_model, fields_terminated_by: str = ',', fields_enclosed_by: str | None = None, fields_optionally_enclosed: bool = False, fields_escaped_by: str | None = None, lines_terminated_by: str | None = None, lines_starting_by: str | None = None, ignore_lines: int = 0, character_set: str | None = None, parallel: bool = False, columns: List[str] | None = None, format: str | LoadDataFormat | None = None, jsondata: str | JsonDataStructure | None = None, compression: str | CompressionFormat | None = None, set_clause: Dict[str, str] | None = None, **kwargs)[source]
Async version of from_file - builds SQL and executes asynchronously
Enumerations
LoadDataFormat
Supported file formats for data loading.
CompressionFormat
Supported compression formats for data files.
- class matrixone.load_data.CompressionFormat(value)[source]
-
Supported compression formats
- NONE = 'none'
- GZIP = 'gzip'
- GZ = 'gz'
- BZIP2 = 'bzip2'
- BZ2 = 'bz2'
- LZ4 = 'lz4'
- TAR_GZ = 'tar.gz'
- TAR_BZ2 = 'tar.bz2'
- __format__(format_spec)
Returns format using actual value type unless __str__ has been overridden.
JsonDataStructure
JSON data structure types (for compatibility, prefer using orient parameter in read_json()).