broadfield-dev commited on
Commit
ff3a113
·
verified ·
1 Parent(s): 2fcce3e

Update processor.py

Browse files
Files changed (1) hide show
  1. processor.py +90 -210
processor.py CHANGED
@@ -19,10 +19,6 @@ class DatasetCommandCenter:
19
  # ==========================================
20
 
21
  def get_dataset_metadata(self, dataset_id):
22
- """
23
- Fetches available Configs (subsets), Splits, and License info
24
- without downloading the actual data rows.
25
- """
26
  configs = ['default']
27
  splits = ['train', 'test', 'validation']
28
  license_name = "unknown"
@@ -31,30 +27,22 @@ class DatasetCommandCenter:
31
  # 1. Fetch Configs
32
  try:
33
  found_configs = get_dataset_config_names(dataset_id, token=self.token)
34
- if found_configs:
35
- configs = found_configs
36
- except Exception:
37
- pass # Keep default
38
 
39
- # 2. Fetch Metadata (Splits & License)
40
  try:
41
  selected = configs[0]
42
- # This API call can fail on some datasets, so we wrap it safely
43
  infos = get_dataset_infos(dataset_id, token=self.token)
44
-
45
  info = None
46
- if selected in infos:
47
- info = infos[selected]
48
- elif 'default' in infos:
49
- info = infos['default']
50
- elif infos:
51
- info = list(infos.values())[0]
52
 
53
  if info:
54
  splits = list(info.splits.keys())
55
  license_name = info.license or "unknown"
56
- except Exception:
57
- pass # Keep defaults if metadata fails
58
 
