Advanced Usage#

This document covers some of Cybsi SDK more advanced features.

Licenses#

License - the right to use the software. Such a right is granted by concluding the appropriate License Agreement, which regulates the scope of rights and restrictions regarding such use. For the correct installation of the Threat Analyzer obtained under the relevant License Agreement, the user receives a license key on paper and/or an electronic key.

The example shows how to upload license.

#!/usr/bin/env python3
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiClient


def main():
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)

    # license.zip is zip archive with nested `license-access-token.key`
    with CybsiClient(config) as client:
        f = open("license.zip", "rb")
        client.licenses.upload(f)


if __name__ == "__main__":
    main()

Timeouts and limits#

You can explicitly configure connect/read/write timeouts and maximum of number connection for CyberClient through the Config data class.

In the example below you can see how it can be used:

from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiClient, Limits, Timeouts

if __name__ == "__main__":
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)

    # Set custom timeouts and limits of HTTP client
    limits = Limits(max_connections=100, max_keepalive_connections=20)
    timeouts = Timeouts(default=3.0)

    config = Config(api_url, auth, ssl_verify=False, timeouts=timeouts, limits=limits)
    client = CybsiClient(config)
    client.artifacts.filter()

Embed object URL#

You can configure automatic URL inclusion for all Threat Analyzer objects and references having uuid property. Object URL is presented if Config embed_object_url parameter is True.

In the example below you can see datasource type common view response with (and without) URL:

#!/usr/bin/env python3
import uuid
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiClient

if __name__ == "__main__":
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, embed_object_url=True, ssl_verify=False)

    ds_uuid = uuid.uuid4()
    # create client with embed object URL
    with CybsiClient(config) as client_with_url:
        type_view = client_with_url.data_source_types.view(ds_uuid)
        print(type_view)

    # {
    #   "uuid": "89cb3de7-66ec-4d17-aa85-f42487921b59",
    #   "url": "http://cybsi.ptsecurity.ru/data-source-types/89cb3de7-66ec-4d17-aa85-f42487921b59",
    #   "shortName": "CIRCL",
    #   "longName": "Computer Incident Response Center Luxembourg",
    #   ...
    # }

    # create client without embed object URL
    config.embed_object_url = False
    with CybsiClient(config) as client_without_url:
        type_view = client_without_url.data_source_types.view(ds_uuid)
        print(type_view)

    # {
    #   "uuid": "abc83124-c3ed-4ae7-bea8-ef5addc3094d",
    #   "shortName": "CIRCL",
    #   "longName": "Computer Incident Response Center Luxembourg",
    #   ...
    # }

Custom entity views#

You can configure entity view in some Cybsi API methods: entities() and changes().

Specify entity_view parameter in API method. Default basic view includes only entity types and natural keys. You can find builtin views in views or use EntityViewsAPI() to retrieve information about them.

In the example below you can see replist entities in PT Multiscanner view:

#!/usr/bin/env python3
import uuid
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.pagination import chain_pages
from cybsi.utils.views import PTMSEntityView

if __name__ == "__main__":
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)
    client = CybsiClient(config)

    # There is a File entity with MalwareClasses and MalwareFamilies attributes.
    # Also, there is replist with custom query "ENT { MalwareClasses = Ransomware }".
    # See generic_registration.py and replist_registration.py for example.

    replist_uuid = uuid.uuid4()
    # view related entities on the PTMSEntityView
    related_entities = client.replists.entities(
        replist_uuid, entity_view=PTMSEntityView
    )
    for ent in chain_pages(related_entities[0]):
        print(ent)

    client.close()

# Basic view example:
# {
#     "type": "File",
#     "uuid": "31d8248f-a004-455a-95c2-5c96cba7fae6",
#     "keys": [
#         {
#             "type": "SHA1Hash",
#             "value": "9ee972c591dc03b24f7bc23c27361dcb719371f2"
#         }
#     ]
# }
#
# PTMS view example:
# {
#     "entity": {
#         "uuid": "31d8248f-a004-455a-95c2-5c96cba7fae6",
#         "type": "File",
#         "naturalKeys": [
#             {
#                 "type": "SHA1Hash",
#                 "value": "9ee972c591dc03b24f7bc23c27361dcb719371f2"
#             }
#         ]
#     },
#     "malwareClasses": [
#       "Ransomware",
#       "Encryptor"
#     ],
#    "malwareFamily": "PlugX",
#    "relatedMalwareFamily": null
# }
#
# Cybsi-Cybsi view example:
# {
#     "entity": {
#         "uuid": "31d8248f-a004-455a-95c2-5c96cba7fae6",
#         "type": "File",
#         "naturalKeys": [
#             {
#                 "type": "SHA1Hash",
#                 "value": "9ee972c591dc03b24f7bc23c27361dcb719371f2"
#             }
#         ]
#     },
#     "attributeValues": [
#         {
#             "name": "MalwareFamilies",
#             "values": [
#               {
#                   "value": "PlugX",
#                   "confidence": 1
#               }
#             ]
#         }
#     ]
# }

