import os, time, requests, uuid, pathlib, json
from dotenv import load_dotenv
load_dotenv()
API = os.getenv("MOSAIC_API_BASE", "https://api.usemosaic.ai/api").rstrip("/")
KEY = os.getenv("MOSAIC_API_KEY")
VIDEO = pathlib.Path(os.getenv("MOSAIC_TEST_VIDEO", "example.mp4"))
HEADERS = {"Authorization": f"Bearer {KEY}", "Content-Type": "application/json"}
# ---- 1. upload ----
meta = requests.post(
f"{API}/video/get-upload-url",
headers=HEADERS,
json={
"filename": VIDEO.name,
"file_size": VIDEO.stat().st_size,
"content_type": "video/mp4",
},
).json()
requests.put(
meta["upload_url"], data=VIDEO.read_bytes(), headers={"Content-Type": "video/mp4"}
)
file_uuid = requests.post(
f"{API}/video/finalize-upload/{meta['video_id']}", headers=HEADERS, json={}
).json()["file_uuid"]
# ---- 2. run agent (two flavours) ----
USE_PROMPT = True # flip to False to target a specific agent
if USE_PROMPT:
payload = {
"file_id": file_uuid,
"auto": True,
"prompt": "Turn this clip into a 30-second TikTok with captions",
}
else:
payload = {
"agent_id": os.getenv("MOSAIC_AGENT_ID"),
"file_id": file_uuid,
"auto": True,
}
run_id = requests.post(f"{API}/run-agent", headers=HEADERS, json=payload).json()[
"agent_run_id"
]
# ---- 3. poll until done ----
while True:
status = requests.get(
f"{API}/get-agent-run-simple/{run_id}", headers=HEADERS
).json()["status"]
if status in ("success", "failed"):
break
print("status", status)
time.sleep(5)
assert status == "success", "Agent run failed"
# ---- 4. download result ----
outs = requests.get(f"{API}/get-agent-run-outputs/{run_id}", headers=HEADERS).json()[
"outputs"
]
url = outs[0]["download_url"]
open("output.mp4", "wb").write(requests.get(url).content)
print("🎉 Saved output.mp4")