Source code for cybsi.api.observation.archive
from typing import Any, Dict, Iterable, Optional
from uuid import UUID
from ..internal import BaseAPI, JsonObjectView
from ..pagination import Cursor, Page
from .view import ObservationHeaderView
[docs]
class ArchiveObservationsAPI(BaseAPI):
"""Archive observation API."""
_path = "/enrichment/observations/archives"
[docs]
def filter(
self,
*,
entity_uuid: Optional[UUID] = None,
artifact_uuid: Optional[UUID] = None,
data_source_uuids: Optional[Iterable[UUID]] = None,
reporter_uuids: Optional[Iterable[UUID]] = None,
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> Page["ArchiveObservationView"]:
"""Get page of filtered observations.
Page's items are sorted in descending order of observation time.
Note:
Calls `GET /enrichment/observations/archives`
Args:
artifact_uuid: Artifact identifier.
Filter archive by specified artifact.
entity_uuid: Entity identifier.
Filter archive by specified file entity.
data_source_uuids: List of data source identifiers.
Filter observation by original data source identifiers.
reporter_uuids: List of reporter identifiers.
Filter observation by reporter data source identifiers.
cursor: Page cursor.
limit: Page limit.
Returns:
Page of observation list and next page cursor.
Raises:
:class:`~cybsi.api.error.SemanticError`: query arguments contain errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.EntityNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.ArtifactNotFound`
"""
params: Dict[str, Any] = {}
if data_source_uuids is not None:
params["dataSourceUUID"] = [str(u) for u in data_source_uuids]
if reporter_uuids is not None:
params["reporterUUID"] = [str(u) for u in reporter_uuids]
if entity_uuid is not None:
params["entityUUID"] = str(entity_uuid)
if artifact_uuid is not None:
params["artifactUUID"] = str(artifact_uuid)
if cursor:
params["cursor"] = str(cursor)
if limit:
params["limit"] = str(limit)
resp = self._connector.do_get(self._path, params=params)
page = Page(self._connector.do_get, resp, ArchiveObservationView)
return page
[docs]
def view(self, observation_uuid: UUID) -> "ArchiveObservationView":
"""Get the Archive view.
Note:
Calls `GET /enrichment/observations/archives/{observation_uuid}`.
Args:
observation_uuid: Observation uuid.
Returns:
View of the observation.
Raises:
:class:`~cybsi.api.error.NotFoundError`: observation not found.
"""
path = f"{self._path}/{observation_uuid}"
r = self._connector.do_get(path)
return ArchiveObservationView(r.json())
[docs]
class ArchiveObservationView(ObservationHeaderView):
"""Archive observation view,
as retrieved by :meth:`ArchiveObservationsAPI.view`."""
@property
def content(self) -> "ArchiveObservationContentView":
"""Content."""
return ArchiveObservationContentView(self._get("content"))
[docs]
class ArchiveObservationContentView(JsonObjectView):
"""Archive content.
TODO:
Implement properties.
"""