File size: 7,331 Bytes
58b0cb8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | import httpx
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db_models import User
from app.auth import create_access_token
from app.config import settings
async def check_github_repo_admin(access_token: str, username: str) -> bool:
"""
Check if user is admin/owner/collaborator of the configured GitHub repo.
Returns True if user has admin privileges on the repo.
"""
if not settings.GITHUB_REPO_OWNER or not settings.GITHUB_REPO_NAME:
return False
async with httpx.AsyncClient() as client:
# Check if user is a collaborator
collab_response = await client.get(
f"https://api.github.com/repos/{settings.GITHUB_REPO_OWNER}/{settings.GITHUB_REPO_NAME}/collaborators/{username}",
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/vnd.github+json",
},
)
if collab_response.status_code == 200:
collab_data = collab_response.json()
permission = collab_data.get("permission", "")
# Admin permissions: admin, maintain, triage
if permission in ["admin", "maintain", "triage"]:
return True
# Check if user is the repo owner (for organization repos)
repo_response = await client.get(
f"https://api.github.com/repos/{settings.GITHUB_REPO_OWNER}/{settings.GITHUB_REPO_NAME}",
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/vnd.github+json",
},
)
if repo_response.status_code == 200:
repo_data = repo_response.json()
# Check if user is the owner
if repo_data.get("owner", {}).get("login") == username:
return True
return False
class OAuthHandler:
@staticmethod
async def github_callback(code: str, session: AsyncSession) -> tuple[User, str]:
async with httpx.AsyncClient() as client:
token_response = await client.post(
"https://github.com/login/oauth/access_token",
headers={"Accept": "application/json"},
data={
"client_id": settings.GITHUB_CLIENT_ID,
"client_secret": settings.GITHUB_CLIENT_SECRET,
"code": code,
},
)
if token_response.status_code != 200:
raise HTTPException(status_code=400, detail="GitHub OAuth failed")
token_data = token_response.json()
access_token = token_data.get("access_token")
user_response = await client.get(
"https://api.github.com/user",
headers={"Authorization": f"Bearer {access_token}"},
)
if user_response.status_code != 200:
raise HTTPException(
status_code=400, detail="Failed to fetch GitHub user"
)
github_user = user_response.json()
github_username = github_user["login"]
github_id = str(github_user["id"])
# Check if user is admin of the configured repo
is_repo_admin = await check_github_repo_admin(access_token, github_username)
result = await session.execute(
select(User).where(
User.oauth_provider == "github",
User.oauth_id == github_id,
)
)
user = result.scalar_one_or_none()
# Determine role: admin if repo collaborator, otherwise user
role = "admin" if is_repo_admin else "user"
if not user:
user = User(
oauth_provider="github",
oauth_id=github_id,
email=github_user.get("email") or f"{github_username}@github.local",
username=github_username,
avatar_url=github_user.get("avatar_url"),
role=role,
)
session.add(user)
else:
user.last_login = None
user.username = github_username
user.avatar_url = github_user.get("avatar_url")
user.role = role # Update role based on repo access
await session.commit()
await session.refresh(user)
jwt_token = create_access_token(
data={"sub": str(user.id), "email": user.email, "role": user.role}
)
return user, jwt_token
@staticmethod
async def google_callback(
code: str, redirect_uri: str, session: AsyncSession
) -> tuple[User, str]:
async with httpx.AsyncClient() as client:
token_response = await client.post(
"https://oauth2.googleapis.com/token",
data={
"client_id": settings.GOOGLE_CLIENT_ID,
"client_secret": settings.GOOGLE_CLIENT_SECRET,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
},
)
if token_response.status_code != 200:
raise HTTPException(status_code=400, detail="Google OAuth failed")
token_data = token_response.json()
access_token = token_data.get("access_token")
user_response = await client.get(
"https://www.googleapis.com/oauth2/v2/userinfo",
headers={"Authorization": f"Bearer {access_token}"},
)
if user_response.status_code != 200:
raise HTTPException(
status_code=400, detail="Failed to fetch Google user"
)
google_user = user_response.json()
result = await session.execute(
select(User).where(
User.oauth_provider == "google", User.oauth_id == google_user["id"]
)
)
user = result.scalar_one_or_none()
if not user:
user = User(
oauth_provider="google",
oauth_id=google_user["id"],
email=google_user["email"],
username=google_user.get(
"name", google_user["email"].split("@")[0]
),
avatar_url=google_user.get("picture"),
role="user",
)
session.add(user)
else:
user.last_login = None
user.username = google_user.get("name", user.username)
user.avatar_url = google_user.get("picture")
await session.commit()
await session.refresh(user)
jwt_token = create_access_token(
data={"sub": str(user.id), "email": user.email, "role": user.role}
)
return user, jwt_token
oauth_handler = OAuthHandler()
|