Skip to content

Pipeline Execution

Execute Pipeline

Synchronous Usage

from airia import AiriaClient

# Initialize client (API key can be passed directly or via AIRIA_API_KEY environment variable)
client = AiriaClient(api_key="your_api_key")

# Execute a pipeline
response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Tell me about quantum computing"
)

print(response.result)

Synchronous Streaming

from airia import AiriaClient

# Initialize client (API key can be passed directly or via AIRIA_API_KEY environment variable)
client = AiriaClient(api_key="your_api_key")

# Execute a pipeline
response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Tell me about quantum computing",
    async_output=True
)

for c in response.stream:
    print(c)

Asynchronous Usage

import asyncio
from airia import AiriaAsyncClient

async def main():
    client = AiriaAsyncClient(api_key="your_api_key")
    response = await client.pipeline_execution.execute_pipeline(
        pipeline_id="your_pipeline_id",
        user_input="Tell me about quantum computing"
    )
    print(response.result)

asyncio.run(main())

Asynchronous Streaming

import asyncio
from airia import AiriaAsyncClient

async def main():
    client = AiriaAsyncClient(api_key="your_api_key")

    response = await client.pipeline_execution.execute_pipeline(
        pipeline_id="your_pipeline_id",
        user_input="Tell me about quantum computing",
        async_output=True
    )
    async for c in response.stream:
        print(c)

asyncio.run(main())

Working with Files and Images

The SDK supports both local files and remote URLs for pipeline execution:

  • Local files: Provide file paths and the SDK automatically uploads them to cloud storage
  • Remote URLs: Provide URLs directly - no upload needed, passed straight to the pipeline

The SDK automatically detects whether you're providing a local file path or a URL.

Synchronous Usage with Images

from airia import AiriaClient

# Initialize client
client = AiriaClient(api_key="your_api_key")

# Execute pipeline with local image (automatically uploaded)
response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Analyze this image and tell me what you see",
    images=["path/to/image.jpg"]
)

# Execute pipeline with remote image URL (used directly)
response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Analyze this image and tell me what you see",
    images=["https://example.com/images/sample.jpg"]
)

print(response.result)

Synchronous Usage with Files

from airia import AiriaClient

# Initialize client
client = AiriaClient(api_key="your_api_key")

# Execute pipeline with local file (automatically uploaded)
response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Summarize this document",
    files=["path/to/document.pdf"]
)

# Execute pipeline with remote file URL (used directly)
response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Summarize this document",
    files=["https://example.com/documents/report.pdf"]
)

print(response.result)

Asynchronous Usage with Images and Files

import asyncio
from airia import AiriaAsyncClient

async def main():
    client = AiriaAsyncClient(api_key="your_api_key")

    # Execute pipeline with local files (automatically uploaded)
    response = await client.pipeline_execution.execute_pipeline(
        pipeline_id="your_pipeline_id",
        user_input="Analyze the image and summarize the document",
        images=["path/to/image.jpg"],
        files=["path/to/document.pdf"]
    )

    # Execute pipeline with remote URLs (used directly)
    response = await client.pipeline_execution.execute_pipeline(
        pipeline_id="your_pipeline_id",
        user_input="Analyze the image and summarize the document",
        images=["https://example.com/images/photo.jpg"],
        files=["https://example.com/documents/report.pdf"]
    )

    # Mix of local files and remote URLs
    response = await client.pipeline_execution.execute_pipeline(
        pipeline_id="your_pipeline_id",
        user_input="Compare these two images",
        images=[
            "path/to/local-image.jpg",  # Local file - uploaded
            "https://example.com/remote-image.jpg"  # Remote URL - used directly
        ]
    )

    print(response.result)

asyncio.run(main())

Multiple Files and Images

from airia import AiriaClient

client = AiriaClient(api_key="your_api_key")

# Mix of local files and remote URLs
image_paths = [
    "path/to/local-image1.jpg",  # Local file
    "https://example.com/remote-image2.png",  # Remote URL
    "path/to/local-image3.gif"  # Local file
]

file_paths = [
    "path/to/local-doc.pdf",  # Local file
    "https://example.com/remote-doc.txt",  # Remote URL
    "https://example.com/another-doc.docx"  # Remote URL
]

# Execute pipeline with multiple files and images
# Local files are automatically uploaded, URLs are used directly
response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Process all these images and documents",
    images=image_paths,
    files=file_paths
)

print(response.result)

Structured Output with Pydantic Models

The SDK supports structured output using Pydantic models, allowing you to get type-safe, validated responses that conform to a specific schema. This is useful when you need predictable, parseable output from the LLM.

How It Works

