import json import requests import os from _db_com import database OPENCLAW_URL = os.getenv("OPENCLAW_URL") OPENCLAW_TOKEN = os.getenv("OPENCLAW_TOKEN") OPENCLAW_PROXY_AUTH = os.getenv("OPENCLAW_PROXY_AUTH") OPENCLAW_MODEL = os.getenv("OPENCLAW_MODEL") OPENCLAW_THINKING = os.getenv("OPENCLAW_THINKING", "low") DATABASE_STORAGE = os.getenv("DATABASE_STORAGE_NAME") AGENT_USERNAME = os.getenv("AGENT_USERNAME") AGENT_PROMPT_FILE = os.getenv("AGENT_PROMPT_FILE") AGENT_HOURLY = int(os.getenv("AGENT_HOURLY", "60")) eventsToHandle = ["pull_request", "issues", "issue_comment"] actionsToHandle = ["assigned", "created"] # Template Fields: DEFAULT_PROMPT_TEMPLATE = """# GITEA STATUS UPDATE - You were [action_str] to an [type_str] in [repo_name] by [sender]. raw event data: ```json [stringified_event] ``` Please check the details and take necessary actions. DM your human in the main session and tell them about this change and if you should do anything about it. Inform them that you have to be told the exact details on what to do next. You won't know what you sent them once they respond, which is a restriction of your system. The Goal is to solve this issue or PR as quickly and with as little back and forth as possible, so try to get all necessary information in the first message.""" def get_assignee_key(event_type, repository_id, number): if event_type == "issues" or event_type == "issue" or event_type == "issue_comment": return f"assignees_issue_{repository_id}_{number}" elif event_type == "pull_request": return f"assignees_pr_{repository_id}_{number}" return None def get_stored_assignees(db, event_type, repository_id, number): key = get_assignee_key(event_type, repository_id, number) if not key: return [] try: assignees_str = db.get(DATABASE_STORAGE, key) if assignees_str: return [a.strip() for a in assignees_str.split(",") if a.strip()] except Exception as e: print(f"Failed to retrieve assignees from DB: {e}") return [] def set_stored_assignees(db, event_type, repository_id, number, assignees_list): key = get_assignee_key(event_type, repository_id, number) if not key: return try: assignees_str = ",".join(assignees_list) if assignees_str: db.set(DATABASE_STORAGE, key, assignees_str) else: db.delete_item(DATABASE_STORAGE, key) except Exception as e: print(f"Failed to store assignees in DB: {e}") def delete_stored_assignees(db, event_type, repository_id, number): key = get_assignee_key(event_type, repository_id, number) if not key: return try: db.delete_item(DATABASE_STORAGE, key) except Exception as e: print(f"Failed to delete assignees from DB: {e}") def get_rate_limit_key(): return f"agent_{AGENT_USERNAME}_cooldown" def check_and_increment_rate_limit(db): """Check if rate limit is reached, increment counter if not. Returns True if allowed, False if limit reached.""" key = get_rate_limit_key() try: current = db.get(DATABASE_STORAGE, key) if current is None: current = 0 else: current = int(current) if current >= AGENT_HOURLY: print(f"Rate limit reached: {current}/{AGENT_HOURLY} hourly triggers used.") return False db.set(DATABASE_STORAGE, key, str(current + 1)) print(f"Rate limit check passed: {current + 1}/{AGENT_HOURLY} hourly triggers used.") return True except Exception as e: print(f"Failed to check/increment rate limit: {e}") # On error, allow the request to proceed return True def reset_rate_limit(db): """Reset the hourly rate limit counter.""" key = get_rate_limit_key() try: db.set(DATABASE_STORAGE, key, "0") print(f"Rate limit reset for agent {AGENT_USERNAME}.") except Exception as e: print(f"Failed to reset rate limit: {e}") def fill_template(template, event_object, action_str, type_str): fields = { "action_str": action_str, "type_str": type_str, "repo_name": event_object["repository"], "sender": event_object["sender"], "stringified_event": json.dumps(event_object, indent=2), } for key, value in fields.items(): template = template.replace(f"[{key}]", value) return template def build_message(event_object, action_str, type_str): if type_str == "comment": action_summary = f"There is a new comment on a {event_object['target_type']} you are assigned to in {event_object['repository']} by {event_object['sender']}." else: action_summary = f"You were {action_str} to an {type_str} in {event_object['repository']} by {event_object['sender']}." if AGENT_PROMPT_FILE: AGENT_PROMPT_FILE_PATH = "/app/" + AGENT_PROMPT_FILE # Check if the file exists if not os.path.isfile(AGENT_PROMPT_FILE_PATH): print( f"Custom prompt file not found at {AGENT_PROMPT_FILE_PATH}. Using default message." ) message = fill_template( DEFAULT_PROMPT_TEMPLATE, event_object, action_str, type_str ) message = message.replace( "You were [action_str] to an [type_str]", action_summary ) return message with open(AGENT_PROMPT_FILE_PATH, "r") as f: custom_prompt = f.read() message = fill_template(custom_prompt, event_object, action_str, type_str) message = message.replace( "You were [action_str] to an [type_str]", action_summary ) return message else: print("No custom prompt file specified. Using default message.") message = fill_template( DEFAULT_PROMPT_TEMPLATE, event_object, action_str, type_str ) message = message.replace( "You were [action_str] to an [type_str]", action_summary ) return message def sendToAgent(event_object, db): headers = {"x-openclaw-token": OPENCLAW_TOKEN, "Content-Type": "application/json"} print(f"Preparing to send notification to Agent for {json.dumps(event_object)}") if event_object.get("type") == "issue_comment": action_str = "created" type_str = "comment" mention = f"@{AGENT_USERNAME}".lower() comment_body = event_object.get("comment_body", "").lower() if mention not in comment_body: print( f"Agent {AGENT_USERNAME} was not mentioned in comment body. Skipping notification." ) return else: action_str = "assigned" type_str = "issue" if event_object["type"] == "issue" else "pull request" assignees = event_object.get( "assignees", [event_object.get("assignee", "Unknown")] ) if AGENT_USERNAME not in assignees: print( f"Agent {AGENT_USERNAME} is not among the assignees for this event. Skipping notification." ) return # Check rate limit before sending if not check_and_increment_rate_limit(db): print("Rate limit reached. Skipping notification to agent.") return message = build_message(event_object, action_str, type_str) try: if OPENCLAW_PROXY_AUTH: auth = tuple(OPENCLAW_PROXY_AUTH.split(":")) else: auth = None response = requests.post( OPENCLAW_URL, headers=headers, json={ "message": message, "thinking": OPENCLAW_THINKING, "model": OPENCLAW_MODEL, "deliver": False, }, auth=auth, ) if response.status_code == 200: print( f"Successfully notified OpenClaw with ({len(message)} chars) about {type_str} {action_str}." ) else: print( f"Failed to notify OpenClaw. Status code: {response.status_code}, Response: {response.text}" ) except Exception as e: print(f"Error notifying OpenClaw: {e}") # SHSF Handler - Serverless def main(args): required_data = [ "OPENCLAW_URL", "OPENCLAW_TOKEN", "DATABASE_STORAGE_NAME", "AGENT_USERNAME", "OPENCLAW_MODEL", ] for var in required_data: if not os.getenv(var): return { "_shsf": "v2", "_code": 500, "_res": {"error": f"Missing required environment variable: {var}"}, "_headers": {"Content-Type": "application/json"}, } headers = args.get("headers", {}) route = args.get("route") event_type = headers.get("X-Gitea-Event") or headers.get("x-gitea-event") if not route or not event_type: return { "_shsf": "v2", "_code": 400, "_res": {"error": "Missing route or X-Gitea-Event header"}, "_headers": {"Content-Type": "application/json"}, } if event_type not in eventsToHandle: return { "_shsf": "v2", "_code": 200, "_res": {"status": f"Ignored event type: {event_type}"}, "_headers": {"Content-Type": "application/json"}, } try: db = database() except: print("Failed to connect to database") return { "_shsf": "v2", "_code": 500, "_res": {"error": "Database connection failed"}, "_headers": {"Content-Type": "application/json"}, } data = args.get("body") data = json.loads(data) if not data: return { "_shsf": "v2", "_code": 400, "_res": {"error": "No JSON payload"}, "_headers": {"Content-Type": "application/json"}, } if route == "webhook": # Prepare DB try: db.create_storage(DATABASE_STORAGE, purpose="For fast key-value data") except Exception as e: print(f"Failed to create storage: {e}") # Probably already exists, continue anyway action = data.get("action") print(f"Action: {action}, Event Type: {event_type}") if not action: return { "_shsf": "v2", "_code": 400, "_res": {"error": "Missing action in payload"}, "_headers": {"Content-Type": "application/json"}, } if action not in actionsToHandle: return { "_shsf": "v2", "_code": 200, "_res": {"status": f"Ignored action: {action}"}, "_headers": {"Content-Type": "application/json"}, } if action == "assigned" and event_type in ["issues", "pull_request"]: assignee_data = data.get("assignee") assignees_data = data.get("assignees") # If "assignee" is missing in the root, look for it in the issue/pull_request object if not assignee_data: assignee_data = ( data.get("issue") or data.get("pull_request") or {} ).get("assignee") if not assignees_data: assignees_data = ( data.get("issue") or data.get("pull_request") or {} ).get("assignees") # Extract logins from the list of assignees, or single assignee assignees_list = [] if assignees_data: assignees_list = [ a.get("login") for a in assignees_data if a.get("login") ] elif assignee_data: assignees_list = ( [assignee_data.get("login")] if assignee_data.get("login") else [] ) repository = data.get("repository", {}).get("full_name", "Unknown") repo_id = data.get("repository", {}).get("id") sender = data.get("sender", {}).get("login", "Unknown") event_object = None # Use helpers to get stored assignees if needed issue_number = (data.get("issue") or data.get("pull_request") or {}).get( "number" ) # Get assignees from DB (previous events) if the current payload has none if not assignees_list: assignees_list = get_stored_assignees( db, event_type, repo_id, issue_number ) if not assignees_list: assignees_list = ["Unknown"] if event_type == "issues": issue_data = data.get("issue", {}) event_object = { "type": "issue", "action": action, "repository": repository, "number": issue_data.get("number"), "title": issue_data.get("title"), "assignees": assignees_list, "sender": sender, "url": issue_data.get("html_url"), "state": issue_data.get("state"), } elif event_type == "pull_request": pr_data = data.get("pull_request", {}) event_object = { "type": "pull_request", "action": action, "repository": repository, "number": pr_data.get("number"), "title": pr_data.get("title"), "assignees": assignees_list, "sender": sender, "url": pr_data.get("html_url"), "state": pr_data.get("state"), "merged": pr_data.get("merged", False), } if event_object: # Store assignees for later comment events. if action == "assigned": set_stored_assignees( db, event_type, repo_id, event_object["number"], assignees_list ) # Send to OpenClaw if action == "assigned": sendToAgent(event_object, db) else: print(f"Action {action} is not configured to send to agent") elif action == "created" and event_type == "issue_comment": comment_data = data.get("comment", {}) is_pull = data.get("is_pull", False) target_data = data.get("issue") or data.get("pull_request") or {} repository = data.get("repository", {}).get("full_name", "Unknown") repo_id = data.get("repository", {}).get("id") sender = data.get("sender", {}).get("login", "Unknown") issue_number = target_data.get("number") event_object = { "type": "issue_comment", "target_type": "pull_request" if is_pull else "issue", "action": action, "repository": repository, "number": issue_number, "title": target_data.get("title", "Unknown"), "sender": sender, "comment_body": comment_data.get("body", ""), "url": comment_data.get("html_url"), } sendToAgent(event_object, db) return { "_shsf": "v2", "_code": 200, "_res": {"status": "received"}, "_headers": {"Content-Type": "application/json"}, } elif route == "clear_limit": # Reset the hourly rate limit counter try: reset_rate_limit(db) return { "_shsf": "v2", "_code": 200, "_res": {"status": "Rate limit reset"}, "_headers": {"Content-Type": "application/json"}, } except Exception as e: print(f"Failed to reset rate limit: {e}") return { "_shsf": "v2", "_code": 500, "_res": {"error": "Failed to reset rate limit"}, "_headers": {"Content-Type": "application/json"}, } else: return { "_shsf": "v2", "_code": 404, "_res": {"error": "Not Found"}, "_headers": {"Content-Type": "application/json"}, }