Asynchronous artifacts uploading#

You can upload artifacts asynchronously using multipart/form-data streams. For usage pass asyncio/aiohttp StreamReader or AsyncIterator as argument to upload().

In the example below you can see local and remote files uploading:

#!/usr/bin/env python3
import asyncio
from os import environ
from typing import AsyncIterator

import aiofiles
import aiohttp

from cybsi.api import APIKeyAuth, Config, CybsiAsyncClient
from cybsi.api.artifact.enums import ArtifactTypes


async def upload_local_file(client: CybsiAsyncClient):
    # Create test file with 1MB size
    chunk_size = 1024
    file_size = 1024 * chunk_size
    file_name = "test.txt"
    create_file(file_name, file_size)

    # Upload file as async bytes iterator
    async_iter_ref = await client.artifacts.upload(
        filename=file_name,
        data=async_file_iter(file_name, chunk_size),
        data_size=file_size,
        artifact_type=ArtifactTypes.FileSample,
    )
    print(async_iter_ref)


def create_file(file_name: str, file_size: int) -> None:
    with open(file_name, "wb") as f:
        f.seek(file_size - 1)
        f.write(b"\0")


async def async_file_iter(file_name: str, chunk_size: int) -> AsyncIterator[bytes]:
    async with aiofiles.open(file_name, mode="r") as f:
        while True:
            chunk = await f.read(chunk_size)
            if not chunk:
                return
            yield chunk.encode()


async def upload_remote_file(client: CybsiAsyncClient):
    async with aiohttp.request(
        "GET", "https://nakamotoinstitute.org/static/docs/taoup.pdf"
    ) as resp:
        assert resp.status == 200

        stream_reader_ref = await client.artifacts.upload(
            filename="taoup.pdf",
            data=resp.content,
            data_size=int(resp.headers["Content-Length"]),
            artifact_type=ArtifactTypes.FileSample,
        )
        print(stream_reader_ref)


async def main():
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)

    async with CybsiAsyncClient(config) as client:
        upload_local_file_task = asyncio.create_task(upload_local_file(client))
        upload_remote_file_task = asyncio.create_task(upload_remote_file(client))

        await upload_local_file_task
        await upload_remote_file_task


if __name__ == "__main__":
    asyncio.run(main())

Enrichment#

Implement an external database#

External databases are useful for entity enrichment. An example of external database is global DNS system.

In the example below we pass IP received from enrichment task to an imaginary system. The system can magically tell if IP is IoC or not. We form an observation from results and register the observation in Threat Analyzer API.

The example can be used as a general boilerplate for connectors to external databases.

#!/usr/bin/env python3
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from os import environ
from typing import List, cast

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.enrichment import (
    AssignedTaskView,
    CompletedTaskForm,
    EnrichmentErrorCodes,
    EnrichmentTypes,
    ExternalDBLookupParamsView,
    FailedTaskForm,
    TaskResultObservationForm,
)
from cybsi.api.observable import (
    AttributeNames,
    EntityForm,
    EntityKeyTypes,
    EntityTypes,
    ShareLevels,
)
from cybsi.api.observation import GenericObservationForm


def main():
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)
    client = CybsiClient(config)

    wait_on_empty_tasks_sec = 10
    max_task_batch_size = 8

    task_queue = client.enrichment.task_queue

    # Main loop, take a batch of tasks, and handle them.
    while True:
        tasks = task_queue.get_assigned_tasks(max_task_batch_size)
        if tasks:
            handle_task_batch(client, tasks)
        else:
            time.sleep(wait_on_empty_tasks_sec)


