All checks were successful
Lint and Syntax Check / build (pull_request) Successful in 33s
363 lines
13 KiB
Python
363 lines
13 KiB
Python
import json
|
|
import requests
|
|
import os
|
|
from _db_com import database
|
|
|
|
OPENCLAW_URL = os.getenv("OPENCLAW_URL")
|
|
OPENCLAW_TOKEN = os.getenv("OPENCLAW_TOKEN")
|
|
OPENCLAW_PROXY_AUTH = os.getenv("OPENCLAW_PROXY_AUTH")
|
|
OPENCLAW_MODEL = os.getenv("OPENCLAW_MODEL")
|
|
DATABASE_STORAGE = os.getenv("DATABASE_STORAGE_NAME")
|
|
AGENT_USERNAME = os.getenv("AGENT_USERNAME")
|
|
AGENT_PROMPT_FILE = os.getenv("AGENT_PROMPT_FILE")
|
|
|
|
eventsToHandle = ["pull_request", "issues"]
|
|
actionsToHandle = ["assigned", "unassigned"]
|
|
|
|
# Template Fields:
|
|
DEFAULT_PROMPT_TEMPLATE = """# GITEA STATUS UPDATE
|
|
- You were [action_str] to an [type_str] in [repo_name] by [sender].
|
|
raw event data:
|
|
```json
|
|
[stringified_event]
|
|
```
|
|
Please check the details and take necessary actions.
|
|
|
|
DM your human in the main session and tell them about this change and if you should do anything about it.
|
|
Inform them that you have to be told the exact details on what to do next.
|
|
You won't know what you sent them once they respond, which is a restriction of your system.
|
|
|
|
The Goal is to solve this issue or PR as quickly and with as little back and forth as possible, so try to get all necessary information in the first message."""
|
|
|
|
|
|
def get_assignee_key(event_type, repository_id, number):
|
|
if event_type == "issues" or event_type == "issue":
|
|
return f"assignees_issue_{repository_id}_{number}"
|
|
elif event_type == "pull_request":
|
|
return f"assignees_pr_{repository_id}_{number}"
|
|
return None
|
|
|
|
|
|
def get_stored_assignees(db, event_type, repository_id, number):
|
|
key = get_assignee_key(event_type, repository_id, number)
|
|
if not key:
|
|
return []
|
|
try:
|
|
assignees_str = db.get(DATABASE_STORAGE, key)
|
|
if assignees_str:
|
|
return [a.strip() for a in assignees_str.split(",") if a.strip()]
|
|
except Exception as e:
|
|
print(f"Failed to retrieve assignees from DB: {e}")
|
|
return []
|
|
|
|
|
|
def set_stored_assignees(db, event_type, repository_id, number, assignees_list):
|
|
key = get_assignee_key(event_type, repository_id, number)
|
|
if not key:
|
|
return
|
|
try:
|
|
assignees_str = ",".join(assignees_list)
|
|
if assignees_str:
|
|
db.set(DATABASE_STORAGE, key, assignees_str)
|
|
else:
|
|
db.delete_item(DATABASE_STORAGE, key)
|
|
except Exception as e:
|
|
print(f"Failed to store assignees in DB: {e}")
|
|
|
|
|
|
def delete_stored_assignees(db, event_type, repository_id, number):
|
|
key = get_assignee_key(event_type, repository_id, number)
|
|
if not key:
|
|
return
|
|
try:
|
|
db.delete_item(DATABASE_STORAGE, key)
|
|
except Exception as e:
|
|
print(f"Failed to delete assignees from DB: {e}")
|
|
|
|
|
|
def fill_template(template, event_object, action_str, type_str):
|
|
fields = {
|
|
"action_str": action_str,
|
|
"type_str": type_str,
|
|
"repo_name": event_object["repository"],
|
|
"sender": event_object["sender"],
|
|
"stringified_event": json.dumps(event_object, indent=2),
|
|
}
|
|
for key, value in fields.items():
|
|
template = template.replace(f"[{key}]", value)
|
|
return template
|
|
|
|
|
|
def build_message(event_object, action_str, type_str):
|
|
if AGENT_PROMPT_FILE:
|
|
AGENT_PROMPT_FILE_PATH = "/app/" + AGENT_PROMPT_FILE
|
|
|
|
# Check if the file exists
|
|
if not os.path.isfile(AGENT_PROMPT_FILE_PATH):
|
|
print(
|
|
f"Custom prompt file not found at {AGENT_PROMPT_FILE_PATH}. Using default message."
|
|
)
|
|
message = fill_template(DEFAULT_PROMPT_TEMPLATE, event_object, action_str, type_str)
|
|
return message
|
|
|
|
with open(AGENT_PROMPT_FILE_PATH, "r") as f:
|
|
custom_prompt = f.read()
|
|
message = fill_template(custom_prompt, event_object, action_str, type_str)
|
|
return message
|
|
else:
|
|
print("No custom prompt file specified. Using default message.")
|
|
message = fill_template(DEFAULT_PROMPT_TEMPLATE, event_object, action_str, type_str)
|
|
return message
|
|
|
|
|
|
def sendToAgent(event_object):
|
|
headers = {"x-openclaw-token": OPENCLAW_TOKEN, "Content-Type": "application/json"}
|
|
action_str = "assigned" if event_object["action"] == "assigned" else "unassigned"
|
|
type_str = "issue" if event_object["type"] == "issue" else "pull request"
|
|
|
|
assignees = event_object.get("assignees", [event_object.get("assignee", "Unknown")])
|
|
|
|
if AGENT_USERNAME in assignees:
|
|
message = build_message(event_object, action_str, type_str)
|
|
else:
|
|
print(
|
|
f"Agent {AGENT_USERNAME} is not among the assignees for this event. Skipping notification."
|
|
)
|
|
return
|
|
|
|
try:
|
|
if OPENCLAW_PROXY_AUTH:
|
|
auth = tuple(OPENCLAW_PROXY_AUTH.split(":"))
|
|
else:
|
|
auth = None
|
|
response = requests.post(
|
|
OPENCLAW_URL,
|
|
headers=headers,
|
|
json={
|
|
"message": message,
|
|
"thinking": "low",
|
|
"timeoutSeconds": 45,
|
|
"model": OPENCLAW_MODEL,
|
|
},
|
|
auth=auth,
|
|
)
|
|
if response.status_code == 200:
|
|
print(
|
|
f"Successfully notified OpenClaw with ({len(message)} chars) about {type_str} {action_str}."
|
|
)
|
|
else:
|
|
print(
|
|
f"Failed to notify OpenClaw. Status code: {response.status_code}, Response: {response.text}"
|
|
)
|
|
except Exception as e:
|
|
print(f"Error notifying OpenClaw: {e}")
|
|
|
|
|
|
# SHSF Handler - Serverless
|
|
def main(args):
|
|
required_data = [
|
|
"OPENCLAW_URL",
|
|
"OPENCLAW_TOKEN",
|
|
"DATABASE_STORAGE_NAME",
|
|
"AGENT_USERNAME",
|
|
"OPENCLAW_MODEL",
|
|
]
|
|
for var in required_data:
|
|
if not os.getenv(var):
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 500,
|
|
"_res": {"error": f"Missing required environment variable: {var}"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
headers = args.get("headers", {})
|
|
route = args.get("route")
|
|
event_type = headers.get("X-Gitea-Event") or headers.get("x-gitea-event")
|
|
|
|
if not route or not event_type:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 400,
|
|
"_res": {"error": "Missing route or X-Gitea-Event header"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
if event_type not in eventsToHandle:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": {"status": f"Ignored event type: {event_type}"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
try:
|
|
db = database()
|
|
except:
|
|
print("Failed to connect to database")
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 500,
|
|
"_res": {"error": "Database connection failed"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
data = args.get("body")
|
|
data = json.loads(data)
|
|
|
|
if not data:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 400,
|
|
"_res": {"error": "No JSON payload"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
if route == "webhook":
|
|
# Prepare DB
|
|
try:
|
|
db.create_storage(DATABASE_STORAGE, purpose="For fast key-value data")
|
|
except Exception as e:
|
|
print(f"Failed to create storage: {e}")
|
|
# Probably already exists, continue anyway
|
|
|
|
action = data.get("action")
|
|
print(f"Action: {action}, Event Type: {event_type}")
|
|
|
|
if not action:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 400,
|
|
"_res": {"error": "Missing action in payload"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
if action not in actionsToHandle:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": {"status": f"Ignored action: {action}"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
if action in ["assigned", "unassigned"]:
|
|
assignee_data = data.get("assignee")
|
|
assignees_data = data.get("assignees")
|
|
|
|
# If "assignee" is missing in the root, look for it in the issue/pull_request object
|
|
if not assignee_data:
|
|
assignee_data = (
|
|
data.get("issue") or data.get("pull_request") or {}
|
|
).get("assignee")
|
|
|
|
if not assignees_data:
|
|
assignees_data = (
|
|
data.get("issue") or data.get("pull_request") or {}
|
|
).get("assignees")
|
|
|
|
# Extract logins from the list of assignees, or single assignee
|
|
assignees_list = []
|
|
if assignees_data:
|
|
assignees_list = [
|
|
a.get("login") for a in assignees_data if a.get("login")
|
|
]
|
|
elif assignee_data:
|
|
assignees_list = (
|
|
[assignee_data.get("login")] if assignee_data.get("login") else []
|
|
)
|
|
|
|
repository = data.get("repository", {}).get("full_name", "Unknown")
|
|
repo_id = data.get("repository", {}).get("id")
|
|
sender = data.get("sender", {}).get("login", "Unknown")
|
|
|
|
event_object = None
|
|
|
|
# Use helpers to get stored assignees if needed
|
|
issue_number = (data.get("issue") or data.get("pull_request") or {}).get(
|
|
"number"
|
|
)
|
|
|
|
# Get assignees from DB (previous events) if the current payload has none
|
|
if not assignees_list:
|
|
assignees_list = get_stored_assignees(
|
|
db, event_type, repo_id, issue_number
|
|
)
|
|
|
|
if not assignees_list:
|
|
assignees_list = ["Unknown"]
|
|
|
|
if event_type == "issues":
|
|
issue_data = data.get("issue", {})
|
|
event_object = {
|
|
"type": "issue",
|
|
"action": action,
|
|
"repository": repository,
|
|
"number": issue_data.get("number"),
|
|
"title": issue_data.get("title"),
|
|
"assignees": assignees_list,
|
|
"sender": sender,
|
|
"url": issue_data.get("html_url"),
|
|
"state": issue_data.get("state"),
|
|
}
|
|
|
|
elif event_type == "pull_request":
|
|
pr_data = data.get("pull_request", {})
|
|
event_object = {
|
|
"type": "pull_request",
|
|
"action": action,
|
|
"repository": repository,
|
|
"number": pr_data.get("number"),
|
|
"title": pr_data.get("title"),
|
|
"assignees": assignees_list,
|
|
"sender": sender,
|
|
"url": pr_data.get("html_url"),
|
|
"state": pr_data.get("state"),
|
|
"merged": pr_data.get("merged", False),
|
|
}
|
|
|
|
if event_object:
|
|
# Use helpers to store/delete assignees
|
|
if action == "assigned":
|
|
set_stored_assignees(
|
|
db, event_type, repo_id, event_object["number"], assignees_list
|
|
)
|
|
|
|
if action == "unassigned":
|
|
# If the payload STILL has assignees, it means someone is still assigned
|
|
# If it's empty (or only contains "Unknown"), delete it.
|
|
if not assignees_data or len(assignees_data) == 0:
|
|
delete_stored_assignees(
|
|
db, event_type, repo_id, event_object["number"]
|
|
)
|
|
else:
|
|
set_stored_assignees(
|
|
db,
|
|
event_type,
|
|
repo_id,
|
|
event_object["number"],
|
|
assignees_list,
|
|
)
|
|
|
|
print(f"--- {event_object['type'].upper()} {action.capitalize()} ---")
|
|
print(f"Title: {event_object['title']}")
|
|
print(f"Assignees: {', '.join(event_object['assignees'])}")
|
|
print(f"Repo: {event_object['repository']}")
|
|
print(f"By: {event_object['sender']}")
|
|
|
|
# Send to OpenClaw
|
|
# sendToAgent(event_object)
|
|
print("Not sending event to agent, unassinging is too noisy for now, will only send assigned events")
|
|
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": {"status": "received"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
else:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 404,
|
|
"_res": {"error": "Not Found"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|