"""Use this section of API operate enrichment tasks.
The API allows to fetch and filter enrichment tasks.
It also allows registering enrichment task to start enrichment forcibly.
"""
import datetime
import uuid
from typing import Any, Dict, Iterable, List, Optional, Union, cast
from .. import RefView
from ..artifact import ArtifactTypes
from ..data_source import DataSourceCommonView
from ..internal import (
BaseAPI,
BaseAsyncAPI,
JsonObjectForm,
JsonObjectView,
parse_rfc3339_timestamp,
)
from ..observable import EntityView, ShareLevels
from ..observation import ObservationCommonView
from ..pagination import AsyncPage, Cursor, Page
from .enums import (
EnrichmentErrorCodes,
EnrichmentTaskPriorities,
EnrichmentTaskStatuses,
EnrichmentTypes,
)
_PATH = "/enrichment/tasks"
[docs]
class TasksAPI(BaseAPI):
"""Enrichment task API."""
[docs]
def register(self, form: "TaskForm") -> RefView:
"""Register enrichment task to start enrichment forcibly.
Note:
Calls `POST /enrichment/tasks`.
Args:
form: Filled enrichment task form.
Returns:
Reference to enrichment task in API.
Raises:
:class:`~cybsi.api.error.InvalidRequestError`: Form values are invalid.
:class:`~cybsi.api.error.SemanticError`: Form contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.EntityNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.ArtifactNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.EnrichmentNotAllowed`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidKeySet`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidShareLevel`
"""
r = self._connector.do_post(path=_PATH, json=form.json())
return RefView(r.json())
[docs]
def view(self, task_uuid: uuid.UUID) -> "TaskView":
"""Get the enrichment task view.
Note:
Calls `GET /enrichment/tasks/{task_uuid}`.
Args:
task_uuid: enrichment task uuid.
Returns:
View of the enrichment task.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Enrichment task not found.
"""
path = f"{_PATH}/{task_uuid}"
r = self._connector.do_get(path=path)
return TaskView(r.json())
[docs]
def filter(
self,
*,
artifact_uuid: Optional[uuid.UUID] = None,
entity_uuid: Optional[uuid.UUID] = None,
data_sources: Optional[Iterable[uuid.UUID]] = None,
statuses: Optional[Iterable[EnrichmentTaskStatuses]] = None,
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> Page["TaskView"]:
"""Get enrichment task filtration list.
Returns Pending, Completed and Failed enrichment tasks.
Note:
Calls `GET /enrichment/tasks`
Args:
cursor: Page cursor.
limit: Page limit.
artifact_uuid: Artifact identifier.
entity_uuid: Entity identifier.
data_sources: Datasource list.
statuses: List of task statuses.
Returns:
Page with enrichment tasks and
cursor allowing to get next batch of enrichment tasks.
Raises:
:class:`~cybsi.api.error.InvalidRequestError`:
Artifact uuid and entity uuid parameters are missing.
At least one of these parameters must be specified.
:class:`~cybsi.api.error.SemanticError`: Request contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.EntityNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.ArtifactNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
"""
params: Dict[str, Any] = {}
if cursor:
params["cursor"] = str(cursor)
if limit:
params["limit"] = str(limit)
if artifact_uuid:
params["artifactUUID"] = str(artifact_uuid)
if entity_uuid:
params["entityUUID"] = str(entity_uuid)
if data_sources is not None:
params["dataSource"] = [str(ds) for ds in data_sources]
if statuses is not None:
params["status"] = [s.value for s in statuses]
resp = self._connector.do_get(_PATH, params=params)
page = Page(self._connector.do_get, resp, TaskView)
return page
[docs]
class TasksAsyncAPI(BaseAsyncAPI):
"""Enrichment task API."""
[docs]
async def register(self, form: "TaskForm") -> RefView:
"""Register enrichment task to start enrichment forcibly.
Note:
Calls `POST /enrichment/tasks`.
Args:
form: Filled enrichment task form.
Returns:
Reference to enrichment task in API.
Raises:
:class:`~cybsi.api.error.InvalidRequestError`: Form values are invalid.
:class:`~cybsi.api.error.SemanticError`: Form contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.EntityNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.ArtifactNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.EnrichmentNotAllowed`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidKeySet`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidShareLevel`
"""
r = await self._connector.do_post(path=_PATH, json=form.json())
return RefView(r.json())
[docs]
async def view(self, task_uuid: uuid.UUID) -> "TaskView":
"""Get the enrichment task view.
Note:
Calls `GET /enrichment/tasks/{task_uuid}`.
Args:
task_uuid: enrichment task uuid.
Returns:
View of the enrichment task.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Enrichment task not found.
"""
path = f"{_PATH}/{task_uuid}"
r = await self._connector.do_get(path=path)
return TaskView(r.json())
[docs]
async def filter(
self,
*,
artifact_uuid: Optional[uuid.UUID] = None,
entity_uuid: Optional[uuid.UUID] = None,
data_sources: Optional[Iterable[uuid.UUID]] = None,
statuses: Optional[Iterable[EnrichmentTaskStatuses]] = None,
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> AsyncPage["TaskView"]:
"""Get enrichment task filtration list.
Returns Pending, Completed and Failed enrichment tasks.
Note:
Calls `GET /enrichment/tasks`
Args:
cursor: Page cursor.
limit: Page limit.
artifact_uuid: Artifact identifier.
entity_uuid: Entity identifier.
data_sources: Datasource list.
statuses: List of task statuses.
Returns:
Page with enrichment tasks and
cursor allowing to get next batch of enrichment tasks.
Raises:
:class:`~cybsi.api.error.InvalidRequestError`:
Artifact uuid and entity uuid parameters are missing.
At least one of these parameters must be specified.
:class:`~cybsi.api.error.SemanticError`: Request contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.EntityNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.ArtifactNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
"""
params: Dict[str, Any] = {}
if cursor:
params["cursor"] = str(cursor)
if limit:
params["limit"] = str(limit)
if artifact_uuid:
params["artifactUUID"] = str(artifact_uuid)
if entity_uuid:
params["entityUUID"] = str(entity_uuid)
if data_sources is not None:
params["dataSource"] = [str(ds) for ds in data_sources]
if statuses is not None:
params["status"] = [s.value for s in statuses]
resp = await self._connector.do_get(_PATH, params=params)
page = AsyncPage(self._connector.do_get, resp, TaskView)
return page
DNSLookupParamsForm = ExternalDBLookupParamsForm
""" DNS lookup task parameters form.
(Alias of :attr:`ExternalDBLookupParamsForm`).
Args:
entity_uuid: Entity identifier.
Note:
For DNSLookup enrichment type can be only
IPAddress, DomainName entity types.
"""
WhoisLookupParamsForm = ExternalDBLookupParamsForm
""" Whois lookup task parameters form.
(Alias of :attr:`ExternalDBLookupParamsForm`).
Args:
entity_uuid: Entity identifier.
Note:
For WhoisLookup enrichment type can be only
IPAddress, DomainName entity types.
"""
EnrichmentTaskParamsForm = Union[
ArchiveUnpackParamsForm,
ArtifactAnalysisParamsForm,
ArtifactDownloadParamsForm,
ExternalDBLookupParamsForm,
DNSLookupParamsForm,
WhoisLookupParamsForm,
]
[docs]
class ArtifactAnalysisParamsView(JsonObjectView):
"""Task parameters view of :attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.ArtifactAnalysis`.""" # noqa: E501
@property
def artifact(self) -> RefView:
"""Artifact, enrichment target.
Note:
Use :class:`~cybsi.api.artifact.api.ArtifactsAPI`
to retrieve complete artifact information and
its binary content.
"""
return RefView(self._get("artifact"))
@property
def image_id(self) -> Optional[str]:
"""Analyzer-specific image id.
.. deprecated:: 2.13 Always empty. Will be removed in future releases.
"""
return self._get_optional("imageID")
@property
def passwords(self) -> Optional[List[str]]:
"""Password list for unpacking an artifact and/or its attachments."""
return self._get_optional("passwords")
[docs]
class ArchiveUnpackParamsView(JsonObjectView):
"""Task parameters view of :attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.ArchiveUnpack`.""" # noqa: E501
@property
def artifact(self) -> RefView:
"""Artifact, enrichment target.
Note:
Use :class:`~cybsi.api.artifact.api.ArtifactsAPI`
to retrieve complete artifact information and
its binary content.
"""
return RefView(self._get("artifact"))
@property
def password(self) -> Optional[str]:
"""Archive password."""
return self._get_optional("password")
@property
def register_nested_archive_as_file_sample(self) -> Optional[bool]:
"""Flag that determines whether to register attached archives
as FileSample artifacts during unpacking."""
return self._get_optional("registerNestedArchiveAsFileSample")
[docs]
class ArtifactDownloadParamsView(JsonObjectView):
"""Task parameters view of :attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.ArtifactDownload`.""" # noqa: E501
@property
def url(self) -> str:
"""URL of the resource to be loaded."""
return self._get("url")
@property
def share_level(self) -> ShareLevels:
"""Share level."""
return self._get("shareLevel")
@property
def type(self) -> Optional[ArtifactTypes]:
"""Artifact type."""
return self._map_optional("type", ArtifactTypes)
[docs]
class ExternalDBLookupParamsView(JsonObjectView):
"""Task parameters view of :attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.ExternalDBLookup`.""" # noqa: E501
@property
def entity(self) -> EntityView:
"""Entity, enrichment target."""
return EntityView(self._get("entity"))
DNSLookupParamsView = ExternalDBLookupParamsView
"""Task parameters view of :attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.DNSLookup`.
(Alias of :attr:`ExternalDBLookupParamsView`).
""" # noqa: E501
WhoisLookupParamsView = ExternalDBLookupParamsView
"""Task parameters view of :attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.WhoisLookup`.
(Alias of :attr:`ExternalDBLookupParamsView`).
""" # noqa: E501
EnrichmentTaskParamsView = Union[
ArtifactAnalysisParamsView,
ArtifactDownloadParamsView,
ArchiveUnpackParamsView,
ExternalDBLookupParamsView,
DNSLookupParamsView,
WhoisLookupParamsView,
]
[docs]
class ArtifactTaskResultView(JsonObjectView):
"""Artifact result task view for
:attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.ArtifactDownload`
"""
@property
def artifact(self) -> RefView:
return RefView(self._get("artifact"))
[docs]
class ReportTaskResultView(JsonObjectView):
"""Report result task view for
:attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.ArtifactAnalysis`
"""
@property
def report(self) -> RefView:
return RefView(self._get("report"))
[docs]
class ObservationTaskResultView(JsonObjectView):
"""Observation result task view for
:attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.ExternalDBLookup`,
:attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.DNSLookup`,
:attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.WhoisLookup`.
:attr:`~cybsi.api.enrichment.enums.EnrichmentTypes.ArchiveUnpack`.
"""
@property
def observation(self) -> ObservationCommonView:
return ObservationCommonView(self._get("observation"))
EnrichmentTaskResultView = Union[
ArtifactTaskResultView, ReportTaskResultView, ObservationTaskResultView
]
[docs]
class EnrichmentTaskErrorView(JsonObjectView):
"""Enrichment task error view."""
@property
def code(self) -> EnrichmentErrorCodes:
"""Error code."""
return EnrichmentErrorCodes(self._get("code"))
@property
def message(self) -> str:
"""Error massage."""
return self._get("message")
[docs]
class TaskView(RefView):
"""Enrichment task view."""
_param_types = {
EnrichmentTypes.ArtifactDownload: ArtifactDownloadParamsView,
EnrichmentTypes.ArchiveUnpack: ArchiveUnpackParamsView,
EnrichmentTypes.ArtifactAnalysis: ArtifactAnalysisParamsView,
EnrichmentTypes.ExternalDBLookup: ExternalDBLookupParamsView,
EnrichmentTypes.WhoisLookup: ExternalDBLookupParamsView,
EnrichmentTypes.DNSLookup: ExternalDBLookupParamsView,
}
_result_types = {
EnrichmentTypes.ArtifactDownload: ArtifactTaskResultView,
EnrichmentTypes.ArchiveUnpack: ObservationTaskResultView,
EnrichmentTypes.ExternalDBLookup: ObservationTaskResultView,
EnrichmentTypes.WhoisLookup: ObservationTaskResultView,
EnrichmentTypes.DNSLookup: ObservationTaskResultView,
EnrichmentTypes.ArtifactAnalysis: ReportTaskResultView,
}
@property
def priority(self) -> EnrichmentTaskPriorities:
"""Priority."""
return EnrichmentTaskPriorities(self._get("priority"))
@property
def created_at(self) -> datetime.datetime:
"""Date and time of task creation."""
return parse_rfc3339_timestamp(self._get("createdAt"))
@property
def updated_at(self) -> datetime.datetime:
"""Date and time of last task update."""
return parse_rfc3339_timestamp(self._get("updatedAt"))
@property
def data_source(self) -> Optional[DataSourceCommonView]:
"""Data source associated with enricher.
Not empty for ArtifactAnalysis, ExternalDBLookup enrichment type.
"""
return self._map_optional("dataSource", DataSourceCommonView)
@property
def type(self) -> EnrichmentTypes:
"""Enrichment type."""
return EnrichmentTypes(self._get("type"))
@property
def params(self) -> EnrichmentTaskParamsView:
"""Parameters of task. Determine exact type of parameters
using property :attr:`type`.
If enricher was a function, this would be function parameters.
Usage:
>>> from typing import cast
>>> from cybsi.api.enrichment import (
>>> TaskView, EnrichmentTypes,
>>> ExternalDBLookupParamsView
>>> )
>>> task_view = TaskView()
>>> if task_view.type == EnrichmentTypes.ExternalDBLookup:
>>> lookup = cast(ExternalDBLookupParamsView, task_view.params)
>>> print(lookup.entity)
"""
params = self._param_types[self.type](self._get("params"))
return cast(EnrichmentTaskParamsView, params)
@property
def status(self) -> EnrichmentTaskStatuses:
"""Enrichment task status."""
return EnrichmentTaskStatuses(self._get("status"))
@property
def error(self) -> Optional[EnrichmentTaskErrorView]:
"""Enrichment task error.
Can be filled only if the enrichment task status is Failed or Aborted.
"""
return self._map_optional("error", EnrichmentTaskErrorView)
@property
def result(self) -> EnrichmentTaskResultView:
"""Result of task. Determine exact type of result
using property :attr:`type`.
If enricher was a function, this would be function parameters.
Usage:
>>> from typing import cast
>>> from cybsi.api.enrichment import (
>>> TaskView, EnrichmentTypes,
>>> ExternalDBLookupParamsView
>>> )
>>> task_view = TaskView()
>>> if task_view.type == EnrichmentTypes.ExternalDBLookup:
>>> observation = cast(ObservationTaskResultView, task_view.result)
>>> print(observation)
"""
result = self._result_types[self.type](self._get("result"))
return cast(EnrichmentTaskResultView, result)