Starlark library usage examples

Community Platform

This document provides sample usage for common runZero Starlark libraries used in custom integration scripts.

requests

load('requests', 'Session', 'Cookie')
load('json', json_decode='decode')

def requests_example():
    session = Session()
    session.headers.set('Accept', 'application/json')
    session.headers.set('User-Agent', 'Mozilla/5.0')
    session.headers.set('User-Agent', None)  # remove header

    url = 'https://hacker-news.firebaseio.com/v0/topstories.json'
    session.cookies.set(url, {"test_cookie": "cookie_value"})

    response = session.get(url)
    if response and response.status_code == 200:
        data = json_decode(response.body)
        print("Top story IDs:", data[:5])
    else:
        print("Failed to fetch stories")

http

load('http', http_post='post', http_get='get', 'url_encode')

def fetch_example():
    url = "https://hacker-news.firebaseio.com/v0/topstories.json"
    headers = {"Accept": "application/json"}
    
    response = http_get(url, headers=headers)
    
    if response and response.status_code == 200:
        print("Top stories retrieved successfully.")
    else:
        print("Request failed with status:", response.status_code)

net

load('net', 'ip_address')

def parse_ip_list():
    ips = ["192.168.1.1", "2607:f8b0:4005:805::200e"]
    for ip in ips:
        addr = ip_address(ip)
        print("IP:", addr, "Version:", addr.version)

json

load('json', json_encode='encode', json_decode='decode')

def test_json_handling():
    data = {"name": "runZero", "features": ["scan", "API", "integrations"]}
    
    encoded = json_encode(data)
    print("Encoded JSON:", encoded)
    
    decoded = json_decode(encoded)
    print("Decoded:", decoded["name"])

time

load('time', 'parse_time')

def parse_example_time():
    time_str = "2025-05-01T15:00:00Z"
    parsed = parse_time(time_str)
    print("Parsed time:", parsed)

uuid

load('uuid', 'new_uuid')

def create_unique_id():
    uid = new_uuid()
    print("Generated UUID:", uid)

gzip

load('gzip', gzip_decompress='decompress', gzip_compress='compress')

def gzip_example():
    original = "Hello, runZero!".encode("utf-8")

    # Compress the data
    compressed = gzip_compress(original)
    print("Compressed length:", len(compressed))

    # Decompress it back
    decompressed = gzip_decompress(compressed)
    print("Decompressed value:", decompressed.decode("utf-8"))

base64

load('base64', base64_encode='encode', base64_decode='decode')

def b64_example():
    username = "xxx"
    password = "yyy"
    enc = base64_encode(username + ":" + password)
    dec = base64_decode(enc)
    return (enc, dec)

crypto

load('crypto', 'sha256', 'sha512', 'sha1', 'md5')

def main(*args, **kwargs):
    access_key = kwargs['access_key']
    access_secret = kwargs['access_secret']

    input = "test"

    sha256_hash = sha256(input)
    if str(sha256_hash) != '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08':
        print("sha256_hash [fail]: {}".format(sha256_hash))
    else:
        print("sha256_hash [pass]: {}".format(sha256_hash))

    sha512_hash = sha512(input)
    if str(sha512_hash) != 'ee26b0dd4af7e749aa1a8ee3c10ae9923f618980772e473f8819a5d4940e0db27ac185f8a0e1d5f84f88bc887fd67b143732c304cc5fa9ad8e6f57f50028a8ff':
        print("sha512_hash [fail]: {}".format(sha512_hash))
    else:
        print("sha512_hash [pass]: {}".format(sha512_hash))

    sha1_hash = sha1(input)
    if str(sha1_hash) != 'a94a8fe5ccb19ba61c4c0873d391e987982fbbd3':
        print("sha1_hash [fail]: {}".format(sha1_hash))
    else:
        print("sha1_hash [pass]: {}".format(sha1_hash))

    md5_hash = md5(input)
    if str(md5_hash) != '098f6bcd4621d373cade4e832627b4f6':
        print("md5_hash [fail]: {}".format(md5_hash))
    else:
        print("md5_hash [pass]: {}".format(md5_hash))

    return True

flatten (json)

load('flatten_json', 'flatten')

def main(*args, **kwargs):
    nested_json = {"foo": "bar", "a": {"b": "c"}}
    flattened_json = flatten(nested_json)
    return True
Updated