| from fastapi import FastAPI, Request, HTTPException |
| from linebot import LineBotApi, WebhookHandler |
| from linebot.exceptions import InvalidSignatureError |
| from linebot.models import MessageEvent, TextMessage, TextSendMessage |
|
|
| import os |
|
|
| |
| LINE_CHANNEL_ACCESS_TOKEN = os.getenv("LINE_CHANNEL_ACCESS_TOKEN") |
| LINE_CHANNEL_SECRET = os.getenv("LINE_CHANNEL_SECRET") |
|
|
| line_bot_api = LineBotApi(LINE_CHANNEL_ACCESS_TOKEN) |
| handler = WebhookHandler(LINE_CHANNEL_SECRET) |
|
|
| app = FastAPI() |
|
|
| |
| @app.post("/callback") |
| async def callback(request: Request): |
| signature = request.headers['X-Line-Signature'] |
| body = await request.body() |
|
|
| try: |
| handler.handle(body.decode('utf-8'), signature) |
| except InvalidSignatureError: |
| raise HTTPException(status_code=400, detail="Invalid signature") |
|
|
| return 'OK' |
|
|
| |
| @handler.add(MessageEvent, message=TextMessage) |
| def handle_text_message(event): |
| |
| reply_text = event.message.text |
| line_bot_api.reply_message( |
| event.reply_token, |
| TextSendMessage(text=reply_text) |
| ) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|