first commit

This commit is contained in:
Space
2026-01-17 13:37:57 +01:00
commit 3e34d84a29
49 changed files with 8579 additions and 0 deletions

38
backend/auth.py Normal file
View File

@@ -0,0 +1,38 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt # type: ignore[import]
from passlib.context import CryptContext
from config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
def decode_access_token(token: str) -> Optional[dict]:
"""Decode a JWT access token"""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
return payload
except JWTError:
return None

23
backend/config.py Normal file
View File

@@ -0,0 +1,23 @@
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
mongodb_url: str
database_name: str = "grademaxxing"
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 43200 # 30 days
vite_api_url: str = "http://localhost:8000/api"
cors_origins: str = "http://localhost:5173,http://localhost:8000,http://127.0.0.1:8000"
@property
def cors_origins_list(self) -> List[str]:
return [origin.strip() for origin in self.cors_origins.split(",")]
class Config:
env_file = ".env"
case_sensitive = False
settings = Settings()

26
backend/database.py Normal file
View File

@@ -0,0 +1,26 @@
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from config import settings
client: AsyncIOMotorClient = None
database: AsyncIOMotorDatabase = None
async def connect_to_mongo():
"""Connect to MongoDB"""
global client, database
client = AsyncIOMotorClient(settings.mongodb_url)
database = client[settings.database_name]
print(f"Connected to MongoDB at {settings.mongodb_url}")
async def close_mongo_connection():
"""Close MongoDB connection"""
global client
if client:
client.close()
print("Closed MongoDB connection")
def get_database() -> AsyncIOMotorDatabase:
"""Get database instance"""
return database

168
backend/main.py Normal file
View File

