import uuid
from contextlib import asynccontextmanager, contextmanager
from datetime import datetime
from email.message import Message
from io import BytesIO
from typing import (
Any,
AsyncContextManager,
AsyncIterator,
ContextManager,
Dict,
Iterable,
Iterator,
List,
Optional,
cast,
)
from .. import RefView
from ..error import CybsiError
from ..internal import BaseAPI, BaseAsyncAPI, JsonObjectView, parse_rfc3339_timestamp
from ..internal.multipart import AsyncStreamWrapper
from ..observable import EntityView, ShareLevels
from ..pagination import AsyncPage, Cursor, Page
from ..view import _TaggedRefView
from .enums import ArtifactContentDownloadCompressionTypes, ArtifactTypes
_ARTIFACTS_PATH = "/enrichment/artifacts"
_ARTIFACTS_TYPE_PATH = "/enrichment/artifact-type"
[docs]
class ArtifactsAPI(BaseAPI):
"""Artifact API."""
[docs]
def view(self, artifact_uuid: uuid.UUID) -> "ArtifactView":
"""Get an artifact view.
Note:
Calls `GET /enrichment/artifacts/{artifact_uuid}`.
Args:
artifact_uuid: Artifact uuid.
Returns:
View of the artifact.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Artifact not found.
"""
path = f"{_ARTIFACTS_PATH}/{artifact_uuid}"
r = self._connector.do_get(path)
return ArtifactView(r)
[docs]
def view_registrations(
self, artifact_uuid: uuid.UUID
) -> List["ArtifactRegistrationView"]:
"""Get artifact registrations.
Note:
Calls `GET /enrichment/artifacts/{artifact_uuid}/registrations`.
Args:
artifact_uuid: Artifact uuid.
Returns:
List of artifact registrations.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Artifact not found.
"""
path = f"{_ARTIFACTS_PATH}/{artifact_uuid}/registrations"
r = self._connector.do_get(path)
return [ArtifactRegistrationView(v) for v in r.json()]
[docs]
def recognize_type(self, data: Any) -> "ArtifactTypeRecognizedView":
"""Recognize artifact type by its first bytes.
The function allows to send the entire artifact, but it's recommended
to send 10KB or less to save time and bandwidth.
Note:
Calls `PUT /enrichment/artifact-type`.
Args:
data: File-like object. If you have bytes, wrap them in BytesIO.
Returns:
Recognized artifact type.
See Also:
The function is similar to :meth:`upload`.
The example for :meth:`upload` is applicable here too.
"""
form = {"file": ("filename", data)}
r = self._connector.do_put(path=_ARTIFACTS_TYPE_PATH, files=form)
return ArtifactTypeRecognizedView(r.json())
[docs]
def upload(
self,
filename: str,
data: Any,
artifact_type: Optional[ArtifactTypes] = None,
share_level: ShareLevels = ShareLevels.White,
timeout: int = 300,
) -> RefView:
"""Upload an artifact.
Note:
Calls `POST /enrichment/artifacts`.
Args:
filename: Name of the artifact.
data: File-like object. If you have bytes, wrap them in BytesIO.
artifact_type: Artifact type.
share_level: Artifact share level.
timeout: Connection timeout in sec.
Returns:
Reference to artifact in API.
Raises:
:class:`~cybsi.api.error.SemanticError`: Semantic error. Possible code is
:attr:`~cybsi.api.error.SemanticErrorCodes.InvalidShareLevel`.
See Also:
See :ref:`upload-download-artifact-example`
for a complete example of this function usage.
"""
form: Dict[str, Any] = {}
if artifact_type is not None:
form["type"] = artifact_type.value.encode()
form["shareLevel"] = share_level.value.encode()
form["file"] = (filename, data)
r = self._connector.do_post(
path=_ARTIFACTS_PATH,
files=form,
timeout=timeout,
)
return RefView(r.json())
[docs]
def get_content(
self,
artifact_uuid: uuid.UUID,
*,
archive: Optional[ArtifactContentDownloadCompressionTypes] = None,
archive_password: Optional[str] = None,
) -> ContextManager["ArtifactContent"]:
"""Get artifact content.
Note:
Calls `GET /enrichment/artifacts/{artifact_uuid}/content`.
Args:
artifact_uuid: Artifact uuid.
archive: Compress artifact content to archive of chosen type before sending.
archive_password: Set archive password,
if compression was chosen using ``archive`` argument.
Returns:
Contextmanager of binary file content.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Artifact not found.
Examples:
>>> with client.artifacts.get_content(artifact_uuid) as content:
>>> with open("/tmp/artifact", "wb") as f:
>>> shutil.copyfileobj(content.stream, f, length=1024 * 1024)
See Also:
See :ref:`upload-download-artifact-example`
for a complete example of this function usage.
"""
path = f"{_ARTIFACTS_PATH}/{artifact_uuid}/content"
params = {}
if archive:
params["archive"] = archive.value
if archive_password:
params["password"] = archive_password
@contextmanager
def load_content():
r = self._connector.do_get(path=path, params=params, stream=True)
try:
filename = _parse_content_filename(r)
yield ArtifactContent(filename, r)
finally:
r.close()
return load_content()
[docs]
def filter(
self,
*,
artifact_type: Optional[ArtifactTypes] = None,
data_source_uuids: Optional[Iterable[uuid.UUID]] = None,
file_uuid: Optional[uuid.UUID] = None,
artifact_hash: Optional[str] = None,
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> Page["ArtifactCommonView"]:
"""Filter artifacts using provided parameters.
Note:
Calls `GET /enrichment/artifacts`
Args:
artifact_type: Artifact type.
data_source_uuids: Data sources of the artifact.
file_uuid: Artifact hash must be the same
as one of the hashes of provided File entity.
artifact_hash: Artifact hash.
Hash type (md5, sha1, sha256) is determined using its length.
cursor: Page cursor.
limit: Page limit.
Raises:
:class:`~cybsi.api.error.SemanticError`: Query contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.FileNotFound`
Return:
Page containing artifact descriptions.
"""
params: Dict[str, Any] = {}
if artifact_type:
params["type"] = artifact_type.value
if data_source_uuids:
params["dataSourceUUID"] = [str(u) for u in data_source_uuids]
if file_uuid:
params["fileUUID"] = str(file_uuid)
if artifact_hash:
params["hash"] = artifact_hash
if cursor:
params["cursor"] = str(cursor)
if limit:
params["limit"] = str(limit)
resp = self._connector.do_get(_ARTIFACTS_PATH, params=params)
page = Page(self._connector.do_get, resp, ArtifactCommonView)
return page
[docs]
class ArtifactsAsyncAPI(BaseAsyncAPI):
"""Asynchronous artifact API."""
[docs]
async def view(self, artifact_uuid: uuid.UUID) -> "ArtifactView":
"""Get an artifact view.
Note:
Calls `GET /enrichment/artifacts/{artifact_uuid}`.
Args:
artifact_uuid: Artifact uuid.
Returns:
View of the artifact.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Artifact not found.
"""
path = f"{_ARTIFACTS_PATH}/{artifact_uuid}"
r = await self._connector.do_get(path)
return ArtifactView(r)
[docs]
async def view_registrations(
self, artifact_uuid: uuid.UUID
) -> List["ArtifactRegistrationView"]:
"""Get artifact registrations.
Note:
Calls `GET /enrichment/artifacts/{artifact_uuid}/registrations`.
Args:
artifact_uuid: Artifact uuid.
Returns:
List of artifact registrations.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Artifact not found.
"""
path = f"{_ARTIFACTS_PATH}/{artifact_uuid}/registrations"
r = await self._connector.do_get(path)
return [ArtifactRegistrationView(v) for v in r.json()]
[docs]
async def recognize_type(self, data: Any) -> "ArtifactTypeRecognizedView":
"""Recognize artifact type by its first bytes.
The function allows to send the entire artifact, but it's recommended
to send 10KB or less to save time and bandwidth.
Note:
Calls `PUT /enrichment/artifact-type`.
Args:
data: File-like object. If you have bytes, wrap them in BytesIO.
Returns:
Recognized artifact type.
See Also:
The function is similar to :meth:`upload`.
The example for :meth:`upload` is applicable here too.
"""
form = {"file": ("filename", data)}
r = await self._connector.do_put(path=_ARTIFACTS_TYPE_PATH, files=form)
return ArtifactTypeRecognizedView(r.json())
[docs]
async def upload(
self,
*,
filename: str,
data: Any,
data_size: int = 0,
artifact_type: Optional[ArtifactTypes] = None,
share_level: ShareLevels = ShareLevels.White,
timeout: int = 300,
) -> RefView:
"""Upload an artifact.
Note:
Calls `POST /enrichment/artifacts`.
Args:
filename: Name of the artifact.
data: File-like or async stream object.
If you have bytes, wrap them in BytesIO.
data_size: Total file size.
Required for StreamReader or AsyncIterator data types.
artifact_type: Artifact type.
share_level: Artifact share level.
timeout: Connection timeout in sec.
Returns:
Reference to artifact in API.
Raises:
:class:`~cybsi.api.error.SemanticError`: Semantic error. Possible code is
:attr:`~cybsi.api.error.SemanticErrorCodes.InvalidShareLevel`.
See Also:
See :ref:`upload-download-artifact-example`
for a complete example of this function usage.
See :ref:`advanced` for asynchronous multipart upload example.
Warning:
Upload is synchronous if data_size is not provided.
"""
form: Dict[str, Any] = {}
if artifact_type is not None:
form["type"] = artifact_type.value.encode()
form["shareLevel"] = share_level.value.encode()
if data_size > 0:
form["file"] = (filename, AsyncStreamWrapper(data, data_size))
else:
form["file"] = (filename, data)
r = await self._connector.do_post(
path=_ARTIFACTS_PATH,
files=form,
timeout=timeout,
)
return RefView(r.json())
[docs]
def get_content(
self,
*,
artifact_uuid: uuid.UUID,
archive: Optional[ArtifactContentDownloadCompressionTypes] = None,
archive_password: Optional[str] = None,
) -> AsyncContextManager["ArtifactAsyncContent"]:
"""Get artifact content
Note:
Calls `GET /enrichment/artifacts/{artifact_uuid}/content`.
Args:
artifact_uuid: Artifact uuid.
archive: Compress artifact content to archive of chosen type before sending.
archive_password: Set archive password,
if compression was chosen using ``archive`` argument.
Returns:
Contextmanager of binary file content.
Raises:
:class:`~cybsi.api.error.NotFoundError`: Artifact not found.
Examples:
>>> async with aiofiles.open("/tmp/artifact", "wb") as f, client.artifacts.get_content(
>>> artifact_uuid
>>> ) as content:
>>> async for chunk in content.iter_chunks(size=8192):
>>> await f.write(chunk)
See Also:
See :ref:`upload-download-artifact-example`
for a complete example of this function usage.
""" # noqa: E501
path = f"{_ARTIFACTS_PATH}/{artifact_uuid}/content"
params = {}
if archive:
params["archive"] = archive.value
if archive_password:
params["password"] = archive_password
@asynccontextmanager
async def load_content():
r = await self._connector.do_get(path=path, params=params, stream=True)
try:
filename = _parse_content_filename(r)
yield ArtifactAsyncContent(filename, r)
finally:
await r.aclose()
return load_content()
[docs]
async def filter(
self,
*,
artifact_type: Optional[ArtifactTypes] = None,
data_source_uuids: Optional[Iterable[uuid.UUID]] = None,
file_uuid: Optional[uuid.UUID] = None,
artifact_hash: Optional[str] = None,
cursor: Optional[Cursor] = None,
limit: Optional[int] = None,
) -> AsyncPage["ArtifactCommonView"]:
"""Filter artifacts using provided parameters.
Note:
Calls `GET /enrichment/artifacts`
Args:
artifact_type: Artifact type.
data_source_uuids: Data sources of the artifact.
file_uuid: Artifact hash must be the same
as one of the hashes of provided File entity.
artifact_hash: Artifact hash.
Hash type (md5, sha1, sha256) is determined using its length.
cursor: Page cursor.
limit: Page limit.
Raises:
:class:`~cybsi.api.error.SemanticError`: Query contains logic errors.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.api.error.SemanticErrorCodes.DataSourceNotFound`
* :attr:`~cybsi.api.error.SemanticErrorCodes.FileNotFound`
Return:
Page containing artifact descriptions.
"""
params: Dict[str, Any] = {}
if artifact_type:
params["type"] = artifact_type.value
if data_source_uuids:
params["dataSourceUUID"] = [str(u) for u in data_source_uuids]
if file_uuid:
params["fileUUID"] = str(file_uuid)
if artifact_hash:
params["hash"] = artifact_hash
if cursor:
params["cursor"] = str(cursor)
if limit:
params["limit"] = str(limit)
resp = await self._connector.do_get(_ARTIFACTS_PATH, params=params)
page = AsyncPage(self._connector.do_get, resp, ArtifactCommonView)
return page
[docs]
class ArtifactTypeRecognizedView(JsonObjectView):
"""Artifact type view, as recognized by Cybsi."""
@property
def type(self) -> ArtifactTypes:
"""Artifact type."""
return ArtifactTypes(self._get("type"))
@property
def format_description(self) -> str:
"""Format description (magic).
Example:
``PE32 executable (GUI) Intel 80386, for MS Windows, Nullsoft Installer self-extracting archive``
""" # noqa: E501
return self._get("formatDescription")
[docs]
class ArtifactContent:
"""Binary artifact content.
May be packed in archive, if it is requested in :meth:`ArtifactsAPI.get_content`.
"""
def __init__(self, filename: str, raw: Any):
self._filename = filename
self._raw = raw
@property
def filename(self) -> str:
"""Artifact file name."""
return self._filename
@property
def stream(self) -> Any:
"""Stream with binary artifact content.
File-like object, supports read() but does not support seek().
"""
# TODO: Implement actual streaming using iter_bytes()
# https://www.python-httpx.org/compatibility/#streaming-responses
return BytesIO(self._raw.read())
[docs]
def iter_chunks(self, size=4096) -> Iterator[bytes]:
"""Iterates over data chunks with maximum size limit.
Args:
size: chunk size.
Return:
Iterator over data chunks.
"""
return self._raw.iter_bytes(size)
[docs]
def readall(self) -> bytes:
"""Read all content data."""
return self._raw.read()
[docs]
class ArtifactAsyncContent:
"""Binary artifact content with asynchronous API.
May be packed in archive,
if it is requested in :meth:`ArtifactsAsyncAPI.get_content`.
"""
def __init__(self, filename: str, raw: Any):
self._filename = filename
self._raw = raw
@property
def filename(self) -> str:
"""Artifact file name."""
return self._filename
[docs]
def iter_chunks(self, size=4096) -> AsyncIterator[bytes]:
"""Iterates over data chunks with maximum size limit.
Args:
size: chunk size.
Return:
Asynchronous iterator over data chunks.
"""
return self._raw.aiter_bytes(size)
[docs]
async def readall(self) -> bytes:
"""Read all content data."""
chunks = []
async for chunk in self.iter_chunks():
chunks.append(chunk)
return b"".join(chunks)
def _parse_content_filename(response) -> str:
"""Parse filename parameter from content-disposition header."""
try:
msg = Message()
msg["content-disposition"] = response.headers["content-disposition"]
except KeyError:
raise CybsiError("Content-disposition header not found") from None
filename = msg.get_filename()
if filename is None:
raise CybsiError("filename not found in Content-disposition header") from None
return filename
[docs]
class ArtifactCommonView(RefView):
"""Common artifact view."""
@property
def types(self) -> List[ArtifactTypes]:
"""Artifact types."""
return [ArtifactTypes(t) for t in self._get("types")]
@property
def data_sources(self) -> List[RefView]:
"""Data sources which registered the artifact."""
return [RefView(t) for t in self._get("dataSources")]
@property
def share_levels(self) -> Optional[List[ShareLevels]]:
"""Artifact share levels, ordered from lowest to highest."""
return self._map_list_optional("shareLevels", ShareLevels)
@property
def content(self) -> "ArtifactContentView":
"""Artifact content meta information."""
return ArtifactContentView(self._get("content"))
[docs]
class ArtifactView(_TaggedRefView):
"""Artifact view."""
@property
def types(self) -> List[ArtifactTypes]:
"""Artifact types."""
return [ArtifactTypes(t) for t in self._get("types")]
@property
def data_sources(self) -> List[RefView]:
"""Data sources which registered the artifact."""
return [RefView(t) for t in self._get("dataSources")]
@property
def share_levels(self) -> Optional[List[ShareLevels]]:
"""Artifact share levels, ordered from lowest to highest."""
return self._map_list_optional("shareLevels", ShareLevels)
@property
def content(self) -> "ArtifactContentView":
"""Artifact content meta information."""
return ArtifactContentView(self._get("content"))
@property
def file_names(self) -> List[str]:
"""Artifact file names. Ordered by time of registration."""
return cast(List[str], self._get("fileNames"))
@property
def entities(self) -> List[EntityView]:
"""List of file entities associated with this artifact."""
return [EntityView(e) for e in self._get("entities")]
[docs]
class ArtifactContentView(JsonObjectView):
"""Artifact content meta information."""
@property
def url(self) -> str:
"""Absolute URL of artifact content."""
return cast(str, self._get("url"))
@property
def size(self) -> int:
"""File size."""
return int(self._get("size"))
@property
def md5_hash(self) -> str:
"""MD5 hash of file content."""
return cast(str, self._get("md5Hash"))
@property
def sha1_hash(self) -> str:
"""SHA1 hash of file content."""
return cast(str, self._get("sha1Hash"))
@property
def sha256_hash(self) -> str:
"""SHA256 hash of file content."""
return cast(str, self._get("sha256Hash"))
@property
def format_description(self) -> Optional[str]:
"""File format description magically extracted from signature."""
return self._get_optional("formatDescription")
[docs]
class ArtifactRegistrationView(JsonObjectView):
"""Artifact registration view."""
@property
def data_source(self) -> RefView:
"""Data source which registered the artifact."""
return RefView(self._get("dataSource"))
@property
def type(self) -> ArtifactTypes:
"""Artifact type."""
return ArtifactTypes(self._get("type"))
@property
def file_name(self) -> str:
"""Artifact file name"""
return self._get("fileName")
@property
def share_level(self) -> ShareLevels:
"""Artifact share level"""
return ShareLevels(self._get("shareLevel"))
@property
def registered_at(self) -> datetime:
return parse_rfc3339_timestamp(self._get("registeredAt"))