Skip to content

pipeline_execution

Bases: BasePipelineExecution

Source code in airia/client/pipeline_execution/async_pipeline_execution.py
def __init__(self, request_handler: AsyncRequestHandler):
    super().__init__(request_handler)

execute_pipeline(pipeline_id, user_input, debug=False, user_id=None, conversation_id=None, async_output=False, include_tools_response=False, images=None, files=None, data_source_folders=None, data_source_files=None, in_memory_messages=None, current_date_time=None, save_history=True, additional_info=None, prompt_variables=None, voice_enabled=False, output_schema=None, correlation_id=None) async

execute_pipeline(
    pipeline_id: str,
    user_input: str,
    debug: bool = False,
    user_id: Optional[str] = None,
    conversation_id: Optional[str] = None,
    async_output: Literal[False] = False,
    include_tools_response: bool = False,
    images: Optional[List[str]] = None,
    files: Optional[List[str]] = None,
    data_source_folders: Optional[Dict[str, Any]] = None,
    data_source_files: Optional[Dict[str, Any]] = None,
    in_memory_messages: Optional[
        List[Dict[str, Any]]
    ] = None,
    current_date_time: Optional[str] = None,
    save_history: bool = True,
    additional_info: Optional[List[Any]] = None,
    prompt_variables: Optional[Dict[str, Any]] = None,
    voice_enabled: bool = False,
    output_schema: Optional[Type[BaseModel]] = None,
    correlation_id: Optional[str] = None,
) -> PipelineExecutionResponse
execute_pipeline(
    pipeline_id: str,
    user_input: str,
    debug: bool = False,
    user_id: Optional[str] = None,
    conversation_id: Optional[str] = None,
    async_output: Literal[True] = True,
    include_tools_response: bool = False,
    images: Optional[List[str]] = None,
    files: Optional[List[str]] = None,
    data_source_folders: Optional[Dict[str, Any]] = None,
    data_source_files: Optional[Dict[str, Any]] = None,
    in_memory_messages: Optional[
        List[Dict[str, Any]]
    ] = None,
    current_date_time: Optional[str] = None,
    save_history: bool = True,
    additional_info: Optional[List[Any]] = None,
    prompt_variables: Optional[Dict[str, Any]] = None,
    voice_enabled: bool = False,
    output_schema: Optional[Type[BaseModel]] = None,
    correlation_id: Optional[str] = None,
) -> PipelineExecutionAsyncStreamedResponse

Execute a pipeline with the provided input asynchronously.

Parameters:

Name Type Description Default
pipeline_id str

The ID of the pipeline to execute.

required
user_input str

input text to process.

required
debug bool

Whether debug mode execution is enabled. Default is False.

False
user_id Optional[str]

Optional ID of the user making the request (guid).

None
conversation_id Optional[str]

Optional conversation ID (guid).

None
async_output bool

Whether to stream the response. Default is False.

False
include_tools_response bool

Whether to return the initial LLM tool result. Default is False.

False
images Optional[List[str]]

Optional list of image file paths or URLs.

None
files Optional[List[str]]

Optional list of file paths or URLs.

None
data_source_folders Optional[Dict[str, Any]]

Optional data source folders information.

None
data_source_files Optional[Dict[str, Any]]

Optional data source files information.

None
in_memory_messages Optional[List[Dict[str, Any]]]

Optional list of in-memory messages, each with a role and message.

None
current_date_time Optional[str]

Optional current date and time in ISO format.

None
save_history bool

Whether to save the userInput and output to conversation history. Default is True.

True
additional_info Optional[List[Any]]

Optional additional information.

None
prompt_variables Optional[Dict[str, Any]]

Optional variables to be used in the prompt.

None
voice_enabled bool

Whether the request came through the airia-voice-proxy. Default is False.

False
output_schema Optional[Type[BaseModel]]

Optional Pydantic model class for structured output.

None
correlation_id Optional[str]

Optional correlation ID for request tracing. If not provided, one will be generated automatically.

None

Returns:

Type Description
Union[PipelineExecutionResponse, PipelineExecutionAsyncStreamedResponse]

Response containing the result of the execution.

Raises:

Type Description
AiriaAPIError

If the API request fails with details about the error.

ClientError

For other request-related errors.

Examples:

Basic usage:

client = AiriaAsyncClient(api_key="your_api_key")
response = await client.pipeline_execution.execute_pipeline(
    pipeline_id="pipeline_123",
    user_input="Tell me about quantum computing"
)
print(response.result)

With structured output:

from pydantic import BaseModel

class PersonInfo(BaseModel):
    name: str
    age: int

