File size: 10,959 Bytes
167596f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 | import os
from dataclasses import dataclass
from typing import Any, final
from lightrag.base import (
BaseKVStorage,
)
from lightrag.utils import (
load_json,
logger,
write_json,
)
from lightrag.exceptions import StorageNotInitializedError
from .shared_storage import (
get_namespace_data,
get_storage_lock,
get_data_init_lock,
get_update_flag,
set_all_update_flags,
clear_all_update_flags,
try_initialize_namespace,
)
@final
@dataclass
class JsonKVStorage(BaseKVStorage):
def __post_init__(self):
working_dir = self.global_config["working_dir"]
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
self.final_namespace = f"{self.workspace}_{self.namespace}"
else:
# Default behavior when workspace is empty
workspace_dir = working_dir
self.final_namespace = self.namespace
self.workspace = "_"
os.makedirs(workspace_dir, exist_ok=True)
self._file_name = os.path.join(workspace_dir, f"kv_store_{self.namespace}.json")
self._data = None
self._storage_lock = None
self.storage_updated = None
async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_storage_lock()
self.storage_updated = await get_update_flag(self.final_namespace)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(self.final_namespace)
self._data = await get_namespace_data(self.final_namespace)
if need_init:
loaded_data = load_json(self._file_name) or {}
async with self._storage_lock:
# Migrate legacy cache structure if needed
if self.namespace.endswith("_cache"):
loaded_data = await self._migrate_legacy_cache_structure(
loaded_data
)
self._data.update(loaded_data)
data_count = len(loaded_data)
logger.info(
f"[{self.workspace}] Process {os.getpid()} KV load {self.namespace} with {data_count} records"
)
async def index_done_callback(self) -> None:
async with self._storage_lock:
if self.storage_updated.value:
data_dict = (
dict(self._data) if hasattr(self._data, "_getvalue") else self._data
)
# Calculate data count - all data is now flattened
data_count = len(data_dict)
logger.debug(
f"[{self.workspace}] Process {os.getpid()} KV writting {data_count} records to {self.namespace}"
)
write_json(data_dict, self._file_name)
await clear_all_update_flags(self.final_namespace)
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
async with self._storage_lock:
result = {}
for key, value in self._data.items():
if value:
# Create a copy to avoid modifying the original data
data = dict(value)
# Ensure time fields are present, provide default values for old data
data.setdefault("create_time", 0)
data.setdefault("update_time", 0)
result[key] = data
else:
result[key] = value
return result
async def get_by_id(self, id: str) -> dict[str, Any] | None:
async with self._storage_lock:
result = self._data.get(id)
if result:
# Create a copy to avoid modifying the original data
result = dict(result)
# Ensure time fields are present, provide default values for old data
result.setdefault("create_time", 0)
result.setdefault("update_time", 0)
# Ensure _id field contains the clean ID
result["_id"] = id
return result
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
async with self._storage_lock:
results = []
for id in ids:
data = self._data.get(id, None)
if data:
# Create a copy to avoid modifying the original data
result = {k: v for k, v in data.items()}
# Ensure time fields are present, provide default values for old data
result.setdefault("create_time", 0)
result.setdefault("update_time", 0)
# Ensure _id field contains the clean ID
result["_id"] = id
results.append(result)
else:
results.append(None)
return results
async def filter_keys(self, keys: set[str]) -> set[str]:
async with self._storage_lock:
return set(keys) - set(self._data.keys())
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
"""
if not data:
return
import time
current_time = int(time.time()) # Get current Unix timestamp
logger.debug(
f"[{self.workspace}] Inserting {len(data)} records to {self.namespace}"
)
if self._storage_lock is None:
raise StorageNotInitializedError("JsonKVStorage")
async with self._storage_lock:
# Add timestamps to data based on whether key exists
for k, v in data.items():
# For text_chunks namespace, ensure llm_cache_list field exists
if self.namespace.endswith("text_chunks"):
if "llm_cache_list" not in v:
v["llm_cache_list"] = []
# Add timestamps based on whether key exists
if k in self._data: # Key exists, only update update_time
v["update_time"] = current_time
else: # New key, set both create_time and update_time
v["create_time"] = current_time
v["update_time"] = current_time
v["_id"] = k
self._data.update(data)
await set_all_update_flags(self.final_namespace)
async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
Args:
ids (list[str]): List of document IDs to be deleted from storage
Returns:
None
"""
async with self._storage_lock:
any_deleted = False
for doc_id in ids:
result = self._data.pop(doc_id, None)
if result is not None:
any_deleted = True
if any_deleted:
await set_all_update_flags(self.final_namespace)
async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources
This action will persistent the data to disk immediately.
This method will:
1. Clear all data from memory
2. Update flags to notify other processes
3. Trigger index_done_callback to save the empty state
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(self.final_namespace)
await self.index_done_callback()
logger.info(
f"[{self.workspace}] Process {os.getpid()} drop {self.namespace}"
)
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"[{self.workspace}] Error dropping {self.namespace}: {e}")
return {"status": "error", "message": str(e)}
async def _migrate_legacy_cache_structure(self, data: dict) -> dict:
"""Migrate legacy nested cache structure to flattened structure
Args:
data: Original data dictionary that may contain legacy structure
Returns:
Migrated data dictionary with flattened cache keys
"""
from lightrag.utils import generate_cache_key
# Early return if data is empty
if not data:
return data
# Check first entry to see if it's already in new format
first_key = next(iter(data.keys()))
if ":" in first_key and len(first_key.split(":")) == 3:
# Already in flattened format, return as-is
return data
migrated_data = {}
migration_count = 0
for key, value in data.items():
# Check if this is a legacy nested cache structure
if isinstance(value, dict) and all(
isinstance(v, dict) and "return" in v for v in value.values()
):
# This looks like a legacy cache mode with nested structure
mode = key
for cache_hash, cache_entry in value.items():
cache_type = cache_entry.get("cache_type", "extract")
flattened_key = generate_cache_key(mode, cache_type, cache_hash)
migrated_data[flattened_key] = cache_entry
migration_count += 1
else:
# Keep non-cache data or already flattened cache data as-is
migrated_data[key] = value
if migration_count > 0:
logger.info(
f"[{self.workspace}] Migrated {migration_count} legacy cache entries to flattened structure"
)
# Persist migrated data immediately
write_json(migrated_data, self._file_name)
return migrated_data
async def finalize(self):
"""Finalize storage resources
Persistence cache data to disk before exiting
"""
if self.namespace.endswith("_cache"):
await self.index_done_callback()
|