Starlark library usage examples
This document provides sample usage for common runZero Starlark libraries used in custom integration scripts.
requests
load('requests', 'Session', 'Cookie')
load('json', json_decode='decode')
def requests_example():
session = Session()
session.headers.set('Accept', 'application/json')
session.headers.set('User-Agent', 'Mozilla/5.0')
session.headers.set('User-Agent', None) # remove header
url = 'https://hacker-news.firebaseio.com/v0/topstories.json'
session.cookies.set(url, {"test_cookie": "cookie_value"})
response = session.get(url)
if response and response.status_code == 200:
data = json_decode(response.body)
print("Top story IDs:", data[:5])
else:
print("Failed to fetch stories")
http
load('http', http_post='post', http_get='get', 'url_encode')
def fetch_example():
url = "https://hacker-news.firebaseio.com/v0/topstories.json"
headers = {"Accept": "application/json"}
response = http_get(url, headers=headers)
if response and response.status_code == 200:
print("Top stories retrieved successfully.")
else:
print("Request failed with status:", response.status_code)
net
load('net', 'ip_address')
def parse_ip_list():
ips = ["192.168.1.1", "2607:f8b0:4005:805::200e"]
for ip in ips:
addr = ip_address(ip)
print("IP:", addr, "Version:", addr.version)
json
load('json', json_encode='encode', json_decode='decode')
def test_json_handling():
data = {"name": "runZero", "features": ["scan", "API", "integrations"]}
encoded = json_encode(data)
print("Encoded JSON:", encoded)
decoded = json_decode(encoded)
print("Decoded:", decoded["name"])
time
load('time', 'parse_time')
def parse_example_time():
time_str = "2025-05-01T15:00:00Z"
parsed = parse_time(time_str)
print("Parsed time:", parsed)
uuid
load('uuid', 'new_uuid')
def create_unique_id():
uid = new_uuid()
print("Generated UUID:", uid)
gzip
load('gzip', gzip_decompress='decompress', gzip_compress='compress')
def gzip_example():
original = "Hello, runZero!".encode("utf-8")
# Compress the data
compressed = gzip_compress(original)
print("Compressed length:", len(compressed))
# Decompress it back
decompressed = gzip_decompress(compressed)
print("Decompressed value:", decompressed.decode("utf-8"))
base64
load('base64', base64_encode='encode', base64_decode='decode')
def b64_example():
username = "xxx"
password = "yyy"
enc = base64_encode(username + ":" + password)
dec = base64_decode(enc)
return (enc, dec)
crypto
load('crypto', 'sha256', 'sha512', 'sha1', 'md5')
def main(*args, **kwargs):
access_key = kwargs['access_key']
access_secret = kwargs['access_secret']
input = "test"
sha256_hash = sha256(input)
if str(sha256_hash) != '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08':
print("sha256_hash [fail]: {}".format(sha256_hash))
else:
print("sha256_hash [pass]: {}".format(sha256_hash))
sha512_hash = sha512(input)
if str(sha512_hash) != 'ee26b0dd4af7e749aa1a8ee3c10ae9923f618980772e473f8819a5d4940e0db27ac185f8a0e1d5f84f88bc887fd67b143732c304cc5fa9ad8e6f57f50028a8ff':
print("sha512_hash [fail]: {}".format(sha512_hash))
else:
print("sha512_hash [pass]: {}".format(sha512_hash))
sha1_hash = sha1(input)
if str(sha1_hash) != 'a94a8fe5ccb19ba61c4c0873d391e987982fbbd3':
print("sha1_hash [fail]: {}".format(sha1_hash))
else:
print("sha1_hash [pass]: {}".format(sha1_hash))
md5_hash = md5(input)
if str(md5_hash) != '098f6bcd4621d373cade4e832627b4f6':
print("md5_hash [fail]: {}".format(md5_hash))
else:
print("md5_hash [pass]: {}".format(md5_hash))
return True
flatten (json)
load('flatten_json', 'flatten')
def main(*args, **kwargs):
nested_json = {"foo": "bar", "a": {"b": "c"}}
flattened_json = flatten(nested_json)
return True
Updated