| from fastapi import FastAPI, BackgroundTasks |
| from contextlib import asynccontextmanager |
| from pymongo import MongoClient |
| import pandas as pd |
| import numpy as np |
| from sklearn.metrics.pairwise import cosine_similarity |
| import joblib |
| import asyncio |
| import logging |
| import os |
| from datetime import datetime |
|
|
| |
| logging.basicConfig(level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| db_name = 'property-listing' |
| collection_name = 'activities' |
| user_recommendation_collection_name = 'user_recommendation_collection' |
| connection_string = os.getenv('CONNECTION_STRING') |
|
|
| client = MongoClient(connection_string) |
| db = client[db_name] |
| collection = db[collection_name] |
| user_recommendation_collection = db[user_recommendation_collection_name] |
|
|
| |
| svd = joblib.load('svd_model.joblib') |
| user_item_matrix_columns = joblib.load('all_columns.joblib') |
| item_factors = svd.components_.T |
|
|
| |
| ALL_COLUMNS = ['nxt_img_listing', 'read_more_listing', 'nxt_img_detail', 'read_more_detail', 'time_spent'] |
|
|
| |
| latest_session_id = None |
| latest_recommendations = None |
|
|
| async def check_for_new_session(): |
| global latest_session_id, latest_recommendations |
| last_document_count = 0 |
| while True: |
| try: |
| |
| latest_doc = collection.find_one(sort=[('timestamp', -1)]) |
| current_document_count = collection.count_documents({}) |
| |
| if latest_doc: |
| if latest_doc['sessionId'] != latest_session_id or current_document_count > last_document_count: |
| latest_session_id = latest_doc['sessionId'] |
| logger.info(f"New activity detected for session: {latest_session_id}") |
| latest_recommendations = generate_recommendations_for_session(latest_session_id) |
| if latest_recommendations: |
| logger.info(f"Generated recommendations for session {latest_session_id}: {latest_recommendations}") |
| else: |
| logger.warning(f"No recommendations generated for session {latest_session_id}") |
| last_document_count = current_document_count |
| else: |
| logger.info("No new activity detected") |
| else: |
| logger.warning("No documents found in the collection") |
| |
| await asyncio.sleep(5) |
| except Exception as e: |
| logger.error(f"Error in check_for_new_session: {e}") |
| await asyncio.sleep(5) |
|
|
| def get_session_data(session_id): |
| try: |
| session_data = list(collection.find({'sessionId': session_id})) |
| if not session_data: |
| logger.warning(f"No data found for session {session_id}") |
| return None |
|
|
| raw_df = pd.DataFrame(session_data) |
| logger.debug(f"Columns in raw_df: {raw_df.columns.tolist()}") |
|
|
| required_columns = ['id', 'action'] |
| missing_columns = [col for col in required_columns if col not in raw_df.columns] |
| if missing_columns: |
| logger.error(f"Missing required columns: {missing_columns}") |
| return None |
|
|
| return raw_df |
|
|
| except Exception as e: |
| logger.error(f"Error in get_session_data: {str(e)}") |
| return None |
|
|
| def create_pivot_table(raw_df): |
| try: |
| if 'duration' in raw_df.columns: |
| aggregated_data = raw_df.groupby(['id', 'action']).agg( |
| presence=('action', 'size'), |
| total_duration=('duration', 'sum') |
| ).reset_index() |
| else: |
| aggregated_data = raw_df.groupby(['id', 'action']).agg( |
| presence=('action', 'size') |
| ).reset_index() |
|
|
| pivot_columns = ['presence', 'total_duration'] if 'duration' in raw_df.columns else ['presence'] |
| pivot_df = aggregated_data.pivot_table( |
| index=['id'], |
| columns='action', |
| values=pivot_columns, |
| fill_value=0 |
| ) |
|
|
| pivot_df.columns = ['_'.join(col).strip() for col in pivot_df.columns.values] |
|
|
| for col in ALL_COLUMNS: |
| if f'presence_{col}' not in pivot_df.columns and col != 'time_spent': |
| pivot_df[f'presence_{col}'] = 0 |
| elif col == 'time_spent' and 'duration' in raw_df.columns and 'total_duration_time_spent' not in pivot_df.columns: |
| pivot_df['total_duration_time_spent'] = 0 |
|
|
| return pivot_df |
|
|
| except Exception as e: |
| logger.error(f"Error in create_pivot_table: {str(e)}") |
| return None |
|
|
| def create_user_vector(pivot_df): |
| try: |
| pivot_df['interaction_score'] = pivot_df.apply(calculate_interaction_score, axis=1) |
|
|
| user_vector = pd.Series(index=user_item_matrix_columns, dtype=float).fillna(0) |
| for property_id, score in pivot_df['interaction_score'].items(): |
| if property_id in user_vector.index: |
| user_vector[property_id] = score |
|
|
| return user_vector |
|
|
| except Exception as e: |
| logger.error(f"Error in create_user_vector: {str(e)}") |
| return None |
|
|
| def generate_recommendations(user_vector): |
| try: |
| user_vector_array = user_vector.values.reshape(1, -1) |
| user_latent = svd.transform(user_vector_array) |
|
|
| similarity_scores = cosine_similarity(user_latent, item_factors) |
| top_indices = similarity_scores.argsort()[0][-10:][::-1] |
| recommendations = user_item_matrix_columns[top_indices].tolist() |
|
|
| return recommendations |
|
|
| except Exception as e: |
| logger.error(f"Error in generate_recommendations: {str(e)}") |
| return None |
|
|
| def generate_recommendations_for_session(session_id): |
| try: |
| raw_df = get_session_data(session_id) |
| if raw_df is None: |
| return None |
|
|
| pivot_df = create_pivot_table(raw_df) |
| if pivot_df is None: |
| return None |
|
|
| user_vector = create_user_vector(pivot_df) |
| if user_vector is None: |
| return None |
|
|
| recommendations = generate_recommendations(user_vector) |
|
|
| |
| existing_recommendations = user_recommendation_collection.find_one({"sessionId": session_id}) |
| |
| if existing_recommendations: |
| |
| if existing_recommendations["recommendations"] != recommendations: |
| |
| recommendation_data = { |
| "sessionId": session_id, |
| "recommendations": recommendations, |
| "timestamp": datetime.now() |
| } |
| user_recommendation_collection.update_one( |
| {"sessionId": session_id}, |
| {"$set": recommendation_data} |
| ) |
| logger.info(f"Updated recommendations for session {session_id}: {recommendations}") |
| else: |
| logger.info(f"Recommendations for session {session_id} remain unchanged") |
| else: |
| |
| recommendation_data = { |
| "sessionId": session_id, |
| "recommendations": recommendations, |
| "timestamp": datetime.now() |
| } |
| user_recommendation_collection.insert_one(recommendation_data) |
| logger.info(f"Saved recommendations for session {session_id}: {recommendations}") |
|
|
| return recommendations |
|
|
| except Exception as e: |
| logger.error(f"Error in generate_recommendations_for_session: {str(e)}") |
| return None |
|
|
| def calculate_interaction_score(row): |
| try: |
| |
| score = ( |
| row.get('presence_nxt_img_listing', 0) * 1 + |
| row.get('presence_read_more_listing', 0) * 2 + |
| row.get('presence_nxt_img_detail', 0) * 3 + |
| row.get('presence_read_more_detail', 0) * 4 + |
| row.get('total_duration_time_spent', 0) / 10 |
| ) |
| |
| |
| if 'total_duration_time_spent' in row and row['total_duration_time_spent'] < 15: |
| score -= 10 |
| |
| return score |
| except Exception as e: |
| logger.error(f"Error in calculate_interaction_score: {e}") |
| return 0 |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| |
| task = asyncio.create_task(check_for_new_session()) |
| yield |
| |
| task.cancel() |
| try: |
| await task |
| except asyncio.CancelledError: |
| logger.info("Background task cancelled") |
|
|
| |
| app = FastAPI(lifespan=lifespan) |
|
|
| @app.get("/") |
| async def root(): |
| return {"message": "Welcome to the Rec API"} |
|
|
| @app.get("/recommendations") |
| async def get_recommendations(): |
| """ |
| API endpoint to get the latest recommendations. |
| Returns: |
| list: An array of recommended property IDs, or an empty array if no recommendations are available. |
| """ |
| if latest_recommendations: |
| logger.info(f"Returning recommendations: {latest_recommendations}") |
| return latest_recommendations |
| else: |
| logger.info("No recommendations available") |
| return [] |
|
|