pipeline_import
Bases: BasePipelineImport
Source code in airia/client/pipeline_import/async_pipeline_import.py
create_agent_from_pipeline_definition(pipeline_definition, agent_import_source=None, conflict_resolution_strategy='SkipConflictingEntities', credential_mappings=None, default_project_behavior=None, project_id=None, correlation_id=None)
async
Create an agent from a pipeline definition (async).
This method imports a complete pipeline from a definition dictionary, creating all necessary components including data sources, prompts, tools, models, and pipeline steps. The definition structure should match the format returned by export_pipeline_definition(), but use JSON-compatible types (no enums or Pydantic classes).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_definition
|
dict
|
The pipeline definition to import. This should be a dictionary with the same structure as the response from pipelines_config.export_pipeline_definition(), but using only JSON-compatible types (strings, numbers, dicts, lists). |
required |
agent_import_source
|
str
|
The source of the agent import. Valid values: "PlatformApi", "ChatCommunity", "PlatformCommunity", "PlatformJson", "Marketplace". If not provided, the import source will not be specified. |
None
|
conflict_resolution_strategy
|
str
|
Strategy for handling conflicting entities. Valid values: - "SkipConflictingEntities" (default): Skip entities that already exist - "RecreateExistingEntities": Recreate entities that already exist - "SeededAgent": Use seeded agent strategy |
'SkipConflictingEntities'
|
credential_mappings
|
dict
|
Mapping of exported credential IDs to existing credential GUIDs in the database. Key: Exported credential ID from agent definition. Value: Existing credential GUID in database (must belong to tenant/project). |
None
|
default_project_behavior
|
str
|
The default project behavior if project_id is null. Valid values: "Library", "BrainFreezeThenDefault", "DefaultProject". |
None
|
project_id
|
str
|
The project ID where the pipeline should be imported. If null, the provision will happen in a library project according to default_project_behavior. |
None
|
correlation_id
|
str
|
A unique identifier for request tracing and logging. If not provided, one will be automatically generated. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
CreateAgentFromPipelineDefinitionResponse |
CreateAgentFromPipelineDefinitionResponse
|
A response object containing the import result, including pipeline ID, department ID, pipeline name, deployment ID (if deployed), and lists of created, updated, and skipped entities. If the import failed, error_message and error_details will contain information about what went wrong. |
Raises:
| Type | Description |
|---|---|
AiriaAPIError
|
If the API request fails, including cases where: - The pipeline definition is invalid (400) - A referenced project_id doesn't exist (404) - Credential mappings are invalid (400) - Authentication fails (401) - Access is forbidden (403) - Server errors (5xx) |
Example
from airia import AiriaAsyncClient
import asyncio
async def main():
client = AiriaAsyncClient(api_key="your_api_key")
# First, export a pipeline definition
exported = await client.pipelines_config.export_pipeline_definition(
pipeline_id="source_pipeline_id"
)
# Convert the exported definition to a JSON-compatible dictionary
# (Pydantic models have a .model_dump() method for this)
pipeline_def = exported.model_dump(by_alias=True, exclude_none=True)
# Import the pipeline into a new project
result = await client.pipeline_import.create_agent_from_pipeline_definition(
pipeline_definition=pipeline_def,
project_id="target_project_id",
conflict_resolution_strategy="SkipConflictingEntities"
)
if result.error_message:
print(f"Import failed: {result.error_message}")
if result.error_details:
for detail in result.error_details:
print(f" - {detail}")
else:
print(f"Pipeline imported successfully!")
print(f"Pipeline ID: {result.pipeline_id}")
print(f"Pipeline Name: {result.pipeline_name}")
print(f"Created {len(result.created_entities or [])} entities")
print(f"Updated {len(result.updated_entities or [])} entities")
print(f"Skipped {len(result.skipped_entities or [])} entities")
asyncio.run(main())
Note
- The pipeline_definition must use JSON-compatible types only
- Use model_dump(by_alias=True) on Pydantic models to get the correct format
- Credential mappings must reference existing credentials in the target system
- The import process will create new GUIDs for most entities
Source code in airia/client/pipeline_import/async_pipeline_import.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | |