Files
assign-me-openclaw/main.py
Space-Banane 694c674581
All checks were successful
Lint and Syntax Check / build (push) Successful in 7s
Add initial implementation of serverless function for Gitea webhook handling
2026-04-02 15:45:11 +02:00

298 lines
11 KiB
Python

import json
import requests
import os
from _db_com import database
OPENCLAW_URL = os.getenv("OPENCLAW_URL")
OPENCLAW_TOKEN = os.getenv("OPENCLAW_TOKEN")
OPENCLAW_PROXY_AUTH = os.getenv("OPENCLAW_PROXY_AUTH")
DATABASE_STORAGE = os.getenv("DATABASE_STORAGE_NAME")
AGENT_USERNAME = os.getenv("AGENT_USERNAME")
eventsToHandle = ["pull_request", "issues"]
actionsToHandle = ["assigned", "unassigned"]
def get_assignee_key(event_type, repository_id, number):
if event_type == "issues" or event_type == "issue":
return f"assignees_issue_{repository_id}_{number}"
elif event_type == "pull_request":
return f"assignees_pr_{repository_id}_{number}"
return None
def get_stored_assignees(db, event_type, repository_id, number):
key = get_assignee_key(event_type, repository_id, number)
if not key:
return []
try:
assignees_str = db.get(DATABASE_STORAGE, key)
if assignees_str:
return [a.strip() for a in assignees_str.split(",") if a.strip()]
except Exception as e:
print(f"Failed to retrieve assignees from DB: {e}")
return []
def set_stored_assignees(db, event_type, repository_id, number, assignees_list):
key = get_assignee_key(event_type, repository_id, number)
if not key:
return
try:
assignees_str = ",".join(assignees_list)
if assignees_str:
db.set(DATABASE_STORAGE, key, assignees_str)
else:
db.delete_item(DATABASE_STORAGE, key)
except Exception as e:
print(f"Failed to store assignees in DB: {e}")
def delete_stored_assignees(db, event_type, repository_id, number):
key = get_assignee_key(event_type, repository_id, number)
if not key:
return
try:
db.delete_item(DATABASE_STORAGE, key)
except Exception as e:
print(f"Failed to delete assignees from DB: {e}")
def sendToAgent(event_object):
headers = {"x-openclaw-token": OPENCLAW_TOKEN, "Content-Type": "application/json"}
action_str = "assigned" if event_object["action"] == "assigned" else "unassigned"
type_str = "issue" if event_object["type"] == "issue" else "pull request"
assignees = event_object.get("assignees", [event_object.get("assignee", "Unknown")])
if AGENT_USERNAME in assignees:
message = f"""# GITEA STATUS UPDATE
- You were {action_str} to an {type_str} in {event_object['repository']} by {event_object['sender']}.
raw event data:
```json{json.dumps(event_object, indent=2)}```
Please check the details and take necessary actions.
DM your human in the main session and tell them about this change and if you should do anything about it.
Inform them that you have to be told the exact details on what to do next, as you won't know what you sent them once they respond, which is a restriction of your system."""
try:
if OPENCLAW_PROXY_AUTH:
auth = tuple(OPENCLAW_PROXY_AUTH.split(":"))
else:
auth = None
response = requests.post(
OPENCLAW_URL, headers=headers, json={"message": message, "thinking": "low", "timeoutSeconds": 45, "model": "openrouter/google/gemini-3-flash-preview"}, auth=auth
)
if response.status_code == 200:
print(
f"Successfully notified OpenClaw with ({len(message)} chars) about {type_str} {action_str}."
)
else:
print(
f"Failed to notify OpenClaw. Status code: {response.status_code}, Response: {response.text}"
)
except Exception as e:
print(f"Error notifying OpenClaw: {e}")
# SHSF Handler - Serverless
def main(args):
required_data = ["OPENCLAW_URL", "OPENCLAW_TOKEN", "DATABASE_STORAGE_NAME", "AGENT_USERNAME"]
for var in required_data:
if not os.getenv(var):
return {
"_shsf": "v2",
"_code": 500,
"_res": {"error": f"Missing required environment variable: {var}"},
"_headers": {"Content-Type": "application/json"},
}
headers = args.get("headers", {})
route = args.get("route")
event_type = headers.get("X-Gitea-Event") or headers.get("x-gitea-event")
if not route or not event_type:
return {
"_shsf": "v2",
"_code": 400,
"_res": {"error": "Missing route or X-Gitea-Event header"},
"_headers": {"Content-Type": "application/json"},
}
if event_type not in eventsToHandle:
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": f"Ignored event type: {event_type}"},
"_headers": {"Content-Type": "application/json"},
}
try:
db = database()
except:
print("Failed to connect to database")
return {
"_shsf": "v2",
"_code": 500,
"_res": {"error": "Database connection failed"},
"_headers": {"Content-Type": "application/json"},
}
data = args.get("body")
data = json.loads(data)
if not data:
return {
"_shsf": "v2",
"_code": 400,
"_res": {"error": "No JSON payload"},
"_headers": {"Content-Type": "application/json"},
}
if route == "webhook":
# Prepare DB
try:
db.create_storage(DATABASE_STORAGE, purpose="For fast key-value data")
except Exception as e:
print(f"Failed to create storage: {e}")
# Probably already exists, continue anyway
action = data.get("action")
print(f"Action: {action}, Event Type: {event_type}")
if not action:
return {
"_shsf": "v2",
"_code": 400,
"_res": {"error": "Missing action in payload"},
"_headers": {"Content-Type": "application/json"},
}
if action not in actionsToHandle:
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": f"Ignored action: {action}"},
"_headers": {"Content-Type": "application/json"},
}
if action in ["assigned", "unassigned"]:
assignee_data = data.get("assignee")
assignees_data = data.get("assignees")
# If "assignee" is missing in the root, look for it in the issue/pull_request object
if not assignee_data:
assignee_data = (
data.get("issue") or data.get("pull_request") or {}
).get("assignee")
if not assignees_data:
assignees_data = (
data.get("issue") or data.get("pull_request") or {}
).get("assignees")
# Extract logins from the list of assignees, or single assignee
assignees_list = []
if assignees_data:
assignees_list = [
a.get("login") for a in assignees_data if a.get("login")
]
elif assignee_data:
assignees_list = (
[assignee_data.get("login")] if assignee_data.get("login") else []
)
repository = data.get("repository", {}).get("full_name", "Unknown")
repo_id = data.get("repository", {}).get("id")
sender = data.get("sender", {}).get("login", "Unknown")
event_object = None
# Use helpers to get stored assignees if needed
issue_number = (data.get("issue") or data.get("pull_request") or {}).get(
"number"
)
# Get assignees from DB (previous events) if the current payload has none
if not assignees_list:
assignees_list = get_stored_assignees(
db, event_type, repo_id, issue_number
)
if not assignees_list:
assignees_list = ["Unknown"]
if event_type == "issues":
issue_data = data.get("issue", {})
event_object = {
"type": "issue",
"action": action,
"repository": repository,
"number": issue_data.get("number"),
"title": issue_data.get("title"),
"assignees": assignees_list,
"sender": sender,
"url": issue_data.get("html_url"),
"state": issue_data.get("state"),
}
elif event_type == "pull_request":
pr_data = data.get("pull_request", {})
event_object = {
"type": "pull_request",
"action": action,
"repository": repository,
"number": pr_data.get("number"),
"title": pr_data.get("title"),
"assignees": assignees_list,
"sender": sender,
"url": pr_data.get("html_url"),
"state": pr_data.get("state"),
"merged": pr_data.get("merged", False),
}
if event_object:
# Use helpers to store/delete assignees
if action == "assigned":
set_stored_assignees(
db, event_type, repo_id, event_object["number"], assignees_list
)
if action == "unassigned":
# If the payload STILL has assignees, it means someone is still assigned
# If it's empty (or only contains "Unknown"), delete it.
if not assignees_data or len(assignees_data) == 0:
delete_stored_assignees(
db, event_type, repo_id, event_object["number"]
)
else:
set_stored_assignees(
db,
event_type,
repo_id,
event_object["number"],
assignees_list,
)
print(f"--- {event_object['type'].upper()} {action.capitalize()} ---")
print(f"Title: {event_object['title']}")
print(f"Assignees: {', '.join(event_object['assignees'])}")
print(f"Repo: {event_object['repository']}")
print(f"By: {event_object['sender']}")
# Send to OpenClaw
sendToAgent(event_object)
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": "received"},
"_headers": {"Content-Type": "application/json"},
}
else:
return {
"_shsf": "v2",
"_code": 404,
"_res": {"error": "Not Found"},
"_headers": {"Content-Type": "application/json"},
}