diff --git "a/htmlcov/z_4b3d0d3b603de221_json_py.html" "b/htmlcov/z_4b3d0d3b603de221_json_py.html" new file mode 100644--- /dev/null +++ "b/htmlcov/z_4b3d0d3b603de221_json_py.html" @@ -0,0 +1,392 @@ + + + + + Coverage for tinytroupe/utils/json.py: 11% + + + + + +
+
+

+ Coverage for tinytroupe / utils / json.py: + 11% +

+ +

+ 167 statements   + + + +

+

+ « prev     + ^ index     + » next +       + coverage.py v7.13.4, + created at 2026-02-28 17:48 +0000 +

+ +
+
+
+

1import json 

+

2import copy 

+

3from pydantic import BaseModel 

+

4 

+

5from tinytroupe.utils import logger 

+

6 

+

7class JsonSerializableRegistry: 

+

8 """ 

+

9 A mixin class that provides JSON serialization, deserialization, and subclass registration. 

+

10 """ 

+

11 

+

12 class_mapping = {} 

+

13 

+

14 def to_json(self, include: list = None, suppress: list = None, file_path: str = None, 

+

15 serialization_type_field_name = "json_serializable_class_name") -> dict: 

+

16 """ 

+

17 Returns a JSON representation of the object. 

+

18  

+

19 Args: 

+

20 include (list, optional): Attributes to include in the serialization. Will override the default behavior. 

+

21 suppress (list, optional): Attributes to suppress from the serialization. Will override the default behavior. 

+

22 file_path (str, optional): Path to a file where the JSON will be written. 

+

23 """ 

+

24 # Gather all serializable attributes from the class hierarchy 

+

25 serializable_attrs = set() 

+

26 suppress_attrs = set() 

+

27 custom_serializers = {} 

+

28 for cls in self.__class__.__mro__: # Traverse the class hierarchy 

+

29 if hasattr(cls, 'serializable_attributes') and isinstance(cls.serializable_attributes, list): 

+

30 serializable_attrs.update(cls.serializable_attributes) 

+

31 if hasattr(cls, 'suppress_attributes_from_serialization') and isinstance(cls.suppress_attributes_from_serialization, list): 

+

32 suppress_attrs.update(cls.suppress_attributes_from_serialization) 

+

33 if hasattr(cls, 'custom_serializers') and isinstance(cls.custom_serializers, dict): 

+

34 custom_serializers.update(cls.custom_serializers) 

+

35 

+

36 # Override attributes with method parameters if provided 

+

37 if include: 

+

38 serializable_attrs = set(include) 

+

39 if suppress: 

+

40 suppress_attrs.update(suppress) 

+

41 

+

42 def aux_serialize_item(item): 

+

43 if isinstance(item, JsonSerializableRegistry): 

+

44 return item.to_json(serialization_type_field_name=serialization_type_field_name) 

+

45 elif isinstance(item, BaseModel): 

+

46 # If it's a Pydantic model, convert it to a dict first 

+

47 logger.debug(f"Serializing Pydantic model: {item}") 

+

48 return item.model_dump(mode="json", exclude_unset=True) 

+

49 else: 

+

50 return copy.deepcopy(item) 

+

51 

+

52 result = {serialization_type_field_name: self.__class__.__name__} 

+

53 for attr in serializable_attrs if serializable_attrs else self.__dict__: 

+

54 if attr not in suppress_attrs: 

+

55 value = getattr(self, attr, None) 

+

56 

+

57 attr_renamed = self._programmatic_name_to_json_name(attr) 

+

58 

+

59 # Check if there's a custom serializer for this attribute 

+

60 if attr in custom_serializers: 

+

61 result[attr_renamed] = custom_serializers[attr](value) 

+

62 elif isinstance(value, list): 

+

63 result[attr_renamed] = [aux_serialize_item(item) for item in value] 

+

64 elif isinstance(value, dict): 

+

65 result[attr_renamed] = {k: aux_serialize_item(v) for k, v in value.items()} 

+

66 else: # isinstance(value, JsonSerializableRegistry) or isinstance(value, BaseModel) or other types 

+