def handle_task_batch(client: CybsiClient, batch: List[AssignedTaskView]) -> None:
    results = []
    failures = []
    for task in batch:
        try:
            result = enrich_ip(task)
            results.append(result)
        except Exception as ex:
            failures.append(IPEnrichmentTaskFailure(task_id=task.uuid, ex=ex))

    register_results(client, results, failures)


def enrich_ip(task_view: AssignedTaskView) -> "IPEnrichmentTaskResult":
    # Our enricher expects ExternalDBLookup tasks.
    if task_view.type != EnrichmentTypes.ExternalDBLookup:
        # Shouldn't happen unless Cybsi enrichment rules are mis-configured.
        raise Exception("unexpected enrichment type")

    # As enrichment type is ExternalDBLookup,
    # we can safely cast parameters to a proper type and extract entity.
    lookup = cast(ExternalDBLookupParamsView, task_view.params)
    entity = lookup.entity

    # Our enricher works with IPs only.
    if entity.type != EntityTypes.IPAddress:
        # Shouldn't happen unless Cybsi enrichment rules are mis-configured.
        raise Exception("unexpected entity type")

    # Extract first entity key, it's the only key of IP entity.
    ip_address = entity.keys[0].value

    # Call external system to bring new information about IP.
    # Our imaginary system is a database of IoCs.
    is_ioc = query_external_database(ip_address)

    return IPEnrichmentTaskResult(ip=ip_address, task_id=task_view.uuid, is_ioc=is_ioc)


def query_external_database(ip_address: str) -> bool:
    # This could be a network call in actual enricher.
    return True


def register_results(
    client: CybsiClient,
    completed: List["IPEnrichmentTaskResult"],
    failed: List["IPEnrichmentTaskFailure"],
) -> None:
    # Register definite task failures.
    failed_forms = [
        FailedTaskForm(
            task_uuid=f.task_id,
            error_code=EnrichmentErrorCodes.FatalError,
            message=str(f.ex),
        )
        for f in failed
    ]
    client.enrichment.task_queue.fail_tasks(failed_forms)

    # Register a common observation for all tasks in batch.
    # We could create a separate observation for each entity,
    # but that would be wasteful.
    observation = GenericObservationForm(
        share_level=ShareLevels.Green, seen_at=datetime.now(timezone.utc)
    )
    for c in completed:
        # Facts with IsIoC=false are not very useful, don't register them.
        if c.is_ioc:
            entity = EntityForm(EntityTypes.IPAddress)
            entity.add_key(EntityKeyTypes.String, c.ip)
            observation.add_attribute_fact(
                entity=entity,
                attribute_name=AttributeNames.IsIoC,
                value=True,
                confidence=0.9,
            )

    observation_ref = client.observations.generics.register(observation)
    result = TaskResultObservationForm(observation_ref.uuid)

    # Register successfully completed tasks.
    completed_forms = [
        CompletedTaskForm(task_uuid=c.task_id, result=result) for c in completed
    ]
    client.enrichment.task_queue.complete_tasks(completed_forms)


@dataclass
class IPEnrichmentTaskResult:
    ip: str
    task_id: uuid.UUID
    is_ioc: bool


@dataclass
class IPEnrichmentTaskFailure:
    task_id: uuid.UUID
    ex: Exception


if __name__ == "__main__":
    main()

Implement an analyzer#

Analyzers perform artifact analysis. Typical analyzers are network traffic analyzers and sandboxes.

In the example below we pass artifact and its content (i.e. bytes) to an imaginary third-party analyzer. The analyzer can magically tell if file associated with our artifact is malicious or not. We form a report from results and register the report in Threat Analyzer API.

The example can be used as a general boilerplate for connectors to analyzers.

#!/usr/bin/env python3
import shutil
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from io import BytesIO
from os import environ
from typing import cast

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.enrichment import (
    ArtifactAnalysisParamsView,
    AssignedTaskView,
    CompletedTaskForm,
    EnrichmentErrorCodes,
    EnrichmentTypes,
    FailedTaskForm,
    TaskResultReportForm,
)
from cybsi.api.observable import (
    AttributeNames,
    EntityForm,
    EntityKeyTypes,
    EntityTypes,
    ShareLevels,
    ThreatCategory,
)
from cybsi.api.observation import GenericObservationForm
from cybsi.api.report import ReportForm


