from dataclasses import dataclass
from typing import Callable, Optional, Union
from .artifact import ArtifactsAPI, ArtifactsAsyncAPI
from .auth import APIKeyAuth, APIKeysAPI
from .client_config import DEFAULT_LIMITS, DEFAULT_TIMEOUTS, Limits, Timeouts
from .custom_list import CustomListsAPI, CustomListsAsyncAPI
from .data_source import (
DataSourcesAPI,
DataSourcesAsyncAPI,
DataSourceTypesAPI,
DataSourceTypesAsyncAPI,
)
from .dictionary import DictionariesAPI, DictionariesAsyncAPI
from .enrichment import EnrichmentAPI, EnrichmentAsyncAPI
from .error import CybsiError
from .internal import HTTPConnector, JsonObjectView
from .internal.connector import AsyncHTTPConnector
from .license import LicensesAPI
from .observable import ObservableAPI, ObservableAsyncAPI
from .observation import ObservationsAPI, ObservationsAsyncAPI
from .replist import ReplistsAPI, ReplistsAsyncAPI
from .report import ReportsAPI, ReportsAsyncAPI
from .search import SearchAPI
from .search.api import SearchAsyncAPI
from .threat_landscape import ThreatLandscapesAPI, ThreatLandscapesAsyncAPI
from .ual import AccessLogsAPI
from .user import UsersAPI
[docs]
@dataclass
class Config:
""":class:`CybsiClient` config.
Args:
api_url: Base API URL.
auth: Optional callable :class:`CybsiClient` can use to authenticate requests.
In most cases it's enough to pass `api_key` instead of this.
ssl_verify: Enable SSL certificate verification.
timeouts: Timeout configuration. Default configuration is 60 sec
on all operations.
limits: Configuration for limits to various client behaviors.
Default configuration is max_connections=100, max_keepalive_connections=20.
embed_object_url: Initialize URL property for all objects having uuid property
(including :class:`~cybsi.api.view.RefView`).
Views are compact if it's set to False.
"""
api_url: str
auth: Union[APIKeyAuth, Callable]
ssl_verify: bool = True
embed_object_url: bool = False
timeouts: Timeouts = DEFAULT_TIMEOUTS
limits: Limits = DEFAULT_LIMITS
[docs]
class CybsiClient:
"""The main entry point for all actions with Cybsi REST API.
As the client is low-level, it is structured around Cybsi REST API routes.
Use properties of the client to retrieve handles of API sections.
The client also follows Cybsi REST API input-output formats,
providing little to no abstration from JSONs API uses.
It's relatively easy to construct an invalid request,
so use client's functions wisely.
Hint:
Use :class:`~cybsi.api.CybsiClient` properties
to construct needed API handles. Don't construct sub-APIs manually.
Do this:
>>> from cybsi.api import CybsiClient
>>> client = CybsiClient(config)
>>> client.observations.generics.view(observation_uuid)
Not this:
>>> from cybsi.api.observation import GenericObservationsAPI
>>> GenericObservationsAPI(connector).view(observation_uuid)
Args:
config: Client config.
Usage:
>>> from cybsi.api import APIKeyAuth, Config, CybsiClient
>>> api_url = "http://localhost:80/api"
>>> api_key = "8Nqjk6V4Q_et_Rf5EPu4SeWy4nKbVPKPzKJESYdRd7E"
>>> auth = APIKeyAuth(api_url, api_key)
>>> config = Config(api_url, auth)
>>> client = CybsiClient(config)
>>>
>>> client.observations.generics.filter()
>>> client.close() # "with" syntax is also supported for CybsiClient
<cybsi_sdk.client.observation.ObservationsAPI object at 0x7f57a293c190>
"""
def __init__(self, config: Config):
if config.auth is None:
raise CybsiError("No authorization mechanism configured for client")
self._connector = HTTPConnector(
base_url=config.api_url,
auth=config.auth,
ssl_verify=config.ssl_verify,
embed_object_url=config.embed_object_url,
timeouts=config.timeouts,
limits=config.limits,
)
def __enter__(self) -> "CybsiClient":
self._connector.__enter__()
return self
def __exit__(
self,
exc_type=None,
exc_value=None,
traceback=None,
) -> None:
self._connector.__exit__(exc_type, exc_value, traceback)
[docs]
def close(self) -> None:
"""Close client and release connections."""
self._connector.close()
@property
def artifacts(self) -> ArtifactsAPI:
"""Artifacts API handle."""
return ArtifactsAPI(self._connector)
@property
def data_sources(self) -> DataSourcesAPI:
"""Data sources API handle."""
return DataSourcesAPI(self._connector)
@property
def data_source_types(self) -> DataSourceTypesAPI:
"""Data source types API handle."""
return DataSourceTypesAPI(self._connector)
@property
def enrichment(self) -> EnrichmentAPI:
"""Enrichment API handle."""
return EnrichmentAPI(self._connector)
@property
def observable(self) -> ObservableAPI:
"""Observable API handle."""
return ObservableAPI(self._connector)
@property
def observations(self) -> ObservationsAPI:
"""Observations API handle."""
return ObservationsAPI(self._connector)
@property
def replists(self) -> ReplistsAPI:
"""Reputation lists API handle."""
return ReplistsAPI(self._connector)
@property
def reports(self) -> ReportsAPI:
"""Reports API handle."""
return ReportsAPI(self._connector)
@property
def search(self) -> SearchAPI:
"""Search API handle."""
return SearchAPI(self._connector)
@property
def users(self) -> UsersAPI:
"""Users API handle."""
return UsersAPI(self._connector)
@property
def api_keys(self) -> APIKeysAPI:
"""API-Keys API handle."""
return APIKeysAPI(self._connector)
@property
def dictionaries(self) -> DictionariesAPI:
"""Dictionaries API handle."""
return DictionariesAPI(self._connector)
@property
def licenses(self) -> LicensesAPI:
"""Licenses API handle."""
return LicensesAPI(self._connector)
@property
def access_logs(self) -> AccessLogsAPI:
"""User access log API handle."""
return AccessLogsAPI(self._connector)
@property
def custom_lists(self) -> CustomListsAPI:
"""Custom list API handle."""
return CustomListsAPI(self._connector)
@property
def threat_landscapes(self) -> ThreatLandscapesAPI:
"""Threat landscape API handle."""
return ThreatLandscapesAPI(self._connector)
[docs]
def version(self) -> "VersionView":
"""Get API and server version information.
Note:
Calls `GET /version`.
Returns:
Version view.
"""
path = "/version"
resp = self._connector.do_get(path)
return VersionView(resp.json())
[docs]
class CybsiAsyncClient:
"""The asynchronous analog of :class:`CybsiClient`.
As you can see, the asynchronous client has fewer features than synchronous one.
This is because we don't simply copy-paste features,
but provide them only when they're actually useful in asynchronous applications.
Args:
config: Client config.
"""
def __init__(self, config: Config):
if config.auth is None:
raise CybsiError("No authorization mechanism configured for client")
self._connector = AsyncHTTPConnector(
base_url=config.api_url,
auth=config.auth,
ssl_verify=config.ssl_verify,
embed_object_url=config.embed_object_url,
timeouts=config.timeouts,
limits=config.limits,
)
async def __aenter__(self) -> "CybsiAsyncClient":
await self._connector.__aenter__()
return self
async def __aexit__(
self,
exc_type=None,
exc_value=None,
traceback=None,
) -> None:
await self._connector.__aexit__(exc_type, exc_value, traceback)
[docs]
async def aclose(self) -> None:
"""Close client and release connections."""
await self._connector.aclose()
@property
def artifacts(self) -> ArtifactsAsyncAPI:
"""Artifacts API handle."""
return ArtifactsAsyncAPI(self._connector)
@property
def enrichment(self) -> EnrichmentAsyncAPI:
"""Enrichment API handle."""
return EnrichmentAsyncAPI(self._connector)
@property
def observations(self) -> ObservationsAsyncAPI:
"""Observations API handle."""
return ObservationsAsyncAPI(self._connector)
@property
def reports(self) -> ReportsAsyncAPI:
"""Reports API handle."""
return ReportsAsyncAPI(self._connector)
@property
def data_sources(self) -> DataSourcesAsyncAPI:
"""Data sources API handle."""
return DataSourcesAsyncAPI(self._connector)
@property
def data_source_types(self) -> DataSourceTypesAsyncAPI:
"""Data source types API handle."""
return DataSourceTypesAsyncAPI(self._connector)
@property
def replists(self) -> ReplistsAsyncAPI:
"""Replists API handle."""
return ReplistsAsyncAPI(self._connector)
@property
def observable(self) -> ObservableAsyncAPI:
"""Observable API handle."""
return ObservableAsyncAPI(self._connector)
@property
def search(self) -> SearchAsyncAPI:
"""Search API handle."""
return SearchAsyncAPI(self._connector)
@property
def dictionaries(self) -> DictionariesAsyncAPI:
"""Dictionaries API handle."""
return DictionariesAsyncAPI(self._connector)
@property
def threat_landscapes(self) -> ThreatLandscapesAsyncAPI:
"""Threat landscape API handle."""
return ThreatLandscapesAsyncAPI(self._connector)
@property
def custom_lists(self) -> CustomListsAsyncAPI:
"""Custom list API handle."""
return CustomListsAsyncAPI(self._connector)
[docs]
async def version(self) -> "VersionView":
"""Get API and server version information.
Note:
Calls `GET /version`.
Returns:
Version view.
"""
path = "/version"
resp = await self._connector.do_get(path)
return VersionView(resp.json())
[docs]
class VersionView(JsonObjectView):
"""Version view."""
@property
def api_version(self) -> "Version":
"""API specification version."""
return Version(self._get("apiVersion"))
@property
def server_version(self) -> "Version":
"""Server version."""
return Version(self._get("serverVersion"))
[docs]
class Version:
"""Version."""
def __init__(self, version: str):
self._version = version
p1, self._build = version.split("+", 1) if "+" in version else (version, "")
core, self._prerelease = p1.split("-", 1) if "-" in p1 else (p1, "")
self._major, self._minor, self._patch = [int(p) for p in core.split(".", 2)]
def __str__(self):
return self._version
@property
def major(self) -> int:
"""Major part of version."""
return self._major
@property
def minor(self) -> int:
"""Minor part of version."""
return self._minor
@property
def patch(self) -> int:
"""Patch part of version."""
return self._patch
@property
def prerelease(self) -> Optional[str]:
"""Prerelease part of version."""
if self._prerelease != "":
return self._prerelease
return None
@property
def build(self) -> Optional[str]:
"""Build part of version."""
if self._build != "":
return self._build
return None