response = await client.pipeline_execution.execute_pipeline(
    pipeline_id="pipeline_123",
    user_input="Extract person info",
    output_schema=PersonInfo
)

Source code in airia/client/pipeline_execution/async_pipeline_execution.py
async def execute_pipeline(
    self,
    pipeline_id: str,
    user_input: str,
    debug: bool = False,
    user_id: Optional[str] = None,
    conversation_id: Optional[str] = None,
    async_output: bool = False,
    include_tools_response: bool = False,
    images: Optional[List[str]] = None,
    files: Optional[List[str]] = None,
    data_source_folders: Optional[Dict[str, Any]] = None,
    data_source_files: Optional[Dict[str, Any]] = None,
    in_memory_messages: Optional[List[Dict[str, Any]]] = None,
    current_date_time: Optional[str] = None,
    save_history: bool = True,
    additional_info: Optional[List[Any]] = None,
    prompt_variables: Optional[Dict[str, Any]] = None,
    voice_enabled: bool = False,
    output_schema: Optional[Type[BaseModel]] = None,
    correlation_id: Optional[str] = None,
) -> Union[
    PipelineExecutionResponse,
    PipelineExecutionAsyncStreamedResponse,
]:
    """
    Execute a pipeline with the provided input asynchronously.

    Args:
        pipeline_id: The ID of the pipeline to execute.
        user_input: input text to process.
        debug: Whether debug mode execution is enabled. Default is False.
        user_id: Optional ID of the user making the request (guid).
        conversation_id: Optional conversation ID (guid).
        async_output: Whether to stream the response. Default is False.
        include_tools_response: Whether to return the initial LLM tool result. Default is False.
        images: Optional list of image file paths or URLs.
        files: Optional list of file paths or URLs.
        data_source_folders: Optional data source folders information.
        data_source_files: Optional data source files information.
        in_memory_messages: Optional list of in-memory messages, each with a role and message.
        current_date_time: Optional current date and time in ISO format.
        save_history: Whether to save the userInput and output to conversation history. Default is True.
        additional_info: Optional additional information.
        prompt_variables: Optional variables to be used in the prompt.
        voice_enabled: Whether the request came through the airia-voice-proxy. Default is False.
        output_schema: Optional Pydantic model class for structured output.
        correlation_id: Optional correlation ID for request tracing. If not provided,
                    one will be generated automatically.

    Returns:
        Response containing the result of the execution.

    Raises:
        AiriaAPIError: If the API request fails with details about the error.
        aiohttp.ClientError: For other request-related errors.

    Examples:
        Basic usage:
        ```python
        client = AiriaAsyncClient(api_key="your_api_key")
        response = await client.pipeline_execution.execute_pipeline(
            pipeline_id="pipeline_123",
            user_input="Tell me about quantum computing"
        )
        print(response.result)
        ```

        With structured output:
        ```python
        from pydantic import BaseModel

        class PersonInfo(BaseModel):
            name: str
            age: int

        response = await client.pipeline_execution.execute_pipeline(
            pipeline_id="pipeline_123",
            user_input="Extract person info",
            output_schema=PersonInfo
        )
        ```
    """
    # Validate user_input parameter
    if not user_input:
        raise ValueError("user_input cannot be empty")

    # Handle file and image uploads (local files are uploaded, URLs are passed through)
    image_urls = None
    file_urls = None

    if images or files:
        file_urls, image_urls = await self._upload_files(files or [], images or [])

    # Handle structured output by injecting schema as system message
    modified_in_memory_messages = in_memory_messages
    if output_schema is not None:
        # Create a copy of in_memory_messages if it exists, otherwise create new list
        modified_in_memory_messages = list(in_memory_messages) if in_memory_messages else []
        # Insert schema instruction as first system message
        schema_message = create_schema_system_message(output_schema)
        modified_in_memory_messages.insert(0, schema_message)

    request_data = self._pre_execute_pipeline(
        pipeline_id=pipeline_id,
        user_input=user_input,
        debug=debug,
        user_id=user_id,
        conversation_id=conversation_id,
        async_output=async_output,
        include_tools_response=include_tools_response,
        images=image_urls,
        files=file_urls,
        data_source_folders=data_source_folders,
        data_source_files=data_source_files,
        in_memory_messages=modified_in_memory_messages,
        current_date_time=current_date_time,
        save_history=save_history,
        additional_info=additional_info,
        prompt_variables=prompt_variables,
        voice_enabled=voice_enabled,
        output_configuration=None,  # Not using output_configuration anymore
        correlation_id=correlation_id,
        api_version=ApiVersion.V2.value,
    )
    resp = (
        self._request_handler.make_request_stream("POST", request_data)
        if async_output
        else await self._request_handler.make_request("POST", request_data)
    )

    if not async_output:
        response = PipelineExecutionResponse(**resp)
        # Parse response to Pydantic model if output_schema was provided
        if output_schema is not None and response.result:
            response.result = parse_response_to_model(response.result, output_schema)
        return response

    return PipelineExecutionAsyncStreamedResponse(stream=resp)