67 result[attr_renamed] = aux_serialize_item(value) 

+

68 

+

69 if file_path: 

+

70 # Create directories if they do not exist 

+

71 import os 

+

72 os.makedirs(os.path.dirname(file_path), exist_ok=True) 

+

73 with open(file_path, 'w', encoding='utf-8', errors='replace') as f: 

+

74 json.dump(result, f, indent=4) 

+

75 

+

76 return result 

+

77 

+

78 @classmethod 

+

79 def from_json(cls, json_dict_or_path, suppress: list = None, 

+

80 serialization_type_field_name = "json_serializable_class_name", 

+

81 post_init_params: dict = None): 

+

82 """ 

+

83 Loads a JSON representation of the object and creates an instance of the class. 

+

84  

+

85 Args: 

+

86 json_dict_or_path (dict or str): The JSON dictionary representing the object or a file path to load the JSON from. 

+

87 suppress (list, optional): Attributes to suppress from being loaded. 

+

88  

+

89 Returns: 

+

90 An instance of the class populated with the data from json_dict_or_path. 

+

91 """ 

+

92 if isinstance(json_dict_or_path, str): 

+

93 with open(json_dict_or_path, 'r', encoding='utf-8', errors='replace') as f: 

+

94 json_dict = json.load(f) 

+

95 else: 

+

96 json_dict = json_dict_or_path 

+

97 

+

98 subclass_name = json_dict.get(serialization_type_field_name) 

+

99 target_class = cls.class_mapping.get(subclass_name, cls) 

+

100 instance = target_class.__new__(target_class) # Create an instance without calling __init__ 

+

101 

+

102 # Gather all serializable attributes from the class hierarchy 

+

103 serializable_attrs = set() 

+

104 custom_deserializers = {} 

+

105 suppress_attrs = set(suppress) if suppress else set() 

+

106 for target_mro in target_class.__mro__: 

+

107 if hasattr(target_mro, 'serializable_attributes') and isinstance(target_mro.serializable_attributes, list): 

+

108 serializable_attrs.update(target_mro.serializable_attributes) 

+

109 if hasattr(target_mro, 'custom_deserializers') and isinstance(target_mro.custom_deserializers, dict): 

+

110 custom_deserializers.update(target_mro.custom_deserializers) 

+

111 if hasattr(target_mro, 'suppress_attributes_from_serialization') and isinstance(target_mro.suppress_attributes_from_serialization, list): 

+

112 suppress_attrs.update(target_mro.suppress_attributes_from_serialization) 

+

113 

+

114 # Assign values only for serializable attributes if specified, otherwise assign everything 

+

115 for key in serializable_attrs if serializable_attrs else json_dict: 

+

116 key_in_json = cls._programmatic_name_to_json_name(key) 

+

117 if key_in_json in json_dict and key not in suppress_attrs: 

+

118 value = json_dict[key_in_json] 

+

119 if key in custom_deserializers: 

+

120 # Use custom initializer if provided 

+

121 setattr(instance, key, custom_deserializers[key](value)) 

+

122 elif isinstance(value, dict) and serialization_type_field_name in value: 

+

123 # Assume it's another JsonSerializableRegistry object 

+

124 setattr(instance, key, JsonSerializableRegistry.from_json(value, serialization_type_field_name=serialization_type_field_name)) 

+

125 elif isinstance(value, list): 

+

126 # Handle collections, recursively deserialize if items are JsonSerializableRegistry objects 

+

127 deserialized_collection = [] 

+

128 for item in value: 

+

129 if isinstance(item, dict) and serialization_type_field_name in item: 

+

130 deserialized_collection.append(JsonSerializableRegistry.from_json(item, serialization_type_field_name=serialization_type_field_name)) 

+

131 else: 

+

132 deserialized_collection.append(copy.deepcopy(item)) 

+

133 setattr(instance, key, deserialized_collection) 

+

134 else: 

+

135 setattr(instance, key, copy.deepcopy(value)) 

+

136 

+

137 # Call post-deserialization initialization if available 

+

138 if hasattr(instance, '_post_deserialization_init') and callable(instance._post_deserialization_init): 

