Source code for matrixone.export

# Copyright 2021 - 2022 Matrix Origin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Data Export for MatrixOne

This module provides pandas-style functionality to export query results to files
or stages using MatrixOne's SELECT ... INTO OUTFILE commands.
"""

import warnings
from typing import Any, Optional, Union


def _query_to_sql(query: Union[str, Any]) -> str:
    """
    Convert various query types to SQL string.

    Args:
        query: Can be:
            - String: Returned as-is
            - SQLAlchemy select() statement: Compiled to SQL
            - MatrixOneQuery object: Compiled via _build_sql()

    Returns:
        SQL string

    Examples::

        # String query
        sql = _query_to_sql("SELECT * FROM users")

        # SQLAlchemy select()
        from sqlalchemy import select
        stmt = select(User).where(User.age > 25)
        sql = _query_to_sql(stmt)

        # MatrixOneQuery
        query = client.query(User).filter(User.age > 25)
        sql = _query_to_sql(query)
    """
    # If it's already a string, return it
    if isinstance(query, str):
        return query.rstrip(';').strip()

    # If it has _build_sql method (MatrixOneQuery)
    if hasattr(query, '_build_sql'):
        sql, params = query._build_sql()
        # Replace parameters in the SQL
        if params:
            for param in params:
                if isinstance(param, str):
                    sql = sql.replace("?", f"'{param}'", 1)
                else:
                    sql = sql.replace("?", str(param), 1)
        return sql.rstrip(';').strip()

    # If it's a SQLAlchemy statement (has compile method)
    if hasattr(query, 'compile'):
        try:
            compiled = query.compile(compile_kwargs={"literal_binds": True})
            return str(compiled).rstrip(';').strip()
        except Exception as e:
            raise ValueError(f"Failed to compile SQLAlchemy statement: {e}")

    # Unsupported type
    raise TypeError(
        f"Unsupported query type: {type(query)}. " "Expected str, SQLAlchemy select(), or MatrixOneQuery object."
    )


def _validate_csv_params(sep: str, quotechar: Optional[str], lineterminator: str) -> None:
    """
    Validate CSV export parameters.

    Args:
        sep: Field separator
        quotechar: Quote character
        lineterminator: Line terminator

    Raises:
        ValueError: If parameters are invalid or conflicting
    """
    if not sep:
        raise ValueError("sep cannot be empty")

    if len(sep) > 1:
        warnings.warn(
            f"sep is '{sep}' (length {len(sep)}). MatrixOne may only support single-character separators.", UserWarning
        )

    if quotechar and sep == quotechar:
        raise ValueError(f"sep and quotechar cannot be the same: '{sep}'")


def _build_csv_export_sql(
    query: Union[str, Any],
    output_path: str,
    sep: str = ',',
    quotechar: Optional[str] = None,
    lineterminator: str = '\n',
    header: bool = False,
) -> str:
    """
    Build CSV export SQL statement.

    Args:
        query: SELECT query to execute
        output_path: Output file path or stage path
        sep: Field separator (default: ',')
        quotechar: Quote character (default: None)
        lineterminator: Line terminator (default: '\n')
        header: Whether to include column headers (default: False)

    Returns:
        Complete export SQL statement
    """
    # Validate parameters
    _validate_csv_params(sep, quotechar, lineterminator)

    # Convert query to SQL string
    query_sql = _query_to_sql(query)

    # Build the export SQL
    sql_parts = [query_sql, "INTO OUTFILE", f"'{output_path}'"]

    # Add CSV options
    options = []

    # MatrixOne limitation: can't use both sep and quotechar simultaneously
    # If both are specified, use sep only
    if quotechar and sep != ',':
        # Use quotechar only if sep is not default (user wants quoting behavior)
        options.append(f"fields enclosed by '{quotechar}'")
    elif sep:
        # Use sep (default behavior)
        options.append(f"fields terminated by '{sep}'")

    # Add line terminator if not default
    if lineterminator != '\n':
        options.append(f"lines terminated by '{lineterminator}'")

    sql_parts.extend(options)

    # Note: HEADER option not yet supported in MatrixOne
    if header:
        warnings.warn(
            "header=True is not yet supported by MatrixOne's INTO OUTFILE. " "Headers will not be included in the output.",
            UserWarning,
        )

    return ' '.join(sql_parts)


def _build_jsonl_export_sql(
    query: Union[str, Any],
    output_path: str,
) -> str:
    """
    Build JSONL export SQL statement.

    Args:
        query: SELECT query to execute
        output_path: Output file path or stage path

    Returns:
        Complete export SQL statement
    """
    # Convert query to SQL string
    query_sql = _query_to_sql(query)

    # Build the export SQL with JSONL format
    # MatrixOne syntax: INTO OUTFILE 'path' FORMAT 'jsonline'
    return f"{query_sql} INTO OUTFILE '{output_path}'"


[docs] class ExportManager: """ Manager class for data export operations (pandas-style interface). This class provides pandas-like methods to export query results to local files or external stages. Examples:: # CSV export (pandas-style) client.export.to_csv('data.csv', query, sep='|', header=True) # JSONL export client.export.to_jsonl('data.jsonl', query) # Export to S3 stage client.export.to_csv('stage://s3_stage/data.csv', query) """
[docs] def __init__(self, client: Any): """ Initialize ExportManager with a client instance. Args: client: MatrixOne client instance (Client or Session) """ self.client = client
[docs] def to_csv( self, path: str, query: Union[str, Any], *, sep: str = ',', quotechar: Optional[str] = None, lineterminator: str = '\n', header: bool = False, **kwargs, ) -> Any: """ Export query results to CSV file (pandas-style). This method exports query results to a CSV file on the MatrixOne server's filesystem or to an external stage. Args: path: Output file path. Can be: - Local path: '/tmp/data.csv' - Stage path: 'stage://stage_name/data.csv' query: SELECT query to execute. Can be: - Raw SQL string: "SELECT * FROM users" - SQLAlchemy select(): select(User).where(User.age > 25) - MatrixOneQuery: client.query(User).filter(User.age > 25) sep: Field separator/delimiter (default: ',') quotechar: Character to quote fields containing special characters lineterminator: Line terminator (default: '\\n') header: Include column headers (not yet supported by MatrixOne) **kwargs: Reserved for future options Returns: Query execution result Raises: ValueError: If parameters are invalid TypeError: If query type is unsupported Examples:: # Basic CSV export with defaults client.export.to_csv('/tmp/users.csv', "SELECT * FROM users") # Custom separator (TSV) client.export.to_csv('/tmp/users.tsv', query, sep='\\t') # With SQLAlchemy from sqlalchemy import select stmt = select(User).where(User.age > 25) client.export.to_csv('/tmp/adults.csv', stmt, sep='|') # Export to stage client.export.to_csv('stage://s3_stage/backup.csv', query) # With MatrixOne query builder query = client.query(Order).filter(Order.status == 'completed') client.export.to_csv('/tmp/orders.csv', query, sep=',') # Using session with client.session() as session: query = session.query(User).filter(User.active == True) session.export.to_csv('/tmp/active_users.csv', query) Note: - For local paths, the MatrixOne server must have write permissions - For stage paths, use format: 'stage://stage_name/filename.csv' - MatrixOne doesn't support using sep and quotechar simultaneously - header=True is not yet supported by MatrixOne """ # Build SQL export_sql = _build_csv_export_sql( query=query, output_path=path, sep=sep, quotechar=quotechar, lineterminator=lineterminator, header=header, ) # Execute export return self.client.execute(export_sql)
[docs] def to_jsonl(self, path: str, query: Union[str, Any], **kwargs) -> Any: """ Export query results to JSONL file (JSON Lines format). Each line in the output file is a valid JSON object representing one row. Args: path: Output file path. Can be: - Local path: '/tmp/data.jsonl' - Stage path: 'stage://stage_name/data.jsonl' query: SELECT query to execute (str, SQLAlchemy select(), or MatrixOneQuery) **kwargs: Reserved for future options Returns: Query execution result Examples:: # Basic JSONL export client.export.to_jsonl('/tmp/users.jsonl', "SELECT * FROM users") # With SQLAlchemy from sqlalchemy import select stmt = select(User).where(User.verified == True) client.export.to_jsonl('/tmp/verified.jsonl', stmt) # Export to stage client.export.to_jsonl('stage://s3_stage/data.jsonl', query) # Complex query with joins query = ''' SELECT u.name, o.total FROM users u JOIN orders o ON u.id = o.user_id ''' client.export.to_jsonl('/tmp/user_orders.jsonl', query) Note: - JSONL format: one JSON object per line, no array wrapper - Each line is a complete, valid JSON object - Suitable for streaming and big data processing """ # Build SQL export_sql = _build_jsonl_export_sql( query=query, output_path=path, ) # Execute export return self.client.execute(export_sql)
[docs] def to_csv_stage( self, stage_name: str, filename: str, query: Union[str, Any], *, sep: str = ',', quotechar: Optional[str] = None, lineterminator: str = '\n', header: bool = False, **kwargs, ) -> Any: """ Export query results to CSV file in an external stage. This is a convenience method for stage exports that doesn't require the 'stage://' protocol prefix. Args: stage_name: Name of the external stage filename: Name of the file to create in the stage query: SELECT query to execute (str, SQLAlchemy select(), or MatrixOneQuery) sep: Field separator/delimiter (default: ',') quotechar: Character to quote fields containing special characters lineterminator: Line terminator (default: '\\n') header: Include column headers (not yet supported by MatrixOne) **kwargs: Reserved for future options Returns: Query execution result Examples:: # Export to S3 stage client.export.to_csv_stage('s3_stage', 'data.csv', "SELECT * FROM users") # With custom separator client.export.to_csv_stage('backup_stage', 'data.tsv', query, sep='\t') # With SQLAlchemy from sqlalchemy import select stmt = select(User).where(User.active == True) client.export.to_csv_stage('active_stage', 'active_users.csv', stmt) Note: - The stage must exist before calling this method - Use client.stage.create_s3() or client.stage.create_local() to create stages - MatrixOne doesn't support using sep and quotechar simultaneously """ # Build stage path stage_path = f"stage://{stage_name}/{filename}" # Delegate to to_csv method return self.to_csv( path=stage_path, query=query, sep=sep, quotechar=quotechar, lineterminator=lineterminator, header=header, **kwargs, )
[docs] def to_jsonl_stage( self, stage_name: str, filename: str, query: Union[str, Any], **kwargs, ) -> Any: """ Export query results to JSONL file in an external stage. This is a convenience method for stage exports that doesn't require the 'stage://' protocol prefix. Args: stage_name: Name of the external stage filename: Name of the file to create in the stage query: SELECT query to execute (str, SQLAlchemy select(), or MatrixOneQuery) **kwargs: Reserved for future options Returns: Query execution result Examples:: # Export to S3 stage client.export.to_jsonl_stage('s3_stage', 'data.jsonl', "SELECT * FROM users") # With SQLAlchemy from sqlalchemy import select stmt = select(User).where(User.verified == True) client.export.to_jsonl_stage('verified_stage', 'verified.jsonl', stmt) Note: - The stage must exist before calling this method - Use client.stage.create_s3() or client.stage.create_local() to create stages """ # Build stage path stage_path = f"stage://{stage_name}/{filename}" # Delegate to to_jsonl method return self.to_jsonl(path=stage_path, query=query, **kwargs)
[docs] class AsyncExportManager: """ Async manager class for data export operations (pandas-style interface). Provides async versions of all export methods from ExportManager. Examples:: # Async CSV export await client.export.to_csv('data.csv', query, sep='|') # Async JSONL export await client.export.to_jsonl('data.jsonl', query) """
[docs] def __init__(self, client: Any): """ Initialize AsyncExportManager with an async client instance. Args: client: MatrixOne async client instance (AsyncClient or AsyncSession) """ self.client = client
[docs] async def to_csv( self, path: str, query: Union[str, Any], *, sep: str = ',', quotechar: Optional[str] = None, lineterminator: str = '\n', header: bool = False, **kwargs, ) -> Any: """ Async export query results to CSV file. See ExportManager.to_csv() for detailed documentation. Examples:: # Async CSV export await client.export.to_csv('/tmp/users.csv', "SELECT * FROM users") # With custom options await client.export.to_csv( '/tmp/data.tsv', query, sep='\\t', lineterminator='\\r\\n' ) # Using async session async with client.session() as session: query = session.query(User).filter(User.active == True) await session.export.to_csv('/tmp/active.csv', query) """ # Build SQL export_sql = _build_csv_export_sql( query=query, output_path=path, sep=sep, quotechar=quotechar, lineterminator=lineterminator, header=header, ) # Execute export asynchronously return await self.client.execute(export_sql)
[docs] async def to_jsonl(self, path: str, query: Union[str, Any], **kwargs) -> Any: """ Async export query results to JSONL file. See ExportManager.to_jsonl() for detailed documentation. Examples:: # Async JSONL export await client.export.to_jsonl('/tmp/data.jsonl', query) # Export to stage await client.export.to_jsonl('stage://s3/backup.jsonl', query) """ # Build SQL export_sql = _build_jsonl_export_sql( query=query, output_path=path, ) # Execute export asynchronously return await self.client.execute(export_sql)
[docs] async def to_csv_stage( self, stage_name: str, filename: str, query: Union[str, Any], *, sep: str = ',', quotechar: Optional[str] = None, lineterminator: str = '\n', header: bool = False, **kwargs, ) -> Any: """ Async export query results to CSV file in an external stage. This is a convenience method for stage exports that doesn't require the 'stage://' protocol prefix. Args: stage_name: Name of the external stage filename: Name of the file to create in the stage query: SELECT query to execute (str, SQLAlchemy select(), or MatrixOneQuery) sep: Field separator/delimiter (default: ',') quotechar: Character to quote fields containing special characters lineterminator: Line terminator (default: '\\n') header: Include column headers (not yet supported by MatrixOne) **kwargs: Reserved for future options Returns: Query execution result Examples:: # Async export to S3 stage await client.export.to_csv_stage('s3_stage', 'data.csv', "SELECT * FROM users") # With custom separator await client.export.to_csv_stage('backup_stage', 'data.tsv', query, sep='\t') # With SQLAlchemy from sqlalchemy import select stmt = select(User).where(User.active == True) await client.export.to_csv_stage('active_stage', 'active_users.csv', stmt) """ # Build stage path stage_path = f"stage://{stage_name}/{filename}" # Delegate to to_csv method return await self.to_csv( path=stage_path, query=query, sep=sep, quotechar=quotechar, lineterminator=lineterminator, header=header, **kwargs, )
[docs] async def to_jsonl_stage( self, stage_name: str, filename: str, query: Union[str, Any], **kwargs, ) -> Any: """ Async export query results to JSONL file in an external stage. This is a convenience method for stage exports that doesn't require the 'stage://' protocol prefix. Args: stage_name: Name of the external stage filename: Name of the file to create in the stage query: SELECT query to execute (str, SQLAlchemy select(), or MatrixOneQuery) **kwargs: Reserved for future options Returns: Query execution result Examples:: # Async export to S3 stage await client.export.to_jsonl_stage('s3_stage', 'data.jsonl', "SELECT * FROM users") # With SQLAlchemy from sqlalchemy import select stmt = select(User).where(User.verified == True) await client.export.to_jsonl_stage('verified_stage', 'verified.jsonl', stmt) """ # Build stage path stage_path = f"stage://{stage_name}/{filename}" # Delegate to to_jsonl method return await self.to_jsonl(path=stage_path, query=query, **kwargs)