When you provide an output_schema parameter with a Pydantic model, the SDK:

  1. Automatically injects a system message with the JSON schema
  2. Instructs the LLM to return data matching that schema
  3. Parses and validates the response
  4. Returns a typed Pydantic model instance

Basic Example

from pydantic import BaseModel, Field
from airia import AiriaClient

# Define your output schema
class PersonInfo(BaseModel):
    """Schema for extracting person information."""
    name: str = Field(..., description="The person's full name")
    age: int = Field(..., description="The person's age in years")
    occupation: str = Field(..., description="The person's occupation")
    email: str = Field(..., description="The person's email address")

client = AiriaClient(api_key="your_api_key")

# Execute pipeline with structured output
response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Extract info: John Doe is a 35-year-old software engineer. Email: john@example.com",
    output_schema=PersonInfo  # Pass the Pydantic model class
)

# response.result is now a PersonInfo instance!
print(f"Name: {response.result.name}")
print(f"Age: {response.result.age}")
print(f"Occupation: {response.result.occupation}")
print(f"Email: {response.result.email}")

Complex Schema Example

You can use complex Pydantic models with nested structures and lists:

from pydantic import BaseModel, Field
from typing import List

class Task(BaseModel):
    title: str = Field(..., description="Task title")
    priority: str = Field(..., description="Priority level: low, medium, high")
    completed: bool = Field(False, description="Whether the task is completed")

class ProjectPlan(BaseModel):
    """Schema for a project plan."""
    project_name: str = Field(..., description="Name of the project")
    duration_weeks: int = Field(..., description="Project duration in weeks")
    tasks: List[Task] = Field(..., description="List of project tasks")
    total_budget: float = Field(..., description="Total project budget in USD")

response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Create a project plan for building a mobile app",
    output_schema=ProjectPlan
)

# Access structured data
print(f"Project: {response.result.project_name}")
print(f"Duration: {response.result.duration_weeks} weeks")
print(f"Budget: ${response.result.total_budget:,.2f}")
for task in response.result.tasks:
    print(f"- {task.title} [{task.priority}]")

Async Usage with Structured Output

import asyncio
from pydantic import BaseModel
from airia import AiriaAsyncClient

class WeatherInfo(BaseModel):
    temperature: float
    conditions: str
    humidity: int

async def main():
    client = AiriaAsyncClient(api_key="your_api_key")

    response = await client.pipeline_execution.execute_pipeline(
        pipeline_id="your_pipeline_id",
        user_input="What's the weather in San Francisco?",
        output_schema=WeatherInfo
    )

    print(f"Temperature: {response.result.temperature}°C")
    print(f"Conditions: {response.result.conditions}")
    print(f"Humidity: {response.result.humidity}%")

asyncio.run(main())

Error Handling

The SDK automatically validates the LLM response against your schema. If the response doesn't match, a ValidationError or ValueError is raised:

from pydantic import ValidationError

try:
    response = client.pipeline_execution.execute_pipeline(
        pipeline_id="your_pipeline_id",
        user_input="Extract person info from this text",
        output_schema=PersonInfo
    )
    print(response.result)
except ValidationError as e:
    print(f"Response didn't match schema: {e}")
except ValueError as e:
    print(f"Invalid JSON in response: {e}")

Streaming Event Parsing

When using streaming mode (async_output=True), the API returns Server-Sent Events (SSE) that contain different types of messages throughout the pipeline execution. You can parse and filter these events to extract specific information.

Note: Structured output with output_schema is not currently supported in streaming mode (async_output=True). It only works with non-streaming responses.

Available Message Types

The streaming response includes various message types defined in airia.types.sse. Here are the key ones:

  • AgentModelStreamFragmentMessage - Contains actual LLM output chunks
  • AgentModelStreamStartMessage - Indicates LLM streaming has started
  • AgentModelStreamEndMessage - Indicates LLM streaming has ended
  • AgentStepStartMessage - Indicates a pipeline step has started
  • AgentStepEndMessage - Indicates a pipeline step has ended
  • AgentOutputMessage - Contains step output
Click to expand the full list of message types
[
    AgentPingMessage,
    AgentStartMessage,
    AgentEndMessage,
    AgentStepStartMessage,
    AgentStepHaltMessage,
    AgentStepEndMessage,
    AgentOutputMessage,
    AgentAgentCardMessage,
    AgentDatasearchMessage,
    AgentInvocationMessage,
    AgentModelMessage,
    AgentPythonCodeMessage,
    AgentToolActionMessage,
    AgentModelStreamStartMessage,
    AgentModelStreamEndMessage,
    AgentModelStreamErrorMessage,
    AgentModelStreamUsageMessage,
    AgentModelStreamFragmentMessage,
    AgentAgentCardStreamStartMessage,
    AgentAgentCardStreamErrorMessage,
    AgentAgentCardStreamFragmentMessage,
    AgentAgentCardStreamEndMessage,
    AgentToolRequestMessage,
    AgentToolResponseMessage,
]