execute_temporary_assistant(model_parameters, user_input, assistant_name='', prompt_parameters={'prompt': ''}, async_output=False, include_tools_response=False, save_history=True, voice_enabled=False, debug=False, additional_info=None, conversation_id=None, current_date_time=None, data_source_files=None, data_source_folders=None, data_store_parameters=None, external_user_id=None, files=None, images=None, in_memory_messages=None, output_configuration=None, output_schema=None, prompt_variables=None, user_id=None, user_input_id=None, variables=None, correlation_id=None) async

execute_temporary_assistant(
    model_parameters: Dict[str, Any],
    user_input: str,
    assistant_name: str = "",
    prompt_parameters: Dict[str, Any] = {"prompt": ""},
    async_output: Literal[False] = False,
    include_tools_response: bool = False,
    save_history: bool = True,
    voice_enabled: bool = False,
    debug: bool = False,
    additional_info: Optional[List[Any]] = None,
    conversation_id: Optional[str] = None,
    current_date_time: Optional[str] = None,
    data_source_files: Optional[
        Dict[str, List[str]]
    ] = None,
    data_source_folders: Optional[
        Dict[str, List[str]]
    ] = None,
    data_store_parameters: Optional[Dict[str, Any]] = None,
    external_user_id: Optional[str] = None,
    files: Optional[List[str]] = None,
    images: Optional[List[str]] = None,
    in_memory_messages: Optional[
        List[Dict[str, Any]]
    ] = None,
    output_configuration: Optional[Dict[str, Any]] = None,
    output_schema: Optional[Type[BaseModel]] = None,
    prompt_variables: Optional[Dict[str, Any]] = None,
    user_id: Optional[str] = None,
    user_input_id: Optional[str] = None,
    variables: Optional[Dict[str, Any]] = None,
    correlation_id: Optional[str] = None,
) -> TemporaryAssistantResponse
execute_temporary_assistant(
    model_parameters: Dict[str, Any],
    user_input: str,
    assistant_name: str = "",
    prompt_parameters: Dict[str, Any] = {"prompt": ""},
    async_output: Literal[True] = True,
    include_tools_response: bool = False,
    save_history: bool = True,
    voice_enabled: bool = False,
    debug: bool = False,
    additional_info: Optional[List[Any]] = None,
    conversation_id: Optional[str] = None,
    current_date_time: Optional[str] = None,
    data_source_files: Optional[
        Dict[str, List[str]]
    ] = None,
    data_source_folders: Optional[
        Dict[str, List[str]]
    ] = None,
    data_store_parameters: Optional[Dict[str, Any]] = None,
    external_user_id: Optional[str] = None,
    files: Optional[List[str]] = None,
    images: Optional[List[str]] = None,
    in_memory_messages: Optional[
        List[Dict[str, Any]]
    ] = None,
    output_configuration: Optional[Dict[str, Any]] = None,
    output_schema: Optional[Type[BaseModel]] = None,
    prompt_variables: Optional[Dict[str, Any]] = None,
    user_id: Optional[str] = None,
    user_input_id: Optional[str] = None,
    variables: Optional[Dict[str, Any]] = None,
    correlation_id: Optional[str] = None,
) -> TemporaryAssistantAsyncStreamedResponse

Execute a temporary assistant with the provided parameters asynchronously.

This method creates and executes a temporary AI assistant with custom configuration, allowing for flexible assistant behavior without creating a persistent pipeline.

Parameters:

Name Type Description Default
model_parameters Dict[str, Any]

Model parameters (required). Must include libraryModelId, projectModelId, modelIdentifierType, and modelIsAvailableinProject

required
user_input str

User input text (required)

required
assistant_name str

Name of the temporary assistant. Default is ""

''
prompt_parameters Dict[str, Any]

Parameters for prompt configuration. Default is {"prompt": ""}

{'prompt': ''}
async_output bool

Whether to stream the response. Default is False

False
include_tools_response bool

Whether to return initial LLM tool result. Default is False

False
save_history bool

Whether to save input and output to conversation history. Default is True

True
voice_enabled bool

Whether voice output is enabled. Default is False

