| import gradio as gr |
| import requests |
| import pandas as pd |
| from sklearn.linear_model import LogisticRegression |
| from fastai.vision.all import * |
| import os |
| from datetime import datetime, timedelta |
|
|
| |
| learn = load_learner('fog_classifier.pkl') |
| labels = learn.dls.vocab |
|
|
| API_KEY = os.environ.get("OPENWEATHER_API_KEY") |
| BASE_URL = 'https://api.openweathermap.org/data/2.5/' |
|
|
| def predict_image(img): |
| """Predict fog conditions from image and return confidence scores""" |
| img = PILImage.create(img) |
| img = img.resize((512, 512)) |
| pred, pred_idx, probs = learn.predict(img) |
| return {labels[i]: float(probs[i]) for i in range(len(labels))} |
|
|
| def calculate_fog_risk_score(weather_data): |
| """Calculate a fog risk score (0-1) based on weather conditions""" |
| |
| weights = { |
| 'humidity': 0.3, |
| 'dew_point_temp_diff': 0.3, |
| 'visibility': 0.2, |
| 'wind_speed': 0.1, |
| 'pressure_change': 0.1 |
| } |
| |
| |
| dew_point = weather_data['temperature'] - ((100 - weather_data['humidity']) / 5.0) |
| dew_point_temp_diff = abs(weather_data['temperature'] - dew_point) |
| |
| |
| humidity_score = min(weather_data['humidity'] / 100, 1) |
| dew_point_score = 1 - min(dew_point_temp_diff / 5, 1) |
| visibility_score = 1 - min(weather_data['visibility'] / 10, 1) |
| wind_score = 1 - min(weather_data['wind_speed'] / 10, 1) |
| pressure_score = min(abs(weather_data['pressure'] - 1013.25) / 50, 1) |
| |
| |
| fog_risk = ( |
| weights['humidity'] * humidity_score + |
| weights['dew_point_temp_diff'] * dew_point_score + |
| weights['visibility'] * visibility_score + |
| weights['wind_speed'] * wind_score + |
| weights['pressure_change'] * pressure_score |
| ) |
| |
| return fog_risk |
|
|
| def get_weather_data(location): |
| """Get current weather data with enhanced error handling""" |
| try: |
| current_weather_url = f'{BASE_URL}weather?q={location}&appid={API_KEY}&units=metric' |
| response = requests.get(current_weather_url) |
| response.raise_for_status() |
| data = response.json() |
| |
| return { |
| 'temperature': data['main'].get('temp', 0), |
| 'feels_like': data['main'].get('feels_like', 0), |
| 'description': data['weather'][0].get('description', ''), |
| 'wind_speed': data['wind'].get('speed', 0), |
| 'pressure': data['main'].get('pressure', 0), |
| 'humidity': data['main'].get('humidity', 0), |
| 'visibility': data.get('visibility', 10000) / 1000, |
| 'timestamp': datetime.fromtimestamp(data['dt']) |
| } |
| except requests.exceptions.RequestException as e: |
| raise Exception(f"Failed to fetch weather data: {str(e)}") |
|
|
| def get_forecast_data(location): |
| """Get 5-day forecast with enhanced error handling""" |
| try: |
| forecast_url = f'{BASE_URL}forecast?q={location}&appid={API_KEY}&units=metric' |
| response = requests.get(forecast_url) |
| response.raise_for_status() |
| data = response.json() |
| |
| forecasts = [] |
| for item in data['list']: |
| forecasts.append({ |
| 'temperature': item['main'].get('temp', 0), |
| 'humidity': item['main'].get('humidity', 0), |
| 'description': item['weather'][0].get('description', ''), |
| 'wind_speed': item['wind'].get('speed', 0), |
| 'pressure': item['main'].get('pressure', 0), |
| 'visibility': item.get('visibility', 10000) / 1000, |
| 'timestamp': datetime.fromtimestamp(item['dt']) |
| }) |
| return forecasts |
| except requests.exceptions.RequestException as e: |
| raise Exception(f"Failed to fetch forecast data: {str(e)}") |
|
|
| def format_duration(duration): |
| """Format timedelta into days and hours string""" |
| total_hours = duration.total_seconds() / 3600 |
| days = int(total_hours // 24) |
| hours = int(total_hours % 24) |
| |
| if days > 0: |
| return f"{days} days and {hours} hours" |
| return f"{hours} hours" |
|
|
| def determine_transmission_power(image_prediction, weather_data, forecast_data=None): |
| """ |
| Determine transmission power based on current conditions and forecast |
| Returns: (power_level, duration, explanation) |
| """ |
| |
| image_fog_confidence = max( |
| image_prediction.get('Dense_Fog', 0), |
| image_prediction.get('Moderate_Fog', 0) * 0.6 |
| ) |
| |
| |
| current_fog_risk = calculate_fog_risk_score(weather_data) |
| |
| |
| |
| combined_fog_risk = (image_fog_confidence * 0.6) + (current_fog_risk * 0.4) |
| |
| |
| explanation = [] |
| |
| |
| if combined_fog_risk > 0.7: |
| power_level = "High" |
| explanation.append(f"High fog risk detected (Risk score: {combined_fog_risk:.2f})") |
| elif combined_fog_risk > 0.4: |
| power_level = "Medium" |
| explanation.append(f"Moderate fog risk detected (Risk score: {combined_fog_risk:.2f})") |
| else: |
| power_level = "Normal" |
| explanation.append(f"Low fog risk detected (Risk score: {combined_fog_risk:.2f})") |
| |
| |
| duration = timedelta(hours=1) |
| if forecast_data: |
| future_risks = [] |
| for forecast in forecast_data[:40]: |
| risk = calculate_fog_risk_score(forecast) |
| future_risks.append(risk) |
| |
| |
| high_risk_periods = [risk > 0.6 for risk in future_risks] |
| if any(high_risk_periods): |
| |
| last_high_risk_idx = len(high_risk_periods) - 1 - high_risk_periods[::-1].index(True) |
| duration = forecast_data[last_high_risk_idx]['timestamp'] - weather_data['timestamp'] |
| |
| explanation.append(f"High fog risk predicted to continue for {format_duration(duration)}") |
| |
| |
| if sum(high_risk_periods) / len(high_risk_periods) > 0.5: |
| power_level = "High" |
| explanation.append("Power level set to High due to sustained fog risk in forecast") |
| |
| return power_level, duration, explanation |
|
|
| def integrated_prediction(image, location): |
| """Main function to process image and weather data""" |
| try: |
| |
| image_prediction = predict_image(image) |
| |
| |
| current_weather = get_weather_data(location) |
| |
| |
| forecast_data = get_forecast_data(location) |
| |
| |
| power_level, duration, explanation = determine_transmission_power( |
| image_prediction, |
| current_weather, |
| forecast_data |
| ) |
| |
| |
| result = [ |
| f"Current Conditions ({current_weather['timestamp'].strftime('%Y-%m-%d %H:%M')})", |
| f"Temperature: {current_weather['temperature']:.1f}°C", |
| f"Humidity: {current_weather['humidity']}%", |
| f"Visibility: {current_weather['visibility']:.1f} km", |
| f"Wind Speed: {current_weather['wind_speed']} m/s", |
| "", |
| "Analysis Results:", |
| *explanation, |
| "", |
| f"Recommended Power Level: {power_level}", |
| f"Duration: {format_duration(duration)}", |
| "", |
| "5-Day Forecast Summary:" |
| ] |
| |
| |
| current_date = current_weather['timestamp'].date() |
| for day in range(5): |
| forecast_date = current_date + timedelta(days=day) |
| day_forecasts = [f for f in forecast_data if f['timestamp'].date() == forecast_date] |
| |
| if day_forecasts: |
| avg_risk = sum(calculate_fog_risk_score(f) for f in day_forecasts) / len(day_forecasts) |
| result.append(f"{forecast_date.strftime('%Y-%m-%d')}: " |
| f"Fog Risk: {'High' if avg_risk > 0.6 else 'Moderate' if avg_risk > 0.3 else 'Low'} " |
| f"({avg_risk:.2f})") |
| |
| return "\n".join(result) |
| |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# Enhanced Fog Prediction and Transmission Power System") |
| |
| with gr.Row(): |
| image_input = gr.Image(label="Upload Current Conditions Image") |
| location_input = gr.Textbox(label="Enter Location") |
| |
| predict_button = gr.Button("Analyze and Determine Transmission Power") |
| output = gr.Textbox(label="Analysis Results", lines=15) |
| |
| predict_button.click(integrated_prediction, inputs=[image_input, location_input], outputs=output) |
|
|
| demo.launch() |