Files
assign-me-openclaw/main.py
Luna 6532bd250d
All checks were successful
Lint and Syntax Check / build (pull_request) Successful in 6s
feat: add hourly rate limits for agent triggers
- Add AGENT_HOURLY env var (default: 60 triggers per hour)
- Track hourly usage in SHSF database with key 'agent_{AGENT_USERNAME}_cooldown'
- Check and increment counter before sending notifications
- Add 'clear_limit' route to reset counter hourly via external trigger

Resolves #4
2026-04-04 17:10:23 +02:00

460 lines
16 KiB
Python

import json
import requests
import os
from _db_com import database
OPENCLAW_URL = os.getenv("OPENCLAW_URL")
OPENCLAW_TOKEN = os.getenv("OPENCLAW_TOKEN")
OPENCLAW_PROXY_AUTH = os.getenv("OPENCLAW_PROXY_AUTH")
OPENCLAW_MODEL = os.getenv("OPENCLAW_MODEL")
OPENCLAW_THINKING = os.getenv("OPENCLAW_THINKING", "low")
DATABASE_STORAGE = os.getenv("DATABASE_STORAGE_NAME")
AGENT_USERNAME = os.getenv("AGENT_USERNAME")
AGENT_PROMPT_FILE = os.getenv("AGENT_PROMPT_FILE")
AGENT_HOURLY = int(os.getenv("AGENT_HOURLY", "60"))
eventsToHandle = ["pull_request", "issues", "issue_comment"]
actionsToHandle = ["assigned", "created"]
# Template Fields:
DEFAULT_PROMPT_TEMPLATE = """# GITEA STATUS UPDATE
- You were [action_str] to an [type_str] in [repo_name] by [sender].
raw event data:
```json
[stringified_event]
```
Please check the details and take necessary actions.
DM your human in the main session and tell them about this change and if you should do anything about it.
Inform them that you have to be told the exact details on what to do next.
You won't know what you sent them once they respond, which is a restriction of your system.
The Goal is to solve this issue or PR as quickly and with as little back and forth as possible, so try to get all necessary information in the first message."""
def get_assignee_key(event_type, repository_id, number):
if event_type == "issues" or event_type == "issue" or event_type == "issue_comment":
return f"assignees_issue_{repository_id}_{number}"
elif event_type == "pull_request":
return f"assignees_pr_{repository_id}_{number}"
return None
def get_stored_assignees(db, event_type, repository_id, number):
key = get_assignee_key(event_type, repository_id, number)
if not key:
return []
try:
assignees_str = db.get(DATABASE_STORAGE, key)
if assignees_str:
return [a.strip() for a in assignees_str.split(",") if a.strip()]
except Exception as e:
print(f"Failed to retrieve assignees from DB: {e}")
return []
def set_stored_assignees(db, event_type, repository_id, number, assignees_list):
key = get_assignee_key(event_type, repository_id, number)
if not key:
return
try:
assignees_str = ",".join(assignees_list)
if assignees_str:
db.set(DATABASE_STORAGE, key, assignees_str)
else:
db.delete_item(DATABASE_STORAGE, key)
except Exception as e:
print(f"Failed to store assignees in DB: {e}")
def delete_stored_assignees(db, event_type, repository_id, number):
key = get_assignee_key(event_type, repository_id, number)
if not key:
return
try:
db.delete_item(DATABASE_STORAGE, key)
except Exception as e:
print(f"Failed to delete assignees from DB: {e}")
def get_rate_limit_key():
return f"agent_{AGENT_USERNAME}_cooldown"
def check_and_increment_rate_limit(db):
"""Check if rate limit is reached, increment counter if not. Returns True if allowed, False if limit reached."""
key = get_rate_limit_key()
try:
current = db.get(DATABASE_STORAGE, key)
if current is None:
current = 0
else:
current = int(current)
if current >= AGENT_HOURLY:
print(f"Rate limit reached: {current}/{AGENT_HOURLY} hourly triggers used.")
return False
db.set(DATABASE_STORAGE, key, str(current + 1))
print(f"Rate limit check passed: {current + 1}/{AGENT_HOURLY} hourly triggers used.")
return True
except Exception as e:
print(f"Failed to check/increment rate limit: {e}")
# On error, allow the request to proceed
return True
def reset_rate_limit(db):
"""Reset the hourly rate limit counter."""
key = get_rate_limit_key()
try:
db.set(DATABASE_STORAGE, key, "0")
print(f"Rate limit reset for agent {AGENT_USERNAME}.")
except Exception as e:
print(f"Failed to reset rate limit: {e}")
def fill_template(template, event_object, action_str, type_str):
fields = {
"action_str": action_str,
"type_str": type_str,
"repo_name": event_object["repository"],
"sender": event_object["sender"],
"stringified_event": json.dumps(event_object, indent=2),
}
for key, value in fields.items():
template = template.replace(f"[{key}]", value)
return template
def build_message(event_object, action_str, type_str):
if type_str == "comment":
action_summary = f"There is a new comment on a {event_object['target_type']} you are assigned to in {event_object['repository']} by {event_object['sender']}."
else:
action_summary = f"You were {action_str} to an {type_str} in {event_object['repository']} by {event_object['sender']}."
if AGENT_PROMPT_FILE:
AGENT_PROMPT_FILE_PATH = "/app/" + AGENT_PROMPT_FILE
# Check if the file exists
if not os.path.isfile(AGENT_PROMPT_FILE_PATH):
print(
f"Custom prompt file not found at {AGENT_PROMPT_FILE_PATH}. Using default message."
)
message = fill_template(
DEFAULT_PROMPT_TEMPLATE, event_object, action_str, type_str
)
message = message.replace(
"You were [action_str] to an [type_str]", action_summary
)
return message
with open(AGENT_PROMPT_FILE_PATH, "r") as f:
custom_prompt = f.read()
message = fill_template(custom_prompt, event_object, action_str, type_str)
message = message.replace(
"You were [action_str] to an [type_str]", action_summary
)
return message
else:
print("No custom prompt file specified. Using default message.")
message = fill_template(
DEFAULT_PROMPT_TEMPLATE, event_object, action_str, type_str
)
message = message.replace(
"You were [action_str] to an [type_str]", action_summary
)
return message
def sendToAgent(event_object, db):
headers = {"x-openclaw-token": OPENCLAW_TOKEN, "Content-Type": "application/json"}
print(f"Preparing to send notification to Agent for {json.dumps(event_object)}")
if event_object.get("type") == "issue_comment":
action_str = "created"
type_str = "comment"
mention = f"@{AGENT_USERNAME}".lower()
comment_body = event_object.get("comment_body", "").lower()
if mention not in comment_body:
print(
f"Agent {AGENT_USERNAME} was not mentioned in comment body. Skipping notification."
)
return
else:
action_str = "assigned"
type_str = "issue" if event_object["type"] == "issue" else "pull request"
assignees = event_object.get(
"assignees", [event_object.get("assignee", "Unknown")]
)
if AGENT_USERNAME not in assignees:
print(
f"Agent {AGENT_USERNAME} is not among the assignees for this event. Skipping notification."
)
return
# Check rate limit before sending
if not check_and_increment_rate_limit(db):
print("Rate limit reached. Skipping notification to agent.")
return
message = build_message(event_object, action_str, type_str)
try:
if OPENCLAW_PROXY_AUTH:
auth = tuple(OPENCLAW_PROXY_AUTH.split(":"))
else:
auth = None
response = requests.post(
OPENCLAW_URL,
headers=headers,
json={
"message": message,
"thinking": OPENCLAW_THINKING,
"model": OPENCLAW_MODEL,
"deliver": False,
},
auth=auth,
)
if response.status_code == 200:
print(
f"Successfully notified OpenClaw with ({len(message)} chars) about {type_str} {action_str}."
)
else:
print(
f"Failed to notify OpenClaw. Status code: {response.status_code}, Response: {response.text}"
)
except Exception as e:
print(f"Error notifying OpenClaw: {e}")
# SHSF Handler - Serverless
def main(args):
required_data = [
"OPENCLAW_URL",
"OPENCLAW_TOKEN",
"DATABASE_STORAGE_NAME",
"AGENT_USERNAME",
"OPENCLAW_MODEL",
]
for var in required_data:
if not os.getenv(var):
return {
"_shsf": "v2",
"_code": 500,
"_res": {"error": f"Missing required environment variable: {var}"},
"_headers": {"Content-Type": "application/json"},
}
headers = args.get("headers", {})
route = args.get("route")
event_type = headers.get("X-Gitea-Event") or headers.get("x-gitea-event")
if not route or not event_type:
return {
"_shsf": "v2",
"_code": 400,
"_res": {"error": "Missing route or X-Gitea-Event header"},
"_headers": {"Content-Type": "application/json"},
}
if event_type not in eventsToHandle:
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": f"Ignored event type: {event_type}"},
"_headers": {"Content-Type": "application/json"},
}
try:
db = database()
except:
print("Failed to connect to database")
return {
"_shsf": "v2",
"_code": 500,
"_res": {"error": "Database connection failed"},
"_headers": {"Content-Type": "application/json"},
}
data = args.get("body")
data = json.loads(data)
if not data:
return {
"_shsf": "v2",
"_code": 400,
"_res": {"error": "No JSON payload"},
"_headers": {"Content-Type": "application/json"},
}
if route == "webhook":
# Prepare DB
try:
db.create_storage(DATABASE_STORAGE, purpose="For fast key-value data")
except Exception as e:
print(f"Failed to create storage: {e}")
# Probably already exists, continue anyway
action = data.get("action")
print(f"Action: {action}, Event Type: {event_type}")
if not action:
return {
"_shsf": "v2",
"_code": 400,
"_res": {"error": "Missing action in payload"},
"_headers": {"Content-Type": "application/json"},
}
if action not in actionsToHandle:
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": f"Ignored action: {action}"},
"_headers": {"Content-Type": "application/json"},
}
if action == "assigned" and event_type in ["issues", "pull_request"]:
assignee_data = data.get("assignee")
assignees_data = data.get("assignees")
# If "assignee" is missing in the root, look for it in the issue/pull_request object
if not assignee_data:
assignee_data = (
data.get("issue") or data.get("pull_request") or {}
).get("assignee")
if not assignees_data:
assignees_data = (
data.get("issue") or data.get("pull_request") or {}
).get("assignees")
# Extract logins from the list of assignees, or single assignee
assignees_list = []
if assignees_data:
assignees_list = [
a.get("login") for a in assignees_data if a.get("login")
]
elif assignee_data:
assignees_list = (
[assignee_data.get("login")] if assignee_data.get("login") else []
)
repository = data.get("repository", {}).get("full_name", "Unknown")
repo_id = data.get("repository", {}).get("id")
sender = data.get("sender", {}).get("login", "Unknown")
event_object = None
# Use helpers to get stored assignees if needed
issue_number = (data.get("issue") or data.get("pull_request") or {}).get(
"number"
)
# Get assignees from DB (previous events) if the current payload has none
if not assignees_list:
assignees_list = get_stored_assignees(
db, event_type, repo_id, issue_number
)
if not assignees_list:
assignees_list = ["Unknown"]
if event_type == "issues":
issue_data = data.get("issue", {})
event_object = {
"type": "issue",
"action": action,
"repository": repository,
"number": issue_data.get("number"),
"title": issue_data.get("title"),
"assignees": assignees_list,
"sender": sender,
"url": issue_data.get("html_url"),
"state": issue_data.get("state"),
}
elif event_type == "pull_request":
pr_data = data.get("pull_request", {})
event_object = {
"type": "pull_request",
"action": action,
"repository": repository,
"number": pr_data.get("number"),
"title": pr_data.get("title"),
"assignees": assignees_list,
"sender": sender,
"url": pr_data.get("html_url"),
"state": pr_data.get("state"),
"merged": pr_data.get("merged", False),
}
if event_object:
# Store assignees for later comment events.
if action == "assigned":
set_stored_assignees(
db, event_type, repo_id, event_object["number"], assignees_list
)
# Send to OpenClaw
if action == "assigned":
sendToAgent(event_object, db)
else:
print(f"Action {action} is not configured to send to agent")
elif action == "created" and event_type == "issue_comment":
comment_data = data.get("comment", {})
is_pull = data.get("is_pull", False)
target_data = data.get("issue") or data.get("pull_request") or {}
repository = data.get("repository", {}).get("full_name", "Unknown")
repo_id = data.get("repository", {}).get("id")
sender = data.get("sender", {}).get("login", "Unknown")
issue_number = target_data.get("number")
event_object = {
"type": "issue_comment",
"target_type": "pull_request" if is_pull else "issue",
"action": action,
"repository": repository,
"number": issue_number,
"title": target_data.get("title", "Unknown"),
"sender": sender,
"comment_body": comment_data.get("body", ""),
"url": comment_data.get("html_url"),
}
sendToAgent(event_object, db)
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": "received"},
"_headers": {"Content-Type": "application/json"},
}
elif route == "clear_limit":
# Reset the hourly rate limit counter
try:
reset_rate_limit(db)
return {
"_shsf": "v2",
"_code": 200,
"_res": {"status": "Rate limit reset"},
"_headers": {"Content-Type": "application/json"},
}
except Exception as e:
print(f"Failed to reset rate limit: {e}")
return {
"_shsf": "v2",
"_code": 500,
"_res": {"error": "Failed to reset rate limit"},
"_headers": {"Content-Type": "application/json"},
}
else:
return {
"_shsf": "v2",
"_code": 404,
"_res": {"error": "Not Found"},
"_headers": {"Content-Type": "application/json"},
}