All checks were successful
Lint and Syntax Check / build (pull_request) Successful in 5s
299 lines
11 KiB
Python
299 lines
11 KiB
Python
import json
|
|
import requests
|
|
import os
|
|
from _db_com import database
|
|
|
|
OPENCLAW_URL = os.getenv("OPENCLAW_URL")
|
|
OPENCLAW_TOKEN = os.getenv("OPENCLAW_TOKEN")
|
|
OPENCLAW_PROXY_AUTH = os.getenv("OPENCLAW_PROXY_AUTH")
|
|
OPENCLAW_MODEL = os.getenv("OPENCLAW_MODEL")
|
|
DATABASE_STORAGE = os.getenv("DATABASE_STORAGE_NAME")
|
|
AGENT_USERNAME = os.getenv("AGENT_USERNAME")
|
|
|
|
eventsToHandle = ["pull_request", "issues"]
|
|
actionsToHandle = ["assigned", "unassigned"]
|
|
|
|
|
|
def get_assignee_key(event_type, repository_id, number):
|
|
if event_type == "issues" or event_type == "issue":
|
|
return f"assignees_issue_{repository_id}_{number}"
|
|
elif event_type == "pull_request":
|
|
return f"assignees_pr_{repository_id}_{number}"
|
|
return None
|
|
|
|
|
|
def get_stored_assignees(db, event_type, repository_id, number):
|
|
key = get_assignee_key(event_type, repository_id, number)
|
|
if not key:
|
|
return []
|
|
try:
|
|
assignees_str = db.get(DATABASE_STORAGE, key)
|
|
if assignees_str:
|
|
return [a.strip() for a in assignees_str.split(",") if a.strip()]
|
|
except Exception as e:
|
|
print(f"Failed to retrieve assignees from DB: {e}")
|
|
return []
|
|
|
|
|
|
def set_stored_assignees(db, event_type, repository_id, number, assignees_list):
|
|
key = get_assignee_key(event_type, repository_id, number)
|
|
if not key:
|
|
return
|
|
try:
|
|
assignees_str = ",".join(assignees_list)
|
|
if assignees_str:
|
|
db.set(DATABASE_STORAGE, key, assignees_str)
|
|
else:
|
|
db.delete_item(DATABASE_STORAGE, key)
|
|
except Exception as e:
|
|
print(f"Failed to store assignees in DB: {e}")
|
|
|
|
|
|
def delete_stored_assignees(db, event_type, repository_id, number):
|
|
key = get_assignee_key(event_type, repository_id, number)
|
|
if not key:
|
|
return
|
|
try:
|
|
db.delete_item(DATABASE_STORAGE, key)
|
|
except Exception as e:
|
|
print(f"Failed to delete assignees from DB: {e}")
|
|
|
|
|
|
def sendToAgent(event_object):
|
|
headers = {"x-openclaw-token": OPENCLAW_TOKEN, "Content-Type": "application/json"}
|
|
action_str = "assigned" if event_object["action"] == "assigned" else "unassigned"
|
|
type_str = "issue" if event_object["type"] == "issue" else "pull request"
|
|
|
|
assignees = event_object.get("assignees", [event_object.get("assignee", "Unknown")])
|
|
|
|
if AGENT_USERNAME in assignees:
|
|
message = f"""# GITEA STATUS UPDATE
|
|
- You were {action_str} to an {type_str} in {event_object['repository']} by {event_object['sender']}.
|
|
raw event data:
|
|
```json{json.dumps(event_object, indent=2)}```
|
|
Please check the details and take necessary actions.
|
|
|
|
DM your human in the main session and tell them about this change and if you should do anything about it.
|
|
Inform them that you have to be told the exact details on what to do next, as you won't know what you sent them once they respond, which is a restriction of your system."""
|
|
|
|
try:
|
|
if OPENCLAW_PROXY_AUTH:
|
|
auth = tuple(OPENCLAW_PROXY_AUTH.split(":"))
|
|
else:
|
|
auth = None
|
|
response = requests.post(
|
|
OPENCLAW_URL, headers=headers, json={"message": message, "thinking": "low", "timeoutSeconds": 45, "model": OPENCLAW_MODEL}, auth=auth
|
|
)
|
|
if response.status_code == 200:
|
|
print(
|
|
f"Successfully notified OpenClaw with ({len(message)} chars) about {type_str} {action_str}."
|
|
)
|
|
else:
|
|
print(
|
|
f"Failed to notify OpenClaw. Status code: {response.status_code}, Response: {response.text}"
|
|
)
|
|
except Exception as e:
|
|
print(f"Error notifying OpenClaw: {e}")
|
|
|
|
|
|
# SHSF Handler - Serverless
|
|
def main(args):
|
|
required_data = ["OPENCLAW_URL", "OPENCLAW_TOKEN", "DATABASE_STORAGE_NAME", "AGENT_USERNAME", "OPENCLAW_MODEL"]
|
|
for var in required_data:
|
|
if not os.getenv(var):
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 500,
|
|
"_res": {"error": f"Missing required environment variable: {var}"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
headers = args.get("headers", {})
|
|
route = args.get("route")
|
|
event_type = headers.get("X-Gitea-Event") or headers.get("x-gitea-event")
|
|
|
|
if not route or not event_type:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 400,
|
|
"_res": {"error": "Missing route or X-Gitea-Event header"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
if event_type not in eventsToHandle:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": {"status": f"Ignored event type: {event_type}"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
try:
|
|
db = database()
|
|
except:
|
|
print("Failed to connect to database")
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 500,
|
|
"_res": {"error": "Database connection failed"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
data = args.get("body")
|
|
data = json.loads(data)
|
|
|
|
if not data:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 400,
|
|
"_res": {"error": "No JSON payload"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
if route == "webhook":
|
|
# Prepare DB
|
|
try:
|
|
db.create_storage(DATABASE_STORAGE, purpose="For fast key-value data")
|
|
except Exception as e:
|
|
print(f"Failed to create storage: {e}")
|
|
# Probably already exists, continue anyway
|
|
|
|
action = data.get("action")
|
|
print(f"Action: {action}, Event Type: {event_type}")
|
|
|
|
if not action:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 400,
|
|
"_res": {"error": "Missing action in payload"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
if action not in actionsToHandle:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": {"status": f"Ignored action: {action}"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
|
|
if action in ["assigned", "unassigned"]:
|
|
assignee_data = data.get("assignee")
|
|
assignees_data = data.get("assignees")
|
|
|
|
# If "assignee" is missing in the root, look for it in the issue/pull_request object
|
|
if not assignee_data:
|
|
assignee_data = (
|
|
data.get("issue") or data.get("pull_request") or {}
|
|
).get("assignee")
|
|
|
|
if not assignees_data:
|
|
assignees_data = (
|
|
data.get("issue") or data.get("pull_request") or {}
|
|
).get("assignees")
|
|
|
|
# Extract logins from the list of assignees, or single assignee
|
|
assignees_list = []
|
|
if assignees_data:
|
|
assignees_list = [
|
|
a.get("login") for a in assignees_data if a.get("login")
|
|
]
|
|
elif assignee_data:
|
|
assignees_list = (
|
|
[assignee_data.get("login")] if assignee_data.get("login") else []
|
|
)
|
|
|
|
repository = data.get("repository", {}).get("full_name", "Unknown")
|
|
repo_id = data.get("repository", {}).get("id")
|
|
sender = data.get("sender", {}).get("login", "Unknown")
|
|
|
|
event_object = None
|
|
|
|
# Use helpers to get stored assignees if needed
|
|
issue_number = (data.get("issue") or data.get("pull_request") or {}).get(
|
|
"number"
|
|
)
|
|
|
|
# Get assignees from DB (previous events) if the current payload has none
|
|
if not assignees_list:
|
|
assignees_list = get_stored_assignees(
|
|
db, event_type, repo_id, issue_number
|
|
)
|
|
|
|
if not assignees_list:
|
|
assignees_list = ["Unknown"]
|
|
|
|
if event_type == "issues":
|
|
issue_data = data.get("issue", {})
|
|
event_object = {
|
|
"type": "issue",
|
|
"action": action,
|
|
"repository": repository,
|
|
"number": issue_data.get("number"),
|
|
"title": issue_data.get("title"),
|
|
"assignees": assignees_list,
|
|
"sender": sender,
|
|
"url": issue_data.get("html_url"),
|
|
"state": issue_data.get("state"),
|
|
}
|
|
|
|
elif event_type == "pull_request":
|
|
pr_data = data.get("pull_request", {})
|
|
event_object = {
|
|
"type": "pull_request",
|
|
"action": action,
|
|
"repository": repository,
|
|
"number": pr_data.get("number"),
|
|
"title": pr_data.get("title"),
|
|
"assignees": assignees_list,
|
|
"sender": sender,
|
|
"url": pr_data.get("html_url"),
|
|
"state": pr_data.get("state"),
|
|
"merged": pr_data.get("merged", False),
|
|
}
|
|
|
|
if event_object:
|
|
# Use helpers to store/delete assignees
|
|
if action == "assigned":
|
|
set_stored_assignees(
|
|
db, event_type, repo_id, event_object["number"], assignees_list
|
|
)
|
|
|
|
if action == "unassigned":
|
|
# If the payload STILL has assignees, it means someone is still assigned
|
|
# If it's empty (or only contains "Unknown"), delete it.
|
|
if not assignees_data or len(assignees_data) == 0:
|
|
delete_stored_assignees(
|
|
db, event_type, repo_id, event_object["number"]
|
|
)
|
|
else:
|
|
set_stored_assignees(
|
|
db,
|
|
event_type,
|
|
repo_id,
|
|
event_object["number"],
|
|
assignees_list,
|
|
)
|
|
|
|
print(f"--- {event_object['type'].upper()} {action.capitalize()} ---")
|
|
print(f"Title: {event_object['title']}")
|
|
print(f"Assignees: {', '.join(event_object['assignees'])}")
|
|
print(f"Repo: {event_object['repository']}")
|
|
print(f"By: {event_object['sender']}")
|
|
|
|
# Send to OpenClaw
|
|
sendToAgent(event_object)
|
|
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 200,
|
|
"_res": {"status": "received"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|
|
else:
|
|
return {
|
|
"_shsf": "v2",
|
|
"_code": 404,
|
|
"_res": {"error": "Not Found"},
|
|
"_headers": {"Content-Type": "application/json"},
|
|
}
|