Quickstart#

Eager to get started? This page gives a good introduction in how to get started with Cybsi SDK.

First, make sure that:

Let’s get started with some simple examples.

Register a generic observation#

Generic observations are universal containers of facts about entities.

In the example below we make an observation about domain “test.com”. The content of the observation says that “test.com” is IoC and malicious with confidence of 0.9. The code is written in asynchronous style, but you can use synchronous style if necessary.

#!/usr/bin/env python3
import asyncio
from datetime import datetime, timezone
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiAsyncClient
from cybsi.api.dictionary import DictItemAttributeValue
from cybsi.api.observable import (
    AttributeNames,
    EntityForm,
    EntityKeyTypes,
    EntityTypes,
    RelationshipKinds,
    ShareLevels,
)
from cybsi.api.observation import GenericObservationForm


def create_generic_observation(domain, ip):
    domain = EntityForm(
        EntityTypes.DomainName,
        [(EntityKeyTypes.String, domain)],
    )
    ip_address = EntityForm(
        EntityTypes.IPAddress,
        [(EntityKeyTypes.String, ip)],
    )

    observation = (
        GenericObservationForm(
            share_level=ShareLevels.Green, seen_at=datetime.now(timezone.utc)
        )
        .add_attribute_fact(
            entity=domain,
            attribute_name=AttributeNames.IsIoC,
            value=True,
            confidence=0.9,
        )
        .add_attribute_fact(
            entity=domain,
            attribute_name=AttributeNames.RelatedMalwareFamilies,
            value=DictItemAttributeValue(key="Aware"),
            confidence=0.8,
        )
        .add_entity_relationship(
            source=domain,
            kind=RelationshipKinds.ResolvesTo,
            target=ip_address,
            confidence=0.5,
        )
    )
    return observation


async def main():
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)

    domain_ip_pairs = [
        ("aa.com", "7.7.7.7"),
        ("bb.com", "8.8.8.8"),
        ("cc.com", "9.9.9.9"),
    ]
    generics = [
        create_generic_observation(domain, ip) for domain, ip in domain_ip_pairs
    ]

    # There is also a sync generic registration. Use CybsiClient class:
    #     with CybsiClient(config) as client:
    #         generic_observation = create_generic_observation()
    #         ref = client.observations.generics.register(generic_observation)

    async with CybsiAsyncClient(config) as client:
        registrations = [client.observations.generics.register(g) for g in generics]

        results = await asyncio.gather(*registrations)
        uuids = ", ".join(str(u.uuid) for u in results)

        print(f"Registered observations: {uuids}")


if __name__ == "__main__":
    asyncio.run(main())

Get attribute and relationship forecasts#

Forecasting is getting a metric to assess the confidence of the provided indicators and keeping it up to date.

In the example below we get IsMalicious attribute forecast of ip-address “8.8.8.8”. Also we get link forecasts of ip-address “8.8.8.8” and forecast of relationship that has form ip-address(“8.8.8.8”) resolves domain-name(“test.com”)

#!/usr/bin/env python3
import uuid
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.observable import AttributeNames, RelationshipKinds

