"""Generic observation is the main form of observation.
In essence, it's a container of arbitrary facts Cybsi domain model supports.
See Also:
See :ref:`register-generic-observation-example`
for a complete examples of generic observation API usage.
"""
import uuid
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Union, cast
from .. import RefView
from ..dictionary import DictItemAttributeValue
from ..internal import (
BaseAPI,
BaseAsyncAPI,
JsonObjectForm,
JsonObjectView,
rfc3339_timestamp,
)
from ..observable import (
AttributeNames,
AttributeValueView,
EntityForm,
EntityView,
RelationshipKinds,
RelationshipView,
ShareLevels,
)
from ..observable.aggregate_section import _convert_attribute_value_type
from ..pagination import AsyncPage, Cursor, Page
from .view import ObservationHeaderView
_PATH = "/enrichment/observations/generics"
[docs]
class GenericObservationsAPI(BaseAPI):
"""Generic observation API."""
[docs]
def register(self, observation: "GenericObservationForm") -> RefView:
"""Register a generic observation.
The observation is always registered unless it contains logic errors.
Entity key value validation errors are ignored.
Note:
Calls `POST /enrichment/observations/generics`.
Args:
observation: Filled generic observation form.
Returns:
Reference to a newly registered observation.
Raises:
:class:`~cybsi.api.error.SemanticError`: Form contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.DuplicatedEntityAttribute`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidAttribute`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidAttributeValue`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidKeySet`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidRelationship`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidShareLevel`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidTime`
"""
r = self._connector.do_post(path=_PATH, json=observation.json())
return RefView(r.json())
[docs]
def filter(
self,
*,
data_source_uuids: Optional[Iterable[uuid.UUID]] = None,
reporter_uuids: Optional[Iterable[uuid.UUID]] = None,
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> Page["GenericObservationView"]:
"""Get page of filtered generic observation list.
Page's items are sorted in descending order of observation time.
Note:
Calls `GET /enrichment/observations/generics`
Args:
cursor: Page cursor.
limit: Page limit.
data_source_uuids: List of data source identifiers.
Filter observations by original data source identifiers.
reporter_uuids: List of reporter identifiers.
Filter observations by reporter data source identifiers.
Returns:
Page of generic observations list and next page cursor.
Raises:
:class:`~cybsi.api.error.SemanticError`: query arguments contain errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
"""
params: Dict[str, Any] = {}
if data_source_uuids is not None:
params["dataSourceUUID"] = [str(u) for u in data_source_uuids]
if reporter_uuids is not None:
params["reporterUUID"] = [str(u) for u in reporter_uuids]
if cursor:
params["cursor"] = str(cursor)
if limit:
params["limit"] = str(limit)
resp = self._connector.do_get(_PATH, params=params)
page = Page(self._connector.do_get, resp, GenericObservationView)
return page
[docs]
def view(self, observation_uuid: uuid.UUID) -> "GenericObservationView":
"""Get the generic observation view.
Note:
Calls `GET /enrichment/observations/generics/{observation_uuid}`.
Args:
observation_uuid: Observation uuid.
Returns:
View of the observation.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Generic observation not found.
"""
path = f"{_PATH}/{observation_uuid}"
r = self._connector.do_get(path)
return GenericObservationView(r.json())
[docs]
class GenericObservationsAsyncAPI(BaseAsyncAPI):
"""Generic observation asynchronous API."""
[docs]
async def register(self, observation: "GenericObservationForm") -> RefView:
"""
Register a generic observation.
Async analog of :meth:`GenericObservationsAPI.register()`.
"""
r = await self._connector.do_post(path=_PATH, json=observation.json())
return RefView(r.json())
[docs]
async def filter(
self,
*,
data_source_uuids: Optional[Iterable[uuid.UUID]] = None,
reporter_uuids: Optional[Iterable[uuid.UUID]] = None,
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> AsyncPage["GenericObservationView"]:
"""Get page of filtered generic observation list.
Async analog of :meth:`GenericObservationsAPI.filter()`.
"""
params: Dict[str, Any] = {}
if data_source_uuids is not None:
params["dataSourceUUID"] = [str(u) for u in data_source_uuids]
if reporter_uuids is not None:
params["reporterUUID"] = [str(u) for u in reporter_uuids]
if cursor:
params["cursor"] = str(cursor)
if limit:
params["limit"] = str(limit)
resp = await self._connector.do_get(_PATH, params=params)
page = AsyncPage(self._connector.do_get, resp, GenericObservationView)
return page
[docs]
async def view(self, observation_uuid: uuid.UUID) -> "GenericObservationView":
"""Get the generic observation view.
Async analog of :meth:`GenericObservationsAPI.view()`.
"""
path = f"{_PATH}/{observation_uuid}"
r = await self._connector.do_get(path)
return GenericObservationView(r.json())
AttributeValueForm = Union[int, str, bool, uuid.UUID, DictItemAttributeValue, Enum]
[docs]
class GenericObservationView(ObservationHeaderView):
"""Generic observation view,
as retrieved by :meth:`GenericObservationsAPI.view`."""
@property
def content(self) -> "GenericObservationContentView":
"""Content."""
return GenericObservationContentView(self._get("content"))
[docs]
class GenericObservationContentView(JsonObjectView):
"""Generic observation content."""
@property
def entity_relationships(self) -> List["RelationshipView"]:
"""Entity relationships."""
relationships = self._get("entityRelationships")
return [RelationshipView(x) for x in relationships]
@property
def entity_attribute_values(self) -> List["AttributeValueFactView"]:
"""Entity attribute values."""
attributes = self._get("entityAttributeValues")
return [AttributeValueFactView(x) for x in attributes]
[docs]
class AttributeValueFactView(JsonObjectView):
"""Attribute value fact view."""
@property
def entity(self) -> EntityView:
"""Entity of the fact.
Warning:
Ref uuid may be zero uuid,
if entity keys were invalid during registration.
"""
return EntityView(self._get("entity"))
@property
def attribute_name(self) -> AttributeNames:
"""Attribute name."""
return AttributeNames(self._get("attributeName"))
@property
def value(self) -> AttributeValueView:
"""Value of the attribute.
Return type depends on attribute name and entity type.
Usage:
>>> from typing import cast
>>> from cybsi.api.observation import GenericObservationContentView
>>> from cybsi.api.dictionary import DictionaryItemCommonView
>>>
>>> view = GenericObservationContentView()
>>> for v in view.entity_attribute_values:
>>> if v.attribute_name == AttributeNames.MalwareFamilies:
>>> value = cast(DictionaryItemCommonView, v.value)
>>> print(value)
"""
return _convert_attribute_value_type(self.attribute_name, self._get("value"))
@property
def confidence(self) -> float:
"""Fact confidence."""
return float(cast(float, self._get("confidence")))