Filtering LLM Output

To extract only the actual LLM output text from the stream:

from airia import AiriaClient
from airia.types.sse import AgentModelStreamFragmentMessage

client = AiriaClient(api_key="your_api_key")
# Or with bearer token: client = AiriaClient.with_bearer_token(bearer_token="your_bearer_token")

response = client.pipeline_execution.execute_pipeline(
    pipeline_id="your_pipeline_id",
    user_input="Tell me about quantum computing",
    async_output=True
)

# Filter and display only LLM output
for event in response.stream:
    if isinstance(event, AgentModelStreamFragmentMessage) and event.index != -1:
        print(event.content, end="", flush=True)

Temporary Assistant

The Temporary Assistant endpoint allows you to create and execute AI assistants with custom configurations without needing a persistent pipeline. This is useful for ad-hoc AI interactions with specific behavior requirements.

Synchronous Usage

from airia import AiriaClient

# Initialize client
client = AiriaClient(api_key="your_api_key")

# Execute a temporary assistant with basic configuration
response = client.pipeline_execution.execute_temporary_assistant(
    model_parameters={
        "libraryModelId": "library-model-id",
        "projectModelId": None,
        "modelIdentifierType": "Library",
        "modelIsAvailableinProject": True,
    },
    user_input="Analyze the quarterly sales data and provide insights",
    assistant_name="Data Analyst",
    prompt_parameters={"prompt": "You are a helpful data analyst assistant."},
)

print(response.result)

Synchronous Streaming

from airia import AiriaClient

client = AiriaClient(api_key="your_api_key")

# Execute temporary assistant with streaming
response = client.pipeline_execution.execute_temporary_assistant(
    model_parameters={
        "libraryModelId": "library-model-id",
        "projectModelId": None,
        "modelIdentifierType": "Library",
        "modelIsAvailableinProject": True,
    },
    user_input="Write a function to calculate fibonacci numbers",
    assistant_name="Code Assistant",
    prompt_parameters={
        "prompt": "You are a coding assistant specializing in Python.",
    },
    async_output=True,
)

for chunk in response.stream:
    print(chunk)

Asynchronous Usage

import asyncio
from airia import AiriaAsyncClient

async def main():
    client = AiriaAsyncClient(api_key="your_api_key")

    response = await client.pipeline_execution.execute_temporary_assistant(
        model_parameters={
            "libraryModelId": "library-model-id",
            "projectModelId": None,
            "modelIdentifierType": "Library",
            "modelIsAvailableinProject": True,
        },
        user_input="Explain the latest developments in quantum computing",
        assistant_name="Research Assistant",
        prompt_parameters={
            "prompt": "You are a research assistant with expertise in scientific literature.",
        },
    )

    print(response.result)

asyncio.run(main())

Asynchronous Streaming

import asyncio
from airia import AiriaAsyncClient

async def main():
    client = AiriaAsyncClient(api_key="your_api_key")

    response = await client.pipeline_execution.execute_temporary_assistant(
        model_parameters={
            "libraryModelId": "library-model-id",
            "projectModelId": None,
            "modelIdentifierType": "Library",
            "modelIsAvailableinProject": True,
        },
        user_input="Write a short story about time travel",
        assistant_name="Creative Writer",
        prompt_parameters={
            "prompt": "You are a creative writing assistant.",
        },
        async_output=True,
        save_history=True,
        include_tools_response=False,
        voice_enabled=False
    )

    async for chunk in response.stream:
        print(chunk)

asyncio.run(main())

Structured Outputs

Structured output also works with temporary assistants:

from pydantic import BaseModel, Field

class SentimentAnalysis(BaseModel):
    """Schema for sentiment analysis results."""
    sentiment: str = Field(..., description="Sentiment: positive, negative, or neutral")
    confidence: float = Field(..., description="Confidence score between 0 and 1")
    key_phrases: List[str] = Field(..., description="Key phrases from the text")

response = client.pipeline_execution.execute_temporary_assistant(
    model_parameters={
        "libraryModelId": "library-model-id",
        "projectModelId": None,
        "modelIdentifierType": "Library",
        "modelIsAvailableinProject": True,
    },
    user_input="Analyze: This product exceeded my expectations! Great quality.",
    output_schema=SentimentAnalysis
)

print(f"Sentiment: {response.result.sentiment}")
print(f"Confidence: {response.result.confidence:.2%}")
print(f"Key phrases: {', '.join(response.result.key_phrases)}")