# coding: utf-8 """ Agent Communication Protocol Specification of the API protocol for communication with an agent. # noqa: E501 The version of the OpenAPI document: v0.2 Generated by OpenAPI Generator (https://openapi-generator.tech) Do not edit the class manually. """ import re # noqa: F401 from typing import Any, Awaitable, List, Optional, Union, overload from pydantic import Field, StrictBytes, StrictStr, validate_arguments from typing_extensions import Annotated from agbenchmark.agent_protocol_client.api_client import ApiClient from agbenchmark.agent_protocol_client.api_response import ApiResponse from agbenchmark.agent_protocol_client.exceptions import ( # noqa: F401 ApiTypeError, ApiValueError, ) from agbenchmark.agent_protocol_client.models.artifact import Artifact from agbenchmark.agent_protocol_client.models.step import Step from agbenchmark.agent_protocol_client.models.step_request_body import StepRequestBody from agbenchmark.agent_protocol_client.models.task import Task from agbenchmark.agent_protocol_client.models.task_request_body import TaskRequestBody class AgentApi(object): """NOTE: This class is auto generated by OpenAPI Generator Ref: https://openapi-generator.tech Do not edit the class manually. """ def __init__(self, api_client=None): if api_client is None: api_client = ApiClient.get_default() self.api_client = api_client @overload async def create_agent_task( self, task_request_body: Optional[TaskRequestBody] = None, **kwargs ) -> Task: # noqa: E501 ... @overload def create_agent_task( self, task_request_body: Optional[TaskRequestBody] = None, async_req: Optional[bool] = True, **kwargs, ) -> Task: # noqa: E501 ... @validate_arguments def create_agent_task( self, task_request_body: Optional[TaskRequestBody] = None, async_req: Optional[bool] = None, **kwargs, ) -> Union[Task, Awaitable[Task]]: # noqa: E501 """Creates a task for the agent. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.create_agent_task(task_request_body, async_req=True) >>> result = thread.get() :param task_request_body: :type task_request_body: TaskRequestBody :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: Task """ kwargs["_return_http_data_only"] = True if "_preload_content" in kwargs: raise ValueError( "Error! Please call the create_agent_task_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" ) if async_req is not None: kwargs["async_req"] = async_req return self.create_agent_task_with_http_info( task_request_body, **kwargs ) # noqa: E501 @validate_arguments def create_agent_task_with_http_info( self, task_request_body: Optional[TaskRequestBody] = None, **kwargs ) -> ApiResponse: # noqa: E501 """Creates a task for the agent. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.create_agent_task_with_http_info(task_request_body, async_req=True) >>> result = thread.get() :param task_request_body: :type task_request_body: TaskRequestBody :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(Task, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = ["task_request_body"] _all_params.extend( [ "async_req", "_return_http_data_only", "_preload_content", "_request_timeout", "_request_auth", "_content_type", "_headers", ] ) # validate the arguments for _key, _val in _params["kwargs"].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method create_agent_task" % _key ) _params[_key] = _val del _params["kwargs"] _collection_formats = {} # process the path parameters _path_params = {} # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get("_headers", {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None if _params["task_request_body"] is not None: _body_params = _params["task_request_body"] # set the HTTP header `Accept` _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # noqa: E501 # set the HTTP header `Content-Type` _content_types_list = _params.get( "_content_type", self.api_client.select_header_content_type(["application/json"]), ) if _content_types_list: _header_params["Content-Type"] = _content_types_list # authentication setting _auth_settings = [] # noqa: E501 _response_types_map = { "200": "Task", } return self.api_client.call_api( "/agent/tasks", "POST", _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get("async_req"), _return_http_data_only=_params.get("_return_http_data_only"), # noqa: E501 _preload_content=_params.get("_preload_content", True), _request_timeout=_params.get("_request_timeout"), collection_formats=_collection_formats, _request_auth=_params.get("_request_auth"), ) @overload async def download_agent_task_artifact( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], artifact_id: Annotated[StrictStr, Field(..., description="ID of the artifact")], **kwargs, ) -> bytearray: # noqa: E501 ... @overload def download_agent_task_artifact( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], artifact_id: Annotated[StrictStr, Field(..., description="ID of the artifact")], async_req: Optional[bool] = True, **kwargs, ) -> bytearray: # noqa: E501 ... @validate_arguments def download_agent_task_artifact( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], artifact_id: Annotated[StrictStr, Field(..., description="ID of the artifact")], async_req: Optional[bool] = None, **kwargs, ) -> Union[bytearray, Awaitable[bytearray]]: # noqa: E501 """Download a specified artifact. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.download_agent_task_artifact(task_id, artifact_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param artifact_id: ID of the artifact (required) :type artifact_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: bytearray """ kwargs["_return_http_data_only"] = True if "_preload_content" in kwargs: raise ValueError( "Error! Please call the download_agent_task_artifact_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" ) if async_req is not None: kwargs["async_req"] = async_req return self.download_agent_task_artifact_with_http_info( task_id, artifact_id, **kwargs ) # noqa: E501 @validate_arguments def download_agent_task_artifact_with_http_info( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], artifact_id: Annotated[StrictStr, Field(..., description="ID of the artifact")], **kwargs, ) -> ApiResponse: # noqa: E501 """Download a specified artifact. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.download_agent_task_artifact_with_http_info(task_id, artifact_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param artifact_id: ID of the artifact (required) :type artifact_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(bytearray, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = ["task_id", "artifact_id"] _all_params.extend( [ "async_req", "_return_http_data_only", "_preload_content", "_request_timeout", "_request_auth", "_content_type", "_headers", ] ) # validate the arguments for _key, _val in _params["kwargs"].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method download_agent_task_artifact" % _key ) _params[_key] = _val del _params["kwargs"] _collection_formats = {} # process the path parameters _path_params = {} if _params["task_id"]: _path_params["task_id"] = _params["task_id"] if _params["artifact_id"]: _path_params["artifact_id"] = _params["artifact_id"] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get("_headers", {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params["Accept"] = self.api_client.select_header_accept( ["application/octet-stream"] ) # noqa: E501 # authentication setting _auth_settings = [] # noqa: E501 _response_types_map = { "200": "bytearray", } return self.api_client.call_api( "/agent/tasks/{task_id}/artifacts/{artifact_id}", "GET", _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get("async_req"), _return_http_data_only=_params.get("_return_http_data_only"), # noqa: E501 _preload_content=_params.get("_preload_content", True), _request_timeout=_params.get("_request_timeout"), collection_formats=_collection_formats, _request_auth=_params.get("_request_auth"), ) @overload async def execute_agent_task_step( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], step_request_body: Optional[StepRequestBody] = None, **kwargs, ) -> Step: # noqa: E501 ... @overload def execute_agent_task_step( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], step_request_body: Optional[StepRequestBody] = None, async_req: Optional[bool] = True, **kwargs, ) -> Step: # noqa: E501 ... @validate_arguments def execute_agent_task_step( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], step_request_body: Optional[StepRequestBody] = None, async_req: Optional[bool] = None, **kwargs, ) -> Union[Step, Awaitable[Step]]: # noqa: E501 """Execute a step in the specified agent task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.execute_agent_task_step(task_id, step_request_body, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param step_request_body: :type step_request_body: StepRequestBody :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: Step """ kwargs["_return_http_data_only"] = True if "_preload_content" in kwargs: raise ValueError( "Error! Please call the execute_agent_task_step_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" ) if async_req is not None: kwargs["async_req"] = async_req return self.execute_agent_task_step_with_http_info( task_id, step_request_body, **kwargs ) # noqa: E501 @validate_arguments def execute_agent_task_step_with_http_info( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], step_request_body: Optional[StepRequestBody] = None, **kwargs, ) -> ApiResponse: # noqa: E501 """Execute a step in the specified agent task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.execute_agent_task_step_with_http_info(task_id, step_request_body, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param step_request_body: :type step_request_body: StepRequestBody :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(Step, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = ["task_id", "step_request_body"] _all_params.extend( [ "async_req", "_return_http_data_only", "_preload_content", "_request_timeout", "_request_auth", "_content_type", "_headers", ] ) # validate the arguments for _key, _val in _params["kwargs"].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method execute_agent_task_step" % _key ) _params[_key] = _val del _params["kwargs"] _collection_formats = {} # process the path parameters _path_params = {} if _params["task_id"]: _path_params["task_id"] = _params["task_id"] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get("_headers", {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None if _params["step_request_body"] is not None: _body_params = _params["step_request_body"] # set the HTTP header `Accept` _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # noqa: E501 # set the HTTP header `Content-Type` _content_types_list = _params.get( "_content_type", self.api_client.select_header_content_type(["application/json"]), ) if _content_types_list: _header_params["Content-Type"] = _content_types_list # authentication setting _auth_settings = [] # noqa: E501 _response_types_map = { "200": "Step", } return self.api_client.call_api( "/agent/tasks/{task_id}/steps", "POST", _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get("async_req"), _return_http_data_only=_params.get("_return_http_data_only"), # noqa: E501 _preload_content=_params.get("_preload_content", True), _request_timeout=_params.get("_request_timeout"), collection_formats=_collection_formats, _request_auth=_params.get("_request_auth"), ) @overload async def get_agent_task( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], **kwargs, ) -> Task: # noqa: E501 ... @overload def get_agent_task( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], async_req: Optional[bool] = True, **kwargs, ) -> Task: # noqa: E501 ... @validate_arguments def get_agent_task( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], async_req: Optional[bool] = None, **kwargs, ) -> Union[Task, Awaitable[Task]]: # noqa: E501 """Get details about a specified agent task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_agent_task(task_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: Task """ kwargs["_return_http_data_only"] = True if "_preload_content" in kwargs: raise ValueError( "Error! Please call the get_agent_task_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" ) if async_req is not None: kwargs["async_req"] = async_req return self.get_agent_task_with_http_info(task_id, **kwargs) # noqa: E501 @validate_arguments def get_agent_task_with_http_info( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], **kwargs, ) -> ApiResponse: # noqa: E501 """Get details about a specified agent task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_agent_task_with_http_info(task_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(Task, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = ["task_id"] _all_params.extend( [ "async_req", "_return_http_data_only", "_preload_content", "_request_timeout", "_request_auth", "_content_type", "_headers", ] ) # validate the arguments for _key, _val in _params["kwargs"].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method get_agent_task" % _key ) _params[_key] = _val del _params["kwargs"] _collection_formats = {} # process the path parameters _path_params = {} if _params["task_id"]: _path_params["task_id"] = _params["task_id"] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get("_headers", {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # noqa: E501 # authentication setting _auth_settings = [] # noqa: E501 _response_types_map = { "200": "Task", } return self.api_client.call_api( "/agent/tasks/{task_id}", "GET", _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get("async_req"), _return_http_data_only=_params.get("_return_http_data_only"), # noqa: E501 _preload_content=_params.get("_preload_content", True), _request_timeout=_params.get("_request_timeout"), collection_formats=_collection_formats, _request_auth=_params.get("_request_auth"), ) @overload async def get_agent_task_step( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], step_id: Annotated[StrictStr, Field(..., description="ID of the step")], **kwargs, ) -> Step: # noqa: E501 ... @overload def get_agent_task_step( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], step_id: Annotated[StrictStr, Field(..., description="ID of the step")], async_req: Optional[bool] = True, **kwargs, ) -> Step: # noqa: E501 ... @validate_arguments def get_agent_task_step( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], step_id: Annotated[StrictStr, Field(..., description="ID of the step")], async_req: Optional[bool] = None, **kwargs, ) -> Union[Step, Awaitable[Step]]: # noqa: E501 """Get details about a specified task step. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_agent_task_step(task_id, step_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param step_id: ID of the step (required) :type step_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: Step """ kwargs["_return_http_data_only"] = True if "_preload_content" in kwargs: raise ValueError( "Error! Please call the get_agent_task_step_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" ) if async_req is not None: kwargs["async_req"] = async_req return self.get_agent_task_step_with_http_info( task_id, step_id, **kwargs ) # noqa: E501 @validate_arguments def get_agent_task_step_with_http_info( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], step_id: Annotated[StrictStr, Field(..., description="ID of the step")], **kwargs, ) -> ApiResponse: # noqa: E501 """Get details about a specified task step. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_agent_task_step_with_http_info(task_id, step_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param step_id: ID of the step (required) :type step_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(Step, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = ["task_id", "step_id"] _all_params.extend( [ "async_req", "_return_http_data_only", "_preload_content", "_request_timeout", "_request_auth", "_content_type", "_headers", ] ) # validate the arguments for _key, _val in _params["kwargs"].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method get_agent_task_step" % _key ) _params[_key] = _val del _params["kwargs"] _collection_formats = {} # process the path parameters _path_params = {} if _params["task_id"]: _path_params["task_id"] = _params["task_id"] if _params["step_id"]: _path_params["step_id"] = _params["step_id"] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get("_headers", {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # noqa: E501 # authentication setting _auth_settings = [] # noqa: E501 _response_types_map = { "200": "Step", } return self.api_client.call_api( "/agent/tasks/{task_id}/steps/{step_id}", "GET", _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get("async_req"), _return_http_data_only=_params.get("_return_http_data_only"), # noqa: E501 _preload_content=_params.get("_preload_content", True), _request_timeout=_params.get("_request_timeout"), collection_formats=_collection_formats, _request_auth=_params.get("_request_auth"), ) @overload async def list_agent_task_artifacts( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], **kwargs, ) -> Any: # noqa: E501 ... @overload def list_agent_task_artifacts( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], async_req: Optional[bool] = True, **kwargs, ) -> Any: # noqa: E501 ... @validate_arguments def list_agent_task_artifacts( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], async_req: Optional[bool] = None, **kwargs, ) -> Union[Any, Awaitable[Any]]: # noqa: E501 """List all artifacts that have been created for the given task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_agent_task_artifacts(task_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: List[Artifact] """ kwargs["_return_http_data_only"] = True if "_preload_content" in kwargs: raise ValueError( "Error! Please call the list_agent_task_artifacts_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" ) if async_req is not None: kwargs["async_req"] = async_req return self.list_agent_task_artifacts_with_http_info( task_id, **kwargs ) # noqa: E501 @validate_arguments def list_agent_task_artifacts_with_http_info( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], **kwargs, ) -> ApiResponse: # noqa: E501 """List all artifacts that have been created for the given task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_agent_task_artifacts_with_http_info(task_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(List[Artifact], status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = ["task_id"] _all_params.extend( [ "async_req", "_return_http_data_only", "_preload_content", "_request_timeout", "_request_auth", "_content_type", "_headers", ] ) # validate the arguments for _key, _val in _params["kwargs"].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method list_agent_task_artifacts" % _key ) _params[_key] = _val del _params["kwargs"] _collection_formats = {} # process the path parameters _path_params = {} if _params["task_id"]: _path_params["task_id"] = _params["task_id"] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get("_headers", {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # noqa: E501 # authentication setting _auth_settings = [] # noqa: E501 _response_types_map = { "200": "Artifacts", } return self.api_client.call_api( "/agent/tasks/{task_id}/artifacts", "GET", _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get("async_req"), _return_http_data_only=_params.get("_return_http_data_only"), # noqa: E501 _preload_content=_params.get("_preload_content", True), _request_timeout=_params.get("_request_timeout"), collection_formats=_collection_formats, _request_auth=_params.get("_request_auth"), ) @overload async def list_agent_task_steps( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], **kwargs, ) -> List[str]: # noqa: E501 ... @overload def list_agent_task_steps( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], async_req: Optional[bool] = True, **kwargs, ) -> List[str]: # noqa: E501 ... @validate_arguments def list_agent_task_steps( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], async_req: Optional[bool] = None, **kwargs, ) -> Union[List[str], Awaitable[List[str]]]: # noqa: E501 """List all steps for the specified task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_agent_task_steps(task_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: List[str] """ kwargs["_return_http_data_only"] = True if "_preload_content" in kwargs: raise ValueError( "Error! Please call the list_agent_task_steps_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" ) if async_req is not None: kwargs["async_req"] = async_req return self.list_agent_task_steps_with_http_info( task_id, **kwargs ) # noqa: E501 @validate_arguments def list_agent_task_steps_with_http_info( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], **kwargs, ) -> ApiResponse: # noqa: E501 """List all steps for the specified task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_agent_task_steps_with_http_info(task_id, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(List[str], status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = ["task_id"] _all_params.extend( [ "async_req", "_return_http_data_only", "_preload_content", "_request_timeout", "_request_auth", "_content_type", "_headers", ] ) # validate the arguments for _key, _val in _params["kwargs"].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method list_agent_task_steps" % _key ) _params[_key] = _val del _params["kwargs"] _collection_formats = {} # process the path parameters _path_params = {} if _params["task_id"]: _path_params["task_id"] = _params["task_id"] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get("_headers", {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # noqa: E501 # authentication setting _auth_settings = [] # noqa: E501 _response_types_map = { "200": "List[str]", } return self.api_client.call_api( "/agent/tasks/{task_id}/steps", "GET", _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get("async_req"), _return_http_data_only=_params.get("_return_http_data_only"), # noqa: E501 _preload_content=_params.get("_preload_content", True), _request_timeout=_params.get("_request_timeout"), collection_formats=_collection_formats, _request_auth=_params.get("_request_auth"), ) @overload async def list_agent_tasks_ids(self, **kwargs) -> List[str]: # noqa: E501 ... @overload def list_agent_tasks_ids( self, async_req: Optional[bool] = True, **kwargs ) -> List[str]: # noqa: E501 ... @validate_arguments def list_agent_tasks_ids( self, async_req: Optional[bool] = None, **kwargs ) -> Union[List[str], Awaitable[List[str]]]: # noqa: E501 """List all tasks that have been created for the agent. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_agent_tasks_ids(async_req=True) >>> result = thread.get() :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: List[str] """ kwargs["_return_http_data_only"] = True if "_preload_content" in kwargs: raise ValueError( "Error! Please call the list_agent_tasks_ids_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" ) if async_req is not None: kwargs["async_req"] = async_req return self.list_agent_tasks_ids_with_http_info(**kwargs) # noqa: E501 @validate_arguments def list_agent_tasks_ids_with_http_info( self, **kwargs ) -> ApiResponse: # noqa: E501 """List all tasks that have been created for the agent. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_agent_tasks_ids_with_http_info(async_req=True) >>> result = thread.get() :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(List[str], status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [] _all_params.extend( [ "async_req", "_return_http_data_only", "_preload_content", "_request_timeout", "_request_auth", "_content_type", "_headers", ] ) # validate the arguments for _key, _val in _params["kwargs"].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method list_agent_tasks_ids" % _key ) _params[_key] = _val del _params["kwargs"] _collection_formats = {} # process the path parameters _path_params = {} # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get("_headers", {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # noqa: E501 # authentication setting _auth_settings = [] # noqa: E501 _response_types_map = { "200": "List[str]", } return self.api_client.call_api( "/agent/tasks", "GET", _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get("async_req"), _return_http_data_only=_params.get("_return_http_data_only"), # noqa: E501 _preload_content=_params.get("_preload_content", True), _request_timeout=_params.get("_request_timeout"), collection_formats=_collection_formats, _request_auth=_params.get("_request_auth"), ) @overload async def upload_agent_task_artifacts( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], file: Annotated[ Union[StrictBytes, StrictStr], Field(..., description="File to upload.") ], relative_path: Annotated[ Optional[StrictStr], Field( description="Relative path of the artifact in the agent's workspace." ), ] = None, **kwargs, ) -> Artifact: # noqa: E501 ... @overload def upload_agent_task_artifacts( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], file: Annotated[ Union[StrictBytes, StrictStr], Field(..., description="File to upload.") ], relative_path: Annotated[ Optional[StrictStr], Field( description="Relative path of the artifact in the agent's workspace." ), ] = None, async_req: Optional[bool] = True, **kwargs, ) -> Artifact: # noqa: E501 ... @validate_arguments def upload_agent_task_artifacts( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], file: Annotated[ Union[StrictBytes, StrictStr], Field(..., description="File to upload.") ], relative_path: Annotated[ Optional[StrictStr], Field( description="Relative path of the artifact in the agent's workspace." ), ] = None, async_req: Optional[bool] = None, **kwargs, ) -> Union[Artifact, Awaitable[Artifact]]: # noqa: E501 """Upload an artifact for the specified task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.upload_agent_task_artifacts(task_id, file, relative_path, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param file: File to upload. (required) :type file: bytearray :param relative_path: Relative path of the artifact in the agent's workspace. :type relative_path: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: Artifact """ kwargs["_return_http_data_only"] = True if "_preload_content" in kwargs: raise ValueError( "Error! Please call the upload_agent_task_artifacts_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" ) if async_req is not None: kwargs["async_req"] = async_req return self.upload_agent_task_artifacts_with_http_info( task_id, file, relative_path, **kwargs ) # noqa: E501 @validate_arguments def upload_agent_task_artifacts_with_http_info( self, task_id: Annotated[StrictStr, Field(..., description="ID of the task")], file: Annotated[ Union[StrictBytes, StrictStr], Field(..., description="File to upload.") ], relative_path: Annotated[ Optional[StrictStr], Field( description="Relative path of the artifact in the agent's workspace." ), ] = None, **kwargs, ) -> ApiResponse: # noqa: E501 """Upload an artifact for the specified task. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.upload_agent_task_artifacts_with_http_info(task_id, file, relative_path, async_req=True) >>> result = thread.get() :param task_id: ID of the task (required) :type task_id: str :param file: File to upload. (required) :type file: bytearray :param relative_path: Relative path of the artifact in the agent's workspace. :type relative_path: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(Artifact, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = ["task_id", "file", "relative_path"] _all_params.extend( [ "async_req", "_return_http_data_only", "_preload_content", "_request_timeout", "_request_auth", "_content_type", "_headers", ] ) # validate the arguments for _key, _val in _params["kwargs"].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method upload_agent_task_artifacts" % _key ) _params[_key] = _val del _params["kwargs"] _collection_formats = {} # process the path parameters _path_params = {} if _params["task_id"]: _path_params["task_id"] = _params["task_id"] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get("_headers", {})) # process the form parameters _form_params = [] _files = {} if _params["file"]: _files["file"] = _params["file"] if _params["relative_path"]: _form_params.append(("relative_path", _params["relative_path"])) # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # noqa: E501 # set the HTTP header `Content-Type` _content_types_list = _params.get( "_content_type", self.api_client.select_header_content_type(["multipart/form-data"]), ) if _content_types_list: _header_params["Content-Type"] = _content_types_list # authentication setting _auth_settings = [] # noqa: E501 _response_types_map = { "200": "Artifact", } return self.api_client.call_api( "/agent/tasks/{task_id}/artifacts", "POST", _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get("async_req"), _return_http_data_only=_params.get("_return_http_data_only"), # noqa: E501 _preload_content=_params.get("_preload_content", True), _request_timeout=_params.get("_request_timeout"), collection_formats=_collection_formats, _request_auth=_params.get("_request_auth"), )