OSINT / src /osint_env /data /generator.py
Siddeshwar1625's picture
fixed tasks
4de4725
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import random
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from osint_env.domain.models import (
CanonicalGraph,
Edge,
EnvironmentConfig,
Node,
NodeType,
SeedEdgeSpec,
SeedQuestionSpec,
TaskInstance,
)
if TYPE_CHECKING:
from osint_env.llm.interface import LLMClient
@dataclass(slots=True)
class PlatformViews:
microblog_posts: list[dict]
forum_threads: list[dict]
profiles: list[dict]
alias_lookup: dict[str, str]
class DatasetGenerator:
def __init__(self, config: EnvironmentConfig, llm: LLMClient | None = None):
self.config = config
self.rng = random.Random(config.seed)
self.llm = llm
@staticmethod
def _edge_key(edge: Edge) -> tuple[str, str, str]:
return (edge.src, edge.rel, edge.dst)
@staticmethod
def _infer_node_type(node_id: str) -> NodeType:
prefix = str(node_id).split("_", 1)[0].lower()
mapping = {
"user": NodeType.USER,
"alias": NodeType.ALIAS,
"org": NodeType.ORG,
"loc": NodeType.LOCATION,
"location": NodeType.LOCATION,
"post": NodeType.POST,
"thr": NodeType.THREAD,
"thread": NodeType.THREAD,
"event": NodeType.EVENT,
}
return mapping.get(prefix, NodeType.USER)
def _ensure_node(self, graph: CanonicalGraph, node_id: str) -> None:
if node_id in graph.nodes:
return
node_type = self._infer_node_type(node_id)
attrs: dict[str, Any] = {}
if node_type == NodeType.USER:
attrs = {"name": node_id, "org": "Unknown", "location": "Unknown"}
if node_type == NodeType.ALIAS:
attrs = {"handle": f"@{node_id}"}
graph.nodes[node_id] = Node(node_id=node_id, node_type=node_type, attrs=attrs)
def _add_edge_if_missing(self, graph: CanonicalGraph, edge: Edge) -> None:
key = self._edge_key(edge)
if any(self._edge_key(existing) == key for existing in graph.edges):
return
self._ensure_node(graph, edge.src)
self._ensure_node(graph, edge.dst)
graph.edges.append(edge)
@staticmethod
def _extract_json_blob(text: str) -> Any:
text = str(text).strip()
if not text:
return None
for start, end in (("{", "}"), ("[", "]")):
left = text.find(start)
right = text.rfind(end)
if left >= 0 and right > left:
snippet = text[left : right + 1]
try:
return json.loads(snippet)
except json.JSONDecodeError:
continue
return None
def _apply_seed_nodes(self, graph: CanonicalGraph) -> None:
for node_spec in self.config.seeding.seeded_nodes:
node_type = (
node_spec.node_type
if isinstance(node_spec.node_type, NodeType)
else self._infer_node_type(node_spec.node_id)
)
existing = graph.nodes.get(node_spec.node_id)
attrs = dict(existing.attrs) if existing else {}
attrs.update(node_spec.attrs)
graph.nodes[node_spec.node_id] = Node(node_spec.node_id, node_type, attrs)
def _apply_seed_edges(self, graph: CanonicalGraph) -> None:
for edge_spec in self.config.seeding.seeded_edges:
self._add_edge_if_missing(
graph,
Edge(
src=edge_spec.src,
rel=edge_spec.rel,
dst=edge_spec.dst,
confidence=float(edge_spec.confidence),
),
)
@staticmethod
def _normalize_edge_candidates(value: Any) -> list[SeedEdgeSpec]:
items: list[SeedEdgeSpec] = []
if not isinstance(value, list):
return items
for row in value:
if not isinstance(row, dict):
continue
src = str(row.get("src", "")).strip()
rel = str(row.get("rel", "")).strip()
dst = str(row.get("dst", "")).strip()
if not src or not rel or not dst:
continue
try:
confidence = float(row.get("confidence", 1.0))
except (TypeError, ValueError):
confidence = 1.0
items.append(SeedEdgeSpec(src=src, rel=rel, dst=dst, confidence=confidence))
return items
@staticmethod
def _split_budget(total: int, parts: int) -> list[int]:
if total <= 0:
return []
slots = max(1, parts)
base = total // slots
remainder = total % slots
chunks = [base + (1 if i < remainder else 0) for i in range(slots)]
return [chunk for chunk in chunks if chunk > 0]
@staticmethod
def _shared_context_blob(graph: CanonicalGraph, node_limit: int = 100, edge_limit: int = 80) -> str:
payload = {
"known_nodes": sorted(graph.nodes.keys())[:node_limit],
"known_edges": [
{"src": edge.src, "rel": edge.rel, "dst": edge.dst}
for edge in graph.edges[: min(edge_limit, len(graph.edges))]
],
}
return json.dumps(payload)
def _llm_generate_json_with_retry(self, prompt: str) -> Any:
if self.llm is None:
return None
attempts = max(1, int(self.config.seeding.llm_generation_retries))
for _ in range(attempts):
try:
response = self.llm.generate([{"role": "system", "content": prompt}], tools=[])
except Exception:
continue
parsed = self._extract_json_blob(response.content)
if parsed is not None:
return parsed
return None
def _run_generation_workers(self, prompts: list[str]) -> list[Any]:
if not prompts:
return []
max_workers = max(1, min(self.config.seeding.llm_generation_workers, len(prompts)))
if not self.config.seeding.llm_generation_parallel or max_workers == 1:
output: list[Any] = []
for prompt in prompts:
parsed = self._llm_generate_json_with_retry(prompt)
if parsed is not None:
output.append(parsed)
return output
output = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self._llm_generate_json_with_retry, prompt) for prompt in prompts]
for future in as_completed(futures):
try:
parsed = future.result()
except Exception:
parsed = None
if parsed is not None:
output.append(parsed)
return output
def _template_fallback_allowed(self) -> bool:
if self.llm is None:
return True
return bool(self.config.seeding.allow_template_fallback_on_llm_failure)
def _template_generated_edges(self, graph: CanonicalGraph, budget: int) -> list[Edge]:
if budget <= 0:
return []
users = [n.node_id for n in graph.nodes.values() if n.node_type == NodeType.USER]
aliases = [n.node_id for n in graph.nodes.values() if n.node_type == NodeType.ALIAS]
if len(users) < 2:
return []
generated: list[Edge] = []
rels = ["connected_to", "mentions", "co_occurs_with"]
for _ in range(budget * 3):
if len(generated) >= budget:
break
roll = self.rng.random()
if aliases and roll < 0.2:
src = self.rng.choice(aliases)
dst = self.rng.choice(users)
rel = "alias_of"
elif roll < 0.75:
src, dst = self.rng.sample(users, 2)
rel = self.rng.choice(rels)
else:
src = self.rng.choice(users)
dst = self.rng.choice([u for u in users if u != src])
rel = "connected_to"
generated.append(Edge(src=src, rel=rel, dst=dst, confidence=0.7))
return generated[:budget]
def _llm_expand_graph(self, graph: CanonicalGraph, budget: int) -> list[Edge]:
if budget <= 0:
return []
if self.llm is None:
return self._template_generated_edges(graph, budget)
shared_context = self._shared_context_blob(graph)
workers = max(1, min(self.config.seeding.llm_generation_workers, budget))
chunks = self._split_budget(budget, workers)
focus_tracks = ["entity_linking", "network_expansion", "org_location", "event_trace"]
prompts: list[str] = []
for idx, chunk_budget in enumerate(chunks):
focus = focus_tracks[idx % len(focus_tracks)]
prompts.append(
(
"SEED_GRAPH_EXPANSION_AGENT\n"
"SHARED_CONTEXT\n"
f"{shared_context}\n"
f"worker_id: {idx}\n"
f"focus: {focus}\n"
f"budget: {chunk_budget}\n"
"Generate plausible graph edges for OSINT retrieval.\n"
"Return STRICT JSON object: {\"edges\": [{\"src\": str, \"rel\": str, \"dst\": str, \"confidence\": float}]}.\n"
"Prefer known nodes from SHARED_CONTEXT and avoid duplicates."
)
)
generated: list[Edge] = []
seen: set[tuple[str, str, str]] = set()
for payload in self._run_generation_workers(prompts):
raw_edges: Any = None
if isinstance(payload, dict):
raw_edges = payload.get("edges")
elif isinstance(payload, list):
raw_edges = payload
for edge_spec in self._normalize_edge_candidates(raw_edges):
key = (edge_spec.src, edge_spec.rel, edge_spec.dst)
if key in seen:
continue
seen.add(key)
generated.append(Edge(edge_spec.src, edge_spec.rel, edge_spec.dst, float(edge_spec.confidence)))
if len(generated) >= budget:
break
if len(generated) >= budget:
break
if len(generated) < budget:
residual = budget - len(generated)
residual_prompt = (
"SEED_GRAPH_EXPANSION_AGENT\n"
"SHARED_CONTEXT\n"
f"{shared_context}\n"
f"budget: {residual}\n"
"Generate any remaining high-utility edges.\n"
"Return STRICT JSON object: {\"edges\": [{\"src\": str, \"rel\": str, \"dst\": str, \"confidence\": float}]}."
)
payload = self._llm_generate_json_with_retry(residual_prompt)
raw_edges: Any = payload.get("edges") if isinstance(payload, dict) else payload
for edge_spec in self._normalize_edge_candidates(raw_edges):
key = (edge_spec.src, edge_spec.rel, edge_spec.dst)
if key in seen:
continue
seen.add(key)
generated.append(Edge(edge_spec.src, edge_spec.rel, edge_spec.dst, float(edge_spec.confidence)))
if len(generated) >= budget:
break
if len(generated) < budget and self._template_fallback_allowed():
for edge in self._template_generated_edges(graph, budget - len(generated)):
key = (edge.src, edge.rel, edge.dst)
if key in seen:
continue
seen.add(key)
generated.append(edge)
if len(generated) >= budget:
break
return generated[:budget]
@staticmethod
def _extract_entity_tokens(question: str) -> list[str]:
return re.findall(r"\b(?:alias|user|org|loc|post|thr|thread|event)_[a-zA-Z0-9_]+\b", question)
@staticmethod
def _normalize_difficulty(value: str, index: int) -> str:
token = str(value or "").strip().lower()
if token in {"easy", "e"}:
return "easy"
if token in {"mid", "medium", "m"}:
return "medium"
if token in {"high", "hard", "h"}:
return "hard"
if index < 10:
return "easy"
if index < 20:
return "medium"
return "hard"
@staticmethod
def _task_type_for_difficulty(base_task_type: str, difficulty: str) -> str:
token = str(base_task_type or "").strip().lower()
if token and token != "fixed_trace":
return token
if difficulty == "easy":
return "easy_trace"
if difficulty == "medium":
return "medium_trace"
return "hard_trace"
@staticmethod
def _grader_for_difficulty(difficulty: str) -> dict[str, Any]:
return {
"type": "difficulty_exact_match",
"answer_type": "node_id",
"case_sensitive": True,
"reward_profile": difficulty,
"logic": {
"easy": "single_agent_simplified",
"medium": "reduced_components",
"hard": "full_reward",
}.get(difficulty, "full_reward"),
}
def _task_metadata(self, index: int, base_task_type: str, metadata: dict[str, Any] | None = None) -> dict[str, Any]:
out = dict(metadata or {})
difficulty = self._normalize_difficulty(out.get("difficulty", ""), index)
out["difficulty"] = difficulty
out.setdefault("grader", self._grader_for_difficulty(difficulty))
out.setdefault("scenario", self._task_type_for_difficulty(base_task_type, difficulty))
return out
def _infer_answer_from_question(self, question: str, graph: CanonicalGraph) -> str:
entities = self._extract_entity_tokens(question)
question_l = question.lower()
alias_tokens = [token for token in entities if token.startswith("alias_")]
if alias_tokens:
alias = alias_tokens[0]
for edge in graph.edges:
if edge.rel == "alias_of" and edge.src == alias:
return edge.dst
if "connected" in question_l:
user_tokens = [token for token in entities if token.startswith("user_")]
if user_tokens:
source = user_tokens[0]
for edge in graph.edges:
if edge.rel == "connected_to" and edge.src == source:
return edge.dst
if "works at" in question_l:
for edge in graph.edges:
if edge.rel != "works_at":
continue
org = graph.nodes.get(edge.dst)
org_name = str((org.attrs or {}).get("name", "")).lower() if org else ""
if org_name and org_name in question_l:
return edge.src
return entities[0] if entities else "unknown"
def _infer_support_edges(self, question: str, answer: str, graph: CanonicalGraph) -> list[Edge]:
if answer:
for edge in graph.edges:
if edge.dst == answer or edge.src == answer:
if edge.src in question or edge.dst in question or edge.rel in question.lower():
return [edge]
entities = self._extract_entity_tokens(question)
for edge in graph.edges:
if edge.src in entities or edge.dst in entities:
return [edge]
return []
def _seeded_tasks(self, graph: CanonicalGraph) -> list[TaskInstance]:
tasks: list[TaskInstance] = []
for idx, question_spec in enumerate(self.config.seeding.seeded_questions):
answer = question_spec.answer or self._infer_answer_from_question(question_spec.question, graph)
metadata = self._task_metadata(idx, question_spec.task_type, dict(question_spec.metadata))
difficulty = str(metadata.get("difficulty", "hard"))
if question_spec.supporting_edges:
support = [
Edge(src=e.src, rel=e.rel, dst=e.dst, confidence=float(e.confidence))
for e in question_spec.supporting_edges
]
else:
support = self._infer_support_edges(question_spec.question, answer, graph)
tasks.append(
TaskInstance(
task_id=f"seed_task_{idx}",
task_type=self._task_type_for_difficulty(question_spec.task_type, difficulty),
question=question_spec.question,
answer=answer,
supporting_edges=support,
metadata=metadata,
)
)
return tasks
def _template_tasks(self, graph: CanonicalGraph, count: int, start_idx: int = 0) -> list[TaskInstance]:
alias_edges = [e for e in graph.edges if e.rel == "alias_of"]
conn_edges = [e for e in graph.edges if e.rel == "connected_to"]
work_edges = [e for e in graph.edges if e.rel == "works_at"]
tasks: list[TaskInstance] = []
for i in range(count):
mode = self.rng.choice(["identity_resolution", "network_discovery", "event_tracing"])
if mode == "identity_resolution" and alias_edges:
edge = self.rng.choice(alias_edges)
q = f"Which canonical user owns alias {edge.src}?"
a = edge.dst
support = [edge]
elif mode == "network_discovery" and conn_edges:
edge = self.rng.choice(conn_edges)
q = f"Who is connected to {edge.src}?"
a = edge.dst
support = [edge]
else:
edge = self.rng.choice(work_edges)
org_node = graph.nodes.get(edge.dst)
org_name = (org_node.attrs or {}).get("name", edge.dst) if org_node else edge.dst
q = f"Which user works at {org_name}?"
a = edge.src
support = [edge]
tasks.append(
TaskInstance(
task_id=f"task_{start_idx + i}",
task_type=mode,
question=q,
answer=a,
supporting_edges=support,
metadata=self._task_metadata(start_idx + i, mode),
)
)
return tasks
def _llm_generated_tasks(self, graph: CanonicalGraph, count: int, start_idx: int) -> list[TaskInstance]:
if count <= 0:
return []
if self.llm is None:
return self._template_tasks(graph, count=count, start_idx=start_idx)
candidate_edges = [
{"src": edge.src, "rel": edge.rel, "dst": edge.dst}
for edge in graph.edges
if edge.rel in {"alias_of", "connected_to", "works_at"}
][:60]
shared_context = json.dumps(
{
"known_nodes": sorted(graph.nodes.keys())[:100],
"edge_sample": candidate_edges,
}
)
workers = max(1, min(self.config.seeding.llm_generation_workers, count))
chunks = self._split_budget(count, workers)
focus_tracks = ["identity_resolution", "network_discovery", "event_tracing", "deanonymization"]
prompts: list[str] = []
for idx, chunk_budget in enumerate(chunks):
focus = focus_tracks[idx % len(focus_tracks)]
prompts.append(
(
"SEED_TASK_EXPANSION_AGENT\n"
"SHARED_CONTEXT\n"
f"{shared_context}\n"
f"worker_id: {idx}\n"
f"focus: {focus}\n"
f"task_budget: {chunk_budget}\n"
"Generate OSINT QA tasks with answers and support edges.\n"
"Return STRICT JSON object: {\"tasks\": [{\"task_type\": str, \"question\": str, \"answer\": str, \"supporting_edges\": [{\"src\": str, \"rel\": str, \"dst\": str, \"confidence\": float}]}]}."
)
)
llm_tasks: list[TaskInstance] = []
seen_questions: set[str] = set()
for payload in self._run_generation_workers(prompts):
raw_tasks: Any = None
if isinstance(payload, dict):
raw_tasks = payload.get("tasks")
elif isinstance(payload, list):
raw_tasks = payload
if not isinstance(raw_tasks, list):
continue
for row in raw_tasks:
if not isinstance(row, dict):
continue
question = str(row.get("question", "")).strip()
if not question:
continue
key = question.lower()
if key in seen_questions:
continue
seen_questions.add(key)
answer = str(row.get("answer", "")).strip() or self._infer_answer_from_question(question, graph)
task_type = str(row.get("task_type", "llm_generated")).strip() or "llm_generated"
support_specs = self._normalize_edge_candidates(row.get("supporting_edges"))
if support_specs:
support = [Edge(e.src, e.rel, e.dst, e.confidence) for e in support_specs]
else:
support = self._infer_support_edges(question, answer, graph)
llm_tasks.append(
TaskInstance(
task_id=f"task_{start_idx + len(llm_tasks)}",
task_type=task_type,
question=question,
answer=answer,
supporting_edges=support,
metadata=self._task_metadata(
start_idx + len(llm_tasks),
task_type,
{"generated_by": "llm", "shared_context": True},
),
)
)
if len(llm_tasks) >= count:
break
if len(llm_tasks) >= count:
break
if len(llm_tasks) < count:
residual = count - len(llm_tasks)
residual_prompt = (
"SEED_TASK_EXPANSION_AGENT\n"
"SHARED_CONTEXT\n"
f"{shared_context}\n"
f"task_budget: {residual}\n"
"Generate additional tasks not already present in SHARED_CONTEXT.\n"
"Return STRICT JSON object: {\"tasks\": [{\"task_type\": str, \"question\": str, \"answer\": str, \"supporting_edges\": [{\"src\": str, \"rel\": str, \"dst\": str, \"confidence\": float}]}]}."
)
payload = self._llm_generate_json_with_retry(residual_prompt)
raw_tasks: Any = payload.get("tasks") if isinstance(payload, dict) else payload
if isinstance(raw_tasks, list):
for row in raw_tasks:
if not isinstance(row, dict):
continue
question = str(row.get("question", "")).strip()
if not question:
continue
key = question.lower()
if key in seen_questions:
continue
seen_questions.add(key)
answer = str(row.get("answer", "")).strip() or self._infer_answer_from_question(question, graph)
task_type = str(row.get("task_type", "llm_generated")).strip() or "llm_generated"
support_specs = self._normalize_edge_candidates(row.get("supporting_edges"))
if support_specs:
support = [Edge(e.src, e.rel, e.dst, e.confidence) for e in support_specs]
else:
support = self._infer_support_edges(question, answer, graph)
llm_tasks.append(
TaskInstance(
task_id=f"task_{start_idx + len(llm_tasks)}",
task_type=task_type,
question=question,
answer=answer,
supporting_edges=support,
metadata=self._task_metadata(
start_idx + len(llm_tasks),
task_type,
{"generated_by": "llm", "shared_context": True},
),
)
)
if len(llm_tasks) >= count:
break
if len(llm_tasks) < count and self._template_fallback_allowed():
llm_tasks.extend(
self._template_tasks(
graph,
count=count - len(llm_tasks),
start_idx=start_idx + len(llm_tasks),
)
)
return llm_tasks[:count]
def build_canonical_graph(self) -> CanonicalGraph:
graph = CanonicalGraph()
orgs = ["Apex Dynamics", "Helios Labs", "Northbridge"]
locations = ["Bengaluru", "Pune", "Hyderabad", "Delhi"]
for i in range(self.config.n_users):
uid = f"user_{i}"
org = self.rng.choice(orgs)
loc = self.rng.choice(locations)
graph.nodes[uid] = Node(uid, NodeType.USER, {"name": f"Person {i}", "org": org, "location": loc})
org_id = f"org_{org.lower().replace(' ', '_')}"
loc_id = f"loc_{loc.lower()}"
graph.nodes.setdefault(org_id, Node(org_id, NodeType.ORG, {"name": org}))
graph.nodes.setdefault(loc_id, Node(loc_id, NodeType.LOCATION, {"name": loc}))
graph.edges.append(Edge(uid, "works_at", org_id))
graph.edges.append(Edge(uid, "located_in", loc_id))
if self.rng.random() < self.config.alias_density:
alias = f"alias_{i}_{self.rng.randint(100,999)}"
graph.nodes[alias] = Node(alias, NodeType.ALIAS, {"handle": f"@{alias}"})
graph.edges.append(Edge(alias, "alias_of", uid))
users = [n for n in graph.nodes.values() if n.node_type == NodeType.USER]
for _ in range(max(1, self.config.n_users // 2)):
a, b = self.rng.sample(users, 2)
graph.edges.append(Edge(a.node_id, "connected_to", b.node_id, confidence=0.8))
self._apply_seed_nodes(graph)
self._apply_seed_edges(graph)
if self.config.seeding.llm_generate_remaining_graph:
llm_edges = self._llm_expand_graph(graph, self.config.seeding.llm_generated_edge_budget)
for edge in llm_edges:
self._add_edge_if_missing(graph, edge)
return graph
def build_platform_views(self, graph: CanonicalGraph) -> PlatformViews:
users = [n for n in graph.nodes.values() if n.node_type == NodeType.USER]
aliases = [n for n in graph.nodes.values() if n.node_type == NodeType.ALIAS]
alias_owner = {e.src: e.dst for e in graph.edges if e.rel == "alias_of"}
user_aliases: dict[str, list[str]] = {}
for alias_id, user_id in alias_owner.items():
user_aliases.setdefault(user_id, []).append(alias_id)
node_names = {
node_id: str((node.attrs or {}).get("name") or (node.attrs or {}).get("handle") or node_id)
for node_id, node in graph.nodes.items()
}
microblog_posts: list[dict] = []
for i, user in enumerate(users):
poster = user.node_id
if aliases and self.rng.random() < 0.45:
candidate = self.rng.choice(aliases).node_id
poster = candidate
text = f"Update {i} from {user.attrs['org']} #{user.attrs['location'].lower()}"
if self.rng.random() < self.config.noise_level:
text = f"Rumor: {text} maybe fake"
microblog_posts.append(
{
"post_id": f"post_{i}",
"user_id": poster,
"canonical_user": alias_owner.get(poster, user.node_id),
"text": text,
"references": [],
"reference_names": [],
"mentions": [f"user_{self.rng.randint(0, self.config.n_users - 1)}"],
"timestamp": 1000 + i,
}
)
authored_posts: dict[str, str] = {}
post_references: dict[str, list[str]] = {}
for edge in graph.edges:
if edge.rel == "authored_post":
authored_posts[edge.dst] = edge.src
elif edge.rel == "references" and edge.src.startswith("post_"):
post_references.setdefault(edge.src, []).append(edge.dst)
for post_id, author_id in authored_posts.items():
refs = post_references.get(post_id, [])
ref_names = [node_names.get(ref, ref) for ref in refs]
author_label = node_names.get(author_id, author_id)
text_parts = [f"{post_id} update from {author_label}"]
if ref_names:
text_parts.append("references " + ", ".join(ref_names))
if refs:
text_parts.append("ids " + ", ".join(refs))
post_payload = {
"post_id": post_id,
"user_id": author_id,
"canonical_user": alias_owner.get(author_id, author_id),
"text": ". ".join(text_parts),
"references": refs,
"reference_names": ref_names,
"mentions": [],
"timestamp": 5000 + len(microblog_posts),
}
existing_idx = next((idx for idx, row in enumerate(microblog_posts) if row["post_id"] == post_id), None)
if existing_idx is None:
microblog_posts.append(post_payload)
else:
microblog_posts[existing_idx] = post_payload
forum_threads: list[dict] = []
for i in range(max(8, self.config.n_users // 3)):
author = self.rng.choice(users).node_id
forum_threads.append(
{
"thread_id": f"thr_{i}",
"topic": self.rng.choice(["security", "startup", "ai", "infra"]),
"author_id": author,
"comments": [
{"user_id": self.rng.choice(users).node_id, "text": "Following this."},
{"user_id": self.rng.choice(users).node_id, "text": "Interesting link."},
],
"references": [],
"discusses": [],
}
)
authored_threads: dict[str, str] = {}
thread_refs: dict[str, list[str]] = {}
thread_discusses: dict[str, list[str]] = {}
for edge in graph.edges:
if edge.rel == "authored_thread":
authored_threads[edge.dst] = edge.src
elif edge.rel == "references" and edge.src.startswith(("thr_", "thread_")):
thread_refs.setdefault(edge.src, []).append(edge.dst)
elif edge.rel == "discusses" and edge.src.startswith(("thr_", "thread_")):
thread_discusses.setdefault(edge.src, []).append(edge.dst)
for thread_id, author_id in authored_threads.items():
node = graph.nodes.get(thread_id)
refs = thread_refs.get(thread_id, [])
discussed = thread_discusses.get(thread_id, [])
comments = []
for ref in refs:
comments.append({"user_id": author_id, "text": f"Reference: {node_names.get(ref, ref)} ({ref})"})
for item in discussed:
comments.append({"user_id": author_id, "text": f"Discusses: {node_names.get(item, item)} ({item})"})
thread_payload = {
"thread_id": thread_id,
"topic": str((node.attrs or {}).get("topic", "seeded")) if node else "seeded",
"author_id": author_id,
"title": node_names.get(thread_id, thread_id),
"comments": comments,
"references": refs,
"discusses": discussed,
}
existing_idx = next((idx for idx, row in enumerate(forum_threads) if row["thread_id"] == thread_id), None)
if existing_idx is None:
forum_threads.append(thread_payload)
else:
forum_threads[existing_idx] = thread_payload
profiles: list[dict] = []
for user in users:
conns = [e.dst for e in graph.edges if e.src == user.node_id and e.rel == "connected_to"][:5]
org_id = next((e.dst for e in graph.edges if e.src == user.node_id and e.rel == "works_at"), "")
location_id = next((e.dst for e in graph.edges if e.src == user.node_id and e.rel == "located_in"), "")
profiles.append(
{
"user_id": user.node_id,
"name": user.attrs["name"],
"org": user.attrs["org"],
"org_id": org_id,
"location": user.attrs["location"],
"location_id": location_id,
"alias_ids": sorted(user_aliases.get(user.node_id, [])),
"connections": conns,
"work_history": [user.attrs["org"]],
}
)
for i in range(int(len(users) * self.config.red_herring_rate)):
profiles.append(
{
"user_id": f"noise_{i}",
"name": f"P{self.rng.randint(100,999)}",
"org": self.rng.choice(["Stealth Co", "Unknown Ventures"]),
"org_id": "",
"location": self.rng.choice(["Remote", "Unknown"]),
"location_id": "",
"alias_ids": [],
"connections": [],
"work_history": [],
}
)
return PlatformViews(microblog_posts, forum_threads, profiles, alias_lookup=alias_owner)
def generate_tasks(self, graph: CanonicalGraph, views: PlatformViews, count: int = 12) -> list[TaskInstance]:
tasks = self._seeded_tasks(graph)
target_count = max(1, count, len(tasks))
llm_budget = min(
max(0, self.config.seeding.llm_generated_task_budget),
max(0, target_count - len(tasks)),
)
if self.config.seeding.llm_generate_remaining_tasks and llm_budget > 0:
tasks.extend(self._llm_generated_tasks(graph, count=llm_budget, start_idx=len(tasks)))
if len(tasks) < target_count and self._template_fallback_allowed():
tasks.extend(self._template_tasks(graph, count=target_count - len(tasks), start_idx=len(tasks)))
if not tasks:
tasks.extend(self._template_tasks(graph, count=target_count, start_idx=0))
return tasks[:target_count]