"""Use this section of API to operate config rules.
"""
import uuid
from typing import Any, Iterable, List, Optional
from .. import RefView, Tag
from ..api import Nullable, _unwrap_nullable
from ..artifact import ArtifactTypes
from ..internal import BaseAPI, JsonObject, JsonObjectForm
from ..observable import EntityTypes
from ..pagination import Cursor, Page
from ..view import _TaggedRefView
from .enums import EnrichmentTriggerTypes, EnrichmentTypes
[docs]
class ConfigRulesAPI(BaseAPI):
"""Config rules API."""
_path = "/enrichment/config/rules"
[docs]
def view(self, rule_uuid: uuid.UUID) -> "ConfigRuleView":
"""Get the config rule view.
.. versionadded:: 2.7
Note:
Calls `GET /enrichment/config/rules/{rule_uuid}`.
Args:
rule_uuid: config rule uuid.
Returns:
View of the config rule.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Config rule not found.
"""
path = f"{self._path}/{rule_uuid}"
r = self._connector.do_get(path=path)
return ConfigRuleView(r)
[docs]
def register(self, rule: "ConfigRuleForm") -> RefView:
"""Register config rule.
.. versionadded:: 2.7
Note:
Calls `POST /enrichment/config/rules`.
Args:
rule: Filled config rule form.
Returns:
Reference to config rule in API.
Raises:
:class:`~cybsi.api.error.SemanticError`: Form contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidRule`
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.MisconfiguredDataSource`
"""
r = self._connector.do_post(path=self._path, json=rule.json())
return RefView(r.json())
[docs]
def filter(
self,
*,
data_source_uuids: Optional[Iterable[uuid.UUID]] = None,
trigger_data_source_uuids: Optional[Iterable[uuid.UUID]] = None,
enrichment_types: Optional[Iterable[EnrichmentTypes]] = None,
artifact_types: Optional[Iterable[ArtifactTypes]] = None,
entity_types: Optional[Iterable[EntityTypes]] = None,
trigger_types: Optional[Iterable[EnrichmentTriggerTypes]] = None,
is_disabled: Optional[bool] = None,
name: Optional[str] = None,
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> Page["ConfigRuleCommonView"]:
"""Filter config rules.
.. versionchanged:: 2.8
Added new parameters: `enrichment_types`, `artifact_types`, `entity_types`,
`trigger_types`,`is_disabled`, `rule_name`.
Parameters `data_source_uuids`, `trigger_data_source_uuids` changed to list.
Note:
Calls `GET /enrichment/config/rules`.
Args:
data_source_uuids: Filter config rules by list of associated data sources.
trigger_data_source_uuids: Filter config rules by list of data sources
which is the trigger for rules.
enrichment_types: Filter config rules by list of enrichment type.
artifact_types: Filter config rules by list of artifact type.
entity_types: Filter config rules by list of entity type.
trigger_types: Filter config rules by list of enrichment trigger type.
is_disabled: Filter config rules by disabled status.
name: Filter config rules by name.
cursor: Page cursor.
limit: Page limit.
Returns:
Page of config rules.
Raises:
:class:`~cybsi.api.error.SemanticError`: query arguments contain errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
"""
params: JsonObject = {}
if data_source_uuids is not None:
params["dataSourceUUID"] = [str(u) for u in data_source_uuids]
if trigger_data_source_uuids is not None:
params["triggerDataSourceUUID"] = [
str(u) for u in trigger_data_source_uuids
]
if enrichment_types is not None:
params["enrichmentType"] = [t.value for t in enrichment_types]
if artifact_types is not None:
params["artifactType"] = [t.value for t in artifact_types]
if entity_types is not None:
params["entityType"] = [t.value for t in entity_types]
if trigger_types is not None:
params["triggerType"] = [t.value for t in trigger_types]
if is_disabled is not None:
params["isDisabled"] = is_disabled
if name is not None:
params["name"] = name
if limit is not None:
params["limit"] = str(limit)
if cursor is not None:
params["cursor"] = str(cursor)
resp = self._connector.do_get(self._path, params=params)
page = Page(self._connector.do_get, resp, ConfigRuleCommonView)
return page
[docs]
def edit(
self,
rule_uuid: uuid.UUID,
tag: Tag,
*,
name: Optional[str] = None,
is_disabled: Optional[bool] = None,
triggers: Optional[Iterable[EnrichmentTriggerTypes]] = None,
trigger_data_source_uuids: Nullable[Iterable[uuid.UUID]] = None,
artifact_types: Nullable[Iterable[ArtifactTypes]] = None,
entity_types: Nullable[Iterable[EntityTypes]] = None,
data_source_uuids: Optional[Iterable[uuid.UUID]] = None,
throttling_interval: Nullable[int] = None,
enrichment: Optional[EnrichmentTypes] = None,
) -> None:
"""Edit config rule.
.. versionadded:: 2.7
Note:
Calls `PATCH /enrichment/config/rules/{rule_uuid}`.
Args:
rule_uuid: config rule identifier.
tag: :attr:`ConfigRuleView.tag` value. Use :meth:`view` to retrieve it.
name: Config rule name.
triggers: Non-empty list of enrichment trigger types.
data_source_uuids:
Non-empty data source uuid list associated with enrichers.
enrichment: Enrichment type.
Must be `ArtifactAnalysis` or `ExternalDBLookup`
type for rule registration.
is_disabled: Disabled config rule flag.
trigger_data_source_uuids:
Entity/Artifact registrar associated datasource list
artifact_types: List of artifact types.
Not-empty for `ArtifactAnalysis` enrichment type.
entity_types: List of entity types.
Not-empty for `ExternalDBLookup` enrichment type.
throttling_interval: Throttling interval in seconds.
Required for `OnRegistration` trigger type and
must be minimum 3600 sec.
Raises:
:class:`~cybsi.api.error.SemanticError`: Query arguments contain errors.
:class:`~cybsi.api.error.InvalidRequestError`:
Provided arguments have invalid values.
:class:`~cybsi.api.error.NotFoundError`: Config rule not found.
:class:`~cybsi.api.error.ResourceModifiedError`:
Config rule changed since last request. Retry using updated tag.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.MisconfiguredDataSource`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidRule`
* :attr:`~cybsi.api.error.SemanticErrorCodes.ImmutableValue`
"""
data = _create_rule_form_data(
name=name,
is_disabled=is_disabled,
triggers=triggers,
trigger_data_source_uuids=trigger_data_source_uuids,
data_source_uuids=data_source_uuids,
artifact_types=artifact_types,
entity_types=entity_types,
throttling_interval=throttling_interval,
enrichment=enrichment,
)
path = f"{self._path}/{rule_uuid}"
self._connector.do_patch(path=path, tag=tag, json=data)
[docs]
class ConfigRuleCommonView(RefView):
"""Config rule object view."""
@property
def name(self) -> str:
"""Config rule name."""
return self._get("name")
@property
def is_disabled(self) -> bool:
"""Disabled config rule flag."""
return self._get("isDisabled")
@property
def is_builtin(self) -> bool:
"""Builtin config rule flag.
Builtin rules can be set to disabled state, other attributes are immutable.
"""
return self._get("isBuiltin")
@property
def triggers(self) -> List[EnrichmentTriggerTypes]:
"""List of enrichment trigger types."""
return [EnrichmentTriggerTypes(t) for t in self._get("triggers")]
@property
def enrichment(self) -> EnrichmentTypes:
"""Enrichment type."""
return EnrichmentTypes(self._get("enrichment"))
@property
def artifact_types(self) -> Optional[List[ArtifactTypes]]:
"""List of artifact types.
Not empty for `ArtifactAnalysis` and `ArchiveUnpack` enrichment types.
"""
return self._map_list_optional("artifactTypes", ArtifactTypes)
@property
def entity_types(self) -> Optional[List[EntityTypes]]:
"""List of entity types.
Not empty for `DNSLookup`, `WhoisLookup`, `ExternalDBLookup` enrichment types.
"""
return self._map_list_optional("entityTypes", EntityTypes)
@property
def data_sources(self) -> Optional[List[RefView]]:
"""Data sources associated with enrichers.
The rule creates enrichment tasks for enrichers listed here.
"""
return self._map_list_optional("dataSources", RefView)
@property
def trigger_data_sources(self) -> Optional[List[RefView]]:
"""Entity/Artifact registrar associated datasource list.
The rule is applied only if entity was mentioned or artifact was registered
by one of data sources from the list.
"""
return self._map_list_optional("triggerDataSources", RefView)
@property
def throttling_interval(self) -> Optional[int]:
"""Throttling interval in seconds."""
return self._get_optional("throttlingInterval")
[docs]
class ConfigRuleView(_TaggedRefView, ConfigRuleCommonView):
"""Config rule response view."""
pass
def _create_rule_form_data(
name: Optional[str],
is_disabled: Optional[bool] = None,
triggers: Optional[Iterable[EnrichmentTriggerTypes]] = None,
trigger_data_source_uuids: Nullable[Iterable[uuid.UUID]] = None,
artifact_types: Nullable[Iterable[ArtifactTypes]] = None,
entity_types: Nullable[Iterable[EntityTypes]] = None,
data_source_uuids: Optional[Iterable[uuid.UUID]] = None,
throttling_interval: Nullable[int] = None,
enrichment: Optional[EnrichmentTypes] = None,
) -> JsonObject:
"""Create rule form data"""
data: JsonObject = dict()
if name is not None:
data["name"] = name
if is_disabled is not None:
data["isDisabled"] = is_disabled
if triggers is not None:
data["triggers"] = [typ.value for typ in triggers]
if data_source_uuids is not None:
data["dataSourceUUIDs"] = [str(u) for u in data_source_uuids]
if enrichment is not None:
data["enrichment"] = enrichment.value
if throttling_interval is not None:
data["throttlingInterval"] = _unwrap_nullable(throttling_interval)
if entity_types is not None:
casted_ent_types: Any = _unwrap_nullable(entity_types)
if casted_ent_types is not None:
casted_ent_types = [typ.value for typ in casted_ent_types]
data["entityTypes"] = casted_ent_types
if trigger_data_source_uuids is None:
casted_trigger_uuids = _unwrap_nullable(trigger_data_source_uuids)
if casted_trigger_uuids is not None:
casted_trigger_uuids = [str(u) for u in casted_trigger_uuids]
data["triggerDataSourceUUIDs"] = casted_trigger_uuids
if artifact_types is not None:
casted_artifact_types: Any = _unwrap_nullable(artifact_types)
if casted_artifact_types is not None:
casted_artifact_types = [typ.value for typ in casted_artifact_types]
data["artifactTypes"] = casted_artifact_types
return data