if __name__ == "__main__":
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)
    client = CybsiClient(config)

    # There are IP-Address and DomainName entity with IoC attribute and relationship
    # See generic_registration.py script for example.
    domain_uuid = uuid.uuid4()
    ip_address_uuid = uuid.uuid4()

    # Get IsIoC attribute value forecast of ip-address entity.
    attribute_forecast = client.observable.entities.forecast_attribute_values(
        ip_address_uuid, AttributeNames.IsIoC
    )
    print(attribute_forecast)
    # {
    #   "values": [
    #     {
    #       "value": true,
    #       "confidence": 0.8998864,
    #       "valuableFacts": [
    #         {
    #           "dataSource": {
    #             "url": "http://.../3ab411dc-17ab-4169-8ea6-c08271fca49e",
    #             "uuid": "3ab411dc-17ab-4169-8ea6-c08271fca49e"
    #           },
    #           "shareLevel": "Green",
    #           "seenAt": "2021-12-24T17:50:08+03:00",
    #           "confidence": 0.9,
    #           "value": true,
    #           "finalConfidence": 0.8998864
    #         }
    #       ]
    #     }
    #   ],
    #   "hasConflicts": false
    # }

    # Get link forecasts list of ip-address entity.
    link_forecast = client.observable.entities.forecast_links(ip_address_uuid)
    print(link_forecast.data()[0])
    # {
    #   "link": {
    #     "direction": "Reverse",
    #     "relationKind": "ResolvesTo",
    #     "relatedEntity": {
    #       "type": "DomainName",
    #       "url": "http://.../66fd82a1-c35c-424e-986c-133054bd7797",
    #       "uuid": "66fd82a1-c35c-424e-986c-133054bd7797",
    #       "keys": [
    #         {
    #           "type": "String",
    #           "value": "test.com"
    #         }
    #       ]
    #     }
    #   },
    #   "confidence": 0.4999951
    # }

    # Get forecast of relationship (domain name entity resolves ip-address entity).
    relationship_forecast = client.observable.relationships.forecast(
        source_entity_uuid=domain_uuid,
        target_entity_uuid=ip_address_uuid,
        kind=RelationshipKinds.ResolvesTo,
    )
    print(relationship_forecast)
    # {
    #   "relationship": {
    #     "sourceEntity": {
    #       "type": "DomainName",
    #       "url": "http://.../66fd82a1-c35c-424e-986c-133054bd7797",
    #       "uuid": "66fd82a1-c35c-424e-986c-133054bd7797",
    #       "keys": [
    #         {
    #           "type": "String",
    #           "value": "test.com"
    #         }
    #       ]
    #     },
    #     "relationKind": "ResolvesTo",
    #     "targetEntity": {
    #       "type": "IPAddress",
    #       "url": "http://.../40a13d3f-96d2-4c85-acc5-5657f2ecb69d",
    #       "uuid": "40a13d3f-96d2-4c85-acc5-5657f2ecb69d",
    #       "keys": [
    #         {
    #           "type": "String",
    #           "value": "8.8.8.8"
    #         }
    #       ]
    #     }
    #   },
    #   "confidence": 0.4999951,
    #   "valuableFacts": null
    # }

    print(client.observable.entities.forecast_links_statistic(domain_uuid)[0])
    # {
    #     "linkType": {
    #         "url": "http://.../observable/entities/66fd82a1-c35c-424e-986c-133054bd7797/links?kind=ResolvesTo&relatedEntityType=IPAddress&direction=Forward", # noqa: E501
    #         "linkDirection": "Forward",
    #         "relationKind": "ResolvesTo",
    #         "relatedEntitiesType": "IPAddress"
    #     },
    #     "links": {
    #         "total": 1,
    #         "distributionByConfidence": [
    #             {
    #                 "confidenceRange": [0, 0.1],
    #                 "count": 0
    #             },
    #             {
    #                 "confidenceRange": [0.1, 0.2],
    #                 "count": 0
    #             },
    #             {
    #                 "confidenceRange": [0.2, 0.3],
    #                 "count": 0
    #             },
    #             {
    #                 "confidenceRange": [0.3, 0.4],
    #                 "count": 0
    #             },
    #             {
    #                 "confidenceRange": [0.4, 0.5],
    #                 "count": 1
    #             },
    #             {
    #                 "confidenceRange": [0.5, 0.6],
    #                 "count": 0
    #             },
    #             {
    #                 "confidenceRange": [0.6, 0.7],
    #                 "count": 0
    #             },
    #             {
    #                 "confidenceRange": [0.7, 0.8],
    #                 "count": 0
    #             },
    #             {
    #                 "confidenceRange": [0.8, 0.9],
    #                 "count": 0
    #             },
    #             {
    #                 "confidenceRange": [0.9, 1],
    #                 "count": 0
    #             }
    #         ]
    #     }
    # }
    client.close()