False
debug bool

Whether debug mode execution is enabled. Default is False

False
additional_info Optional[List[Any]]

Optional additional information array

None
conversation_id Optional[str]

Optional conversation identifier

None
current_date_time Optional[str]

Optional current date and time in ISO format

None
data_source_files Optional[Dict[str, List[str]]]

Optional dictionary mapping data source GUIDs to file GUID arrays

None
data_source_folders Optional[Dict[str, List[str]]]

Optional dictionary mapping data source GUIDs to folder GUID arrays

None
data_store_parameters Optional[Dict[str, Any]]

Optional DataStore parameters

None
external_user_id Optional[str]

Optional external user identifier

None
files Optional[List[str]]

Optional list of file identifiers

None
images Optional[List[str]]

Optional list of image identifiers

None
in_memory_messages Optional[List[Dict[str, Any]]]

Optional list of in-memory messages

None
output_configuration Optional[Dict[str, Any]]

Optional output configuration (raw dict format)

None
output_schema Optional[Type[BaseModel]]

Optional Pydantic model class for structured output. If provided, takes precedence over output_configuration.

None
prompt_variables Optional[Dict[str, Any]]

Optional prompt variables dictionary

None
user_id Optional[str]

Optional user identifier

None
user_input_id Optional[str]

Optional unique identifier for user input

None
variables Optional[Dict[str, Any]]

Optional variables dictionary

None
correlation_id Optional[str]

Optional correlation ID for request tracing. If not provided, one will be generated automatically.

None

Returns:

Type Description
Union[TemporaryAssistantResponse, TemporaryAssistantAsyncStreamedResponse]

Response containing the result of the temporary assistant execution.

Union[TemporaryAssistantResponse, TemporaryAssistantAsyncStreamedResponse]

Returns different response types based on the result type discriminator.

Raises:

Type Description
AiriaAPIError

If the API request fails with details about the error.

ClientError

For other request-related errors.

ValueError

If required parameters are missing or invalid.

Examples:

Basic usage:

client = AiriaAsyncClient(api_key="your_api_key")
response = await client.pipeline_execution.execute_temporary_assistant(
    model_parameters={
        "libraryModelId": "library-model-id",
        "projectModelId": None,
        "modelIdentifierType": "Library",
        "modelIsAvailableinProject": True,
    },
    user_input="say double bubble bath ten times fast",
)
print(response.result)

With structured output:

from pydantic import BaseModel

class WeatherInfo(BaseModel):
    temperature: float
    conditions: str

response = await client.pipeline_execution.execute_temporary_assistant(
    model_parameters={...},
    user_input="What's the weather?",
    output_schema=WeatherInfo
)

