El sistema SIGA utiliza varios modelos de Machine Learning para predecir demanda, valorar posiciones y optimizar decisiones. Esta documentación detalla la arquitectura, entrenamiento y uso de cada modelo.
Propósito: Predecir la demanda de transporte por zona geográfica y horizonte temporal.
# Modelo: Gradient Boosting + Time Series Features
class DemandPredictionModel:
def __init__(self):
self.model = XGBRegressor(
n_estimators=500,
max_depth=8,
learning_rate=0.01,
objective='reg:squarederror',
colsample_bytree=0.8,
subsample=0.8,
random_state=42
)
self.feature_encoder = FeatureEncoder()
self.scaler = StandardScaler()
DEMAND_FEATURES = {
# Temporales
'hour_of_day': 'cyclical_encoding',
'day_of_week': 'one_hot_encoding',
'day_of_month': 'numerical',
'week_of_year': 'cyclical_encoding',
'is_holiday': 'binary',
'is_weekend': 'binary',
# Geográficas
'zone_id': 'target_encoding',
'zone_type': 'one_hot_encoding', # industrial, urban, rural
'distance_to_major_hub': 'numerical',
'population_density': 'numerical',
# Históricas (ventanas deslizantes)
'demand_last_hour': 'numerical',
'demand_last_day_same_hour': 'numerical',
'demand_last_week_same_hour': 'numerical',
'demand_avg_last_7d': 'numerical',
'demand_std_last_7d': 'numerical',
# Externas
'weather_condition': 'one_hot_encoding',
'temperature': 'numerical',
'fuel_price_index': 'numerical',
'economic_activity_index': 'numerical',
# Interacciones
'zone_hour_interaction': 'custom',
'weekday_zone_interaction': 'custom'
}
def train_demand_model(data: pd.DataFrame) -> DemandPredictionModel:
# 1. Feature Engineering
features = engineer_demand_features(data)
# 2. Train/Validation Split (Time-based)
train_end = data['timestamp'].max() - pd.Timedelta(days=30)
train_data = features[features['timestamp'] <= train_end]
val_data = features[features['timestamp'] > train_end]
# 3. Cross-validation con TimeSeriesSplit
tscv = TimeSeriesSplit(n_splits=5)
scores = []
for train_idx, val_idx in tscv.split(train_data):
X_train, X_val = train_data.iloc[train_idx], train_data.iloc[val_idx]
y_train, y_val = X_train['demand'], X_val['demand']
model.fit(X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=False)
pred = model.predict(X_val)
score = mean_absolute_percentage_error(y_val, pred)
scores.append(score)
# 4. Final training con todos los datos
model.fit(train_data.drop('demand', axis=1),
train_data['demand'])
# 5. Evaluación en holdout set
val_pred = model.predict(val_data.drop('demand', axis=1))
metrics = calculate_metrics(val_data['demand'], val_pred)
return model, metrics
{
"mape": 12.3, # Mean Absolute Percentage Error
"rmse": 4.2, # Root Mean Squared Error
"mae": 3.1, # Mean Absolute Error
"r2": 0.84, # R-squared
"coverage_80": 0.82, # % predictions within 80% confidence interval
"bias": -0.02 # Systematic over/under prediction
}
Propósito: Calcular el valor futuro esperado de que un vehículo esté en una posición específica.
# Modelo: Deep Neural Network
class PositionValueModel(nn.Module):
def __init__(self, input_dim=64, hidden_dim=128):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.BatchNorm1d(hidden_dim),
nn.Dropout(0.3),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.BatchNorm1d(hidden_dim),
nn.Dropout(0.3),
nn.Linear(hidden_dim, 64),
nn.ReLU()
)
self.value_head = nn.Sequential(
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1) # Output: expected value in €
)
self.confidence_head = nn.Sequential(
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid() # Output: confidence 0-1
)
POSITION_VALUE_FEATURES = {
# Posición actual
'current_lat': 'numerical',
'current_lon': 'numerical',
'current_zone': 'embedding',
# Estado del vehículo
'hours_since_last_load': 'numerical',
'km_since_last_load': 'numerical',
'driver_hours_remaining': 'numerical',
# Contexto temporal
'hour_of_arrival': 'cyclical',
'day_of_week_arrival': 'one_hot',
# Demanda predicha en zonas cercanas
'demand_current_zone_24h': 'numerical',
'demand_adjacent_zones_24h': 'numerical',
'demand_current_zone_48h': 'numerical',
# Competencia (otros vehículos)
'vehicles_in_zone': 'numerical',
'vehicles_arriving_6h': 'numerical',
# Histórico de la zona
'avg_wait_time_zone': 'numerical',
'avg_revenue_per_load_zone': 'numerical',
'empty_km_to_nearest_hub': 'numerical'
}
class PositionValueTrainer:
def __init__(self):
self.model = PositionValueModel()
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-4)
self.replay_buffer = ReplayBuffer(capacity=100000)
def train_step(self, batch):
states, actions, rewards, next_states, dones = batch
# Calcular valores actuales
current_values = self.model(states).value_head
# Calcular valores objetivo (Bellman equation)
with torch.no_grad():
next_values = self.model(next_states).value_head
target_values = rewards + 0.95 * next_values * (1 - dones)
# Loss calculation
value_loss = F.mse_loss(current_values, target_values)
# Confidence loss (basado en error de predicción)
confidence_pred = self.model(states).confidence_head
prediction_error = torch.abs(current_values - target_values)
confidence_target = 1 / (1 + prediction_error)
confidence_loss = F.binary_cross_entropy(confidence_pred, confidence_target)
total_loss = value_loss + 0.1 * confidence_loss
# Optimization step
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
return {
'value_loss': value_loss.item(),
'confidence_loss': confidence_loss.item(),
'mean_value': current_values.mean().item()
}
Propósito: Optimizar rutas multi-día considerando restricciones y objetivos múltiples.
class RouteOptimizer:
def __init__(self):
# Componente 1: Heurística VROOM para rutas diarias
self.vroom_solver = VROOMSolver()
# Componente 2: RL para decisiones estratégicas
self.strategic_model = StrategicDecisionModel()
# Componente 3: Algoritmo genético para exploración
self.genetic_optimizer = GeneticOptimizer(
population_size=100,
mutation_rate=0.1,
crossover_rate=0.7
)
def calculate_route_fitness(route_plan):
# Componentes del fitness
revenue = sum(load.revenue for load in route_plan.loads)
empty_km = calculate_empty_kilometers(route_plan)
balance_score = calculate_fleet_balance(route_plan.end_positions)
# Penalizaciones
overtime_penalty = calculate_overtime_penalty(route_plan)
deadline_penalty = calculate_deadline_penalty(route_plan)
# Función objetivo ponderada
fitness = (
0.4 * normalize(revenue) +
0.3 * (1 - normalize(empty_km)) +
0.2 * balance_score +
0.1 * (1 - normalize(overtime_penalty + deadline_penalty))
)
return fitness
Propósito: Detectar patrones anómalos en operaciones y comportamiento de conductores.
class AnomalyDetector:
def __init__(self):
# Autoencoder para reducción de dimensionalidad
self.autoencoder = self._build_autoencoder()
# Isolation Forest para detección
self.isolation_forest = IsolationForest(
n_estimators=100,
contamination=0.05,
random_state=42
)
def _build_autoencoder(self):
return nn.Sequential(
# Encoder
nn.Linear(50, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, 8), # Latent space
# Decoder
nn.Linear(8, 16),
nn.ReLU(),
nn.Linear(16, 32),
nn.ReLU(),
nn.Linear(32, 50)
)
def create_temporal_features(df):
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['day_of_month'] = df['timestamp'].dt.day
df['week_of_year'] = df['timestamp'].dt.isocalendar().week
# Codificación cíclica para features periódicas
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
df['dow_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
df['dow_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
# Features de calendario
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
df['is_month_start'] = df['day_of_month'] <= 5
df['is_month_end'] = df['day_of_month'] >= 25
return df
def create_lag_features(df, target_col, lags=[1, 24, 168]):
for lag in lags:
df[f'{target_col}_lag_{lag}h'] = df.groupby('zone')[target_col].shift(lag)
# Rolling statistics
for window in [24, 168]:
df[f'{target_col}_roll_mean_{window}h'] = (
df.groupby('zone')[target_col]
.rolling(window=window, min_periods=1)
.mean()
.reset_index(level=0, drop=True)
)
df[f'{target_col}_roll_std_{window}h'] = (
df.groupby('zone')[target_col]
.rolling(window=window, min_periods=1)
.std()
.reset_index(level=0, drop=True)
)
return df
import ray
from ray import train
from ray.train.torch import TorchTrainer
@ray.remote
class DistributedTrainer:
def __init__(self, model_config):
self.model = self._build_model(model_config)
self.data_loader = self._setup_data_loader()
def train_epoch(self, epoch):
losses = []
for batch in self.data_loader:
loss = self._train_step(batch)
losses.append(loss)
return np.mean(losses)
def _train_step(self, batch):
# Training logic
pass
# Configuración de entrenamiento distribuido
trainer = TorchTrainer(
train_loop_per_worker=train_func,
train_loop_config={"num_epochs": 100},
scaling_config=train.ScalingConfig(
num_workers=4,
use_gpu=True,
resources_per_worker={"GPU": 1}
)
)
results = trainer.fit()
from ray import tune
from ray.tune.schedulers import ASHAScheduler
def objective(config):
model = DemandPredictionModel(
n_estimators=config["n_estimators"],
max_depth=config["max_depth"],
learning_rate=config["learning_rate"]
)
# Train model
score = train_and_evaluate(model)
return {"score": score}
# Configuración de búsqueda
search_space = {
"n_estimators": tune.randint(100, 1000),
"max_depth": tune.randint(3, 15),
"learning_rate": tune.loguniform(0.001, 0.3)
}
# Ejecutar optimización
tuner = tune.Tuner(
objective,
param_space=search_space,
tune_config=tune.TuneConfig(
metric="score",
mode="min",
scheduler=ASHAScheduler(),
num_samples=100
)
)
results = tuner.fit()
best_config = results.get_best_config()
class ModelMonitor:
def __init__(self, model_name):
self.model_name = model_name
self.metrics_buffer = []
self.drift_detector = DriftDetector()
def log_prediction(self, features, prediction, actual=None):
entry = {
'timestamp': datetime.now(),
'features': features,
'prediction': prediction,
'actual': actual,
'model_version': self.get_model_version()
}
self.metrics_buffer.append(entry)
# Verificar drift cada 1000 predicciones
if len(self.metrics_buffer) >= 1000:
self.check_drift()
def check_drift(self):
recent_features = [e['features'] for e in self.metrics_buffer[-1000:]]
drift_score = self.drift_detector.calculate_drift(recent_features)
if drift_score > 0.15:
self.trigger_retraining()
def calculate_online_metrics(self):
# Calcular métricas en ventana deslizante
recent_entries = [e for e in self.metrics_buffer[-1000:] if e['actual'] is not None]
predictions = [e['prediction'] for e in recent_entries]
actuals = [e['actual'] for e in recent_entries]
return {
'mape': mean_absolute_percentage_error(actuals, predictions),
'bias': np.mean(np.array(predictions) - np.array(actuals)),
'coverage_80': calculate_coverage(predictions, actuals, 0.8)
}
class ModelABTest:
def __init__(self, model_a, model_b, traffic_split=0.5):
self.model_a = model_a
self.model_b = model_b
self.traffic_split = traffic_split
self.results = {'a': [], 'b': []}
def predict(self, features):
# Asignación determinística basada en hash
assignment = hash(str(features)) % 100 < self.traffic_split * 100
if assignment:
prediction = self.model_a.predict(features)
model_used = 'a'
else:
prediction = self.model_b.predict(features)
model_used = 'b'
return prediction, model_used
def analyze_results(self):
# Análisis estadístico de resultados
from scipy import stats
perf_a = [r['performance'] for r in self.results['a']]
perf_b = [r['performance'] for r in self.results['b']]
t_stat, p_value = stats.ttest_ind(perf_a, perf_b)
return {
'mean_performance_a': np.mean(perf_a),
'mean_performance_b': np.mean(perf_b),
'p_value': p_value,
'significant': p_value < 0.05,
'winner': 'a' if np.mean(perf_a) > np.mean(perf_b) else 'b'
}
# Dockerfile para model serving
FROM pytorch/torchserve:latest
# Copiar modelos
COPY models/ /home/model-server/models/
# Configuración
COPY config.properties /home/model-server/
# Handler personalizado
COPY handlers/ /home/model-server/handlers/
EXPOSE 8080 8081
CMD ["torchserve", "--start", \
"--model-store", "/home/model-server/models", \
"--models", "demand=demand_v2.mar position=position_v1.mar"]
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class PredictionRequest(BaseModel):
zone: str
timestamp: datetime
features: dict
class PredictionResponse(BaseModel):
prediction: float
confidence: float
model_version: str
explanation: dict
@app.post("/predict/demand", response_model=PredictionResponse)
async def predict_demand(request: PredictionRequest):
try:
# Preparar features
features = prepare_features(request)
# Obtener predicción
prediction = demand_model.predict(features)
confidence = demand_model.predict_confidence(features)
# Generar explicación con SHAP
explanation = generate_shap_explanation(demand_model, features)
return PredictionResponse(
prediction=prediction,
confidence=confidence,
model_version="demand_v2.3",
explanation=explanation
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
import shap
def explain_prediction(model, features, feature_names):
# Crear explainer
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(features)
# Crear visualización
shap.force_plot(
explainer.expected_value,
shap_values[0],
features[0],
feature_names=feature_names,
matplotlib=True
)
# Retornar top features
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': np.abs(shap_values[0])
}).sort_values('importance', ascending=False)
return feature_importance.head(10).to_dict('records')
⬅️ Volver a Documentación Técnica | ➡️ Siguiente: Configuración Sistema