Source code for cybsi.api.enrichment.task_queue
"""Use this section of API to implement a custom enricher.
The API allows to fetch tasks assigned to enricher.
It also allows to publish task execution results and errors.
See Also:
See :ref:`implement-custom-external-db-example`
for a complete example of enricher capable of IP enrichment.
See :ref:`implement-custom-analyzer-example`
for a complete example of enricher capable of File sample enrichment.
"""
import datetime
import uuid
from typing import Iterable, List, Union, cast
from .. import RefView
from ..data_source import DataSourceCommonView
from ..internal import BaseAPI, BaseAsyncAPI, JsonObjectForm, parse_rfc3339_timestamp
from .enums import EnrichmentErrorCodes, EnrichmentTaskPriorities, EnrichmentTypes
from .tasks import ArtifactAnalysisParamsView, ExternalDBLookupParamsView
_PATH = "/enrichment/task-queue"
[docs]
class TaskQueueAPI(BaseAPI):
"""Task queue API.
.. versionadded:: 2.7
"""
[docs]
def get_assigned_tasks(self, limit: int = 1) -> List["AssignedTaskView"]:
"""Assign a batch of pending enrichment tasks for execution by client.
.. versionadded:: 2.7
All returned tasks have status `Executing`.
Note:
Calls `POST /enrichment/task-queue/executing-tasks`.
Args:
limit: Maximum task batch size.
Returns:
A batch of tasks for execution.
Warning:
Please wait some time if :meth:`get_assigned_tasks`
returns empty list before calling it again.
"""
path = f"{_PATH}/executing-tasks"
r = self._connector.do_post(path=path, json={"limit": limit})
return [AssignedTaskView(t) for t in r.json()]
[docs]
def complete_tasks(self, completed_tasks: Iterable["CompletedTaskForm"]) -> None:
"""Register successful task results.
.. versionadded:: 2.7
Note:
Calls `POST /enrichment/task-queue/completed-tasks`.
Args:
completed_tasks: List of filled forms of completed tasks.
Returns:
None on successful registration of results.
Raises:
:class:`~cybsi.api.error.ForbiddenError`: Enricher cannot report
result of one of tasks.
:class:`~cybsi.api.error.SemanticError`: One of forms contains logic errors.
Note:
ForbiddenError error codes:
* :attr:`~cybsi.api.error.ForbiddenErrorCodes.NotOwner`
-- Task belongs to other enricher.
SemanticError codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidTaskStatus`
-- Current task status is not ``Executing``.
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidTaskResult`
-- Result has a broken link to observation, report or artifact.
"""
path = f"{_PATH}/completed-tasks"
task_jsons = [r.json() for r in completed_tasks]
self._connector.do_post(path=path, json={"tasks": task_jsons})
[docs]
def fail_tasks(self, failed_tasks: Iterable["FailedTaskForm"]) -> None:
"""Register failed task errors.
.. versionadded:: 2.7
Note:
Calls `POST /enrichment/task-queue/failed-tasks`.
Args:
failed_tasks: List of filled forms of failed tasks.
Returns:
None on successful registration of errors.
Raises:
:class:`~cybsi.api.error.ForbiddenError`: Enricher cannot report
result of one of tasks.
:class:`~cybsi.api.error.SemanticError`: One of forms contains logic errors.
Note:
ForbiddenError codes:
* :attr:`~cybsi.api.error.ForbiddenErrorCodes.NotOwner`
-- Task belongs to other enricher.
SemanticError codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidTaskStatus`
-- Current task status is not ``Executing``.
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidErrorCode`
-- Error code is invalid for tasks of such type.
"""
path = f"{_PATH}/failed-tasks"
task_jsons = [r.json() for r in failed_tasks]
self._connector.do_post(path=path, json={"tasks": task_jsons})
[docs]
class TaskQueueAsyncAPI(BaseAsyncAPI):
"""Task queue API.
.. versionadded:: 2.7
"""
[docs]
async def get_assigned_tasks(self, limit: int = 1) -> List["AssignedTaskView"]:
"""Assign a batch of pending enrichment tasks for execution by client.
.. versionadded:: 2.7
All returned tasks have status `Executing`.
Note:
Calls `POST /enrichment/task-queue/executing-tasks`.
Args:
limit: Maximum task batch size.
Returns:
A batch of tasks for execution.
Warning:
Please wait some time if :meth:`get_assigned_tasks`
returns empty list before calling it again.
"""
path = f"{_PATH}/executing-tasks"
r = await self._connector.do_post(path=path, json={"limit": limit})
return [AssignedTaskView(t) for t in r.json()]
[docs]
async def complete_tasks(
self, completed_tasks: Iterable["CompletedTaskForm"]
) -> None:
"""Register successful task results.
.. versionadded:: 2.7
Note:
Calls `POST /enrichment/task-queue/completed-tasks`.
Args:
completed_tasks: List of filled forms of completed tasks.
Returns:
None on successful registration of results.
Raises:
:class:`~cybsi.api.error.ForbiddenError`: Enricher cannot report
result of one of tasks.
:class:`~cybsi.api.error.SemanticError`: One of forms contains logic errors.
Note:
ForbiddenError error codes:
* :attr:`~cybsi.api.error.ForbiddenErrorCodes.NotOwner`
-- Task belongs to other enricher.
SemanticError codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidTaskStatus`
-- Current task status is not ``Executing``.
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidTaskResult`
-- Result has a broken link to observation, report or artifact.
"""
path = f"{_PATH}/completed-tasks"
task_jsons = [r.json() for r in completed_tasks]
await self._connector.do_post(path=path, json={"tasks": task_jsons})
[docs]
async def fail_tasks(self, failed_tasks: Iterable["FailedTaskForm"]) -> None:
"""Register failed task errors.
.. versionadded:: 2.7
Note:
Calls `POST /enrichment/task-queue/failed-tasks`.
Args:
failed_tasks: List of filled forms of failed tasks.
Returns:
None on successful registration of errors.
Raises:
:class:`~cybsi.api.error.ForbiddenError`: Enricher cannot report
result of one of tasks.
:class:`~cybsi.api.error.SemanticError`: One of forms contains logic errors.
Note:
ForbiddenError codes:
* :attr:`~cybsi.api.error.ForbiddenErrorCodes.NotOwner`
-- Task belongs to other enricher.
SemanticError codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidTaskStatus`
-- Current task status is not ``Executing``.
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidErrorCode`
-- Error code is invalid for tasks of such type.
"""
path = f"{_PATH}/failed-tasks"
task_jsons = [r.json() for r in failed_tasks]
await self._connector.do_post(path=path, json={"tasks": task_jsons})
EnrichmentTaskQueueParamsView = Union[
ArtifactAnalysisParamsView, ExternalDBLookupParamsView
]
[docs]
class AssignedTaskView(RefView):
"""Task assigned to enricher for execution."""
_param_types = {
EnrichmentTypes.ArtifactAnalysis: ArtifactAnalysisParamsView,
EnrichmentTypes.ExternalDBLookup: ExternalDBLookupParamsView,
}
@property
def priority(self) -> EnrichmentTaskPriorities:
"""Priority."""
return EnrichmentTaskPriorities(self._get("priority"))
@property
def created_at(self) -> datetime.datetime:
"""Date and time of task creation."""
return parse_rfc3339_timestamp(self._get("createdAt"))
@property
def updated_at(self) -> datetime.datetime:
"""Date and time of last task update."""
return parse_rfc3339_timestamp(self._get("updatedAt"))
@property
def data_source(self) -> DataSourceCommonView:
"""Data source associated with enricher."""
return DataSourceCommonView(self._get("dataSource"))
@property
def type(self) -> "EnrichmentTypes":
"""Enrichment type.
Note:
Possible values are subset of :class:`.enums.EnrichmentTypes` values.
Only :attr:`.enums.EnrichmentTypes.ArtifactAnalysis` and
:attr:`.enums.EnrichmentTypes.ExternalDBLookup` are possible here.
""" # noqa: E501
return EnrichmentTypes(self._get("type"))
@property
def params(self) -> EnrichmentTaskQueueParamsView:
"""Parameters of task. Determine exact type of parameters
using property :attr:`type`.
If enricher was a function, this would be function parameters.
Usage:
>>> from typing import cast
>>> from cybsi.api.enrichment import (
>>> AssignedTaskView, EnrichmentTypes,
>>> ExternalDBLookupParamsView
>>> )
>>> task_view = AssignedTaskView()
>>> if task_view.type == EnrichmentTypes.ExternalDBLookup:
>>> lookup = cast(ExternalDBLookupParamsView, task_view.params)
>>> print(lookup.entity)
"""
params = self._param_types[self.type](self._get("params"))
return cast(EnrichmentTaskQueueParamsView, params)
TaskResultForm = Union[
TaskResultArtifactForm, TaskResultObservationForm, TaskResultReportForm
]