59
  return {
60
  "status": "success",
@@ -66,176 +54,119 @@ class DatasetCommandCenter:
66
  return {"status": "error", "message": str(e)}
67
 
68
  def get_splits_for_config(self, dataset_id, config_name):
69
- """
70
- Updates the Split dropdown when the user changes the Config.
71
- """
72
  try:
73
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
74
- if config_name in infos:
75
- splits = list(infos[config_name].splits.keys())
76
- elif len(infos) > 0:
77
- splits = list(infos.values())[0].splits.keys()
78
- else:
79
- splits = ['train', 'test']
80
  return {"status": "success", "splits": splits}
81
  except:
82
  return {"status": "success", "splits": ['train', 'test', 'validation']}
83
 
84
  def _flatten_object(self, obj, parent_key='', sep='.'):
85
- """
86
- Recursively finds all keys in nested dicts or JSON strings
87
- to populate the 'Simple Path' dropdown in the UI.
88
- """
89
  items = {}
90
-
91
- # Transparently parse JSON strings
92
  if isinstance(obj, str):
93
  s = obj.strip()
94
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
95
- try:
96
- obj = json.loads(s)
97
- except:
98
- pass # Keep as string if parse fails
99
 
100
  if isinstance(obj, dict):
101
  for k, v in obj.items():
102
  new_key = f"{parent_key}{sep}{k}" if parent_key else k
103
  items.update(self._flatten_object(v, new_key, sep=sep))
104
  elif isinstance(obj, list):
105
- # We mark lists but do not recurse infinitely
106
- new_key = f"{parent_key}" if parent_key else "list_content"
107
- items[new_key] = "List"
108
  else:
109
- # Leaf node
110
  items[parent_key] = type(obj).__name__
111
-
112
  return items
113
 
114
  def inspect_dataset(self, dataset_id, config, split):
115
- """
116
- Scans the first 10 rows to build a Schema Tree for the UI.
117
- """
118
  try:
119
  conf = config if config != 'default' else None
120
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
121
 
122
  sample_rows = []
123
  available_paths = set()
124
- schema_map = {} # Used for List Mode detection
125
 
126
  for i, row in enumerate(ds_stream):
127
  if i >= 10: break
128
 
129
- # 1. Clean row for UI Preview (convert objects to strings)
130
- clean_row = {}
131
- for k, v in row.items():
132
- if not isinstance(v, (str, int, float, bool, list, dict, type(None))):
133
- clean_row[k] = str(v)
134
- else:
135
- clean_row[k] = v
136
  sample_rows.append(clean_row)
137
 
138
- # 2. Deep Flattening for "Simple Path" dropdowns
139
  flattened = self._flatten_object(row)
140
  available_paths.update(flattened.keys())
141
 
142
- # 3. Top Level Analysis for "List Mode" detection
143
  for k, v in row.items():
144
- if k not in schema_map:
145
- schema_map[k] = {"type": "Object"}
146
-
147
  val = v
148
  if isinstance(val, str):
149
  try: val = json.loads(val)
150
  except: pass
151
-
152
- if isinstance(val, list):
153
- schema_map[k]["type"] = "List"
154
 
155
- # Reconstruct Schema Tree for UI grouping
156
  sorted_paths = sorted(list(available_paths))
157
  schema_tree = {}
158
  for path in sorted_paths:
159
  root = path.split('.')[0]
160
- if root not in schema_tree:
161
- schema_tree[root] = []
162
  schema_tree[root].append(path)
163
 
164
  return {
165
  "status": "success",
166
  "samples": sample_rows,
167
- "schema_tree": schema_tree, # Used by Simple Path Dropdown
168
- "schema": schema_map, # Used by List Mode Dropdown
169
  "dataset_id": dataset_id
170
  }
171
  except Exception as e:
172
  return {"status": "error", "message": str(e)}
173
 
174
  # ==========================================
175
- # 2. CORE EXTRACTION LOGIC
176
  # ==========================================
177
 
178
  def _get_value_by_path(self, obj, path):
179
- """
180
- Navigates dot notation (meta.user.id), automatically parsing
181
- JSON strings if encountered along the path.
182
- """
183
  if not path: return obj
184
  keys = path.split('.')
185
  current = obj
186
 
187
  for key in keys:
188
- # Auto-parse JSON string if encountered
189
  if isinstance(current, str):
190
  s = current.strip()
191
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
192
- try:
193
- current = json.loads(s)
194
- except:
195
- pass
196
 
197
  if isinstance(current, dict) and key in current:
198
  current = current[key]
199
  else:
200
- return None # Path broken
201
  return current
202
 
203
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
204
- """
205
- Logic for: FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path
206
- """
207
  data = row.get(source_col)
208
-
209
- # Parse if string
210
  if isinstance(data, str):
211
- try:
212
- data = json.loads(data)
213
- except:
214
- return None
215
-
216
- if not isinstance(data, list):
217
- return None
218
 
219
  matched_item = None
220
  for item in data:
221
- # String comparison for safety
222
  if str(item.get(filter_key, '')) == str(filter_val):
223
  matched_item = item
224
  break
225
 
226
  if matched_item:
227
  return self._get_value_by_path(matched_item, target_path)
228
-
229
  return None
230
 
231
  def _apply_projection(self, row, recipe):
232
- """
233
- Builds the new row based on the recipe.
234
- Raises ValueError if user Python code fails (Fail Fast).
235
- """
236
  new_row = {}
237
-
238
- # Setup Eval Context (Variables available in Python Mode)
239
  eval_context = row.copy()
240
  eval_context['row'] = row
241
  eval_context['json'] = json
@@ -248,169 +179,118 @@ class DatasetCommandCenter:
248
  try:
249
  if t_type == 'simple':
250
  new_row[target_col] = self._get_value_by_path(row, col_def['source'])
251
-
252
  elif t_type == 'list_search':
253
  new_row[target_col] = self._extract_from_list_logic(
254
- row,
255
- col_def['source'],
256
- col_def['filter_key'],
257
- col_def['filter_val'],
258
- col_def['target_key']
259
  )
260
-
261
  elif t_type == 'python':
262
- # Execute user code
263
- expression = col_def['expression']
264
- val = eval(expression, {}, eval_context)
265
  new_row[target_col] = val
266
-
267
  except Exception as e:
268
- # Fail Fast: Stop the generator immediately if a column fails
269
  raise ValueError(f"Column '{target_col}' failed: {str(e)}")
270
 
271
  return new_row
272
 
 
 
 
 
 
 
 
 
 
 
 
273
  # ==========================================
274
- # 3. DOCUMENTATION (MODEL CARD)
275
  # ==========================================
276
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
277
  def _generate_card(self, source_id, target_id, recipe, license_name):
278
- """
279
- Creates a README.md for the new dataset.
280
- """
281
  card_data = DatasetCardData(
282
  language="en",
283
  license=license_name,
284
- tags=["dataset-command-center", "etl", "generated-dataset"],
285
  base_model=source_id,
286
  )
287
-
288
  content = f"""
289
  # {target_id.split('/')[-1]}
290
-
291
  This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}).
292
- It was generated using the **Hugging Face Dataset Command Center**.
293
-
294
- ## Transformation Recipe
295
-
296
- The following operations were applied to the source data:
297
-
298
- | Target Column | Operation Type | Logic |
299
- |---------------|----------------|-------|
300
  """
301
  for col in recipe['columns']:
302
- c_type = col.get('type', 'simple')
303
- c_name = col['name']
304
-
305
- logic = "-"
306
- if c_type == 'simple':
307
- logic = f"Mapped from `{col.get('source')}`"
308
- elif c_type == 'list_search':
309
- logic = f"Extracted `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`"
310
- elif c_type == 'python':
311
- logic = f"Python: `{col.get('expression')}`"
312
-
313
- content += f"| **{c_name}** | {c_type} | {logic} |\n"
314
-
315
- if recipe.get('filter_rule'):
316
- content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n"
317
-
318
- content += f"\n## Original License\nThis dataset inherits the license: `{license_name}` from the source."
319
-
320
- card = DatasetCard.from_template(card_data, content=content)
321
- return card
322
-
323
- # ==========================================
324
- # 4. EXECUTION
325
- # ==========================================
326
 
327
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
328
- logger.info(f"Job started: {source_id} -> {target_id}")
329
  conf = config if config != 'default' else None
330
 
331
  def gen():
332
  ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
333
  count = 0
334
  for i, row in enumerate(ds_stream):
335
- if max_rows and count >= int(max_rows):
336
- break
337
 
338
- # 1. Filter
339
  if recipe.get('filter_rule'):
340
  try:
341
  ctx = row.copy()
342
  ctx['row'] = row
343
  ctx['json'] = json
344
  ctx['re'] = re
345
- if not eval(recipe['filter_rule'], {}, ctx):
346
- continue
347
  except Exception as e:
348
- raise ValueError(f"Filter crashed on row {i}: {e}")
349
 
350
- # 2. Projection
351
  try:
352
  yield self._apply_projection(row, recipe)
353
  count += 1
354
- except ValueError as ve:
355
- # Pass the specific column error up
356
- raise ve
357
- except Exception as e:
358
- raise ValueError(f"Unexpected crash on row {i}: {e}")
359
 
360
  try:
361
- # 1. Process & Push Data
362
  new_dataset = datasets.Dataset.from_generator(gen)
363
  new_dataset.push_to_hub(target_id, token=self.token)
364
-
365
- # 2. Generate & Push Card
366
  try:
367
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
368
  card.push_to_hub(target_id, token=self.token)
369
- except Exception as e:
370
- logger.error(f"Failed to push Dataset Card: {e}")
371
-
372
  return {"status": "success", "rows_processed": len(new_dataset)}
373
-
374
  except Exception as e:
375
  logger.error(f"Job Failed: {e}")
376
- return {"status": "failed", "error": str(e)}
377
-
378
- # ==========================================
379
- # 5. PREVIEW
380
- # ==========================================
381
-
382
- def preview_transform(self, dataset_id, config, split, recipe):
383
- conf = config if config != 'default' else None
384
-
385
- try:
386
- ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
387
- processed = []
388
-
389
- for i, row in enumerate(ds_stream):
390
- if len(processed) >= 5: break
391
-
392
- # Check Filter
393
- passed = True
394
- if recipe.get('filter_rule'):
395
- try:
396
- ctx = row.copy()
397
- ctx['row'] = row
398
- ctx['json'] = json
399
- ctx['re'] = re
400
- if not eval(recipe['filter_rule'], {}, ctx):
401
- passed = False
402
- except:
403
- passed = False # Skip invalid rows in preview
404
-
405
- if passed:
406
- try:
407
- new_row = self._apply_projection(row, recipe)
408
- processed.append(new_row)
409
- except Exception as e:
410
- # In preview, we want to see the error, not crash
411
- processed.append({"_preview_error": f"Error: {str(e)}"})
412
-
413
- return processed
414
- except Exception as e:
415
- # Return global error if loading fails
416
- raise e
 
19
  # ==========================================
20
 
21
  def get_dataset_metadata(self, dataset_id):
 
 
 
 
22
  configs = ['default']
23
  splits = ['train', 'test', 'validation']
24
  license_name = "unknown"
 
27
  # 1. Fetch Configs
28
  try:
29
  found_configs = get_dataset_config_names(dataset_id, token=self.token)
30
+ if found_configs: configs = found_configs
31
+ except: pass
 
 
32
 
33
+ # 2. Fetch Metadata
34
  try:
35
  selected = configs[0]
 
36
  infos = get_dataset_infos(dataset_id, token=self.token)
 
37
  info = None
38
+ if selected in infos: info = infos[selected]
39
+ elif 'default' in infos: info = infos['default']
40
+ elif infos: info = list(infos.values())[0]
 
 
 
41
 
42
  if info:
43
  splits = list(info.splits.keys())
44
  license_name = info.license or "unknown"
45
+ except: pass
 
46
 
47
  return {
48
  "status": "success",
 
54
  return {"status": "error", "message": str(e)}
55
 
56
  def get_splits_for_config(self, dataset_id, config_name):
 
 
 
57
  try:
58
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
59
+ splits = list(infos[config_name].splits.keys())
 
 
 
 
 
60
  return {"status": "success", "splits": splits}
61
  except:
62
  return {"status": "success", "splits": ['train', 'test', 'validation']}
63
 
64
  def _flatten_object(self, obj, parent_key='', sep='.'):
65
+ """Recursively finds keys for the UI dropdowns."""
 
 
 
66
  items = {}
 
 
67
  if isinstance(obj, str):
68
  s = obj.strip()
69
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
70
+ try: obj = json.loads(s)
71
+ except: pass
 
 
72
 
73
  if isinstance(obj, dict):
74
  for k, v in obj.items():
75
  new_key = f"{parent_key}{sep}{k}" if parent_key else k
76
  items.update(self._flatten_object(v, new_key, sep=sep))
77
  elif isinstance(obj, list):
78
+ items[parent_key or "list"] = "List"
 
 
79
  else:
 
80
  items[parent_key] = type(obj).__name__
 
81
  return items
82
 
83
  def inspect_dataset(self, dataset_id, config, split):
 
 
 
84
  try:
85
  conf = config if config != 'default' else None
86
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
87
 
88
  sample_rows = []
89
  available_paths = set()
90
+ schema_map = {}
91
 
92
  for i, row in enumerate(ds_stream):
93
  if i >= 10: break
94
 
95
+ # Clean row for UI (No objects)
96
+ clean_row = self._sanitize_for_json(row)
 
 
 
 
 
97
  sample_rows.append(clean_row)
98
 
99
+ # Schema Discovery
100
  flattened = self._flatten_object(row)
101
  available_paths.update(flattened.keys())
102
 
103
+ # List Mode Detection
104
  for k, v in row.items():
105
+ if k not in schema_map: schema_map[k] = {"type": "Object"}
 
 
106
  val = v
107
  if isinstance(val, str):
108
  try: val = json.loads(val)
109
  except: pass
110
+ if isinstance(val, list): schema_map[k]["type"] = "List"
 
 
111
 
 
112
  sorted_paths = sorted(list(available_paths))
113
  schema_tree = {}
114
  for path in sorted_paths:
115
  root = path.split('.')[0]
116
+ if root not in schema_tree: schema_tree[root] = []
 
117
  schema_tree[root].append(path)
118
 
119
  return {
120
  "status": "success",
121
  "samples": sample_rows,
122
+ "schema_tree": schema_tree,
123
+ "schema": schema_map,
124
  "dataset_id": dataset_id
125
  }
126
  except Exception as e:
127
  return {"status": "error", "message": str(e)}
128
 
129
  # ==========================================
130
+ # 2. CORE LOGIC
131
  # ==========================================
132
 
133
  def _get_value_by_path(self, obj, path):
 
 
 
 
134
  if not path: return obj
135
  keys = path.split('.')
136
  current = obj
137
 
138
  for key in keys:
 
139
  if isinstance(current, str):
140
  s = current.strip()
141
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
142
+ try: current = json.loads(s)
143
+ except: pass
 
 
144
 
145
  if isinstance(current, dict) and key in current:
146
  current = current[key]
147
  else:
148
+ return None
149
  return current
150
 
151
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
 
 
 
152
  data = row.get(source_col)
 
 
153
  if isinstance(data, str):
154
+ try: data = json.loads(data)
155
+ except: return None
156
+ if not isinstance(data, list): return None
 
 
 
 
157
 
158
  matched_item = None
159
  for item in data:
 
160
  if str(item.get(filter_key, '')) == str(filter_val):
161
  matched_item = item
162
  break
163
 
164
  if matched_item:
165
  return self._get_value_by_path(matched_item, target_path)
 
166
  return None
167
 
168
  def _apply_projection(self, row, recipe):
 
 
 
 
169
  new_row = {}
 
 
170
  eval_context = row.copy()
171
  eval_context['row'] = row
172
  eval_context['json'] = json
 
179
  try:
180
  if t_type == 'simple':
181
  new_row[target_col] = self._get_value_by_path(row, col_def['source'])
 
182
  elif t_type == 'list_search':
183
  new_row[target_col] = self._extract_from_list_logic(
184
+ row, col_def['source'], col_def['filter_key'], col_def['filter_val'], col_def['target_key']
 
 
 
 
185
  )
 
186
  elif t_type == 'python':
187
+ val = eval(col_def['expression'], {}, eval_context)
 
 
188
  new_row[target_col] = val
 
189
  except Exception as e:
 
190
  raise ValueError(f"Column '{target_col}' failed: {str(e)}")
191
 
192
  return new_row
193
 
194
+ def _sanitize_for_json(self, obj):
195
+ """Helper to ensure objects are JSON serializable (fixes Preview crash)."""
196
+ if isinstance(obj, dict):
197
+ return {k: self._sanitize_for_json(v) for k, v in obj.items()}
198
+ elif isinstance(obj, list):
199
+ return [self._sanitize_for_json(v) for v in obj]
200
+ elif isinstance(obj, (str, int, float, bool, type(None))):
201
+ return obj
202
+ else:
203
+ return str(obj) # Convert Timestamps, Images, etc to string
204
+
205
  # ==========================================
206
+ # 3. PREVIEW & EXECUTE
207
  # ==========================================
208
 
209
+ def preview_transform(self, dataset_id, config, split, recipe):
210
+ conf = config if config != 'default' else None
211
+
212
+ try:
213
+ ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
214
+ processed = []
215
+
216
+ for i, row in enumerate(ds_stream):
217
+ if len(processed) >= 5: break
218
+
219
+ # Filter
220
+ passed = True
221
+ if recipe.get('filter_rule'):
222
+ try:
223
+ ctx = row.copy()
224
+ ctx['row'] = row
225
+ ctx['json'] = json
226
+ ctx['re'] = re
227
+ if not eval(recipe['filter_rule'], {}, ctx): passed = False
228
+ except: passed = False # Skip crashing rows in preview
229
+
230
+ if passed:
231
+ try:
232
+ projected = self._apply_projection(row, recipe)
233
+ # SANITIZE OUTPUT so Flask doesn't crash on Timestamps/Images
234
+ clean_projected = self._sanitize_for_json(projected)
235
+ processed.append(clean_projected)
236
+ except Exception as e:
237
+ processed.append({"_preview_error": f"Error: {str(e)}"})
238
+
239
+ return processed
240
+ except Exception as e:
241
+ raise e
242
+
243
  def _generate_card(self, source_id, target_id, recipe, license_name):
 
 
 
244
  card_data = DatasetCardData(
245
  language="en",
246
  license=license_name,
247
+ tags=["dataset-command-center", "etl"],
248
  base_model=source_id,
249
  )
 
250
  content = f"""
251
  # {target_id.split('/')[-1]}
 
252
  This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}).
253
+ ## Recipe
 
 
 
 
 
 
 
254
  """
255
  for col in recipe['columns']:
256
+ content += f"- **{col['name']}**: {col.get('type')} ({col.get('source') or 'expr'})\n"
257
+ content += f"\n**License:** {license_name}"
258
+ return DatasetCard.from_template(card_data, content=content)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
259
 
260
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
261
+ logger.info(f"Job: {source_id} -> {target_id}")
262
  conf = config if config != 'default' else None
263
 
264
  def gen():
265
  ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
266
  count = 0
267
  for i, row in enumerate(ds_stream):
268
+ if max_rows and count >= int(max_rows): break
 
269
 
 
270
  if recipe.get('filter_rule'):
271
  try:
272
  ctx = row.copy()
273
  ctx['row'] = row
274
  ctx['json'] = json
275
  ctx['re'] = re
276
+ if not eval(recipe['filter_rule'], {}, ctx): continue
 
277
  except Exception as e:
278
+ raise ValueError(f"Filter error row {i}: {e}")
279
 
 
280
  try:
281
  yield self._apply_projection(row, recipe)
282
  count += 1
283
+ except ValueError as ve: raise ve
284
+ except Exception as e: raise ValueError(f"Error row {i}: {e}")
 
 
 
285
 
286
  try:
 
287
  new_dataset = datasets.Dataset.from_generator(gen)
288
  new_dataset.push_to_hub(target_id, token=self.token)
 
 
289
  try:
290
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
291
  card.push_to_hub(target_id, token=self.token)
292
+ except: pass
 
 
293
  return {"status": "success", "rows_processed": len(new_dataset)}
 
294
  except Exception as e:
295
  logger.error(f"Job Failed: {e}")
296
+ return {"status": "failed", "error": str(e)}