specimba commited on
Commit
2a24138
·
verified ·
1 Parent(s): 2c02618

Copy nexus_os_v2/ollama_telemetry.py from dataset for module imports

Browse files
Files changed (1) hide show
  1. nexus_os_v2/ollama_telemetry.py +458 -0
nexus_os_v2/ollama_telemetry.py ADDED
@@ -0,0 +1,458 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Ollama Telemetry Extractor for NEXUS OS v2
3
+ Extracts per-token thermodynamic order parameters from Ollama generation.
4
+
5
+ Since Ollama does not expose raw logits, we use a dual-tier approach:
6
+ Tier 1 (fast): Token-level surface metrics from stream timing + text patterns
7
+ Tier 2 (deep): Embedding-space trajectory divergence as entropy proxy
8
+
9
+ The embedding divergence correlates with generation coherence:
10
+ - Smooth trajectory -> low entropy -> coherent (condensate phase)
11
+ - Sharp jumps -> high entropy -> bifurcation risk (near T_c)
12
+
13
+ Uses Ollama's /api/embeddings endpoint for vector representations.
14
+ """
15
+ import json
16
+ import math
17
+ import time
18
+ import urllib.request
19
+ import urllib.error
20
+ from typing import List, Dict, Optional, Any, Tuple
21
+ from dataclasses import dataclass, field
22
+
23
+ # Try numpy, fallback to pure Python
24
+ import sys
25
+ HAS_NUMPY = True
26
+ try:
27
+ import numpy as np
28
+ except ImportError:
29
+ HAS_NUMPY = False
30
+
31
+
32
+ @dataclass
33
+ class TokenTelemetry:
34
+ """Surface-level metrics for a single generated token."""
35
+ position: int
36
+ token_text: str
37
+ timestamp_ms: float # Relative to generation start
38
+ time_since_prev_ms: float # Inter-token latency
39
+ char_length: int
40
+ is_punctuation: bool
41
+ is_whitespace: bool
42
+ repetition_count: int # How many times this token appeared recently
43
+ embedding: Optional[List[float]] = None # Ollama embedding vector
44
+
45
+
46
+ @dataclass
47
+ class EmbeddingTrajectory:
48
+ """Sequence of embeddings with computed divergences."""
49
+ tokens: List[TokenTelemetry]
50
+ divergences: List[float] # L2 distance between consecutive embeddings
51
+ cosine_similarities: List[float]
52
+ trajectory_curvature: List[float] # Angle change in embedding space
53
+ cumulative_drift: List[float] # Running sum of divergences
54
+
55
+ def entropy_proxy(self, position: int) -> float:
56
+ """Compute entropy proxy from embedding divergence at position."""
57
+ if position < 1 or position >= len(self.divergences):
58
+ return 0.0
59
+ div = self.divergences[position]
60
+ cum = self.cumulative_drift[position] if position < len(self.cumulative_drift) else div
61
+ if cum < 1e-6:
62
+ return 0.0
63
+ return min(1.0, div / (cum / max(1, position + 1)))
64
+
65
+
66
+ class OllamaTelemetryExtractor:
67
+ """
68
+ Extract per-token telemetry from Ollama generation streams.
69
+
70
+ Dual-tier architecture:
71
+ Tier 1: Fast surface metrics from stream (timing, repetition)
72
+ Tier 2: Deep embedding-space analysis via /api/embeddings
73
+ """
74
+
75
+ def __init__(
76
+ self,
77
+ ollama_host: str = "http://localhost:11434",
78
+ embedding_model: str = "functiongemma:latest",
79
+ telemetry_interval: int = 5,
80
+ history_window: int = 20,
81
+ ):
82
+ self.ollama_host = ollama_host.rstrip("/")
83
+ self.embedding_model = embedding_model
84
+ self.telemetry_interval = telemetry_interval
85
+ self.history_window = history_window
86
+ self._token_buffer: List[TokenTelemetry] = []
87
+ self._text_buffer: str = ""
88
+ self._start_time: Optional[float] = None
89
+ self._embedding_cache: Dict[str, List[float]] = {}
90
+
91
+ def _get_embedding(self, text: str) -> Optional[List[float]]:
92
+ if text in self._embedding_cache:
93
+ return self._embedding_cache[text]
94
+
95
+ payload = json.dumps({
96
+ "model": self.embedding_model,
97
+ "prompt": text,
98
+ }).encode("utf-8")
99
+
100
+ req = urllib.request.Request(
101
+ f"{self.ollama_host}/api/embeddings",
102
+ data=payload,
103
+ headers={"Content-Type": "application/json"},
104
+ method="POST",
105
+ )
106
+
107
+ try:
108
+ with urllib.request.urlopen(req, timeout=30) as resp:
109
+ data = json.loads(resp.read().decode("utf-8"))
110
+ embedding = data.get("embedding")
111
+ if embedding:
112
+ self._embedding_cache[text] = embedding
113
+ return embedding
114
+ except Exception:
115
+ return None
116
+
117
+ def _compute_repetition(self, token_text: str) -> int:
118
+ recent = [t.token_text for t in self._token_buffer[-self.history_window:]]
119
+ return recent.count(token_text)
120
+
121
+ def on_token(self, token_text: str, position: int) -> TokenTelemetry:
122
+ now = time.time()
123
+ if self._start_time is None:
124
+ self._start_time = now
125
+
126
+ elapsed_ms = (now - self._start_time) * 1000
127
+ prev_time = self._token_buffer[-1].timestamp_ms if self._token_buffer else elapsed_ms
128
+ time_since_prev = elapsed_ms - prev_time
129
+
130
+ self._text_buffer += token_text
131
+
132
+ embedding = None
133
+ if position % self.telemetry_interval == 0 and position > 0:
134
+ embedding = self._get_embedding(self._text_buffer)
135
+
136
+ telemetry = TokenTelemetry(
137
+ position=position,
138
+ token_text=token_text,
139
+ timestamp_ms=elapsed_ms,
140
+ time_since_prev_ms=time_since_prev,
141
+ char_length=len(token_text),
142
+ is_punctuation=token_text.strip() in ".,;:!?",
143
+ is_whitespace=token_text.strip() == "",
144
+ repetition_count=self._compute_repetition(token_text),
145
+ embedding=embedding,
146
+ )
147
+
148
+ self._token_buffer.append(telemetry)
149
+ return telemetry
150
+
151
+ def build_embedding_trajectory(self) -> EmbeddingTrajectory:
152
+ """Compute embedding-space trajectory metrics."""
153
+ embeddings = [t.embedding for t in self._token_buffer if t.embedding is not None]
154
+
155
+ if len(embeddings) < 2:
156
+ return EmbeddingTrajectory(
157
+ tokens=self._token_buffer,
158
+ divergences=[],
159
+ cosine_similarities=[],
160
+ trajectory_curvature=[],
161
+ cumulative_drift=[],
162
+ )
163
+
164
+ if HAS_NUMPY:
165
+ emb_array = np.array(embeddings)
166
+ divergences = []
167
+ for i in range(1, len(emb_array)):
168
+ div = float(np.linalg.norm(emb_array[i] - emb_array[i-1]))
169
+ divergences.append(div)
170
+ cos_sims = []
171
+ for i in range(1, len(emb_array)):
172
+ dot = np.dot(emb_array[i], emb_array[i-1])
173
+ norm = np.linalg.norm(emb_array[i]) * np.linalg.norm(emb_array[i-1])
174
+ cos_sims.append(float(dot / norm) if norm > 0 else 0.0)
175
+ curvature = []
176
+ for i in range(2, len(emb_array)):
177
+ v1 = emb_array[i-1] - emb_array[i-2]
178
+ v2 = emb_array[i] - emb_array[i-1]
179
+ cross = np.linalg.norm(np.cross(v1, v2))
180
+ dot = np.dot(v1, v2)
181
+ angle = math.atan2(cross, dot) if len(v1) == 3 else 0.0
182
+ curvature.append(float(angle))
183
+ else:
184
+ # Pure Python fallback
185
+ divergences = []
186
+ for i in range(1, len(embeddings)):
187
+ div = math.sqrt(sum((a - b) ** 2 for a, b in zip(embeddings[i], embeddings[i-1])))
188
+ divergences.append(div)
189
+ cos_sims = []
190
+ for i in range(1, len(embeddings)):
191
+ dot = sum(a * b for a, b in zip(embeddings[i], embeddings[i-1]))
192
+ norm1 = math.sqrt(sum(a * a for a in embeddings[i]))
193
+ norm2 = math.sqrt(sum(a * a for a in embeddings[i-1]))
194
+ cos_sims.append(dot / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0.0)
195
+ curvature = []
196
+
197
+ cumulative = []
198
+ total = 0.0
199
+ for div in divergences:
200
+ total += div
201
+ cumulative.append(total)
202
+
203
+ return EmbeddingTrajectory(
204
+ tokens=self._token_buffer,
205
+ divergences=divergences,
206
+ cosine_similarities=cos_sims,
207
+ trajectory_curvature=curvature,
208
+ cumulative_drift=cumulative,
209
+ )
210
+
211
+ def compute_surface_entropy(self, token: TokenTelemetry) -> float:
212
+ """
213
+ Fast heuristic entropy from surface metrics.
214
+ Maps timing + repetition patterns to entropy proxy.
215
+ """
216
+ entropy = 0.0
217
+
218
+ if token.time_since_prev_ms > 200:
219
+ entropy += 0.2
220
+ if token.time_since_prev_ms > 500:
221
+ entropy += 0.3
222
+
223
+ if token.repetition_count >= 3:
224
+ entropy -= 0.3
225
+
226
+ if token.is_punctuation and token.time_since_prev_ms > 100:
227
+ entropy += 0.1
228
+
229
+ return max(0.0, min(1.0, entropy))
230
+
231
+ def to_per_token_debug(self, trajectory, twave, model_id: str, tier: str):
232
+ """Convert telemetry to PerTokenDebug schema."""
233
+ from .per_token_debug import PerTokenDebug
234
+ debugs = []
235
+
236
+ for i, token in enumerate(self._token_buffer):
237
+ surface_H = self.compute_surface_entropy(token)
238
+
239
+ embedding_H = 0.0
240
+ if i > 0 and i <= len(trajectory.divergences):
241
+ embedding_H = trajectory.entropy_proxy(i)
242
+
243
+ H = 0.3 * surface_H + 0.7 * embedding_H if embedding_H > 0 else surface_H
244
+ H = max(0.0, min(1.0, H))
245
+
246
+ coherence = twave.compute_coherence(H * twave.H_max)
247
+ T_eff = twave.T_c * coherence
248
+
249
+ CG = 0.0
250
+ mu_ret = twave.compute_chemical_potential(CG)
251
+
252
+ psi = twave.compute_order_parameter(coherence, mu_ret)
253
+ f_density = twave.compute_free_energy_density(psi, coherence, mu_ret)
254
+
255
+ prev_psi = debugs[-1].twave_psi if debugs else 0.0
256
+ k_local = abs(psi - prev_psi) if prev_psi else 0.0
257
+ E_exc = twave.compute_bogoliubov_energy(psi, k_local, mu_ret)
258
+
259
+ debug = PerTokenDebug(
260
+ position=i,
261
+ token_id=0,
262
+ token_str=token.token_text,
263
+ entropy=H * twave.H_max,
264
+ entropy_normalized=H,
265
+ twave_T_eff=T_eff,
266
+ twave_coherence=coherence,
267
+ twave_psi=psi,
268
+ twave_f_density=f_density,
269
+ twave_mu_ret=mu_ret,
270
+ twave_E_exc=E_exc,
271
+ twave_k_local=k_local,
272
+ generation_time_ms=token.time_since_prev_ms,
273
+ model_id=model_id,
274
+ tier=tier,
275
+ )
276
+ debugs.append(debug)
277
+
278
+ for i in range(len(debugs)):
279
+ if i >= 2:
280
+ ent_history = [d.entropy for d in debugs[max(0, i-2):i+1]]
281
+ debugs[i].twave_C_V = twave.compute_specific_heat(ent_history)
282
+ debugs[i].jarzynski_W_i = 0.0
283
+
284
+ return debugs
285
+
286
+ def reset(self):
287
+ self._token_buffer = []
288
+ self._text_buffer = ""
289
+ self._start_time = None
290
+
291
+
292
+ class OllamaStreamingClient:
293
+ """
294
+ Production Ollama client with streaming + telemetry extraction.
295
+ """
296
+
297
+ def __init__(
298
+ self,
299
+ ollama_host: str = "http://localhost:11434",
300
+ telemetry_extractor: Optional[OllamaTelemetryExtractor] = None,
301
+ ):
302
+ self.ollama_host = ollama_host.rstrip("/")
303
+ self.telemetry = telemetry_extractor or OllamaTelemetryExtractor(ollama_host)
304
+
305
+ def generate(
306
+ self,
307
+ model_tag: str,
308
+ prompt: str,
309
+ system: Optional[str] = None,
310
+ temperature: float = 0.7,
311
+ max_tokens: int = 2048,
312
+ top_p: float = 0.95,
313
+ stream_callback=None,
314
+ ) -> Tuple[str, List[TokenTelemetry], EmbeddingTrajectory]:
315
+ """
316
+ Generate text via Ollama with full telemetry extraction.
317
+
318
+ Returns: (full_text, token_telemetry_list, embedding_trajectory)
319
+ """
320
+ self.telemetry.reset()
321
+
322
+ messages = []
323
+ if system:
324
+ messages.append({"role": "system", "content": system})
325
+ messages.append({"role": "user", "content": prompt})
326
+
327
+ payload = json.dumps({
328
+ "model": model_tag,
329
+ "messages": messages,
330
+ "stream": True,
331
+ "options": {
332
+ "temperature": temperature,
333
+ "num_predict": max_tokens,
334
+ "top_p": top_p,
335
+ },
336
+ }).encode("utf-8")
337
+
338
+ req = urllib.request.Request(
339
+ f"{self.ollama_host}/api/chat",
340
+ data=payload,
341
+ headers={"Content-Type": "application/json"},
342
+ method="POST",
343
+ )
344
+
345
+ full_text = ""
346
+ position = 0
347
+
348
+ try:
349
+ with urllib.request.urlopen(req, timeout=300) as resp:
350
+ for line in resp:
351
+ if not line:
352
+ continue
353
+ try:
354
+ data = json.loads(line.decode("utf-8"))
355
+ except json.JSONDecodeError:
356
+ continue
357
+
358
+ if data.get("done", False):
359
+ break
360
+
361
+ token_text = data.get("message", {}).get("content", "")
362
+ if not token_text:
363
+ continue
364
+
365
+ full_text += token_text
366
+
367
+ telemetry = self.telemetry.on_token(token_text, position)
368
+ position += 1
369
+
370
+ if stream_callback:
371
+ stream_callback(token_text, telemetry)
372
+
373
+ except urllib.error.URLError as e:
374
+ raise RuntimeError(f"Ollama connection failed: {e}")
375
+
376
+ trajectory = self.telemetry.build_embedding_trajectory()
377
+
378
+ return full_text, self.telemetry._token_buffer, trajectory
379
+
380
+ def generate_non_streaming(
381
+ self,
382
+ model_tag: str,
383
+ prompt: str,
384
+ system: Optional[str] = None,
385
+ temperature: float = 0.7,
386
+ max_tokens: int = 2048,
387
+ top_p: float = 0.95,
388
+ ) -> str:
389
+ """Simple non-streaming generation (faster, no telemetry)."""
390
+ messages = []
391
+ if system:
392
+ messages.append({"role": "system", "content": system})
393
+ messages.append({"role": "user", "content": prompt})
394
+
395
+ payload = json.dumps({
396
+ "model": model_tag,
397
+ "messages": messages,
398
+ "stream": False,
399
+ "options": {
400
+ "temperature": temperature,
401
+ "num_predict": max_tokens,
402
+ "top_p": top_p,
403
+ },
404
+ }).encode("utf-8")
405
+
406
+ req = urllib.request.Request(
407
+ f"{self.ollama_host}/api/chat",
408
+ data=payload,
409
+ headers={"Content-Type": "application/json"},
410
+ method="POST",
411
+ )
412
+
413
+ try:
414
+ with urllib.request.urlopen(req, timeout=300) as resp:
415
+ data = json.loads(resp.read().decode("utf-8"))
416
+ return data.get("message", {}).get("content", "")
417
+ except urllib.error.URLError as e:
418
+ raise RuntimeError(f"Ollama connection failed: {e}")
419
+
420
+
421
+ def estimate_entropy_from_response(response: str, chunk_size: int = 10) -> float:
422
+ """
423
+ Post-hoc entropy estimation from completed response.
424
+ Uses lexical diversity as a proxy for generation entropy.
425
+
426
+ Returns: Estimated normalized entropy [0, 1]
427
+ """
428
+ if not response:
429
+ return 0.0
430
+
431
+ words = response.split()
432
+ if not words:
433
+ return 0.0
434
+
435
+ unique_words = len(set(w.lower() for w in words))
436
+ lexical_diversity = unique_words / len(words)
437
+
438
+ from collections import Counter
439
+ word_counts = Counter(w.lower() for w in words)
440
+ max_repeat = max(word_counts.values()) if word_counts else 1
441
+ repetition_penalty = min(1.0, max_repeat / max(1, len(words) * 0.1))
442
+
443
+ sentences = response.split(".")
444
+ sentence_lengths = [len(s.split()) for s in sentences if s.strip()]
445
+ if len(sentence_lengths) > 1:
446
+ mean_len = sum(sentence_lengths) / len(sentence_lengths)
447
+ variance = sum((x - mean_len) ** 2 for x in sentence_lengths) / len(sentence_lengths)
448
+ length_variance = variance / max(1, mean_len ** 2)
449
+ else:
450
+ length_variance = 0.0
451
+
452
+ entropy = (
453
+ 0.5 * lexical_diversity +
454
+ 0.3 * (1.0 - repetition_penalty) +
455
+ 0.2 * min(1.0, length_variance)
456
+ )
457
+
458
+ return min(1.0, max(0.0, entropy))