El sistema de predicción de demanda es el componente de inteligencia artificial que anticipa las necesidades de transporte futuras, permitiendo a SIGA posicionar proactivamente los activos antes de que surja la demanda.
class ARIMADemandPredictor:
"""Predictor basado en series temporales ARIMA"""
def __init__(self, zone_id: str):
self.zone_id = zone_id
self.model = None
self.order = (5, 1, 2) # (p, d, q) determinado por grid search
self.seasonal_order = (1, 1, 1, 24) # Patrón diario
def fit(self, historical_data: pd.DataFrame):
"""Entrena el modelo con datos históricos"""
# Preparar serie temporal
ts = historical_data.set_index('timestamp')['demand']
# Detectar y remover outliers
ts_clean = self._remove_outliers(ts)
# Entrenar modelo ARIMA
self.model = ARIMA(
ts_clean,
order=self.order,
seasonal_order=self.seasonal_order,
enforce_stationarity=False,
enforce_invertibility=False
)
self.model_fit = self.model.fit(disp=False)
# Métricas de ajuste
self.metrics = {
'aic': self.model_fit.aic,
'bic': self.model_fit.bic,
'mape': self._calculate_mape(ts_clean, self.model_fit.fittedvalues)
}
def predict(self, horizon_hours: int) -> DemandForecast:
"""Predice demanda futura"""
# Predicción
forecast = self.model_fit.forecast(steps=horizon_hours)
# Intervalos de confianza
forecast_df = pd.DataFrame({
'prediction': forecast,
'lower_bound': forecast - 1.96 * forecast.std(),
'upper_bound': forecast + 1.96 * forecast.std()
})
return DemandForecast(
zone_id=self.zone_id,
predictions=forecast_df,
confidence_level=0.95,
model_type='ARIMA'
)
class LSTMDemandPredictor:
"""Red neuronal LSTM para capturar patrones complejos"""
def __init__(self, zone_id: str):
self.zone_id = zone_id
self.sequence_length = 168 # Una semana de datos horarios
self.model = self._build_model()
def _build_model(self):
"""Construye arquitectura LSTM"""
model = Sequential([
LSTM(128, return_sequences=True,
input_shape=(self.sequence_length, 15)), # 15 features
Dropout(0.2),
LSTM(64, return_sequences=True),
Dropout(0.2),
LSTM(32),
Dropout(0.2),
Dense(24) # Predice próximas 24 horas
])
model.compile(
optimizer=Adam(learning_rate=0.001),
loss='huber', # Robusto a outliers
metrics=['mae', 'mape']
)
return model
def prepare_features(self, data: pd.DataFrame) -> np.ndarray:
"""Prepara features para el modelo"""
features = []
# Demanda histórica
features.append(data['demand'].values)
# Codificación temporal
features.append(np.sin(2 * np.pi * data['hour'] / 24))
features.append(np.cos(2 * np.pi * data['hour'] / 24))
features.append(np.sin(2 * np.pi * data['day_of_week'] / 7))
features.append(np.cos(2 * np.pi * data['day_of_week'] / 7))
# Indicadores económicos
features.append(data['fuel_price'].values)
features.append(data['economic_index'].values)
# Eventos especiales
features.append(data['is_holiday'].values)
features.append(data['special_event'].values)
# Clima
features.append(data['temperature'].values)
features.append(data['precipitation'].values)
# Competencia
features.append(data['competitor_vehicles'].values)
# Tendencias
features.append(data['trend_7d'].values)
features.append(data['trend_30d'].values)
# Volatilidad
features.append(data['demand_std_7d'].values)
return np.stack(features, axis=-1)
class EnsembleDemandPredictor:
"""Combina múltiples modelos para mayor precisión"""
def __init__(self, zone_id: str):
self.zone_id = zone_id
self.models = {
'arima': ARIMADemandPredictor(zone_id),
'lstm': LSTMDemandPredictor(zone_id),
'xgboost': XGBoostDemandPredictor(zone_id),
'prophet': ProphetDemandPredictor(zone_id)
}
self.weights = None
def train_all_models(self, data: pd.DataFrame):
"""Entrena todos los modelos base"""
# Dividir datos para validación
train_data = data[:-168] # Última semana para validación
val_data = data[-168:]
predictions = {}
for name, model in self.models.items():
# Entrenar
model.fit(train_data)
# Validar
pred = model.predict(168)
predictions[name] = pred
# Optimizar pesos
self.weights = self._optimize_weights(predictions, val_data)
def predict(self, horizon_hours: int) -> DemandForecast:
"""Predicción ensemble ponderada"""
predictions = {}
# Obtener predicciones de cada modelo
for name, model in self.models.items():
predictions[name] = model.predict(horizon_hours)
# Combinar con pesos óptimos
ensemble_pred = np.zeros(horizon_hours)
for name, pred in predictions.items():
ensemble_pred += self.weights[name] * pred.values
# Calcular incertidumbre
uncertainty = self._calculate_uncertainty(predictions)
return DemandForecast(
zone_id=self.zone_id,
predictions=ensemble_pred,
uncertainty=uncertainty,
model_contributions=self.weights
)
class TemporalPatternAnalyzer:
"""Identifica y cuantifica patrones temporales en la demanda"""
def analyze_patterns(self, data: pd.DataFrame) -> PatternReport:
"""Análisis completo de patrones"""
patterns = PatternReport()
# Patrón diario
patterns.daily = self._analyze_daily_pattern(data)
# Patrón semanal
patterns.weekly = self._analyze_weekly_pattern(data)
# Patrón mensual
patterns.monthly = self._analyze_monthly_pattern(data)
# Estacionalidad
patterns.seasonal = self._analyze_seasonality(data)
# Tendencias
patterns.trends = self._analyze_trends(data)
return patterns
def _analyze_daily_pattern(self, data: pd.DataFrame) -> DailyPattern:
"""Identifica patrón de 24 horas"""
hourly_avg = data.groupby('hour')['demand'].agg(['mean', 'std'])
# Identificar horas pico
peak_hours = hourly_avg.nlargest(3, 'mean').index.tolist()
# Identificar horas valle
valley_hours = hourly_avg.nsmallest(3, 'mean').index.tolist()
# Calcular variabilidad
cv = hourly_avg['std'] / hourly_avg['mean']
return DailyPattern(
peak_hours=peak_hours,
valley_hours=valley_hours,
hourly_profile=hourly_avg['mean'].to_dict(),
variability=cv.mean(),
pattern_strength=self._calculate_pattern_strength(hourly_avg)
)
class AnomalyDetector:
"""Detecta y clasifica anomalías en demanda"""
def __init__(self):
self.isolation_forest = IsolationForest(
contamination=0.05,
random_state=42
)
def detect_anomalies(self, data: pd.DataFrame) -> AnomalyReport:
"""Detecta puntos anómalos en la demanda"""
# Features para detección
features = self._prepare_features(data)
# Entrenar detector
anomaly_labels = self.isolation_forest.fit_predict(features)
# Analizar anomalías
anomalies = data[anomaly_labels == -1].copy()
# Clasificar tipos
anomalies['type'] = anomalies.apply(
self._classify_anomaly, axis=1
)
# Análisis de impacto
impact_analysis = self._analyze_anomaly_impact(anomalies, data)
return AnomalyReport(
anomalies=anomalies,
types_distribution=anomalies['type'].value_counts(),
impact_analysis=impact_analysis,
recommendations=self._generate_recommendations(anomalies)
)
def _classify_anomaly(self, row: pd.Series) -> str:
"""Clasifica el tipo de anomalía"""
if row['is_holiday']:
return 'holiday_effect'
elif row['special_event']:
return 'event_driven'
elif row['demand'] > row['demand_mean'] + 3 * row['demand_std']:
return 'demand_spike'
elif row['demand'] < row['demand_mean'] - 3 * row['demand_std']:
return 'demand_drop'
else:
return 'complex_pattern'
class WeatherIntegration:
"""Integra predicciones meteorológicas en el modelo"""
def __init__(self):
self.weather_api = WeatherAPIClient()
self.impact_models = {}
def get_weather_features(self, zone_id: str,
horizon_hours: int) -> pd.DataFrame:
"""Obtiene features meteorológicas para predicción"""
# Obtener predicción meteorológica
weather_forecast = self.weather_api.get_forecast(
zone_id=zone_id,
hours=horizon_hours
)
# Transformar a features
features = pd.DataFrame({
'temperature': weather_forecast['temperature'],
'precipitation': weather_forecast['precipitation'],
'wind_speed': weather_forecast['wind_speed'],
'visibility': weather_forecast['visibility'],
'weather_severity': self._calculate_severity(weather_forecast)
})
return features
def calculate_weather_impact(self, weather_features: pd.DataFrame,
base_demand: float) -> float:
"""Calcula el impacto del clima en la demanda"""
impact_multiplier = 1.0
# Lluvia reduce demanda
if weather_features['precipitation'] > 10:
impact_multiplier *= 0.85
# Nieve reduce más
if weather_features['temperature'] < 0 and \
weather_features['precipitation'] > 5:
impact_multiplier *= 0.70
# Condiciones extremas
if weather_features['weather_severity'] > 7:
impact_multiplier *= 0.60
return base_demand * impact_multiplier
class EventCalendarIntegration:
"""Gestiona el impacto de eventos en la demanda"""
def __init__(self):
self.event_database = EventDatabase()
self.holiday_calendar = HolidayCalendar()
def get_event_features(self, zone_id: str,
date_range: DateRange) -> pd.DataFrame:
"""Obtiene features de eventos para el período"""
events = []
# Festivos nacionales y regionales
holidays = self.holiday_calendar.get_holidays(
zone_id, date_range
)
# Eventos deportivos
sports_events = self.event_database.get_sports_events(
zone_id, date_range
)
# Ferias y congresos
business_events = self.event_database.get_business_events(
zone_id, date_range
)
# Crear features
features = pd.DataFrame({
'is_holiday': [1 if date in holidays else 0
for date in date_range],
'is_bridge_day': self._detect_bridge_days(holidays),
'major_event': self._encode_major_events(
sports_events + business_events
),
'event_magnitude': self._estimate_event_magnitude(events),
'days_to_event': self._calculate_days_to_event(events)
})
return features
class DemandSpikeDetector:
"""Detecta y alerta sobre picos inminentes de demanda"""
def __init__(self):
self.threshold_multiplier = 1.5 # 50% sobre media
self.alert_horizon = 24 # Alertar con 24h de anticipación
def scan_for_spikes(self, predictions: Dict[str, DemandForecast]) -> List[Alert]:
"""Escanea predicciones buscando picos"""
alerts = []
for zone_id, forecast in predictions.items():
# Obtener estadísticas base
zone_stats = self._get_zone_statistics(zone_id)
# Buscar picos en predicción
spike_threshold = zone_stats['mean'] * self.threshold_multiplier
spike_hours = forecast.predictions[
forecast.predictions > spike_threshold
].index
if len(spike_hours) > 0:
alert = Alert(
type='demand_spike',
zone_id=zone_id,
severity=self._calculate_severity(
forecast.predictions[spike_hours].max(),
zone_stats['mean']
),
message=f"Pico de demanda esperado en {zone_id}",
recommended_action=self._generate_recommendation(
zone_id, spike_hours[0]
),
confidence=forecast.confidence,
time_to_event=spike_hours[0]
)
alerts.append(alert)
return alerts
class ProactiveRecommendationEngine:
"""Genera recomendaciones basadas en predicciones"""
def generate_fleet_recommendations(self,
current_state: FleetState,
predictions: Dict[str, DemandForecast],
horizon_hours: int = 48) -> List[Recommendation]:
"""Genera recomendaciones de reposicionamiento"""
recommendations = []
# Analizar desbalances futuros
future_imbalances = self._predict_imbalances(
current_state, predictions, horizon_hours
)
for imbalance in future_imbalances:
if imbalance.severity > 0.7: # Severo
rec = self._create_rebalancing_recommendation(
imbalance, current_state
)
recommendations.append(rec)
# Oportunidades de alta demanda
opportunities = self._identify_opportunities(predictions)
for opp in opportunities:
if opp.expected_value > 1000: # Umbral de valor
rec = self._create_opportunity_recommendation(
opp, current_state
)
recommendations.append(rec)
# Priorizar y filtrar
return self._prioritize_recommendations(recommendations)
def create_prediction_dashboard(zones: List[str]) -> Dashboard:
"""Crea dashboard interactivo de predicciones"""
dashboard = Dashboard(title="Predicción de Demanda SIGA")
# Panel 1: Mapa de calor predictivo
heatmap = create_predictive_heatmap(zones)
dashboard.add_panel(heatmap, position='top-left', size='large')
# Panel 2: Series temporales por zona
for zone in zones[:5]: # Top 5 zonas
ts_chart = create_time_series_chart(
zone_id=zone,
show_confidence_bands=True,
horizon_hours=72
)
dashboard.add_panel(ts_chart, position='right', size='medium')
# Panel 3: Alertas y recomendaciones
alerts_panel = create_alerts_panel()
dashboard.add_panel(alerts_panel, position='bottom', size='full-width')
# Panel 4: Métricas de precisión
accuracy_panel = create_accuracy_metrics_panel()
dashboard.add_panel(accuracy_panel, position='bottom-right', size='small')
return dashboard
| Métrica | Fórmula | Target | Actual |
|---|---|---|---|
| MAPE | Mean Absolute Percentage Error | <15% | 12.3% |
| RMSE | Root Mean Square Error | <50 | 42.7 |
| Hit Rate | % predicciones dentro del IC 95% | >90% | 91.8% |
| Lag predicción | Tiempo desde evento hasta alerta | >24h | 28.5h |
| Valor capturado | % oportunidades aprovechadas | >80% | 84.2% |
class PredictionErrorAnalyzer:
"""Analiza y aprende de errores de predicción"""
def analyze_errors(self, predictions: pd.DataFrame,
actuals: pd.DataFrame) -> ErrorReport:
"""Análisis exhaustivo de errores"""
# Calcular errores
errors = predictions - actuals
# Análisis por tipo
error_analysis = {
'systematic_bias': self._detect_systematic_bias(errors),
'temporal_patterns': self._analyze_temporal_errors(errors),
'zone_specific': self._analyze_zone_errors(errors),
'magnitude_dependent': self._analyze_magnitude_errors(errors, actuals)
}
# Recomendaciones de mejora
improvements = self._generate_improvement_recommendations(
error_analysis
)
return ErrorReport(
summary_stats=errors.describe(),
detailed_analysis=error_analysis,
improvement_recommendations=improvements,
retrain_priority=self._calculate_retrain_priority(error_analysis)
)
⬅️ Volver a Procesos Core | ➡️ Siguiente: Optimización Multi-día