+

139 post_init_params = post_init_params if post_init_params else {} 

+

140 instance._post_deserialization_init(**post_init_params) 

+

141 

+

142 return instance 

+

143 

+

144 def __init_subclass__(cls, **kwargs): 

+

145 super().__init_subclass__(**kwargs) 

+

146 # Register the subclass using its name as the key 

+

147 JsonSerializableRegistry.class_mapping[cls.__name__] = cls 

+

148 

+

149 # Automatically extend serializable attributes and custom initializers from parent classes  

+

150 if hasattr(cls, 'serializable_attributes') and isinstance(cls.serializable_attributes, list): 

+

151 for base in cls.__bases__: 

+

152 if hasattr(base, 'serializable_attributes') and isinstance(base.serializable_attributes, list): 

+

153 cls.serializable_attributes = list(set(base.serializable_attributes + cls.serializable_attributes)) 

+

154 

+

155 if hasattr(cls, 'suppress_attributes_from_serialization') and isinstance(cls.suppress_attributes_from_serialization, list): 

+

156 for base in cls.__bases__: 

+

157 if hasattr(base, 'suppress_attributes_from_serialization') and isinstance(base.suppress_attributes_from_serialization, list): 

+

158 cls.suppress_attributes_from_serialization = list(set(base.suppress_attributes_from_serialization + cls.suppress_attributes_from_serialization)) 

+

159 

+

160 if hasattr(cls, 'custom_deserializers') and isinstance(cls.custom_deserializers, dict): 

+

161 for base in cls.__bases__: 

+

162 if hasattr(base, 'custom_deserializers') and isinstance(base.custom_deserializers, dict): 

+

163 base_initializers = base.custom_deserializers.copy() 

+

164 base_initializers.update(cls.custom_deserializers) 

+

165 cls.custom_deserializers = base_initializers 

+

166 

+

167 if hasattr(cls, 'custom_serializers') and isinstance(cls.custom_serializers, dict): 

+

168 for base in cls.__bases__: 

+

169 if hasattr(base, 'custom_serializers') and isinstance(base.custom_serializers, dict): 

+

170 base_serializers = base.custom_serializers.copy() 

+

171 base_serializers.update(cls.custom_serializers) 

+

172 cls.custom_serializers = base_serializers 

+

173 

+

174 def _post_deserialization_init(self, **kwargs): 

+

175 # if there's a _post_init method, call it after deserialization 

+

176 if hasattr(self, '_post_init'): 

+

177 self._post_init(**kwargs) 

+

178 

+

179 @classmethod 

+

180 def _programmatic_name_to_json_name(cls, name): 

+

181 """ 

+

182 Converts a programmatic name to a JSON name by converting it to snake case. 

+

183 """ 

+

184 if hasattr(cls, 'serializable_attributes_renaming') and isinstance(cls.serializable_attributes_renaming, dict): 

+

185 return cls.serializable_attributes_renaming.get(name, name) 

+

186 return name 

+

187 

+

188 @classmethod 

+

189 def _json_name_to_programmatic_name(cls, name): 

+

190 """ 

+

191 Converts a JSON name to a programmatic name. 

+

192 """ 

+

193 if hasattr(cls, 'serializable_attributes_renaming') and isinstance(cls.serializable_attributes_renaming, dict): 

+

194 reverse_rename = {} 

+

195 for k, v in cls.serializable_attributes_renaming.items(): 

+

196 if v in reverse_rename: 

+

197 raise ValueError(f"Duplicate value '{v}' found in serializable_attributes_renaming.") 

+

198 reverse_rename[v] = k 

+

199 return reverse_rename.get(name, name) 

+

200 return name 

+

201 

+

202def post_init(cls): 

+

203 """ 

+

204 Decorator to enforce a post-initialization method call in a class, if it has one. 

+

205 The method must be named `_post_init`. 

+

206 """ 

+

207 original_init = cls.__init__ 

+

208 

+

209 def new_init(self, *args, **kwargs): 

+

210 original_init(self, *args, **kwargs) 

+

211 if hasattr(cls, '_post_init'): 

+