Register a report#

A report is a container that contains a list of observations and artifacts and general meta-information about them.

In the example below we register reports with other observations and artifacts. The code is written in asynchronous style, but you can use synchronous style if necessary.

#!/usr/bin/env python3
import asyncio
import uuid
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiAsyncClient
from cybsi.api.observable import ShareLevels
from cybsi.api.report import ReportForm


def create_report(observation_uuid, artifact_uuid):
    report = (
        ReportForm(
            ShareLevels.Green,
            description="Report with observation and artifact",
            title="Report",
        )
        .add_observation(observation_uuid)
        .add_artifact(artifact_uuid)
    )
    return report


async def main():
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)

    # Tuples with previously registered artifacts and observation
    # See generic_registration.py and artifact_multipart_upload.py for example.
    observation_artifact_pair = [
        (uuid.uuid4(), uuid.uuid4()),
        (uuid.uuid4(), uuid.uuid4()),
        (uuid.uuid4(), uuid.uuid4()),
    ]
    reports = [create_report(obs, art) for obs, art in observation_artifact_pair]

    # There is also a sync reports registration. Use CybsiClient class:
    #     with CybsiClient(config) as client:
    #         report = create_report(obs_uuid, artifact_uuid)
    #         ref = client.reports.register(report)

    async with CybsiAsyncClient(config) as client:
        registrations = [client.reports.register(r) for r in reports]

        results = await asyncio.gather(*registrations)
        uuids = ", ".join(str(u.uuid) for u in results)

        print(f"Registered reports: {uuids}")


if __name__ == "__main__":
    asyncio.run(main())

Register a reputation list#

Reputation list is a list of observed entities, united by some characteristic through a stored query, for example: malicious entities, indicator hosts, etc. Reputation list is dynamic and can change with the appearance of each new fact in the system. It is the main mechanism for publishing aggregated information in the system.

In the example below we register stored query ENT { IsIoC } to search for observable entities that are indicators. Based on it, we create a reputation list.

#!/usr/bin/env python3
from os import environ

from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.error import SemanticError, SemanticErrorCodes
from cybsi.api.observable.enums import ShareLevels
from cybsi.api.replist import ReplistForm
from cybsi.api.search.enums import QueryCompatibility
from cybsi.api.search.stored_queries import CybsiLangErrorView, StoredQueryForm

if __name__ == "__main__":
    api_key = environ["CYBSI_API_KEY"]
    api_url = environ["CYBSI_API_URL"]

    auth = APIKeyAuth(api_url=api_url, api_key=api_key)
    config = Config(api_url, auth, ssl_verify=False)
    client = CybsiClient(config)

    query_text = "ENT { IsIoC }"
    validate_view = client.search.stored_queries.validate(
        query_text, QueryCompatibility.Replist
    )
    if len(validate_view.errors) > 0:
        # erroneous query
        exit(1)  # handle errors here
    if len(validate_view.warnings) > 0:
        # semantic warnings
        # for example: always empty result
        pass  # handle warnings here

    try:  # store query
        query_form = StoredQueryForm("malicious entities", query_text)
        query_ref = client.search.stored_queries.register(query_form)
    except SemanticError as exp:
        if exp.code == SemanticErrorCodes.InvalidQueryText:
            e = CybsiLangErrorView.from_semantic_error(exp)
            # handle CybsiLangError here
        exit(1)

    # get stored query view
    query_view = client.search.stored_queries.view(query_ref.uuid)
    # register new disabled reputation list with given query
    replist = ReplistForm(
        is_enabled=False,
        query_uuid=query_view.uuid,
        share_level=ShareLevels.White,
    )
    replist_ref = client.replists.register(replist)
    # get replist view and set it enable
    replist_view = client.replists.view(replist_ref.uuid)
    client.replists.edit(
        replist_view.uuid,
        replist_view.tag,
        is_enabled=True,
        share_level=ShareLevels.Green,
    )

    client.close()