Examples#
Artifacts#
Upload and download an artifact#
Artifacts are regular files with additional attributes. An artifact can be analyzed or unpacked by Threat Analyzer. An artifact can be sent for analysis to analyzer (for example, sandbox).
The example shows how to upload and download artifacts.
Also see Asynchronous artifacts uploading.
#!/usr/bin/env python3
import shutil
from io import BytesIO
from os import environ
from cybsi.api import APIKeyAuth, Config, CybsiClient
def main():
api_key = environ["CYBSI_API_KEY"]
api_url = environ["CYBSI_API_URL"]
auth = APIKeyAuth(api_url=api_url, api_key=api_key)
config = Config(api_url, auth, ssl_verify=False)
with CybsiClient(config) as client:
# Upload artifact. We pass BytesIO, but any file-like object will do.
content = BytesIO(b"artifact content")
recognized_type_view = client.artifacts.recognize_type(content)
print(recognized_type_view.type)
# FileSample
print(recognized_type_view.format_description)
# ASCII text, with no line terminators
content.seek(0)
artifact_ref = client.artifacts.upload(
"example.txt", content, artifact_type=recognized_type_view.type
)
# Get artifact content, copy in memory and print plain content
# Also you can download artifact content as ZIP archive (protected by password),
with client.artifacts.get_content(artifact_ref.uuid) as content:
buffer = BytesIO()
shutil.copyfileobj(content.stream, buffer, length=1024 * 1024)
print("plain artifact content: ", buffer.getvalue())
if __name__ == "__main__":
main()
Data sources#
Register custom data source#
Data source describes software or identity who makes observation, describes objects, artifacts or reports.
In the example below we registering our own data source type CIRCL and data source MISP.
#!/usr/bin/env python3
from os import environ
from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.data_source import DataSourceForm, DataSourceTypeForm
from cybsi.api.error import ConflictError
if __name__ == "__main__":
api_key = environ["CYBSI_API_KEY"]
api_url = environ["CYBSI_API_URL"]
auth = APIKeyAuth(api_url=api_url, api_key=api_key)
config = Config(api_url, auth, ssl_verify=False)
client = CybsiClient(config)
ds_type_uuid = None
ds_uuid = None
try: # store datasource_type
datasource_type_form = DataSourceTypeForm(
short_name="CIRCL",
long_name="Computer Incident Response Center Luxembourg",
)
ds_type_ref = client.data_source_types.register(datasource_type_form)
ds_type_uuid = ds_type_ref.uuid
datasource_form = DataSourceForm(
type_uuid=ds_type_uuid,
name="MISP",
long_name="MISP",
)
ds_ref = client.data_sources.register(datasource_form)
ds_uuid = ds_ref.uuid
except ConflictError:
# handle Duplicate Error here
exit(1)
view = client.data_sources.view(ds_uuid)
type_view = client.data_source_types.view(ds_type_uuid)
client.close()
Pagination#
You’ll often need to work with collections of elements API provides.
Cybsi SDK provides two ways to traverse collections.
The first way is pages traversing. This approach fits for cases when you need to get page’s properties i.e. cursor. For walking by page elements just iterate through the page.
#!/usr/bin/env python3
import uuid
from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.observable import EntityView
from cybsi.api.pagination import Page
if __name__ == "__main__":
api_url = "http://127.0.0.1/api"
auth = APIKeyAuth(api_url=api_url, api_key="api_key")
config = Config(api_url, auth, ssl_verify=False)
replist_uuid = uuid.uuid4()
page: Page[EntityView]
with CybsiClient(config) as client:
page, _ = client.replists.entities(replist_uuid)
while page:
# Page is iterable
for ent in page:
# Do something with element
pass
# Do something with a page
page = page.next_page() # type: ignore
The second way is elements traversing. This approach allows you to iterate through collections without working with pages. To work with collections as with iterator use chain_pages.
#!/usr/bin/env python3
import uuid
from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.observable import EntityView
from cybsi.api.pagination import Page, chain_pages
if __name__ == "__main__":
api_url = "http://127.0.0.1/api"
auth = APIKeyAuth(api_url=api_url, api_key="api_key")
config = Config(api_url, auth, ssl_verify=False)
replist_uuid = uuid.uuid4()
start_page: Page[EntityView]
with CybsiClient(config) as client:
start_page, _ = client.replists.entities(replist_uuid)
for item in chain_pages(start_page):
# Do something with an item
pass
Reputation list changes#
Reputation list is a list of observed entities, united by some characteristic through a stored query, for example: malicious entities, indicator hosts, etc. Reputation list is dynamic and can change with the appearance of each new fact in the system.
In the example below we get a reputation list changes by specified cursor.
Also see Register a reputation list.
#!/usr/bin/env python3
import time
import uuid
from os import environ
from typing import cast
from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.observable import EntityView
from cybsi.api.pagination import Cursor, Page
from cybsi.api.replist import EntitySetChangeView
if __name__ == "__main__":
api_key = environ["CYBSI_API_KEY"]
api_url = environ["CYBSI_API_URL"]
auth = APIKeyAuth(api_url=api_url, api_key=api_key)
config = Config(api_url, auth, ssl_verify=False)
with CybsiClient(config) as client:
wait_on_empty_changes_sec = 10
replist_uuid = uuid.uuid4()
cursor_for_changes: Cursor
entities_page: Page[EntityView]
# By requesting entities list we get the last cursor
# to get reputation list changes.
# The change cursor starts with the latest change of entity in list.
# WARNING: It is mandatory that you first obtain all pages
# of reputation list entities so that you can then monitor the changes.
entities_page, cursor_for_changes = client.replists.entities(replist_uuid)
changes_page: Page[EntitySetChangeView[EntityView]]
while True:
changes_page = client.replists.changes(
replist_uuid, cursor=cursor_for_changes
)
while changes_page:
# Page is iterable
for item in changes_page:
# Do something with reputation list changes
pass
# Do something with a page
if changes_page.cursor is not None:
cursor_for_changes = cast(Cursor, changes_page.cursor)
changes_page = changes_page.next_page() # type: ignore
# Changes are over, wait and request again with last cursor
time.sleep(wait_on_empty_changes_sec)
Search entities#
It is possible to search observed entities united by some characteristic through a CybsiLang query.
In the example below we get a list of indicators.
#!/usr/bin/env python3
from os import environ
from cybsi.api import APIKeyAuth, Config, CybsiClient
from cybsi.api.observable import EntityView
from cybsi.api.pagination import Page, chain_pages
if __name__ == "__main__":
api_key = environ["CYBSI_API_KEY"]
api_url = environ["CYBSI_API_URL"]
auth = APIKeyAuth(api_url=api_url, api_key=api_key)
config = Config(api_url, auth, ssl_verify=False)
with CybsiClient(config) as client:
query = "ENT { IsIoC }"
search_cursor = client.search.entities.start_search(query)
page: Page[EntityView] = client.search.entities.next_search_page(
cursor=search_cursor,
limit=20,
)
for ent in chain_pages(page):
print(ent)