Source code in airia/client/pipeline_execution/async_pipeline_execution.py
async def execute_temporary_assistant(
    self,
    model_parameters: Dict[str, Any],
    user_input: str,
    assistant_name: str = "",
    prompt_parameters: Dict[str, Any] = {"prompt": ""},
    async_output: bool = False,
    include_tools_response: bool = False,
    save_history: bool = True,
    voice_enabled: bool = False,
    debug: bool = False,
    additional_info: Optional[List[Any]] = None,
    conversation_id: Optional[str] = None,
    current_date_time: Optional[str] = None,
    data_source_files: Optional[Dict[str, List[str]]] = None,
    data_source_folders: Optional[Dict[str, List[str]]] = None,
    data_store_parameters: Optional[Dict[str, Any]] = None,
    external_user_id: Optional[str] = None,
    files: Optional[List[str]] = None,
    images: Optional[List[str]] = None,
    in_memory_messages: Optional[List[Dict[str, Any]]] = None,
    output_configuration: Optional[Dict[str, Any]] = None,
    output_schema: Optional[Type[BaseModel]] = None,
    prompt_variables: Optional[Dict[str, Any]] = None,
    user_id: Optional[str] = None,
    user_input_id: Optional[str] = None,
    variables: Optional[Dict[str, Any]] = None,
    correlation_id: Optional[str] = None,
) -> Union[
    TemporaryAssistantResponse,
    TemporaryAssistantAsyncStreamedResponse,
]:
    """
    Execute a temporary assistant with the provided parameters asynchronously.

    This method creates and executes a temporary AI assistant with custom configuration,
    allowing for flexible assistant behavior without creating a persistent pipeline.

    Args:
        model_parameters: Model parameters (required). Must include libraryModelId,
                        projectModelId, modelIdentifierType, and modelIsAvailableinProject
        user_input: User input text (required)
        assistant_name: Name of the temporary assistant. Default is ""
        prompt_parameters: Parameters for prompt configuration. Default is {"prompt": ""}
        async_output: Whether to stream the response. Default is False
        include_tools_response: Whether to return initial LLM tool result. Default is False
        save_history: Whether to save input and output to conversation history. Default is True
        voice_enabled: Whether voice output is enabled. Default is False
        debug: Whether debug mode execution is enabled. Default is False
        additional_info: Optional additional information array
        conversation_id: Optional conversation identifier
        current_date_time: Optional current date and time in ISO format
        data_source_files: Optional dictionary mapping data source GUIDs to file GUID arrays
        data_source_folders: Optional dictionary mapping data source GUIDs to folder GUID arrays
        data_store_parameters: Optional DataStore parameters
        external_user_id: Optional external user identifier
        files: Optional list of file identifiers
        images: Optional list of image identifiers
        in_memory_messages: Optional list of in-memory messages
        output_configuration: Optional output configuration (raw dict format)
        output_schema: Optional Pydantic model class for structured output.
                     If provided, takes precedence over output_configuration.
        prompt_variables: Optional prompt variables dictionary
        user_id: Optional user identifier
        user_input_id: Optional unique identifier for user input
        variables: Optional variables dictionary
        correlation_id: Optional correlation ID for request tracing. If not provided,
                      one will be generated automatically.

    Returns:
        Response containing the result of the temporary assistant execution.
        Returns different response types based on the result type discriminator.

    Raises:
        AiriaAPIError: If the API request fails with details about the error.
        aiohttp.ClientError: For other request-related errors.
        ValueError: If required parameters are missing or invalid.

    Examples:
        Basic usage:
        ```python
        client = AiriaAsyncClient(api_key="your_api_key")
        response = await client.pipeline_execution.execute_temporary_assistant(
            model_parameters={
                "libraryModelId": "library-model-id",
                "projectModelId": None,
                "modelIdentifierType": "Library",
                "modelIsAvailableinProject": True,
            },
            user_input="say double bubble bath ten times fast",
        )
        print(response.result)
        ```

        With structured output:
        ```python
        from pydantic import BaseModel

        class WeatherInfo(BaseModel):
            temperature: float
            conditions: str

        response = await client.pipeline_execution.execute_temporary_assistant(
            model_parameters={...},
            user_input="What's the weather?",
            output_schema=WeatherInfo
        )
        ```
    """
    # Validate required parameters
    if not user_input:
        raise ValueError("user_input cannot be empty")

    if not model_parameters:
        raise ValueError("model_parameters cannot be empty")

    # Handle file and image uploads (local files are uploaded, URLs are passed through)
    image_urls = None
    file_urls = None

    if images or files:
        file_urls, image_urls = await self._upload_files(files or [], images or [])

    # Handle structured output by injecting schema as system message
    modified_in_memory_messages = in_memory_messages
    if output_schema is not None:
        # Create a copy of in_memory_messages if it exists, otherwise create new list
        modified_in_memory_messages = list(in_memory_messages) if in_memory_messages else []
        # Insert schema instruction as first system message
        schema_message = create_schema_system_message(output_schema)
        modified_in_memory_messages.insert(0, schema_message)
        # Don't use output_configuration when using output_schema
        output_configuration = None

    request_data = self._pre_execute_temporary_assistant(
        model_parameters=model_parameters,
        user_input=user_input,
        assistant_name=assistant_name,
        prompt_parameters=prompt_parameters,
        async_output=async_output,
        include_tools_response=include_tools_response,
        save_history=save_history,
        voice_enabled=voice_enabled,
        debug=debug,
        additional_info=additional_info,
        conversation_id=conversation_id,
        current_date_time=current_date_time,
        data_source_files=data_source_files,
        data_source_folders=data_source_folders,
        data_store_parameters=data_store_parameters,
        external_user_id=external_user_id,
        files=file_urls,
        images=image_urls,
        in_memory_messages=modified_in_memory_messages,
        output_configuration=output_configuration,
        prompt_variables=prompt_variables,
        user_id=user_id,
        user_input_id=user_input_id,
        variables=variables,
        correlation_id=correlation_id,
    )

    resp = (
        self._request_handler.make_request_stream("POST", request_data)
        if async_output
        else await self._request_handler.make_request("POST", request_data)
    )

    if async_output:
        return TemporaryAssistantAsyncStreamedResponse(stream=resp)

    response = TemporaryAssistantResponse(**resp)
    # Parse response to Pydantic model if output_schema was provided
    if output_schema is not None and response.result:
        response.result = parse_response_to_model(str(response.result), output_schema)
    return response