import vk_api from dotenv import load_dotenv import os from dataclasses import dataclass import numpy as np import streamlit as st import pandas as pd from joblib import Parallel, delayed load_dotenv() @st.cache_resource def connect_api(): service_token = os.getenv("VK_TOKEN") return vk_api.VkApi(token=service_token).get_api() vk = connect_api() @st.cache_resource def get_cities_db(): return pd.read_csv("towns.csv") cities_db = get_cities_db() @dataclass class Post: text: str city_of_origin: str geolocation: tuple[float, float] # likes: int owner_id: int # group_owned: bool = False def search_posts(query: str, num_of_posts: int, *, search_args = {}) -> list[Post]: posts: list[Post] = [] offset = 0 request_count = min(num_of_posts, 200) city_none_stat = 0 pos_none_stat = 0 while len(posts) < num_of_posts: query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, **search_args) items = [item for item in query_results["items"] if "owner_id" in item and "text" in item] item_dict = {item["owner_id"]: item for item in items} # print(query_results, items, flush=True) owner_ids = np.array([item["owner_id"] for item in items]) cities = get_post_city(owner_ids) city_pos = get_city_position(cities) # likes = item.get("likes", {"count": 0})["count"] for id, pos in city_pos.items(): if cities[id] is None: city_none_stat += 1 continue if pos is None: pos_none_stat += 1 continue posts.append(Post(item_dict[id]["text"], cities[id], city_pos[id], id)) offset += request_count print(f"Processed {offset} posts, added {len(posts)}. City not found: {city_none_stat}, position not found: {pos_none_stat}.", flush=True) return posts[:num_of_posts] def search_posts_parallel(query: str, num_of_posts: int, num_of_workers: int = 4, *, search_args = {}): posts: list[Post] = [] offset = 0 request_count = min(num_of_posts // num_of_workers + 1, 200) while len(posts) < num_of_posts: search_res = Parallel(n_jobs=num_of_workers) \ ( delayed(_get_posts)(query, request_count, offset + i * request_count, search_args) for i in range(num_of_workers) ) for p in search_res: posts.extend(p) # print(*[pp.geolocation for pp in p]) offset += request_count * num_of_workers print(f"Processed {offset} posts, added {len(posts)}.", flush=True) return posts[:num_of_posts] def _get_posts(query: str, request_count: int, offset: int, search_args) -> list[Post]: posts = [] query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, **search_args) items = [item for item in query_results["items"] if "owner_id" in item and "text" in item] item_dict = {item["owner_id"]: item for item in items} # print(query_results, items, flush=True) owner_ids = np.array([item["owner_id"] for item in items]) cities = get_post_city(owner_ids) city_pos = get_city_position(cities) # likes = item.get("likes", {"count": 0})["count"] for id, pos in city_pos.items(): if pos is None: continue posts.append(Post(item_dict[id]["text"], cities[id], pos, id)) assert all(post.geolocation is not None for post in posts) return posts def get_post_city(owner_id: np.ndarray) -> dict[int, str | None]: group_ids = -owner_id[owner_id < 0] user_ids = owner_id[owner_id > 0] assert len(group_ids) + len(user_ids) == len(owner_id) # print(group_ids, user_ids, owner_id, flush=True) if len(group_ids) > 0: groups = vk.groups.getById(group_ids=list(group_ids), fields=['city', 'country']) groups_dict = {-group["id"]: group.get("city", None) for group in groups} else: groups_dict = {} if len(user_ids) > 0: users = vk.users.get(user_ids=list(user_ids), fields=['city', 'country']) users_dict = {user["id"]: user.get("city", None) for user in users} else: users_dict = {} users_dict.update(groups_dict) return {id: city["title"] if city is not None else None for id, city in users_dict.items()} def get_city_position(cities: dict[int, str | None]) -> dict[int, tuple[float, float] | None]: res = {} for id, city in cities.items(): if city is None: res[id] = None continue selected = cities_db[cities_db["city"] == city] if len(selected) == 0: res[id] = None continue # print(selected) res[id] = (selected["lat"].iloc[0], selected["lon"].iloc[0]) assert len(cities) == len(res) return res def search_posts_by_pos(query: str, num_of_posts: int, city_name: str, lat: float, lon: float) -> list[Post]: posts: list[Post] = [] offset = 0 request_count = min(num_of_posts, 200) while len(posts) < num_of_posts: query_results = vk.newsfeed.search(q=query, count=request_count, offset=offset, latitude=lat, longtitude=lon) items = [item for item in query_results["items"] if "text" in item] for item in items: posts.append(Post(item["text"], city_name, (lat, lon), item.get("owner_id", 0))) offset += request_count print(f"For city {city_name} processed {offset} posts, added {len(posts)}.", flush=True) return posts[:num_of_posts]