|
|
| """
|
| Skrypt do masowego przetwarzania plik贸w JSONL w celu klasyfikacji jako艣ci tekstu.
|
|
|
| Ten modu艂 jest przeznaczony do wydajnej analizy du偶ych zbior贸w danych.
|
| Skanuje folder wej艣ciowy w poszukiwaniu plik贸w .jsonl, przetwarza ka偶dy z nich
|
| r贸wnolegle z u偶yciem wielu proces贸w (`multiprocessing`), a nast臋pnie zapisuje
|
| wyniki do nowego pliku w folderze wyj艣ciowym, zachowuj膮c oryginaln膮 struktur臋
|
| danych i dodaj膮c wyniki klasyfikacji.
|
| """
|
|
|
|
|
| import os
|
| import glob
|
| import time
|
| import pickle
|
| import joblib
|
| import pandas as pd
|
| import json
|
| import numpy as np
|
| from tqdm import tqdm
|
| from typing import List
|
|
|
| from text_analyzer.analyzer import TextAnalyzer
|
| from text_analyzer import constants
|
|
|
|
|
|
|
| with open('models/scaler.pkl', 'rb') as f:
|
| scaler = pickle.load(f)
|
| classifier = joblib.load("models/model.joblib")
|
| text_analyzer = TextAnalyzer()
|
|
|
| batch_size = 10
|
|
|
| class NumpyJSONEncoder(json.JSONEncoder):
|
| """
|
| Specjalny enkoder JSON do obs艂ugi typ贸w danych z NumPy,
|
| kt贸re nie s膮 domy艣lnie serializowalne.
|
| """
|
| def default(self, obj):
|
| if isinstance(obj, np.integer):
|
| return int(obj)
|
| if isinstance(obj, np.floating):
|
| return float(obj)
|
| if isinstance(obj, np.ndarray):
|
| return obj.tolist()
|
| return super(NumpyJSONEncoder, self).default(obj)
|
|
|
|
|
|
|
| def predict_batch(texts: List[str], analyzer: TextAnalyzer, scaler_model, classifier_model) -> List[tuple[str | None, float | None]]:
|
| """
|
| Przetwarza ca艂膮 list臋 tekst贸w wsadowo i zwraca list臋 predykcji.
|
| """
|
| all_features = []
|
|
|
|
|
| feature_generator = analyzer.analyze_batch(texts, batch_size=batch_size)
|
| for features_dict in tqdm(feature_generator, total=len(texts), desc="Analiza cech"):
|
| ordered_features = [features_dict.get(fname, 0.0) for fname in constants.COLUMN_ORDER]
|
| all_features.append(ordered_features)
|
|
|
| if not all_features:
|
| return []
|
|
|
|
|
| features_df = pd.DataFrame(all_features, columns=constants.COLUMN_ORDER)
|
| features_scaled = scaler_model.transform(features_df)
|
|
|
|
|
| pred_probas = classifier_model.predict_proba(features_scaled)
|
|
|
|
|
| results = []
|
| labels = ["LOW", "MEDIUM", "HIGH"]
|
| for single_pred_proba in pred_probas:
|
| category_prob = {
|
| label: prob
|
| for label, prob in zip(labels, single_pred_proba)
|
| }
|
|
|
| sorted_category_prob = sorted(category_prob.items(), key=lambda item: item[1], reverse=True)
|
| most_probable_category, confidence = sorted_category_prob[0]
|
|
|
| results.append((most_probable_category, round(float(confidence) * 100, 2)))
|
|
|
| return results
|
|
|
| def process_jsonl_file(input_file: str, output_file: str):
|
| """Orkiestruje proces przetwarzania pojedynczego pliku .jsonl wsadowo."""
|
| original_data = []
|
| texts_to_process = []
|
| try:
|
| with open(input_file, 'r', encoding='utf-8') as f:
|
| for line in f:
|
| json_object = json.loads(line)
|
| original_data.append(json_object)
|
| texts_to_process.append(json_object.get('text', ''))
|
| except Exception as e:
|
| print(f"Nie uda艂o si臋 wczyta膰 pliku {input_file}. B艂膮d: {e}")
|
| return
|
|
|
| print(f"Wczytano {len(texts_to_process)} wierszy. Rozpoczynam przetwarzanie wsadowe...")
|
|
|
|
|
| results = predict_batch(texts_to_process, text_analyzer, scaler, classifier)
|
|
|
|
|
| try:
|
| with open(output_file, 'w', encoding='utf-8') as f:
|
| for i, (category, confidence) in enumerate(results):
|
| output_object = original_data[i]
|
| output_object['quality_ai'] = category
|
| output_object['confidence'] = confidence
|
| json_line = json.dumps(output_object, ensure_ascii=False, cls=NumpyJSONEncoder)
|
| f.write(json_line + '\n')
|
| except Exception as e:
|
| print(f"Nie uda艂o si臋 zapisa膰 pliku {output_file}. B艂膮d: {e}")
|
|
|
|
|
|
|
| if __name__ == '__main__':
|
| print("Inicjalizacja skryptu przetwarzania wsadowego...")
|
|
|
| INPUT_FOLDER = 'input_jsonl'
|
| OUTPUT_FOLDER = 'output'
|
| os.makedirs(OUTPUT_FOLDER, exist_ok=True)
|
|
|
|
|
| jsonl_files = glob.glob(os.path.join(INPUT_FOLDER, '*.jsonl'))
|
|
|
| for file_path in jsonl_files:
|
| start_time = time.time()
|
| output_file = os.path.join(OUTPUT_FOLDER, os.path.basename(file_path))
|
|
|
| if os.path.exists(output_file):
|
| print(f"POMIJAM - plik ju偶 istnieje: {output_file}")
|
| continue
|
|
|
| print(f"\n--- Przetwarzanie pliku: {file_path} ---")
|
| process_jsonl_file(file_path, output_file)
|
| end_time = time.time()
|
| print(f"Processing time: {end_time - start_time:.4f} seconds")
|
|
|
| print("\nWszystkie pliki zosta艂y przetworzone!") |