Skip to content

Pipeline Import

The Airia Python SDK provides functionality to import pipelines from their exported definitions. This allows you to create new pipelines by importing complete pipeline configurations including data sources, prompts, tools, models, and all associated components.

Create Agent from Pipeline Definition

The create_agent_from_pipeline_definition method imports a complete pipeline from a definition dictionary and creates all necessary components in your Airia environment.

Basic Usage

from airia import AiriaClient

# Initialize client
client = AiriaClient()

# First, export a pipeline definition from an existing pipeline
exported = client.pipelines_config.export_pipeline_definition(
    pipeline_id="source-pipeline-id"
)

# Convert the exported definition to a JSON-compatible dictionary
# (Pydantic models have a .model_dump() method for this)
pipeline_def = exported.model_dump(by_alias=True, exclude_none=True)

# Import the pipeline
result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def
)

# Check the import result
if result.error_message:
    print(f"Import failed: {result.error_message}")
    if result.error_details:
        for detail in result.error_details:
            print(f"  - {detail}")
else:
    print(f"Pipeline imported successfully!")
    print(f"Pipeline ID: {result.pipeline_id}")
    print(f"Pipeline Name: {result.pipeline_name}")
    print(f"Created {len(result.created_entities or [])} entities")

Import into a Specific Project

You can import a pipeline into a specific project by providing the project_id:

# Import into a specific project
result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    project_id="target-project-id"
)

print(f"Imported pipeline {result.pipeline_name} into project")

Conflict Resolution Strategies

When importing a pipeline, you can specify how to handle existing entities that conflict with the import:

# Skip conflicting entities (default)
result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    conflict_resolution_strategy="SkipConflictingEntities"
)

# Recreate existing entities
result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    conflict_resolution_strategy="RecreateExistingEntities"
)

# Use seeded agent strategy
result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    conflict_resolution_strategy="SeededAgent"
)

Available conflict resolution strategies:

  • SkipConflictingEntities (default): Skip entities that already exist in the target environment
  • RecreateExistingEntities: Recreate entities even if they already exist
  • SeededAgent: Use the seeded agent import strategy

Credential Mappings

When importing pipelines that use credentials, you can map the exported credential IDs to existing credentials in your target environment:

# Map exported credential IDs to existing credential GUIDs
credential_mappings = {
    "exported_credential_id_1": "00000000-0000-0000-0000-000000000001",
    "exported_credential_id_2": "00000000-0000-0000-0000-000000000002",
}

result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    credential_mappings=credential_mappings
)

print(f"Imported with credential mappings: {result.pipeline_id}")

Agent Import Source

You can specify the source of the agent import:

result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    agent_import_source="PlatformApi"  # or "ChatCommunity", "PlatformCommunity", "PlatformJson", "Marketplace"
)

Default Project Behavior

When project_id is not specified, you can control the default project behavior:

# Import into library
result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    default_project_behavior="Library"
)

# Use BrainFreeze then default
result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    default_project_behavior="BrainFreezeThenDefault"
)

# Use default project
result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    default_project_behavior="DefaultProject"
)

Complete Example with All Options

from airia import AiriaClient

# Initialize client
client = AiriaClient()

# Export a pipeline definition
source_pipeline_id = "source-pipeline-id"
exported = client.pipelines_config.export_pipeline_definition(
    pipeline_id=source_pipeline_id
)

# Prepare pipeline definition
pipeline_def = exported.model_dump(by_alias=True, exclude_none=True)

# Prepare credential mappings
credential_mappings = {
    "old_credential_id": "new-credential-guid",
}

# Import with all options
result = client.pipeline_import.create_agent_from_pipeline_definition(
    pipeline_definition=pipeline_def,
    agent_import_source="PlatformJson",
    conflict_resolution_strategy="SkipConflictingEntities",
    credential_mappings=credential_mappings,
    default_project_behavior="Library",
    project_id="target-project-id"
)

# Process the result
if result.error_message:
    print(f"Import failed: {result.error_message}")
    if result.error_details:
        print("\nError details:")
        for detail in result.error_details:
            print(f"  - {detail}")
else:
    print(f"Import successful!")
    print(f"\nPipeline Information:")
    print(f"  ID: {result.pipeline_id}")
    print(f"  Name: {result.pipeline_name}")
    print(f"  Department: {result.department_id}")

    if result.deployment_id:
        print(f"  Deployment: {result.deployment_id}")

    # Display created entities
    if result.created_entities:
        print(f"\nCreated {len(result.created_entities)} entities:")
        for entity in result.created_entities:
            print(f"  - {entity.entity_type}: {entity.entity_name} ({entity.entity_id})")

    # Display updated entities
    if result.updated_entities:
        print(f"\nUpdated {len(result.updated_entities)} entities:")
        for entity in result.updated_entities:
            reason = f" - {entity.reason}" if entity.reason else ""
            print(f"  - {entity.entity_type}: {entity.entity_name}{reason}")

    # Display skipped entities
    if result.skipped_entities:
        print(f"\nSkipped {len(result.skipped_entities)} entities:")
        for entity in result.skipped_entities:
            reason = f" - {entity.reason}" if entity.reason else ""
            print(f"  - {entity.entity_type}: {entity.entity_name}{reason}")

Async Usage

The pipeline import functionality is also available in async mode:

import asyncio
from airia import AiriaAsyncClient

async def import_pipeline():
    # Initialize async client
    client = AiriaAsyncClient()

    # Export a pipeline definition
    exported = await client.pipelines_config.export_pipeline_definition(
        pipeline_id="source-pipeline-id"
    )

    # Prepare pipeline definition
    pipeline_def = exported.model_dump(by_alias=True, exclude_none=True)

    # Import the pipeline
    result = await client.pipeline_import.create_agent_from_pipeline_definition(
        pipeline_definition=pipeline_def,
        project_id="target-project-id"
    )

    if result.error_message:
        print(f"Import failed: {result.error_message}")
    else:
        print(f"Successfully imported pipeline: {result.pipeline_name}")
        print(f"Pipeline ID: {result.pipeline_id}")

    await client.close()

# Run the async function
asyncio.run(import_pipeline())