Files
assign-me-openclaw/main.py
Space-Banane cd79af6599
All checks were successful
Lint and Syntax Check / build (push) Successful in 11s
Dump json object
2026-04-04 15:21:10 +02:00

386 lines
14 KiB
Python

import json
import requests
import os
from _db_com import database
OPENCLAW_URL = os.getenv("OPENCLAW_URL")
OPENCLAW_TOKEN = os.getenv("OPENCLAW_TOKEN")
OPENCLAW_PROXY_AUTH = os.getenv("OPENCLAW_PROXY_AUTH")
OPENCLAW_MODEL = os.getenv("OPENCLAW_MODEL")
DATABASE_STORAGE = os.getenv("DATABASE_STORAGE_NAME")
AGENT_USERNAME = os.getenv("AGENT_USERNAME")
AGENT_PROMPT_FILE = os.getenv("AGENT_PROMPT_FILE")
eventsToHandle = ["pull_request", "issues", "issue_comment"]
actionsToHandle = ["assigned", "created"]
# Template Fields:
DEFAULT_PROMPT_TEMPLATE = """# GITEA STATUS UPDATE
- You were [action_str] to an [type_str] in [repo_name] by [sender].
raw event data:
```json
[stringified_event]
```
Please check the details and take necessary actions.
DM your human in the main session and tell them about this change and if you should do anything about it.
Inform them that you have to be told the exact details on what to do next.
You won't know what you sent them once they respond, which is a restriction of your system.
The Goal is to solve this issue or PR as quickly and with as little back and forth as possible, so try to get all necessary information in the first message."""
def get_assignee_key(event_type, repository_id, number):
if event_type == "issues" or event_type == "issue" or event_type == "issue_comment":
return f"assignees_issue_{repository_id}_{number}"
elif event_type == "pull_request":
return f"assignees_pr_{repository_id}_{number}"
return None
def get_stored_assignees(db, event_type, repository_id, number):
key = get_assignee_key(event_type, repository_id, number)
if not key:
return []
try:
assignees_str = db.get(DATABASE_STORAGE, key)
if assignees_str:
return [a.strip() for a in assignees_str.split(",") if a.strip()]
except Exception as e:
print(f"Failed to retrieve assignees from DB: {e}")
return []
def set_stored_assignees(db, event_type, repository_id, number, assignees_list):
key = get_assignee_key(event_type, repository_id, number)
if not key:
return
try:
assignees_str = ",".join(assignees_list)
if assignees_str:
db.set(DATABASE_STORAGE, key, assignees_str)
else:
db.delete_item(DATABASE_STORAGE, key)
except Exception as e:
print(f"Failed to store assignees in DB: {e}")
def delete_stored_assignees(db, event_type, repository_id, number):
key = get_assignee_key(event_type, repository_id, number)
if not key:
return
try:
db.delete_item(DATABASE_STORAGE, key)
except Exception as e:
print(f"Failed to delete assignees from DB: {e}")
def fill_template(template, event_object, action_str, type_str):
fields = {
"action_str": action_str,
"type_str": type_str,
"repo_name": event_object["repository"],
"sender": event_object["sender"],
"stringified_event": json.dumps(event_object, indent=2),
}
for key, value in fields.items():
template = template.replace(f"[{key}]", value)
return template
def build_message(event_object, action_str, type_str):
if type_str == "comment":
action_summary = f"There is a new comment on a {event_object['target_type']} you are assigned to in {event_object['repository']} by {event_object['sender']}."
else:
action_summary = f"You were {action_str} to an {type_str} in {event_object['repository']} by {event_object['sender']}."
if AGENT_PROMPT_FILE:
AGENT_PROMPT_FILE_PATH = "/app/" + AGENT_PROMPT_FILE
# Check if the file exists
if not os.path.isfile(AGENT_PROMPT_FILE_PATH):
print(
f"Custom prompt file not found at {AGENT_PROMPT_FILE_PATH}. Using default message."
)
message = fill_template(DEFAULT_PROMPT_TEMPLATE, event_object, action_str, type_str)
message = message.replace("You were [action_str] to an [type_str]", action_summary)
return message
with open(AGENT_PROMPT_FILE_PATH, "r") as f:
custom_prompt = f.read()
message = fill_template(custom_prompt, event_object, action_str, type_str)
message = message.replace("You were [action_str] to an [type_str]", action_summary)
return message
else:
print("No custom prompt file specified. Using default message.")
message = fill_template(DEFAULT_PROMPT_TEMPLATE, event_object, action_str, type_str)
message = message.replace("You were [action_str] to an [type_str]", action_summary)
return message
def sendToAgent(event_object):
headers = {"x-openclaw-token": OPENCLAW_TOKEN, "Content-Type": "application/json"}
print(f"Preparing to send notification to Agent for {json.dumps(event_object)}")
if event_object.get("type") == "issue_comment":
action_str = "created"
type_str = "comment"
mention = f"@{AGENT_USERNAME}".lower()
comment_body = event_object.get("comment_body", "").lower()
if mention not in comment_body:
print(
f"Agent {AGENT_USERNAME} was not mentioned in comment body. Skipping notification."
)
return
else:
action_str = "assigned"
type_str = "issue" if event_object["type"] == "issue" else "pull request"
assignees = event_object.get("assignees", [event_object.get("assignee", "Unknown")])
if AGENT_USERNAME not in assignees:
print(
f"Agent {AGENT_USERNAME} is not among the assignees for this event. Skipping notification."
)
return
message = build_message(event_object, action_str, type_str)
try:
if OPENCLAW_PROXY_AUTH:
auth = tuple(OPENCLAW_PROXY_AUTH.split(":"))
else:
auth = None
response = requests.post(
OPENCLAW_URL,
headers=headers,
json={
"message": message,
"thinking": "low",
"timeoutSeconds": 45,
"model": OPENCLAW_MODEL,
},
auth=auth,
)
if response.status_code == 200:
print(
f"Successfully notified OpenClaw with ({len(message)} chars) about {type_str} {action_str}."
)
else:
print(
f"Failed to notify OpenClaw. Status code: {response.status_code}, Response: {response.text}"
)
except Exception as e:
print(f"Error notifying OpenClaw: {e}")
# SHSF Handler - Serverless
def main(args):
required_data = [
"OPENCLAW_URL",
"OPENCLAW_TOKEN",
"DATABASE_STORAGE_NAME",
"AGENT_USERNAME",
"OPENCLAW_MODEL",
]
for var in required_data:
if not os.getenv(var):
return {
"_shsf": "v2",
"_code": 500,
"_res": {"error": f"Missing required environment variable: {var}"},
"_headers": {"Content-Type": "application/json"},
}
headers = args.get("headers", {})
route = args.get("route")
event_type = headers.get("X-Gitea-Event") or headers.get("x-gitea-event")
if not route or not event_type:
return {
"_shsf": "v2",
"_code": 400,
"_res": {"error": "Missing route or X-Gitea-Event header"},
"_headers": {"Content-Type": "application/json"},
}
if event_type not in eventsToHandle:
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": f"Ignored event type: {event_type}"},
"_headers": {"Content-Type": "application/json"},
}
try:
db = database()
except:
print("Failed to connect to database")
return {
"_shsf": "v2",
"_code": 500,
"_res": {"error": "Database connection failed"},
"_headers": {"Content-Type": "application/json"},
}
data = args.get("body")
data = json.loads(data)
if not data:
return {
"_shsf": "v2",
"_code": 400,
"_res": {"error": "No JSON payload"},
"_headers": {"Content-Type": "application/json"},
}
if route == "webhook":
# Prepare DB
try:
db.create_storage(DATABASE_STORAGE, purpose="For fast key-value data")
except Exception as e:
print(f"Failed to create storage: {e}")
# Probably already exists, continue anyway
action = data.get("action")
print(f"Action: {action}, Event Type: {event_type}")
if not action:
return {
"_shsf": "v2",
"_code": 400,
"_res": {"error": "Missing action in payload"},
"_headers": {"Content-Type": "application/json"},
}
if action not in actionsToHandle:
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": f"Ignored action: {action}"},
"_headers": {"Content-Type": "application/json"},
}
if action == "assigned" and event_type in ["issues", "pull_request"]:
assignee_data = data.get("assignee")
assignees_data = data.get("assignees")
# If "assignee" is missing in the root, look for it in the issue/pull_request object
if not assignee_data:
assignee_data = (
data.get("issue") or data.get("pull_request") or {}
).get("assignee")
if not assignees_data:
assignees_data = (
data.get("issue") or data.get("pull_request") or {}
).get("assignees")
# Extract logins from the list of assignees, or single assignee
assignees_list = []
if assignees_data:
assignees_list = [
a.get("login") for a in assignees_data if a.get("login")
]
elif assignee_data:
assignees_list = (
[assignee_data.get("login")] if assignee_data.get("login") else []
)
repository = data.get("repository", {}).get("full_name", "Unknown")
repo_id = data.get("repository", {}).get("id")
sender = data.get("sender", {}).get("login", "Unknown")
event_object = None
# Use helpers to get stored assignees if needed
issue_number = (data.get("issue") or data.get("pull_request") or {}).get(
"number"
)
# Get assignees from DB (previous events) if the current payload has none
if not assignees_list:
assignees_list = get_stored_assignees(
db, event_type, repo_id, issue_number
)
if not assignees_list:
assignees_list = ["Unknown"]
if event_type == "issues":
issue_data = data.get("issue", {})
event_object = {
"type": "issue",
"action": action,
"repository": repository,
"number": issue_data.get("number"),
"title": issue_data.get("title"),
"assignees": assignees_list,
"sender": sender,
"url": issue_data.get("html_url"),
"state": issue_data.get("state"),
}
elif event_type == "pull_request":
pr_data = data.get("pull_request", {})
event_object = {
"type": "pull_request",
"action": action,
"repository": repository,
"number": pr_data.get("number"),
"title": pr_data.get("title"),
"assignees": assignees_list,
"sender": sender,
"url": pr_data.get("html_url"),
"state": pr_data.get("state"),
"merged": pr_data.get("merged", False),
}
if event_object:
# Store assignees for later comment events.
if action == "assigned":
set_stored_assignees(
db, event_type, repo_id, event_object["number"], assignees_list
)
# Send to OpenClaw
if action == "assigned":
sendToAgent(event_object)
else:
print(f"Action {action} is not configured to send to agent")
elif action == "created" and event_type == "issue_comment":
comment_data = data.get("comment", {})
is_pull = data.get("is_pull", False)
target_data = data.get("issue") or data.get("pull_request") or {}
repository = data.get("repository", {}).get("full_name", "Unknown")
repo_id = data.get("repository", {}).get("id")
sender = data.get("sender", {}).get("login", "Unknown")
issue_number = target_data.get("number")
event_object = {
"type": "issue_comment",
"target_type": "pull_request" if is_pull else "issue",
"action": action,
"repository": repository,
"number": issue_number,
"title": target_data.get("title", "Unknown"),
"sender": sender,
"comment_body": comment_data.get("body", ""),
"url": comment_data.get("html_url"),
}
sendToAgent(event_object)
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": "received"},
"_headers": {"Content-Type": "application/json"},
}
else:
return {
"_shsf": "v2",
"_code": 404,
"_res": {"error": "Not Found"},
"_headers": {"Content-Type": "application/json"},
}