@@ -0,0 +1,168 @@
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
from contextlib import asynccontextmanager
import os
from pathlib import Path
from io import StringIO
import csv
from typing import Optional
from config import settings
from database import connect_to_mongo, close_mongo_connection, get_database
from routes import auth_routes, subject_routes, grade_routes, period_routes, teacher_grade_routes
from routes.auth_routes import get_current_user
from models import UserInDB
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan events"""
# Startup
await connect_to_mongo()
yield
# Shutdown
await close_mongo_connection()
app = FastAPI(
title="Grademaxxing API",
description="Grade management system API",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# API routes
app.include_router(auth_routes.router, prefix="/api")
app.include_router(subject_routes.router, prefix="/api")
app.include_router(grade_routes.router, prefix="/api")
app.include_router(period_routes.router, prefix="/api")
app.include_router(teacher_grade_routes.router, prefix="/api")
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "version": "1.0.0"}
@app.get("/api/export/grades")
async def export_grades_csv(
period_id: Optional[str] = None,
current_user: UserInDB = Depends(get_current_user)
):
"""Export all grades as CSV, optionally filtered by period"""
db = get_database()
# Get all subjects for the user
subjects_cursor = db.subjects.find({"user_id": current_user.id})
subjects = await subjects_cursor.to_list(length=None)
subject_dict = {str(subject["_id"]): subject for subject in subjects}
# Build grade query
grade_query = {"user_id": current_user.id}
# If period_id is provided, filter grades by date range
if period_id:
period = await db.periods.find_one({"_id": period_id, "user_id": current_user.id})
if period:
grade_query["date"] = {
"$gte": period["start_date"],
"$lte": period["end_date"]
}
# Get all grades
grades_cursor = db.grades.find(grade_query).sort("date", -1)
grades = await grades_cursor.to_list(length=None)
# Create CSV in memory
output = StringIO()
writer = csv.writer(output)
# Write header
writer.writerow([
"Subject",
"Category",
"Grade Name",
"Grade",
"Max Grade",
"Percentage",
"Weight in Category",
"Date",
"Notes"
])
# Write grade rows
for grade in grades:
subject = subject_dict.get(grade["subject_id"])
subject_name = subject["name"] if subject else "Unknown Subject"
# Calculate percentage
percentage = (grade["grade"] / grade["max_grade"] * 100) if grade["max_grade"] > 0 else 0
writer.writerow([
subject_name,
grade.get("category_name", ""),
grade.get("name", ""),
grade["grade"],
grade["max_grade"],
f"{percentage:.2f}%",
grade.get("weight_in_category", 1.0),
grade["date"].strftime("%Y-%m-%d") if "date" in grade else "",
grade.get("notes", "")
])
# Prepare response
output.seek(0)
filename = f"grades_export_{current_user.username}.csv"
if period_id:
period = await db.periods.find_one({"_id": period_id, "user_id": current_user.id})
if period:
filename = f"grades_{period['name'].replace(' ', '_')}.csv"
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
# Serve static files (React SPA)
static_dir = Path(__file__).parent.parent / "build" / "client"
if static_dir.exists():
app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets")
@app.get("/{full_path:path}")
async def serve_spa(full_path: str):
"""Serve React SPA for all non-API routes"""
# Check if the file exists
file_path = static_dir / full_path
if file_path.is_file():
return FileResponse(file_path)
# For all other routes, serve index.html (SPA routing)
index_path = static_dir / "index.html"
if index_path.exists():
return FileResponse(index_path)
raise HTTPException(status_code=404, detail="Not found")
else:
print(f"Warning: Static directory not found at {static_dir}")
print("Run the frontend build first to serve the UI")
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

265
backend/models.py Normal file
View File

@@ -0,0 +1,265 @@
from pydantic import BaseModel, EmailStr, Field, field_validator
from typing import Optional, List, Dict
from datetime import datetime
from bson import ObjectId
class PyObjectId(ObjectId):
"""Custom ObjectId type for Pydantic"""
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
return ObjectId(v)
@classmethod
def __get_pydantic_json_schema__(cls, field_schema):
field_schema.update(type="string")
# User Models
class UserBase(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
class UserCreate(UserBase):
password: str = Field(..., min_length=6)
class UserLogin(BaseModel):
email: EmailStr
password: str
class UserInDB(UserBase):
id: str = Field(default_factory=lambda: str(ObjectId()), alias="_id")
hashed_password: str
created_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
class UserResponse(UserBase):
id: str = Field(..., alias="_id")
created_at: datetime
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
user: UserResponse
# Grading Category Model (flexible, not German-specific)
class GradingCategory(BaseModel):
name: str = Field(..., min_length=1, max_length=100) # e.g., "Written", "Oral", "Homework"
weight: float = Field(..., ge=0, le=100) # Percentage weight (0-100)
color: Optional[str] = Field(default="#3b82f6") # Hex color for UI
# Subject Model
class SubjectBase(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
grading_categories: List[GradingCategory] = Field(..., min_items=1)
color: Optional[str] = Field(default="#3b82f6")
teacher: Optional[str] = None
grade_system: Optional[str] = Field(default="percentage") # "percentage", "german", "us-letter"
target_grade: Optional[float] = None
target_grade_max: Optional[float] = None
@field_validator("grading_categories")
@classmethod
def validate_weights(cls, categories: List[GradingCategory]) -> List[GradingCategory]:
total_weight = sum(cat.weight for cat in categories)
if abs(total_weight - 100.0) > 0.01: # Allow small floating point errors
raise ValueError(f"Grading category weights must sum to 100%, got {total_weight}%")
return categories
class SubjectCreate(SubjectBase):
pass
class SubjectUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=100)
grading_categories: Optional[List[GradingCategory]] = None
color: Optional[str] = None
teacher: Optional[str] = None
grade_system: Optional[str] = None
target_grade: Optional[float] = None
target_grade_max: Optional[float] = None
@field_validator("grading_categories")
@classmethod
def validate_weights(cls, categories: Optional[List[GradingCategory]]) -> Optional[List[GradingCategory]]:
if categories is None:
return None
total_weight = sum(cat.weight for cat in categories)
if abs(total_weight - 100.0) > 0.01:
raise ValueError(f"Grading category weights must sum to 100%, got {total_weight}%")
return categories
class SubjectInDB(SubjectBase):
id: str = Field(default_factory=lambda: str(ObjectId()), alias="_id")
user_id: str
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
class SubjectResponse(SubjectBase):
id: str = Field(..., alias="_id")
created_at: datetime
updated_at: datetime
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
# Test/Grade Model
class GradeBase(BaseModel):
subject_id: str
category_name: str # Must match one of the subject's grading categories
grade: float = Field(..., ge=0) # Grade value (e.g., 1.0 to 6.0 for German, 0-100 for percentage)
max_grade: float = Field(..., ge=0) # Maximum possible grade (e.g., 6.0 or 100)
weight_in_category: float = Field(default=1.0, ge=0) # Weight within the category (for averaging multiple tests)
name: Optional[str] = Field(None, max_length=200) # Test name/description
date: datetime = Field(default_factory=datetime.utcnow)
notes: Optional[str] = None
class GradeCreate(GradeBase):
pass
class GradeUpdate(BaseModel):
category_name: Optional[str] = None
grade: Optional[float] = Field(None, ge=0)
max_grade: Optional[float] = Field(None, ge=0)
weight_in_category: Optional[float] = Field(None, ge=0)
name: Optional[str] = Field(None, max_length=200)
date: Optional[datetime] = None
notes: Optional[str] = None
class GradeInDB(GradeBase):
id: str = Field(default_factory=lambda: str(ObjectId()), alias="_id")
user_id: str
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
class GradeResponse(GradeBase):
id: str = Field(..., alias="_id")
created_at: datetime
updated_at: datetime
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
# Report Period Model
class ReportPeriodBase(BaseModel):
name: str = Field(..., min_length=1, max_length=100) # e.g., "First Half-Year 2024/2025"
start_date: datetime
end_date: datetime
@field_validator("end_date")
@classmethod
def validate_dates(cls, end_date: datetime, info) -> datetime:
start_date = info.data.get("start_date")
if start_date and end_date <= start_date:
raise ValueError("end_date must be after start_date")
return end_date
class ReportPeriodCreate(ReportPeriodBase):
pass
class ReportPeriodUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=100)
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
class ReportPeriodInDB(ReportPeriodBase):
id: str = Field(default_factory=lambda: str(ObjectId()), alias="_id")
user_id: str
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
class ReportPeriodResponse(ReportPeriodBase):
id: str = Field(..., alias="_id")
created_at: datetime
updated_at: datetime
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
# Teacher Grade Override Model (manual grades from teachers)
class TeacherGradeBase(BaseModel):
subject_id: str
period_id: str
grade: float = Field(..., ge=0) # The grade value (e.g., 1.0-6.0 for German, 0-100 for percentage)
max_grade: float = Field(..., ge=0) # Maximum possible grade (e.g., 6.0 or 100)
notes: Optional[str] = Field(None, max_length=500)
class TeacherGradeCreate(TeacherGradeBase):
pass
class TeacherGradeUpdate(BaseModel):
grade: Optional[float] = Field(None, ge=0)
max_grade: Optional[float] = Field(None, ge=0)
notes: Optional[str] = Field(None, max_length=500)
class TeacherGradeInDB(TeacherGradeBase):
id: str = Field(default_factory=lambda: str(ObjectId()), alias="_id")
user_id: str
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}
class TeacherGradeResponse(TeacherGradeBase):
id: str = Field(..., alias="_id")
created_at: datetime
updated_at: datetime
class Config:
populate_by_name = True
json_encoders = {ObjectId: str}

10
backend/requirements.txt Normal file
View File

@@ -0,0 +1,10 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
motor==3.6.0
pymongo>=4.9,<4.10
pydantic==2.10.5
pydantic-settings==2.7.1
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.20
email-validator==2.2.0

View File

@@ -0,0 +1,7 @@
# Backend routes
from . import auth_routes
from . import subject_routes
from . import grade_routes
from . import period_routes
__all__ = ["auth_routes", "subject_routes", "grade_routes", "period_routes"]

View File

@@ -0,0 +1,149 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from datetime import timedelta
from typing import Optional
from models import UserCreate, UserLogin, UserResponse, UserInDB, Token
from auth import get_password_hash, verify_password, create_access_token, decode_access_token
from database import get_database
from config import settings
from bson import ObjectId
router = APIRouter(prefix="/auth", tags=["authentication"])
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> UserInDB:
"""Get current authenticated user"""
token = credentials.credentials
payload = decode_access_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
db = get_database()
user_dict = await db.users.find_one({"_id": ObjectId(user_id)})
if user_dict is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
user_dict["_id"] = str(user_dict["_id"])
return UserInDB(**user_dict)
@router.post("/register", response_model=Token, status_code=status.HTTP_201_CREATED)
async def register(user: UserCreate):
"""Register a new user"""
db = get_database()
# Check if user already exists
existing_user = await db.users.find_one({"email": user.email})
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
existing_username = await db.users.find_one({"username": user.username})
if existing_username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
# Create new user
user_in_db = UserInDB(
email=user.email,
username=user.username,
hashed_password=get_password_hash(user.password)
)
user_dict = user_in_db.model_dump(by_alias=True)
user_dict["_id"] = ObjectId(user_dict["_id"])
result = await db.users.insert_one(user_dict)
user_dict["_id"] = str(result.inserted_id)
# Create access token
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": str(result.inserted_id)},
expires_delta=access_token_expires
)
user_response = UserResponse(
_id=str(result.inserted_id),
email=user.email,
username=user.username,
created_at=user_in_db.created_at
)
return Token(access_token=access_token, user=user_response)
@router.post("/login", response_model=Token)
async def login(user_login: UserLogin):
"""Login a user"""
db = get_database()
# Find user by email
user_dict = await db.users.find_one({"email": user_login.email})
if not user_dict:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
user_dict["_id"] = str(user_dict["_id"])
user = UserInDB(**user_dict)
# Verify password
if not verify_password(user_login.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.id},
expires_delta=access_token_expires
)
user_response = UserResponse(
_id=user.id,
email=user.email,
username=user.username,
created_at=user.created_at
)
return Token(access_token=access_token, user=user_response)
@router.get("/me", response_model=UserResponse)
async def get_me(current_user: UserInDB = Depends(get_current_user)):
"""Get current user information"""
return UserResponse(
_id=current_user.id,
email=current_user.email,
username=current_user.username,
created_at=current_user.created_at
)

View File

@@ -0,0 +1,202 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query
from typing import List, Optional
from models import GradeCreate, GradeUpdate, GradeResponse, GradeInDB, UserInDB
from database import get_database
from routes.auth_routes import get_current_user
from bson import ObjectId
from datetime import datetime
router = APIRouter(prefix="/grades", tags=["grades"])
@router.post("", response_model=GradeResponse, status_code=status.HTTP_201_CREATED)
async def create_grade(
grade: GradeCreate,
current_user: UserInDB = Depends(get_current_user)
):
"""Create a new grade/test entry"""
db = get_database()
# Verify subject exists and belongs to user
if not ObjectId.is_valid(grade.subject_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid subject ID"
)
subject = await db.subjects.find_one({
"_id": ObjectId(grade.subject_id),
"user_id": current_user.id
})
if not subject:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Subject not found"
)
# Verify category exists in subject
category_names = [cat["name"] for cat in subject.get("grading_categories", [])]
if grade.category_name not in category_names:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Category '{grade.category_name}' not found in subject. Available: {', '.join(category_names)}"
)
grade_in_db = GradeInDB(
**grade.model_dump(),
user_id=current_user.id
)
grade_dict = grade_in_db.model_dump(by_alias=True)
grade_dict["_id"] = ObjectId(grade_dict["_id"])
result = await db.grades.insert_one(grade_dict)
grade_dict["_id"] = str(result.inserted_id)
return GradeResponse(**grade_dict)
@router.get("", response_model=List[GradeResponse])
async def get_grades(
subject_id: Optional[str] = Query(None),
current_user: UserInDB = Depends(get_current_user)
):
"""Get all grades for the current user, optionally filtered by subject"""
db = get_database()
query = {"user_id": current_user.id}
if subject_id:
if not ObjectId.is_valid(subject_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid subject ID"
)
query["subject_id"] = subject_id
cursor = db.grades.find(query).sort("date", -1) # Most recent first
grades = await cursor.to_list(length=None)
for grade in grades:
grade["_id"] = str(grade["_id"])
return [GradeResponse(**grade) for grade in grades]
@router.get("/{grade_id}", response_model=GradeResponse)
async def get_grade(
grade_id: str,
current_user: UserInDB = Depends(get_current_user)
):
"""Get a specific grade"""
db = get_database()
if not ObjectId.is_valid(grade_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid grade ID"
)
grade = await db.grades.find_one({
"_id": ObjectId(grade_id),
"user_id": current_user.id
})
if not grade:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Grade not found"
)
grade["_id"] = str(grade["_id"])
return GradeResponse(**grade)
@router.put("/{grade_id}", response_model=GradeResponse)
async def update_grade(
grade_id: str,
grade_update: GradeUpdate,
current_user: UserInDB = Depends(get_current_user)
):
"""Update a grade"""
db = get_database()
if not ObjectId.is_valid(grade_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid grade ID"
)
# Check if grade exists and belongs to user
existing_grade = await db.grades.find_one({
"_id": ObjectId(grade_id),
"user_id": current_user.id
})
if not existing_grade:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Grade not found"
)
# If updating category, verify it exists in subject
if grade_update.category_name:
subject = await db.subjects.find_one({
"_id": ObjectId(existing_grade["subject_id"]),
"user_id": current_user.id
})
if subject:
category_names = [cat["name"] for cat in subject.get("grading_categories", [])]
if grade_update.category_name not in category_names:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Category '{grade_update.category_name}' not found in subject"
)
# Update only provided fields
update_data = grade_update.model_dump(exclude_unset=True)
if update_data:
update_data["updated_at"] = datetime.utcnow()
await db.grades.update_one(
{"_id": ObjectId(grade_id)},
{"$set": update_data}
)
# Fetch updated grade
updated_grade = await db.grades.find_one({"_id": ObjectId(grade_id)})
updated_grade["_id"] = str(updated_grade["_id"])
return GradeResponse(**updated_grade)
@router.delete("/{grade_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_grade(
grade_id: str,
current_user: UserInDB = Depends(get_current_user)
):
"""Delete a grade"""
db = get_database()
if not ObjectId.is_valid(grade_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid grade ID"
)
# Check if grade exists and belongs to user
grade = await db.grades.find_one({
"_id": ObjectId(grade_id),
"user_id": current_user.id
})
if not grade:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Grade not found"
)
# Delete grade
await db.grades.delete_one({"_id": ObjectId(grade_id)})
return None

View File

@@ -0,0 +1,149 @@
from fastapi import APIRouter, Depends, HTTPException, status
from typing import List
from models import ReportPeriodCreate, ReportPeriodUpdate, ReportPeriodResponse, ReportPeriodInDB, UserInDB
from database import get_database
from routes.auth_routes import get_current_user
from bson import ObjectId
from datetime import datetime
router = APIRouter(prefix="/periods", tags=["report-periods"])
@router.post("", response_model=ReportPeriodResponse, status_code=status.HTTP_201_CREATED)
async def create_period(
period: ReportPeriodCreate,
current_user: UserInDB = Depends(get_current_user)
):
"""Create a new report period"""
db = get_database()
period_in_db = ReportPeriodInDB(
**period.model_dump(),
user_id=current_user.id
)
period_dict = period_in_db.model_dump(by_alias=True)
period_dict["_id"] = ObjectId(period_dict["_id"])
result = await db.periods.insert_one(period_dict)
period_dict["_id"] = str(result.inserted_id)
return ReportPeriodResponse(**period_dict)
@router.get("", response_model=List[ReportPeriodResponse])
async def get_periods(current_user: UserInDB = Depends(get_current_user)):
"""Get all report periods for the current user"""
db = get_database()
cursor = db.periods.find({"user_id": current_user.id}).sort("start_date", -1)
periods = await cursor.to_list(length=None)
for period in periods:
period["_id"] = str(period["_id"])
return [ReportPeriodResponse(**period) for period in periods]
@router.get("/{period_id}", response_model=ReportPeriodResponse)
async def get_period(
period_id: str,
current_user: UserInDB = Depends(get_current_user)
):
"""Get a specific report period"""
db = get_database()
if not ObjectId.is_valid(period_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid period ID"
)
period = await db.periods.find_one({
"_id": ObjectId(period_id),
"user_id": current_user.id
})
if not period:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Report period not found"
)
period["_id"] = str(period["_id"])
return ReportPeriodResponse(**period)
@router.put("/{period_id}", response_model=ReportPeriodResponse)
async def update_period(
period_id: str,
period_update: ReportPeriodUpdate,
current_user: UserInDB = Depends(get_current_user)
):
"""Update a report period"""
db = get_database()
if not ObjectId.is_valid(period_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid period ID"
)
# Check if period exists and belongs to user
existing_period = await db.periods.find_one({
"_id": ObjectId(period_id),
"user_id": current_user.id
})
if not existing_period:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Report period not found"
)
# Update only provided fields
update_data = period_update.model_dump(exclude_unset=True)
if update_data:
update_data["updated_at"] = datetime.utcnow()
await db.periods.update_one(
{"_id": ObjectId(period_id)},
{"$set": update_data}
)
# Fetch updated period
updated_period = await db.periods.find_one({"_id": ObjectId(period_id)})
updated_period["_id"] = str(updated_period["_id"])
return ReportPeriodResponse(**updated_period)
@router.delete("/{period_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_period(
period_id: str,
current_user: UserInDB = Depends(get_current_user)
):
"""Delete a report period"""
db = get_database()
if not ObjectId.is_valid(period_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid period ID"
)
# Check if period exists and belongs to user
period = await db.periods.find_one({
"_id": ObjectId(period_id),
"user_id": current_user.id
})
if not period:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Report period not found"
)
# Delete period
await db.periods.delete_one({"_id": ObjectId(period_id)})
return None

View File

@@ -0,0 +1,152 @@
from fastapi import APIRouter, Depends, HTTPException, status
from typing import List
from models import SubjectCreate, SubjectUpdate, SubjectResponse, SubjectInDB, UserInDB
from database import get_database
from routes.auth_routes import get_current_user
from bson import ObjectId
from datetime import datetime
router = APIRouter(prefix="/subjects", tags=["subjects"])
@router.post("", response_model=SubjectResponse, status_code=status.HTTP_201_CREATED)
async def create_subject(
subject: SubjectCreate,
current_user: UserInDB = Depends(get_current_user)
):
"""Create a new subject"""
db = get_database()
subject_in_db = SubjectInDB(
**subject.model_dump(),
user_id=current_user.id
)
subject_dict = subject_in_db.model_dump(by_alias=True)
subject_dict["_id"] = ObjectId(subject_dict["_id"])
result = await db.subjects.insert_one(subject_dict)
subject_dict["_id"] = str(result.inserted_id)
return SubjectResponse(**subject_dict)
@router.get("", response_model=List[SubjectResponse])
async def get_subjects(current_user: UserInDB = Depends(get_current_user)):
"""Get all subjects for the current user"""
db = get_database()
cursor = db.subjects.find({"user_id": current_user.id})
subjects = await cursor.to_list(length=None)
for subject in subjects:
subject["_id"] = str(subject["_id"])
return [SubjectResponse(**subject) for subject in subjects]
@router.get("/{subject_id}", response_model=SubjectResponse)
async def get_subject(
subject_id: str,
current_user: UserInDB = Depends(get_current_user)
):
"""Get a specific subject"""
db = get_database()
if not ObjectId.is_valid(subject_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid subject ID"
)
subject = await db.subjects.find_one({
"_id": ObjectId(subject_id),
"user_id": current_user.id
})
if not subject:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Subject not found"
)
subject["_id"] = str(subject["_id"])
return SubjectResponse(**subject)
@router.put("/{subject_id}", response_model=SubjectResponse)
async def update_subject(
subject_id: str,
subject_update: SubjectUpdate,
current_user: UserInDB = Depends(get_current_user)
):
"""Update a subject"""
db = get_database()
if not ObjectId.is_valid(subject_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid subject ID"
)
# Check if subject exists and belongs to user
existing_subject = await db.subjects.find_one({
"_id": ObjectId(subject_id),
"user_id": current_user.id
})
if not existing_subject:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Subject not found"
)
# Update only provided fields
update_data = subject_update.model_dump(exclude_unset=True)
if update_data:
update_data["updated_at"] = datetime.utcnow()
await db.subjects.update_one(
{"_id": ObjectId(subject_id)},
{"$set": update_data}
)
# Fetch updated subject
updated_subject = await db.subjects.find_one({"_id": ObjectId(subject_id)})
updated_subject["_id"] = str(updated_subject["_id"])
return SubjectResponse(**updated_subject)
@router.delete("/{subject_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_subject(
subject_id: str,
current_user: UserInDB = Depends(get_current_user)
):
"""Delete a subject and all associated grades"""
db = get_database()
if not ObjectId.is_valid(subject_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid subject ID"
)
# Check if subject exists and belongs to user
subject = await db.subjects.find_one({
"_id": ObjectId(subject_id),
"user_id": current_user.id
})
if not subject:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Subject not found"
)
# Delete subject
await db.subjects.delete_one({"_id": ObjectId(subject_id)})
# Delete all grades for this subject
await db.grades.delete_many({"subject_id": subject_id, "user_id": current_user.id})
return None

View File

@@ -0,0 +1,186 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query
from typing import List, Optional
from models import TeacherGradeCreate, TeacherGradeUpdate, TeacherGradeResponse, TeacherGradeInDB, UserInDB
from database import get_database
from routes.auth_routes import get_current_user
from bson import ObjectId
from datetime import datetime
router = APIRouter(prefix="/teacher-grades", tags=["teacher-grades"])
@router.post("", response_model=TeacherGradeResponse, status_code=status.HTTP_201_CREATED)
async def create_teacher_grade(
teacher_grade: TeacherGradeCreate,
current_user: UserInDB = Depends(get_current_user)
):
"""Create or update a teacher-provided grade override for a subject in a period"""
db = get_database()
# Verify subject exists and belongs to user
if not ObjectId.is_valid(teacher_grade.subject_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid subject ID"
)
subject = await db.subjects.find_one({
"_id": ObjectId(teacher_grade.subject_id),
"user_id": current_user.id
})
if not subject:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Subject not found"
)
# Verify period exists and belongs to user
if not ObjectId.is_valid(teacher_grade.period_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid period ID"
)
period = await db.periods.find_one({
"_id": ObjectId(teacher_grade.period_id),
"user_id": current_user.id
})
if not period:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Period not found"
)
# Check if teacher grade already exists for this subject and period
existing = await db.teacher_grades.find_one({
"subject_id": teacher_grade.subject_id,
"period_id": teacher_grade.period_id,
"user_id": current_user.id
})
if existing:
# Update existing teacher grade
update_data = teacher_grade.model_dump()
update_data["updated_at"] = datetime.utcnow()
await db.teacher_grades.update_one(
{"_id": existing["_id"]},
{"$set": update_data}
)
updated = await db.teacher_grades.find_one({"_id": existing["_id"]})
updated["_id"] = str(updated["_id"])
return TeacherGradeResponse(**updated)
else:
# Create new teacher grade
teacher_grade_in_db = TeacherGradeInDB(
**teacher_grade.model_dump(),
user_id=current_user.id
)
grade_dict = teacher_grade_in_db.model_dump(by_alias=True)
grade_dict["_id"] = ObjectId(grade_dict["_id"])
result = await db.teacher_grades.insert_one(grade_dict)
grade_dict["_id"] = str(result.inserted_id)
return TeacherGradeResponse(**grade_dict)
@router.get("", response_model=List[TeacherGradeResponse])
async def get_teacher_grades(
subject_id: Optional[str] = Query(None),
period_id: Optional[str] = Query(None),
current_user: UserInDB = Depends(get_current_user)
):
"""Get all teacher grades for the current user, optionally filtered by subject or period"""
db = get_database()
query = {"user_id": current_user.id}
if subject_id:
if not ObjectId.is_valid(subject_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid subject ID"
)
query["subject_id"] = subject_id
if period_id:
if not ObjectId.is_valid(period_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid period ID"
)
query["period_id"] = period_id
cursor = db.teacher_grades.find(query)
teacher_grades = await cursor.to_list(length=None)
for grade in teacher_grades:
grade["_id"] = str(grade["_id"])
return [TeacherGradeResponse(**grade) for grade in teacher_grades]
@router.get("/{teacher_grade_id}", response_model=TeacherGradeResponse)
async def get_teacher_grade(
teacher_grade_id: str,
current_user: UserInDB = Depends(get_current_user)
):
"""Get a specific teacher grade"""
db = get_database()
if not ObjectId.is_valid(teacher_grade_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid teacher grade ID"
)
teacher_grade = await db.teacher_grades.find_one({
"_id": ObjectId(teacher_grade_id),
"user_id": current_user.id
})
if not teacher_grade:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Teacher grade not found"
)
teacher_grade["_id"] = str(teacher_grade["_id"])
return TeacherGradeResponse(**teacher_grade)
@router.delete("/{teacher_grade_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_teacher_grade(
teacher_grade_id: str,
current_user: UserInDB = Depends(get_current_user)
):
"""Delete a teacher grade override"""
db = get_database()
if not ObjectId.is_valid(teacher_grade_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid teacher grade ID"
)
# Check if teacher grade exists and belongs to user
teacher_grade = await db.teacher_grades.find_one({
"_id": ObjectId(teacher_grade_id),
"user_id": current_user.id
})
if not teacher_grade:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Teacher grade not found"
)
# Delete teacher grade
await db.teacher_grades.delete_one({"_id": ObjectId(teacher_grade_id)})
return None