Examples#

Artifacts#

Upload and download an artifact#

Artifacts are regular files with additional attributes. An artifact can be analyzed or unpacked by Threat Analyzer. An artifact can be sent for analysis to analyzer (for example, sandbox).

The example shows how to upload and download artifacts.

Also see Asynchronous artifacts uploading.

#!/usr/bin/env python3
import shutil
from io import BytesIO
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiClient


def main():
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)

    with CybsiClient(config) as client:
        # Upload artifact. We pass BytesIO, but any file-like object will do.
        content = BytesIO(b"artifact content")
        recognized_type_view = client.artifacts.recognize_type(content)

        print(recognized_type_view.type)
        # FileSample
        print(recognized_type_view.format_description)
        # ASCII text, with no line terminators

        content.seek(0)
        artifact_ref = client.artifacts.upload(
            "example.txt", content, artifact_type=recognized_type_view.type
        )

        # Get artifact content, copy in memory and print plain content
        # Also you can download artifact content as ZIP archive (protected by password),
        with client.artifacts.get_content(artifact_ref.uuid) as content:
            buffer = BytesIO()
            shutil.copyfileobj(content.stream, buffer, length=1024 * 1024)
        print("plain artifact content: ", buffer.getvalue())


if __name__ == "__main__":
    main()

Data sources#

Register custom data source#

Data source describes software or identity who makes observation, describes objects, artifacts or reports.

In the example below we registering our own data source type CIRCL and data source MISP.

#!/usr/bin/env python3
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.data_source import DataSourceForm, DataSourceTypeForm
from cybsi.api.error import ConflictError

if __name__ == "__main__":
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)
    client = CybsiClient(config)

    ds_type_uuid = None
    ds_uuid = None
    try:  # store datasource_type
        datasource_type_form = DataSourceTypeForm(
            short_name="CIRCL",
            long_name="Computer Incident Response Center Luxembourg",
        )
        ds_type_ref = client.data_source_types.register(datasource_type_form)
        ds_type_uuid = ds_type_ref.uuid
        datasource_form = DataSourceForm(
            type_uuid=ds_type_uuid,
            name="MISP",
            long_name="MISP",
        )
        ds_ref = client.data_sources.register(datasource_form)
        ds_uuid = ds_ref.uuid
    except ConflictError:
        # handle Duplicate Error here
        exit(1)
    view = client.data_sources.view(ds_uuid)
    type_view = client.data_source_types.view(ds_type_uuid)

    client.close()

Pagination#

You’ll often need to work with collections of elements API provides.

Cybsi SDK provides two ways to traverse collections.

The first way is pages traversing. This approach fits for cases when you need to get page’s properties i.e. cursor. For walking by page elements just iterate through the page.

#!/usr/bin/env python3
import uuid

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.observable import EntityView
from cybsi.api.pagination import Page

if __name__ == "__main__":
    api_url = "http://127.0.0.1/api"
    auth = APIKeyAuth(api_url=api_url, api_key="api_key")
    config = Config(api_url, auth, ssl_verify=False)

    replist_uuid = uuid.uuid4()
    page: Page[EntityView]

    with CybsiClient(config) as client:
        page, _ = client.replists.entities(replist_uuid)
        while page:
            # Page is iterable
            for ent in page:
                # Do something with element
                pass
            # Do something with a page
            page = page.next_page()  # type: ignore

The second way is elements traversing. This approach allows you to iterate through collections without working with pages. To work with collections as with iterator use chain_pages.

#!/usr/bin/env python3
import uuid

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.observable import EntityView
from cybsi.api.pagination import Page, chain_pages

if __name__ == "__main__":
    api_url = "http://127.0.0.1/api"
    auth = APIKeyAuth(api_url=api_url, api_key="api_key")
    config = Config(api_url, auth, ssl_verify=False)

    replist_uuid = uuid.uuid4()
    start_page: Page[EntityView]

    with CybsiClient(config) as client:
        start_page, _ = client.replists.entities(replist_uuid)
        for item in chain_pages(start_page):
            # Do something with an item
            pass

Reputation list changes#

Reputation list is a list of observed entities, united by some characteristic through a stored query, for example: malicious entities, indicator hosts, etc. Reputation list is dynamic and can change with the appearance of each new fact in the system.

In the example below we get a reputation list changes by specified cursor.

Also see Register a reputation list.

#!/usr/bin/env python3
import time
import uuid
from os import environ
from typing import cast

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.observable import EntityView
from cybsi.api.pagination import Cursor, Page
from cybsi.api.replist import EntitySetChangeView

if __name__ == "__main__":
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)

    with CybsiClient(config) as client:
        wait_on_empty_changes_sec = 10
        replist_uuid = uuid.uuid4()

        cursor_for_changes: Cursor
        entities_page: Page[EntityView]
        # By requesting entities list we get the last cursor
        # to get reputation list changes.
        # The change cursor starts with the latest change of entity in list.
        # WARNING: It is mandatory that you first obtain all pages
        # of reputation list entities so that you can then monitor the changes.
        entities_page, cursor_for_changes = client.replists.entities(replist_uuid)

        changes_page: Page[EntitySetChangeView[EntityView]]
        while True:
            changes_page = client.replists.changes(
                replist_uuid, cursor=cursor_for_changes
            )
            while changes_page:
                # Page is iterable
                for item in changes_page:
                    # Do something with reputation list changes
                    pass
                # Do something with a page
                if changes_page.cursor is not None:
                    cursor_for_changes = cast(Cursor, changes_page.cursor)
                changes_page = changes_page.next_page()  # type: ignore

            # Changes are over, wait and request again with last cursor
            time.sleep(wait_on_empty_changes_sec)

Search entities#

It is possible to search observed entities united by some characteristic through a CybsiLang query.

In the example below we get a list of indicators.

#!/usr/bin/env python3
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.observable import EntityView
from cybsi.api.pagination import Page, chain_pages

if __name__ == "__main__":
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)
    with CybsiClient(config) as client:
        query = "ENT { IsIoC }"
        search_cursor = client.search.entities.start_search(query)
        page: Page[EntityView] = client.search.entities.next_search_page(
            cursor=search_cursor,
            limit=20,
        )
        for ent in chain_pages(page):
            print(ent)