212 cls._post_init(self) 

+

213 

+

214 cls.__init__ = new_init 

+

215 return cls 

+

216 

+

217def merge_dicts(current, additions, overwrite=False, error_on_conflict=True, remove_duplicates=True): 

+

218 """ 

+

219 Merges two dictionaries and returns a new dictionary. Works as follows: 

+

220 - If a key exists in the additions dictionary but not in the current dictionary, it is added. 

+

221 - If a key maps to None in the current dictionary, it is replaced by the value in the additions dictionary. 

+

222 - If a key exists in both dictionaries and the values are dictionaries, the function is called recursively. 

+

223 - If a key exists in both dictionaries and the values are lists, the lists are concatenated and duplicates are removed 

+

224 (if remove_duplicates is True). 

+

225 - If the values are of different types, an exception is raised. 

+

226 - If the values are of the same type but not both lists/dictionaries, the value from the additions dictionary overwrites the value in the current dictionary based on the overwrite parameter. 

+

227  

+

228 Parameters: 

+

229 - current (dict): The original dictionary. 

+

230 - additions (dict): The dictionary with values to add. 

+

231 - overwrite (bool): Whether to overwrite values if they are of the same type but not both lists/dictionaries. 

+

232 - error_on_conflict (bool): Whether to raise an error if there is a conflict and overwrite is False. 

+

233 - remove_duplicates (bool): Whether to remove duplicates from lists when merging. 

+

234  

+

235 Returns: 

+

236 - dict: A new dictionary with merged values. 

+

237 """ 

+

238 merged = current.copy() # Create a copy of the current dictionary to avoid altering it 

+

239 

+

240 for key in additions: 

+

241 if key in merged: 

+

242 # If the current value is None, directly assign the new value 

+

243 if merged[key] is None: 

+

244 merged[key] = additions[key] 

+

245 # If both values are dictionaries, merge them recursively 

+

246 elif isinstance(merged[key], dict) and isinstance(additions[key], dict): 

+

247 merged[key] = merge_dicts(merged[key], additions[key], overwrite, error_on_conflict) 

+

248 # If both values are lists, concatenate them and remove duplicates 

+

249 elif isinstance(merged[key], list) and isinstance(additions[key], list): 

+

250 merged[key].extend(additions[key]) 

+

251 # Remove duplicates while preserving order 

+

252 if remove_duplicates: 

+

253 merged[key] = remove_duplicate_items(merged[key]) 

+

254 # If the values are of different types, raise an exception 

+

255 elif type(merged[key]) != type(additions[key]): 

+

256 raise TypeError(f"Cannot merge different types: {type(merged[key])} and {type(additions[key])} for key '{key}'") 

+

257 # If the values are of the same type but not both lists/dictionaries, decide based on the overwrite parameter 

+

258 else: 

+

259 if overwrite: 

+

260 merged[key] = additions[key] 

+

261 elif merged[key] != additions[key]: 

+

262 if error_on_conflict: 

+

263 raise ValueError(f"Conflict at key '{key}': overwrite is set to False and values are different.") 

+

264 else: 

+

265 continue # Ignore the conflict and continue 

+

266 else: 

+

267 # If the key is not present in merged, add it from additions 

+

268 merged[key] = additions[key] 

+

269 

+

270 return merged 

+

271 

+

272def remove_duplicate_items(lst): 

+

273 """ 

+

274 Removes duplicates from a list while preserving order. 

+

275 Handles unhashable elements by using a list comprehension. 

+

276 

+

277 Parameters: 

+

278 - lst (list): The list to remove duplicates from. 

+

279 

+

280 Returns: 

+

281 - list: A new list with duplicates removed. 

+

282 """ 

+

283 seen = [] 

+

284 result = [] 

+

285 for item in lst: 

+

286 if isinstance(item, dict): 

+

287 # Convert dict to a frozenset of its items to make it hashable 

+

288 item_key = frozenset(item.items()) 

+

289 else: 

+

290 item_key = item 

+

291 

+

292 if item_key not in seen: 

+

293 seen.append(item_key) 

+

294 result.append(item) 

+

295 return result 

+
+ + +