File size: 7,331 Bytes
58b0cb8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import httpx
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db_models import User
from app.auth import create_access_token
from app.config import settings


async def check_github_repo_admin(access_token: str, username: str) -> bool:
    """

    Check if user is admin/owner/collaborator of the configured GitHub repo.

    Returns True if user has admin privileges on the repo.

    """
    if not settings.GITHUB_REPO_OWNER or not settings.GITHUB_REPO_NAME:
        return False

    async with httpx.AsyncClient() as client:
        # Check if user is a collaborator
        collab_response = await client.get(
            f"https://api.github.com/repos/{settings.GITHUB_REPO_OWNER}/{settings.GITHUB_REPO_NAME}/collaborators/{username}",
            headers={
                "Authorization": f"Bearer {access_token}",
                "Accept": "application/vnd.github+json",
            },
        )

        if collab_response.status_code == 200:
            collab_data = collab_response.json()
            permission = collab_data.get("permission", "")
            # Admin permissions: admin, maintain, triage
            if permission in ["admin", "maintain", "triage"]:
                return True

        # Check if user is the repo owner (for organization repos)
        repo_response = await client.get(
            f"https://api.github.com/repos/{settings.GITHUB_REPO_OWNER}/{settings.GITHUB_REPO_NAME}",
            headers={
                "Authorization": f"Bearer {access_token}",
                "Accept": "application/vnd.github+json",
            },
        )

        if repo_response.status_code == 200:
            repo_data = repo_response.json()
            # Check if user is the owner
            if repo_data.get("owner", {}).get("login") == username:
                return True

        return False


class OAuthHandler:
    @staticmethod
    async def github_callback(code: str, session: AsyncSession) -> tuple[User, str]:
        async with httpx.AsyncClient() as client:
            token_response = await client.post(
                "https://github.com/login/oauth/access_token",
                headers={"Accept": "application/json"},
                data={
                    "client_id": settings.GITHUB_CLIENT_ID,
                    "client_secret": settings.GITHUB_CLIENT_SECRET,
                    "code": code,
                },
            )

            if token_response.status_code != 200:
                raise HTTPException(status_code=400, detail="GitHub OAuth failed")

            token_data = token_response.json()
            access_token = token_data.get("access_token")

            user_response = await client.get(
                "https://api.github.com/user",
                headers={"Authorization": f"Bearer {access_token}"},
            )

            if user_response.status_code != 200:
                raise HTTPException(
                    status_code=400, detail="Failed to fetch GitHub user"
                )

            github_user = user_response.json()
            github_username = github_user["login"]
            github_id = str(github_user["id"])

            # Check if user is admin of the configured repo
            is_repo_admin = await check_github_repo_admin(access_token, github_username)

            result = await session.execute(
                select(User).where(
                    User.oauth_provider == "github",
                    User.oauth_id == github_id,
                )
            )
            user = result.scalar_one_or_none()

            # Determine role: admin if repo collaborator, otherwise user
            role = "admin" if is_repo_admin else "user"

            if not user:
                user = User(
                    oauth_provider="github",
                    oauth_id=github_id,
                    email=github_user.get("email") or f"{github_username}@github.local",
                    username=github_username,
                    avatar_url=github_user.get("avatar_url"),
                    role=role,
                )
                session.add(user)
            else:
                user.last_login = None
                user.username = github_username
                user.avatar_url = github_user.get("avatar_url")
                user.role = role  # Update role based on repo access

            await session.commit()
            await session.refresh(user)

            jwt_token = create_access_token(
                data={"sub": str(user.id), "email": user.email, "role": user.role}
            )

            return user, jwt_token

    @staticmethod
    async def google_callback(

        code: str, redirect_uri: str, session: AsyncSession

    ) -> tuple[User, str]:
        async with httpx.AsyncClient() as client:
            token_response = await client.post(
                "https://oauth2.googleapis.com/token",
                data={
                    "client_id": settings.GOOGLE_CLIENT_ID,
                    "client_secret": settings.GOOGLE_CLIENT_SECRET,
                    "code": code,
                    "grant_type": "authorization_code",
                    "redirect_uri": redirect_uri,
                },
            )

            if token_response.status_code != 200:
                raise HTTPException(status_code=400, detail="Google OAuth failed")

            token_data = token_response.json()
            access_token = token_data.get("access_token")

            user_response = await client.get(
                "https://www.googleapis.com/oauth2/v2/userinfo",
                headers={"Authorization": f"Bearer {access_token}"},
            )

            if user_response.status_code != 200:
                raise HTTPException(
                    status_code=400, detail="Failed to fetch Google user"
                )

            google_user = user_response.json()

            result = await session.execute(
                select(User).where(
                    User.oauth_provider == "google", User.oauth_id == google_user["id"]
                )
            )
            user = result.scalar_one_or_none()

            if not user:
                user = User(
                    oauth_provider="google",
                    oauth_id=google_user["id"],
                    email=google_user["email"],
                    username=google_user.get(
                        "name", google_user["email"].split("@")[0]
                    ),
                    avatar_url=google_user.get("picture"),
                    role="user",
                )
                session.add(user)
            else:
                user.last_login = None
                user.username = google_user.get("name", user.username)
                user.avatar_url = google_user.get("picture")

            await session.commit()
            await session.refresh(user)

            jwt_token = create_access_token(
                data={"sub": str(user.id), "email": user.email, "role": user.role}
            )

            return user, jwt_token


oauth_handler = OAuthHandler()