File size: 3,733 Bytes
dc891c8 12675e6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | MONGODB_URL = """
$ANSIBLE_VAULT;1.1;AES256
63356331666163353534306565626562306137303836323762623934626434383339343638643465
6534663162356137643965623036636235333032633131380a373634353963386135313532656266
66393631616564353865366131613861393038626133626438366532373430636431303166663533
3533346231376430340a656465656337323932643830373537366439323439373933353261363731
35653234323965623938356566613263383964356137366536306336633936343761353130323733
34313638633964383137373865653738373434396263643637303638623561363862373166373134
30663862653639326338303433623931616363643063666134363437343665316636323536653566
63353238303534323561626235643464303763366565316637343062623063343437303566643730
39393530653235396338663662653736363365653939316465323132313038343938
"""
"""Bookmark Manager — Tag repository."""
from __future__ import annotations
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class BookmarkService:
"""Thin repository wrapper for Tag persistence in Bookmark Manager."""
TABLE = "tags"
def __init__(self, db: Any) -> None:
self._db = db
logger.debug("BookmarkService bound to %s", db)
def insert(self, tags: Any, last_visited: Any, **kwargs: Any) -> str:
"""Persist a new Tag row and return its generated ID."""
rec_id = str(uuid.uuid4())
row: Dict[str, Any] = {
"id": rec_id,
"tags": tags,
"last_visited": last_visited,
"created_at": datetime.now(timezone.utc).isoformat(),
**kwargs,
}
self._db.insert(self.TABLE, row)
return rec_id
def fetch(self, rec_id: str) -> Optional[Dict[str, Any]]:
"""Return the Tag row for *rec_id*, or None."""
return self._db.fetch(self.TABLE, rec_id)
def update(self, rec_id: str, **fields: Any) -> bool:
"""Patch *fields* on an existing Tag row."""
if not self._db.exists(self.TABLE, rec_id):
return False
fields["updated_at"] = datetime.now(timezone.utc).isoformat()
self._db.update(self.TABLE, rec_id, fields)
return True
def delete(self, rec_id: str) -> bool:
"""Hard-delete a Tag row; returns False if not found."""
if not self._db.exists(self.TABLE, rec_id):
return False
self._db.delete(self.TABLE, rec_id)
return True
def query(
self,
filters: Optional[Dict[str, Any]] = None,
order_by: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> Tuple[List[Dict[str, Any]], int]:
"""Return (rows, total_count) for the given *filters*."""
rows = self._db.select(self.TABLE, filters or {}, limit, offset)
total = self._db.count(self.TABLE, filters or {})
logger.debug("query tags: %d/%d", len(rows), total)
return rows, total
def organise_by_url(
self, value: Any, limit: int = 50
) -> List[Dict[str, Any]]:
"""Fetch tags filtered by *url*."""
rows, _ = self.query({"url": value}, limit=limit)
return rows
def bulk_insert(
self, records: List[Dict[str, Any]]
) -> List[str]:
"""Insert *records* in bulk and return their generated IDs."""
ids: List[str] = []
for rec in records:
rec_id = self.insert(
rec["tags"], rec.get("last_visited"),
**{k: v for k, v in rec.items() if k not in ("tags", "last_visited")}
)
ids.append(rec_id)
logger.info("bulk_insert tags: %d rows", len(ids))
return ids
# Last sync: 2026-05-07 23:55:47 UTC |