Pipeline Execution
Execute Pipeline
Synchronous Usage
from airia import AiriaClient
# Initialize client (API key can be passed directly or via AIRIA_API_KEY environment variable)
client = AiriaClient(api_key="your_api_key")
# Execute a pipeline
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Tell me about quantum computing"
)
print(response.result)
Synchronous Streaming
from airia import AiriaClient
# Initialize client (API key can be passed directly or via AIRIA_API_KEY environment variable)
client = AiriaClient(api_key="your_api_key")
# Execute a pipeline
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Tell me about quantum computing",
async_output=True
)
for c in response.stream:
print(c)
Asynchronous Usage
import asyncio
from airia import AiriaAsyncClient
async def main():
client = AiriaAsyncClient(api_key="your_api_key")
response = await client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Tell me about quantum computing"
)
print(response.result)
asyncio.run(main())
Asynchronous Streaming
import asyncio
from airia import AiriaAsyncClient
async def main():
client = AiriaAsyncClient(api_key="your_api_key")
response = await client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Tell me about quantum computing",
async_output=True
)
async for c in response.stream:
print(c)
asyncio.run(main())
Working with Files and Images
The SDK supports both local files and remote URLs for pipeline execution:
- Local files: Provide file paths and the SDK automatically uploads them to cloud storage
- Remote URLs: Provide URLs directly - no upload needed, passed straight to the pipeline
The SDK automatically detects whether you're providing a local file path or a URL.
Synchronous Usage with Images
from airia import AiriaClient
# Initialize client
client = AiriaClient(api_key="your_api_key")
# Execute pipeline with local image (automatically uploaded)
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Analyze this image and tell me what you see",
images=["path/to/image.jpg"]
)
# Execute pipeline with remote image URL (used directly)
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Analyze this image and tell me what you see",
images=["https://example.com/images/sample.jpg"]
)
print(response.result)
Synchronous Usage with Files
from airia import AiriaClient
# Initialize client
client = AiriaClient(api_key="your_api_key")
# Execute pipeline with local file (automatically uploaded)
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Summarize this document",
files=["path/to/document.pdf"]
)
# Execute pipeline with remote file URL (used directly)
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Summarize this document",
files=["https://example.com/documents/report.pdf"]
)
print(response.result)
Asynchronous Usage with Images and Files
import asyncio
from airia import AiriaAsyncClient
async def main():
client = AiriaAsyncClient(api_key="your_api_key")
# Execute pipeline with local files (automatically uploaded)
response = await client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Analyze the image and summarize the document",
images=["path/to/image.jpg"],
files=["path/to/document.pdf"]
)
# Execute pipeline with remote URLs (used directly)
response = await client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Analyze the image and summarize the document",
images=["https://example.com/images/photo.jpg"],
files=["https://example.com/documents/report.pdf"]
)
# Mix of local files and remote URLs
response = await client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Compare these two images",
images=[
"path/to/local-image.jpg", # Local file - uploaded
"https://example.com/remote-image.jpg" # Remote URL - used directly
]
)
print(response.result)
asyncio.run(main())
Multiple Files and Images
from airia import AiriaClient
client = AiriaClient(api_key="your_api_key")
# Mix of local files and remote URLs
image_paths = [
"path/to/local-image1.jpg", # Local file
"https://example.com/remote-image2.png", # Remote URL
"path/to/local-image3.gif" # Local file
]
file_paths = [
"path/to/local-doc.pdf", # Local file
"https://example.com/remote-doc.txt", # Remote URL
"https://example.com/another-doc.docx" # Remote URL
]
# Execute pipeline with multiple files and images
# Local files are automatically uploaded, URLs are used directly
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Process all these images and documents",
images=image_paths,
files=file_paths
)
print(response.result)
Structured Output with Pydantic Models
The SDK supports structured output using Pydantic models, allowing you to get type-safe, validated responses that conform to a specific schema. This is useful when you need predictable, parseable output from the LLM.
How It Works
When you provide an output_schema parameter with a Pydantic model, the SDK:
- Automatically injects a system message with the JSON schema
- Instructs the LLM to return data matching that schema
- Parses and validates the response
- Returns a typed Pydantic model instance
Basic Example
from pydantic import BaseModel, Field
from airia import AiriaClient
# Define your output schema
class PersonInfo(BaseModel):
"""Schema for extracting person information."""
name: str = Field(..., description="The person's full name")
age: int = Field(..., description="The person's age in years")
occupation: str = Field(..., description="The person's occupation")
email: str = Field(..., description="The person's email address")
client = AiriaClient(api_key="your_api_key")
# Execute pipeline with structured output
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Extract info: John Doe is a 35-year-old software engineer. Email: john@example.com",
output_schema=PersonInfo # Pass the Pydantic model class
)
# response.result is now a PersonInfo instance!
print(f"Name: {response.result.name}")
print(f"Age: {response.result.age}")
print(f"Occupation: {response.result.occupation}")
print(f"Email: {response.result.email}")
Complex Schema Example
You can use complex Pydantic models with nested structures and lists:
from pydantic import BaseModel, Field
from typing import List
class Task(BaseModel):
title: str = Field(..., description="Task title")
priority: str = Field(..., description="Priority level: low, medium, high")
completed: bool = Field(False, description="Whether the task is completed")
class ProjectPlan(BaseModel):
"""Schema for a project plan."""
project_name: str = Field(..., description="Name of the project")
duration_weeks: int = Field(..., description="Project duration in weeks")
tasks: List[Task] = Field(..., description="List of project tasks")
total_budget: float = Field(..., description="Total project budget in USD")
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Create a project plan for building a mobile app",
output_schema=ProjectPlan
)
# Access structured data
print(f"Project: {response.result.project_name}")
print(f"Duration: {response.result.duration_weeks} weeks")
print(f"Budget: ${response.result.total_budget:,.2f}")
for task in response.result.tasks:
print(f"- {task.title} [{task.priority}]")
Async Usage with Structured Output
import asyncio
from pydantic import BaseModel
from airia import AiriaAsyncClient
class WeatherInfo(BaseModel):
temperature: float
conditions: str
humidity: int
async def main():
client = AiriaAsyncClient(api_key="your_api_key")
response = await client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="What's the weather in San Francisco?",
output_schema=WeatherInfo
)
print(f"Temperature: {response.result.temperature}°C")
print(f"Conditions: {response.result.conditions}")
print(f"Humidity: {response.result.humidity}%")
asyncio.run(main())
Error Handling
The SDK automatically validates the LLM response against your schema. If the response doesn't match, a ValidationError or ValueError is raised:
from pydantic import ValidationError
try:
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Extract person info from this text",
output_schema=PersonInfo
)
print(response.result)
except ValidationError as e:
print(f"Response didn't match schema: {e}")
except ValueError as e:
print(f"Invalid JSON in response: {e}")
Streaming Event Parsing
When using streaming mode (async_output=True), the API returns Server-Sent Events (SSE) that contain different types of messages throughout the pipeline execution. You can parse and filter these events to extract specific information.
Note: Structured output with output_schema is not currently supported in streaming mode (async_output=True). It only works with non-streaming responses.
Available Message Types
The streaming response includes various message types defined in airia.types.sse. Here are the key ones:
AgentModelStreamFragmentMessage- Contains actual LLM output chunksAgentModelStreamStartMessage- Indicates LLM streaming has startedAgentModelStreamEndMessage- Indicates LLM streaming has endedAgentStepStartMessage- Indicates a pipeline step has startedAgentStepEndMessage- Indicates a pipeline step has endedAgentOutputMessage- Contains step output
Click to expand the full list of message types
[
AgentPingMessage,
AgentStartMessage,
AgentEndMessage,
AgentStepStartMessage,
AgentStepHaltMessage,
AgentStepEndMessage,
AgentOutputMessage,
AgentAgentCardMessage,
AgentDatasearchMessage,
AgentInvocationMessage,
AgentModelMessage,
AgentPythonCodeMessage,
AgentToolActionMessage,
AgentModelStreamStartMessage,
AgentModelStreamEndMessage,
AgentModelStreamErrorMessage,
AgentModelStreamUsageMessage,
AgentModelStreamFragmentMessage,
AgentAgentCardStreamStartMessage,
AgentAgentCardStreamErrorMessage,
AgentAgentCardStreamFragmentMessage,
AgentAgentCardStreamEndMessage,
AgentToolRequestMessage,
AgentToolResponseMessage,
]
Filtering LLM Output
To extract only the actual LLM output text from the stream:
from airia import AiriaClient
from airia.types.sse import AgentModelStreamFragmentMessage
client = AiriaClient(api_key="your_api_key")
# Or with bearer token: client = AiriaClient.with_bearer_token(bearer_token="your_bearer_token")
response = client.pipeline_execution.execute_pipeline(
pipeline_id="your_pipeline_id",
user_input="Tell me about quantum computing",
async_output=True
)
# Filter and display only LLM output
for event in response.stream:
if isinstance(event, AgentModelStreamFragmentMessage) and event.index != -1:
print(event.content, end="", flush=True)
Temporary Assistant
The Temporary Assistant endpoint allows you to create and execute AI assistants with custom configurations without needing a persistent pipeline. This is useful for ad-hoc AI interactions with specific behavior requirements.
Synchronous Usage
from airia import AiriaClient
# Initialize client
client = AiriaClient(api_key="your_api_key")
# Execute a temporary assistant with basic configuration
response = client.pipeline_execution.execute_temporary_assistant(
model_parameters={
"libraryModelId": "library-model-id",
"projectModelId": None,
"modelIdentifierType": "Library",
"modelIsAvailableinProject": True,
},
user_input="Analyze the quarterly sales data and provide insights",
assistant_name="Data Analyst",
prompt_parameters={"prompt": "You are a helpful data analyst assistant."},
)
print(response.result)
Synchronous Streaming
from airia import AiriaClient
client = AiriaClient(api_key="your_api_key")
# Execute temporary assistant with streaming
response = client.pipeline_execution.execute_temporary_assistant(
model_parameters={
"libraryModelId": "library-model-id",
"projectModelId": None,
"modelIdentifierType": "Library",
"modelIsAvailableinProject": True,
},
user_input="Write a function to calculate fibonacci numbers",
assistant_name="Code Assistant",
prompt_parameters={
"prompt": "You are a coding assistant specializing in Python.",
},
async_output=True,
)
for chunk in response.stream:
print(chunk)
Asynchronous Usage
import asyncio
from airia import AiriaAsyncClient
async def main():
client = AiriaAsyncClient(api_key="your_api_key")
response = await client.pipeline_execution.execute_temporary_assistant(
model_parameters={
"libraryModelId": "library-model-id",
"projectModelId": None,
"modelIdentifierType": "Library",
"modelIsAvailableinProject": True,
},
user_input="Explain the latest developments in quantum computing",
assistant_name="Research Assistant",
prompt_parameters={
"prompt": "You are a research assistant with expertise in scientific literature.",
},
)
print(response.result)
asyncio.run(main())
Asynchronous Streaming
import asyncio
from airia import AiriaAsyncClient
async def main():
client = AiriaAsyncClient(api_key="your_api_key")
response = await client.pipeline_execution.execute_temporary_assistant(
model_parameters={
"libraryModelId": "library-model-id",
"projectModelId": None,
"modelIdentifierType": "Library",
"modelIsAvailableinProject": True,
},
user_input="Write a short story about time travel",
assistant_name="Creative Writer",
prompt_parameters={
"prompt": "You are a creative writing assistant.",
},
async_output=True,
save_history=True,
include_tools_response=False,
voice_enabled=False
)
async for chunk in response.stream:
print(chunk)
asyncio.run(main())
Structured Outputs
Structured output also works with temporary assistants:
from pydantic import BaseModel, Field
class SentimentAnalysis(BaseModel):
"""Schema for sentiment analysis results."""
sentiment: str = Field(..., description="Sentiment: positive, negative, or neutral")
confidence: float = Field(..., description="Confidence score between 0 and 1")
key_phrases: List[str] = Field(..., description="Key phrases from the text")
response = client.pipeline_execution.execute_temporary_assistant(
model_parameters={
"libraryModelId": "library-model-id",
"projectModelId": None,
"modelIdentifierType": "Library",
"modelIsAvailableinProject": True,
},
user_input="Analyze: This product exceeded my expectations! Great quality.",
output_schema=SentimentAnalysis
)
print(f"Sentiment: {response.result.sentiment}")
print(f"Confidence: {response.result.confidence:.2%}")
print(f"Key phrases: {', '.join(response.result.key_phrases)}")