| |
| import datetime |
| |
| import re |
| import json |
| import pandas as pd |
| |
| import gradio as gr |
| |
| from gliner import GLiNER |
| |
| from transformers import pipeline |
|
|
| |
| model = GLiNER.from_pretrained("chris32/gliner_multi_pii_real_state-v2") |
| model.eval() |
|
|
| |
| model_name = "chris32/distilbert-base-spanish-uncased-finetuned-text-intelligence" |
| pipe = pipeline(model = model_name, device = "cpu") |
|
|
| |
| YEAR_OF_REMODELING_LIMIT = 100 |
| CURRENT_YEAR = int(datetime.date.today().year) |
| SCORE_LIMIT_SIMILARITY_NAMES = 70 |
|
|
| def clean_text(text): |
| |
| replacement_char = " # " |
| text = re.sub(r'<br\s*\/?>', replacement_char, text) |
| |
| |
| cleaned_text = re.sub(r'<[^>]*>', '', text) |
| cleaned_text = re.sub(r' ', ' ', cleaned_text) |
| cleaned_text = re.sub(r'&', '&', cleaned_text) |
| |
| |
| |
| |
| |
| |
| cleaned_text = re.sub(r'\s+', ' ', cleaned_text) |
|
|
| |
| cleaned_text = cleaned_text.strip() |
| |
| |
| cleaned_text = cleaned_text.replace("..", ".").replace(",,", ",") |
| |
| return cleaned_text |
|
|
| def format_gliner_predictions(prediction): |
| if len(prediction) > 0: |
| |
| prediction_df = pd.DataFrame(prediction)\ |
| .sort_values("score", ascending = False)\ |
| .drop_duplicates(subset = "label", keep = "first") |
|
|
| |
| prediction_df["position"] = prediction_df.apply(lambda x: (x["start"], x["end"]) ,axis = 1) |
|
|
| |
| prediction_df["label_text"] = prediction_df["label"].apply(lambda x: f"pred_{x}") |
| prediction_df["label_prob"] = prediction_df["label"].apply(lambda x: f"prob_{x}") |
| prediction_df["label_position"] = prediction_df["label"].apply(lambda x: f"pos_{x}") |
|
|
| |
| entities = prediction_df.set_index("label_text")["text"].to_dict() |
| entities_probs = prediction_df.set_index("label_prob")["score"].to_dict() |
| entities_positions = prediction_df.set_index("label_position")["position"].to_dict() |
| predictions_formatted = {**entities, **entities_probs, **entities_positions} |
|
|
| return predictions_formatted |
| else: |
| return dict() |
| |
| def clean_prediction(row, feature_name, threshols_dict, clean_functions_dict): |
| |
| prediction = row[f"pred_{feature_name}"] |
| prob = row[f"prob_{feature_name}"] |
| |
| |
| if prob > threshols_dict[feature_name]: |
| clean_function = clean_functions_dict[feature_name] |
| prediction_clean = clean_function(prediction) |
| return prediction_clean |
| else: |
| return None |
| |
| surfaces_words_to_omit = ["ha", "hect", "lts", "litros", "mil"] |
| tower_name_key_words_to_keep = ["torr", "towe"] |
|
|
| def has_number(string): |
| return bool(re.search(r'\d', string)) |
|
|
| def contains_multiplication(string): |
| |
| pattern = r'\b([\d,]+(?:\.\d+)?)\s*(?:\w+\s*)*[xX]\s*([\d,]+(?:\.\d+)?)\s*(?:\w+\s*)*\b' |
| |
| |
| match = re.search(pattern, string) |
| |
| |
| if match: |
| return True |
| else: |
| return False |
|
|
| def extract_first_number_from_string(text): |
| if isinstance(text, str): |
| match = re.search(r'\b\d*\.?\d+\b|\d*\.?\d+', text) |
| if match: |
| start_pos = match.start() |
| end_pos = match.end() |
| number = int(float(match.group())) |
| return number, start_pos, end_pos |
| else: |
| return None, None, None |
| else: |
| return None, None, None |
| |
| def get_character(string, index): |
| if len(string) > index: |
| return string[index] |
| else: |
| return None |
| |
| def find_valid_comma_separated_number(string): |
| |
| match = re.match(r'^(\d{1,3},\d{3})(?:[^0-9,]|$)', string) |
| if match: |
| valid_number = int(match.group(1).replace(",", "")) |
| return valid_number |
| else: |
| return None |
|
|
| def extract_surface_from_string(string: str) -> int: |
| if isinstance(string, str): |
| |
| if not(has_number(string)): return None |
|
|
| |
| if contains_multiplication(string): return None |
|
|
| |
| if any([word in string.lower() for word in surfaces_words_to_omit]): return None |
|
|
| |
| number, start_pos, end_pos = extract_first_number_from_string(string) |
|
|
| |
| if isinstance(number, int): |
| if get_character(string, end_pos) == ",": |
| valid_comma_separated_number = find_valid_comma_separated_number(string[start_pos: -1]) |
| return valid_comma_separated_number |
| else: |
| return number |
| else: |
| return None |
| else: |
| return None |
| |
| def clean_prediction(row, feature_name, threshols_dict, clean_functions_dict): |
| |
| prediction = row[f"pred_{feature_name}"] |
| prob = row[f"prob_{feature_name}"] |
| |
| |
| if prob > threshols_dict[feature_name]: |
| clean_function = clean_functions_dict[feature_name] |
| prediction_clean = clean_function(prediction) |
| return prediction_clean |
| else: |
| return None |
|
|
| def extract_remodeling_year_from_string(string): |
| if isinstance(string, str): |
| |
| match = re.search(r'\b\d{4}\b', string) |
| if match: |
| year_predicted = int(match.group()) |
| else: |
| |
| match = re.search(r'(\d+) (year|years|anio|año|an|añ)', string.lower(), re.IGNORECASE) |
| if match: |
| past_years_predicted = int(match.group(1)) |
| year_predicted = CURRENT_YEAR - past_years_predicted |
| else: |
| return None |
| |
| |
| is_valid_year = (year_predicted <= CURRENT_YEAR) and (YEAR_OF_REMODELING_LIMIT > CURRENT_YEAR - year_predicted) |
| return year_predicted if is_valid_year else None |
| |
| return None |
|
|
| def extract_valid_string_left_dotted(string, text, pos): |
| if isinstance(string, str): |
| |
| left_pos, rigth_pos = pos |
|
|
| |
| if left_pos < 5: |
| return None |
|
|
| if string[0].isdigit(): |
| |
| sub_text = text[left_pos - 5: rigth_pos] |
|
|
| |
| if text[left_pos - 1] == ".": |
|
|
| |
| if text[left_pos - 2].isdigit(): |
|
|
| |
| pattern = r'^(?![\d.,])\D*\d{1,3}\.' + re.escape(string) |
| match = re.search(pattern, sub_text) |
| if match: |
| return match.group(0) |
| else: |
| return None |
| else: |
| return string |
| else: |
| return string |
| else: |
| return string |
| else: |
| return None |
| |
| |
| clean_functions_dict = { |
| "SUPERFICIE_TERRAZA": extract_surface_from_string, |
| "SUPERFICIE_JARDIN": extract_surface_from_string, |
| "SUPERFICIE_TERRENO": extract_surface_from_string, |
| "SUPERFICIE_HABITABLE": extract_surface_from_string, |
| "SUPERFICIE_BALCON": extract_surface_from_string, |
| "AÑO_REMODELACIÓN": extract_remodeling_year_from_string, |
| "NOMBRE_COMPLETO_ARQUITECTO": lambda x: x, |
| 'NOMBRE_CLUB_GOLF': lambda x: x, |
| 'NOMBRE_TORRE': lambda x: x, |
| 'NOMBRE_CONDOMINIO': lambda x: x, |
| 'NOMBRE_DESARROLLO': lambda x: x, |
| } |
|
|
| threshols_dict = { |
| "SUPERFICIE_TERRAZA": 0.9, |
| "SUPERFICIE_JARDIN": 0.9, |
| "SUPERFICIE_TERRENO": 0.9, |
| "SUPERFICIE_HABITABLE": 0.9, |
| "SUPERFICIE_BALCON": 0.9, |
| "AÑO_REMODELACIÓN": 0.9, |
| "NOMBRE_COMPLETO_ARQUITECTO": 0.9, |
| 'NOMBRE_CLUB_GOLF': 0.9, |
| 'NOMBRE_TORRE': 0.9, |
| 'NOMBRE_CONDOMINIO': 0.9, |
| 'NOMBRE_DESARROLLO': 0.9, |
| } |
|
|
| threshols_dict = { |
| "SUPERFICIE_BALCON": 0.7697697697697697, |
| "SUPERFICIE_TERRAZA": 0.953953953953954, |
| "SUPERFICIE_JARDIN": 0.9519519519519519, |
| "SUPERFICIE_TERRENO": 0.980980980980981 - 0.05, |
| "SUPERFICIE_HABITABLE": 0.978978978978979 - 0.02, |
| "AÑO_REMODELACIÓN": 0.996996996996997 - 0.01, |
| "NOMBRE_COMPLETO_ARQUITECTO": 0.8878878878878879, |
| "NOMBRE_CLUB_GOLF": 0.8708708708708709, |
| "NOMBRE_TORRE": 0.8458458458458459 - 0.04, |
| "NOMBRE_CONDOMINIO": 0.965965965965966, |
| "NOMBRE_DESARROLLO": 0.9229229229229229 |
| } |
|
|
| label_names_dict = { |
| 'LABEL_0': None, |
| 'LABEL_1': 1, |
| 'LABEL_2': 2, |
| 'LABEL_3': 3, |
| } |
| BERT_SCORE_LIMIT = 0.980819808198082 |
|
|
| def extract_max_label_score(probabilities): |
| |
| max_item = max(probabilities, key=lambda x: x['score']) |
| |
| label = max_item['label'] |
| score = max_item['score'] |
|
|
| return label, score |
|
|
| def clean_prediction_bert(label, score): |
| if score > BERT_SCORE_LIMIT: |
| label_formatted = label_names_dict.get(label, None) |
| return label_formatted |
| else: |
| return None |
| |
| |
| pipe_config = { |
| "batch_size": 8, |
| "truncation": True, |
| "max_length": 250, |
| "add_special_tokens": True, |
| "return_all_scores": True, |
| "padding": True, |
| } |
|
|
| def generate_answer(text): |
| labels = [ |
| 'SUPERFICIE_JARDIN', |
| 'NOMBRE_CLUB_GOLF', |
| 'SUPERFICIE_TERRENO', |
| 'SUPERFICIE_HABITABLE', |
| 'SUPERFICIE_TERRAZA', |
| 'NOMBRE_COMPLETO_ARQUITECTO', |
| 'SUPERFICIE_BALCON', |
| 'NOMBRE_DESARROLLO', |
| 'NOMBRE_TORRE', |
| 'NOMBRE_CONDOMINIO', |
| 'AÑO_REMODELACIÓN' |
| ] |
|
|
| |
| text = clean_text(text) |
| |
| |
| entities = model.predict_entities(text, labels, threshold=0.4) |
|
|
| |
| entities_formatted = format_gliner_predictions(entities) |
|
|
| |
| feature_surfaces = ['SUPERFICIE_BALCON', 'SUPERFICIE_TERRAZA', 'SUPERFICIE_JARDIN', 'SUPERFICIE_TERRENO', 'SUPERFICIE_HABITABLE'] |
| for feature_name in feature_surfaces: |
| if entities_formatted.get(f"pred_{feature_name}", None) != None: |
| entities_formatted[f"pred_{feature_name}"] = extract_valid_string_left_dotted(entities_formatted[f"pred_{feature_name}"], text, entities_formatted[f"pos_{feature_name}"]) |
|
|
| |
| entities_names = list({c.replace("pred_", "").replace("prob_", "").replace("pos_", "") for c in list(entities_formatted.keys())}) |
| entities_cleaned = dict() |
| for feature_name in entities_names: |
| entity_prediction_cleaned = clean_prediction(entities_formatted, feature_name, threshols_dict, clean_functions_dict) |
| if isinstance(entity_prediction_cleaned, str) or isinstance(entity_prediction_cleaned, int): |
| entities_cleaned[feature_name] = entity_prediction_cleaned |
| |
| |
| predictions = pipe([text], **pipe_config) |
|
|
| |
| label, score = extract_max_label_score(predictions[0]) |
| entities_formatted["NIVELES_CASA"] = label |
| entities_formatted["prob_NIVELES_CASA"] = score |
| prediction_cleaned = clean_prediction_bert(label, score) |
| if isinstance(prediction_cleaned, int): |
| entities_cleaned["NIVELES_CASA"] = prediction_cleaned |
| |
|
|
| result_json = json.dumps(entities_cleaned, indent = 4, ensure_ascii = False) |
|
|
| return "Clean Result:" + result_json + "\n \n" + "Raw Result:" + json.dumps(entities_formatted, indent = 4, ensure_ascii = False) |
|
|
| |
| |
|
|
| iface = gr.Interface( |
| fn=generate_answer, |
| inputs="text", |
| outputs="text", |
| title="Text Intelligence for Real State", |
| description="Input text describing the property." |
| ) |
|
|
| iface.launch() |
|
|