import uuid
from datetime import datetime
from typing import Generic, Iterable, List, Optional, Tuple, Type, cast
from .. import RefView
from ..api import Tag
from ..internal import (
BaseAPI,
BaseAsyncAPI,
JsonObject,
JsonObjectForm,
JsonObjectView,
parse_rfc3339_timestamp,
)
from ..observable import EntityTypes, EntityView, EntityViewT, ShareLevels
from ..pagination import AsyncPage, Cursor, Page
from ..search import StoredQueryCommonView
from ..view import _TaggedRefView
from .enums import EntitySetOperations, ReplistStatus
X_CHANGE_CURSOR = "X-Change-Cursor"
_REPLIST_BASE_PATH = "/replists"
_REPLIST_CHANGES_PATH_TPL = _REPLIST_BASE_PATH + "/{}/changes"
_REPLIST_ENTITIES_PATH_TPL = _REPLIST_BASE_PATH + "/{}/entities"
[docs]
class ReplistsAPI(BaseAPI):
"""Reputation list API."""
[docs]
def register(self, replist: "ReplistForm") -> RefView:
"""Register reputation list.
Note:
Calls `POST /replists`.
Args:
replist: Filled replist form.
Returns:
Reference to the registered replist.
Raises:
:class:`~cybsi.api.error.ConflictError`:
Replist with same identifying data already exists.
:class:`~cybsi.api.error.SemanticError`: Form contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.StoredQueryNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidShareLevel`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidStoredQuery`
"""
resp = self._connector.do_post(path=_REPLIST_BASE_PATH, json=replist.json())
return RefView(resp.json())
[docs]
def view(self, replist_uuid: uuid.UUID) -> "ReplistView":
"""Get reputation list full view.
Note:
Calls `GET /replists/{replist_uuid}`.
Args:
replist_uuid: Replist uuid.
Returns:
Full view of the replist with ETag string value.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Replist not found.
"""
path = f"{_REPLIST_BASE_PATH}/{replist_uuid}"
resp = self._connector.do_get(path)
return ReplistView(resp)
[docs]
def edit(
self,
replist_uuid: uuid.UUID,
tag: Tag,
*,
is_enabled: Optional[bool] = None,
query_uuid: Optional[uuid.UUID] = None,
share_level: Optional[ShareLevels] = None,
) -> None:
"""Edit the reputation list.
Note:
Calls `PATCH /replists/{replist_uuid}`.
Args:
replist_uuid: Replist uuid.
tag: :attr:`ReplistView.tag` value. Use :meth:`view` to retrieve it.
query_uuid: Search query UUID attached to replist.
share_level: Replist share level.
is_enabled: Replist status toggle.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Replist not found.
:class:`~cybsi.api.error.ConflictError`:
Replist with same identifying data already exists.
:class:`~cybsi.api.error.ResourceModifiedError`:
Replist changed since last request. Update tag and retry.
:class:`~cybsi.api.error.SemanticError`: Form contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.StoredQueryNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidShareLevel`
* :attr:`~cybsi.api.error.SemanticErrorCodes.InvalidStoredQuery`
"""
form = {}
if query_uuid is not None:
form["queryUUID"] = str(query_uuid)
if share_level is not None:
form["shareLevel"] = share_level.value
if is_enabled is not None:
form["isEnabled"] = is_enabled # type: ignore
path = f"{_REPLIST_BASE_PATH}/{replist_uuid}"
self._connector.do_patch(path=path, tag=tag, json=form)
[docs]
def filter(
self,
*,
query_uuids: Optional[Iterable[uuid.UUID]] = None,
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> Page["ReplistCommonView"]:
"""Get replist filtration list.
Note:
Calls `GET /replists`
Args:
query_uuids: Stored query identifier.
Filter replists by specified stored query UUIDs.
cursor: Page cursor.
limit: Page limit.
Return:
Page with entities and cursor allowing to get next batch of changes.
Raises:
:class:`~cybsi.api.error.SemanticError`: Request contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.StoredQueryNotFound`
"""
params: dict = {}
if query_uuids is not None:
params["queryUUID"] = [str(u) for u in query_uuids]
if cursor is not None:
params["cursor"] = str(cursor)
if limit is not None:
params["limit"] = str(limit)
resp = self._connector.do_get(path=_REPLIST_BASE_PATH, params=params)
page = Page(self._connector.do_get, resp, ReplistCommonView)
return page
[docs]
def entities(
self,
replist_uuid: uuid.UUID,
*,
# see https://github.com/python/mypy/issues/3737
entity_view: Type[EntityViewT] = EntityView, # type: ignore
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> Tuple[Page[EntityViewT], Cursor]:
"""Get replist entities.
Note:
Calls `GET /replist/{replist_uuid}/entities`
Args:
replist_uuid: Replist uuid.
entity_view: Entity view to use.
Default is :class:`~cybsi.api.observable.EntityView` -
- only natural keys.
You can specify one of builtin views in :mod:`~cybsi.utils.views`.
cursor: Page cursor.
limit: Page limit.
Return:
Page with entity views and cursor.
The cursor can be used to call :meth:`changes`.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Replist not found.
:class:`~cybsi.api.error.SemanticError`: Semantic request error.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.EntityViewNotFound`
.. versionchanged:: 2.9
Add entity view support. See :mod:`~cybsi.utils.views` for details.
"""
params = {}
if entity_view is not EntityView:
params["viewUUID"] = str(entity_view._view_uuid())
if cursor:
params["cursor"] = str(cursor)
if limit:
params["limit"] = str(limit)
path = _REPLIST_ENTITIES_PATH_TPL.format(replist_uuid)
resp = self._connector.do_get(path, params=params)
page = Page(self._connector.do_get, resp, entity_view)
return page, cast(Cursor, resp.headers.get(X_CHANGE_CURSOR, ""))
[docs]
def changes(
self,
replist_uuid: uuid.UUID,
*,
cursor: Cursor,
# see https://github.com/python/mypy/issues/3737
entity_view: Type[EntityViewT] = EntityView, # type: ignore
limit: Optional[int] = None,
) -> Page["EntitySetChangeView[EntityViewT]"]:
"""Get replist changes
Note:
Calls `GET /replist/{replist_uuid}/changes`
Args:
replist_uuid: Replist uuid.
entity_view: Entity view to use.
Default is :class:`~cybsi.api.observable.EntityView` -
- only natural keys.
You can specify one of builtin views in :mod:`~cybsi.utils.views`.
cursor: Page cursor.
On the first request you should pass the cursor value
obtained when requesting replist entities :meth:`entities`.
Subsequent calls should use cursor property of the page
returned by :meth:`changes`.
limit: Page limit.
Return:
Page with changes.
Warning:
Cursor behaviour differs from other API methods.
Do not save returned page cursor if it is :data:`None`.
:data:`None` means that all changes **for this moment** are received.
More changes can arrive later. Pass your previous non-none ``cursor``
value in loop, until non-none cursor is returned.
Please wait some time if method returns a page with :data:`None` cursor.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Replist not found.
:class:`~cybsi.api.error.SemanticError`: Semantic request error.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.CursorOutOfRange`
* :attr:`~cybsi.api.error.SemanticErrorCodes.EntityViewNotFound`
.. versionchanged:: 2.9
Add entity view support. See :mod:`~cybsi.utils.views` for details.
"""
params = {"cursor": str(cursor)}
if entity_view is not EntityView:
params["viewUUID"] = str(entity_view._view_uuid())
if limit:
params["limit"] = str(limit)
path = _REPLIST_CHANGES_PATH_TPL.format(replist_uuid)
resp = self._connector.do_get(path, params=params)
def entity_view_converter(entity_data):
return EntitySetChangeView(entity_view, entity_data)
page = Page(self._connector.do_get, resp, entity_view_converter)
return page
[docs]
def statistic(self, replist_uuid: uuid.UUID) -> "ReplistStatisticView":
"""Get replist statistic.
Note:
Calls `GET /replists/{replist_uuid}/statistic`.
Args:
replist_uuid: Replist uuid.
Returns:
Replist statistic view.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Replist not found.
"""
path = f"{_REPLIST_BASE_PATH}/{replist_uuid}/statistic"
resp = self._connector.do_get(path)
return ReplistStatisticView(resp.json())
[docs]
class ReplistsAsyncAPI(BaseAsyncAPI):
"""Replists asynchronous API."""
[docs]
async def entities(
self,
replist_uuid: uuid.UUID,
*,
# see https://github.com/python/mypy/issues/3737
entity_view: Type[EntityViewT] = EntityView, # type: ignore
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> Tuple[AsyncPage[EntityViewT], Cursor]:
"""Get replist entities.
Note:
Calls `GET /replist/{replist_uuid}/entities`
Args:
replist_uuid: Replist uuid.
entity_view: Entity view to use.
Default is :class:`~cybsi.api.observable.EntityView` -
- only natural keys.
You can specify one of builtin views in :mod:`~cybsi.utils.views`.
cursor: Page cursor.
limit: Page limit.
Return:
Page with entity views and cursor.
The cursor can be used to call :meth:`changes`.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Replist not found.
.. versionchanged:: 2.9
Add entity view support. See :mod:`~cybsi.utils.views` for details.
"""
params = {}
if entity_view is not EntityView:
params["viewUUID"] = str(entity_view._view_uuid())
if cursor:
params["cursor"] = str(cursor)
if limit:
params["limit"] = str(limit)
path = _REPLIST_ENTITIES_PATH_TPL.format(replist_uuid)
resp = await self._connector.do_get(path, params=params)
page = AsyncPage(self._connector.do_get, resp, entity_view)
return page, cast(Cursor, resp.headers.get(X_CHANGE_CURSOR, ""))
[docs]
async def changes(
self,
replist_uuid: uuid.UUID,
*,
cursor: Cursor,
# see https://github.com/python/mypy/issues/3737
entity_view: Type[EntityViewT] = EntityView, # type: ignore
limit: Optional[int] = None,
) -> AsyncPage["EntitySetChangeView[EntityViewT]"]:
"""Get replist changes
Note:
Calls `GET /replist/{replist_uuid}/changes`
Args:
replist_uuid: Replist uuid.
entity_view: Entity view to use.
Default is :class:`~cybsi.api.observable.EntityView` -
- only natural keys.
You can specify one of builtin views in :mod:`~cybsi.utils.views`.
cursor: Page cursor.
On the first request you should pass the cursor value
obtained when requesting replist entities :meth:`entities`.
Subsequent calls should use cursor property of the page
returned by :meth:`changes`.
limit: Page limit.
Return:
Page with changes.
Warning:
Cursor behaviour differs from other API methods.
Do not save returned page cursor if it is :data:`None`.
:data:`None` means that all changes **for this moment** are received.
More changes can arrive later. Pass your previous non-none ``cursor``
value in loop, until non-none cursor is returned.
Please wait some time if method returns a page with :data:`None` cursor.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Replist not found.
:class:`~cybsi.api.error.SemanticError`: Semantic request error.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.CursorOutOfRange`
.. versionchanged:: 2.9
Add entity view support. See :mod:`~cybsi.utils.views` for details.
"""
params = {"cursor": str(cursor)}
if entity_view is not EntityView:
params["viewUUID"] = str(entity_view._view_uuid())
if limit:
params["limit"] = str(limit)
path = _REPLIST_CHANGES_PATH_TPL.format(replist_uuid)
resp = await self._connector.do_get(path, params=params)
def entity_view_converter(entity_data):
return EntitySetChangeView(entity_view, entity_data)
page = AsyncPage(self._connector.do_get, resp, entity_view_converter)
return page
[docs]
async def statistic(self, replist_uuid: uuid.UUID) -> "ReplistStatisticView":
"""Get replist statistic.
Note:
Calls `GET /replists/{replist_uuid}/statistic`.
Args:
replist_uuid: Replist uuid.
Returns:
Replist statistic view.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Replist not found.
"""
path = f"{_REPLIST_BASE_PATH}/{replist_uuid}/statistic"
resp = await self._connector.do_get(path)
return ReplistStatisticView(resp.json())
[docs]
class ReplistCommonView(RefView):
"""Reputation list short view."""
@property
def query(self) -> "StoredQueryCommonView":
"""Search query attached to replist (without raw query text)."""
return StoredQueryCommonView(self._get("query"))
@property
def author(self) -> "RefView":
"""Replist author."""
return RefView(self._get("author"))
@property
def share_level(self) -> ShareLevels:
"""Replist share level."""
return ShareLevels(self._get("shareLevel"))
@property
def is_enabled(self) -> bool:
"""Replist enabled status."""
return self._get("isEnabled")
[docs]
class ReplistView(_TaggedRefView, ReplistCommonView):
"""Reputation list full view."""
@property
def updated_at(self) -> Optional[datetime]:
"""Replist last updated time.
:data:`None` if it was never updated.
"""
return self._map_optional("createdAt", parse_rfc3339_timestamp)
@property
def status(self) -> ReplistStatus:
"""Replist current status."""
return ReplistStatus(self._get("status"))
[docs]
class EntitySetChangeView(JsonObjectView, Generic[EntityViewT]):
"""Replist change."""
def __init__(
self, entity_view: Type[EntityViewT], data: Optional[JsonObject] = None
):
super(EntitySetChangeView, self).__init__(data)
self._entity_view = entity_view
@property
def operation(self) -> EntitySetOperations:
"""Get change operation."""
return EntitySetOperations(self._get("operation"))
@property
def entity(self) -> EntityViewT:
"""Get entity view."""
en = self._get("entity")
return self._entity_view(en)
[docs]
class ReplistStatisticView(JsonObjectView):
"""Replist statistic view."""
@property
def entity_count(self) -> int:
"""Total number of entities in the replist."""
return self._get("entityCount")
@property
def entity_type_distribution(self) -> List["EntityTypeDistributionView"]:
"""Distribution of entities number by their types."""
return [
EntityTypeDistributionView(x) for x in self._get("entityTypeDistribution")
]
[docs]
class EntityTypeDistributionView(JsonObjectView):
"""Entity type distribution."""
@property
def entity_type(self) -> EntityTypes:
"""Entity type."""
return EntityTypes(self._get("entityType"))
@property
def count(self) -> int:
"""Number of entities."""
return self._get("count")