def main():
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)
    client = CybsiClient(config)

    wait_on_empty_tasks_sec = 10

    # Main loop, take a task and handle it.
    while True:
        tasks = client.enrichment.task_queue.get_assigned_tasks(limit=1)
        if tasks:
            handle_task(client, tasks[0])
        else:
            time.sleep(wait_on_empty_tasks_sec)


def handle_task(client: CybsiClient, task: AssignedTaskView) -> None:
    try:
        artifact_uuid = get_task_artifact_uuid(task)
        result = analyze_artifact(client, task.uuid, artifact_uuid)
        register_result(client, result)
    except Exception as ex:
        failure = FileSampleAnalysisTaskFailure(task_id=task.uuid, ex=ex)
        register_failure(client, failure)


def get_task_artifact_uuid(task_view: AssignedTaskView) -> uuid.UUID:
    # Our enricher expects ArtifactAnalysis tasks.
    if task_view.type != EnrichmentTypes.ArtifactAnalysis:
        # Shouldn't happen unless Cybsi enrichment rules are mis-configured.
        raise Exception("unexpected enrichment type")

    # As enrichment type is ArtifactAnalysis,
    # we can safely cast parameters to a proper type and extract artifact.
    task_params = cast(ArtifactAnalysisParamsView, task_view.params)
    artifact = task_params.artifact

    return artifact.uuid


def analyze_artifact(
    client: CybsiClient,
    task_id: uuid.UUID,
    artifact_uuid: uuid.UUID,
) -> "FileSampleAnalysisTaskResult":
    view = client.artifacts.view(artifact_uuid)
    with client.artifacts.get_content(artifact_uuid) as content:
        # Here we load the entire artifact content to memory,
        # but it's also possible to pass stream to it to external system.
        buffer = copy_artifact_content_to_mem(content.stream)  # noqa: F841

    # Copy content.stream to external system.
    # Call external system to analyze artifact.
    # This is a canned result.
    return FileSampleAnalysisTaskResult(
        task_id=task_id,
        file_md5_hash=view.content.md5_hash,
        is_malicious=True,
        external_id=uuid.uuid4().hex[:8],
    )


def copy_artifact_content_to_mem(artifact_content) -> BytesIO:
    buffer = BytesIO()
    shutil.copyfileobj(artifact_content, buffer, length=1024 * 1024)
    return buffer


def register_failure(client, failure: "FileSampleAnalysisTaskFailure") -> None:
    form = FailedTaskForm(
        task_uuid=failure.task_id,
        error_code=EnrichmentErrorCodes.FatalError,
        message=str(failure.ex),
    )

    client.enrichment.task_queue.fail_tasks([form])


def register_result(
    client: CybsiClient, completed: "FileSampleAnalysisTaskResult"
) -> None:
    # Register report, it's required for ArtifactAnalysis tasks.
    report_uuid = register_report(client, completed)

    # Mention report in task completion form and send the form.
    result = TaskResultReportForm(report_uuid)
    form = CompletedTaskForm(task_uuid=completed.task_id, result=result)
    client.enrichment.task_queue.complete_tasks([form])


def register_report(
    client: CybsiClient, result: "FileSampleAnalysisTaskResult"
) -> uuid.UUID:
    observation = GenericObservationForm(
        share_level=ShareLevels.Green, seen_at=datetime.now(timezone.utc)
    )
    # Add facts from result to observation
    file_form = EntityForm(EntityTypes.File)
    file_form.add_key(EntityKeyTypes.MD5, result.file_md5_hash)
    report_description = "File sample is not malicious"
    if result.is_malicious:
        report_description = "File sample is malicious"
        observation.add_attribute_fact(
            entity=file_form,
            attribute_name=AttributeNames.ThreatCategory,
            value=ThreatCategory.Malware,
            confidence=0.9,
        )
    observation_ref = client.observations.generics.register(observation)

    report = ReportForm(
        ShareLevels.Green,
        description=report_description,
        title="File analysis, md5:" + result.file_md5_hash,
        external_id=result.external_id,
    ).add_observation(observation_ref.uuid)

    report_ref = client.reports.register(report)
    return report_ref.uuid


@dataclass
class FileSampleAnalysisTaskResult:
    task_id: uuid.UUID
    file_md5_hash: str
    is_malicious: bool
    external_id: str


@dataclass
class FileSampleAnalysisTaskFailure:
    task_id: uuid.UUID
    ex: Exception


if __name__ == "__main__":
    main()