La optimización multi-día es el proceso más sofisticado de SIGA, que integra todos los componentes anteriores para generar estrategias óptimas de posicionamiento de flota considerando un horizonte temporal extendido (48-72 horas).
"No optimizamos el viaje de hoy, optimizamos la cadena de oportunidades de los próximos 3 días"
class MultiDayOptimizationProblem:
"""Formulación matemática del problema de optimización multi-día"""
def __init__(self, horizon_hours: int = 72):
self.horizon = horizon_hours
self.time_slots = range(0, horizon_hours, 4) # Slots de 4 horas
def formulate_objective(self):
"""
Maximizar: Σ(t∈T) Σ(v∈V) Σ(l∈L) revenue[l,t] * x[v,l,t]
- Σ(t∈T) Σ(v∈V) Σ(z∈Z) cost[v,z,t] * y[v,z,t]
+ Σ(v∈V) position_value[v,T] * z[v,T]
Donde:
- x[v,l,t]: binaria, vehículo v toma carga l en tiempo t
- y[v,z,t]: binaria, vehículo v se mueve vacío a zona z en t
- z[v,T]: valor de posición final del vehículo v
"""
return {
'maximize': 'total_profit',
'components': [
'immediate_revenue',
'repositioning_costs',
'terminal_position_value'
]
}
def define_constraints(self):
"""Define restricciones del problema"""
constraints = {
# Un vehículo solo puede estar en un lugar a la vez
'single_location': 'Σ(z∈Z) location[v,z,t] = 1 ∀v,t',
# Continuidad de movimiento
'movement_continuity': 'location[v,z,t+1] depends on location[v,z,t]',
# Capacidad del vehículo
'capacity': 'load_weight[l] ≤ vehicle_capacity[v]',
# Ventanas de tiempo
'time_windows': 'pickup_time[l] ≤ t ≤ delivery_time[l]',
# Horas de conducción
'driving_hours': 'Σ driving_time[v,t] ≤ max_hours',
# Una carga solo puede ser asignada una vez
'unique_assignment': 'Σ(v∈V) x[v,l,t] ≤ 1 ∀l'
}
return constraints
class MultiDayOptimizer:
"""Implementación del optimizador multi-día"""
def __init__(self):
self.solver = OptimizationSolver()
self.scenario_evaluator = ScenarioEvaluator()
self.position_valuator = PositionValuator()
def optimize(self, initial_state: FleetState,
predictions: DemandPredictions,
horizon_hours: int = 72) -> OptimizationResult:
"""Ejecuta optimización completa"""
# Generar árbol de decisiones
decision_tree = self._build_decision_tree(
initial_state, predictions, horizon_hours
)
# Podar ramas no prometedoras
pruned_tree = self._prune_tree(decision_tree)
# Evaluar escenarios restantes
scenarios = self._evaluate_scenarios(pruned_tree)
# Seleccionar estrategia óptima
optimal_strategy = self._select_optimal_strategy(scenarios)
# Generar plan de acción
action_plan = self._generate_action_plan(optimal_strategy)
return OptimizationResult(
strategy=optimal_strategy,
action_plan=action_plan,
expected_value=optimal_strategy.total_value,
confidence=self._calculate_confidence(scenarios)
)
class DecisionTreeBuilder:
"""Construye árbol de decisiones para explorar opciones"""
def build_tree(self, state: FleetState,
horizon: int) -> DecisionTree:
"""Construye árbol de decisiones multi-nivel"""
root = DecisionNode(state=state, time=0)
# Queue para construcción BFS
queue = [(root, state, 0)]
while queue:
node, current_state, time = queue.pop(0)
if time >= horizon:
continue
# Generar posibles acciones
actions = self._generate_possible_actions(
current_state, time
)
for action in actions:
# Simular resultado de la acción
new_state = self._simulate_action(
current_state, action, time
)
# Calcular valor inmediato
immediate_value = self._calculate_immediate_value(
action, current_state
)
# Crear nodo hijo
child = DecisionNode(
state=new_state,
time=time + action.duration,
action=action,
immediate_value=immediate_value
)
node.add_child(child)
# Añadir a queue si no es terminal
if time + action.duration < horizon:
queue.append((child, new_state, time + action.duration))
return DecisionTree(root=root)
def _generate_possible_actions(self, state: FleetState,
time: int) -> List[Action]:
"""Genera acciones posibles desde un estado"""
actions = []
for vehicle in state.vehicles:
# Cargas disponibles
available_loads = self._get_available_loads(
vehicle.position, time
)
for load in available_loads:
actions.append(AssignLoadAction(vehicle, load))
# Reposicionamientos estratégicos
strategic_positions = self._get_strategic_positions(
vehicle.position, time
)
for position in strategic_positions:
actions.append(RepositionAction(vehicle, position))
# Opción de esperar
if self._is_waiting_valuable(vehicle.position, time):
actions.append(WaitAction(vehicle, duration=4))
return actions
class ScenarioEvaluator:
"""Evalúa y compara diferentes escenarios futuros"""
def evaluate_scenario(self, scenario_path: List[DecisionNode]) -> float:
"""Evalúa el valor total de un camino en el árbol"""
total_value = 0
discount_factor = 0.95 # Descuento temporal
for i, node in enumerate(scenario_path):
# Valor inmediato descontado
discounted_value = node.immediate_value * (discount_factor ** i)
total_value += discounted_value
# Valor terminal (posición final)
terminal_state = scenario_path[-1].state
terminal_value = self._evaluate_terminal_positions(terminal_state)
# Aplicar descuento máximo al valor terminal
total_value += terminal_value * (discount_factor ** len(scenario_path))
# Penalizaciones
penalties = self._calculate_penalties(scenario_path)
total_value -= penalties
return total_value
def _evaluate_terminal_positions(self, final_state: FleetState) -> float:
"""Evalúa el valor de las posiciones finales de la flota"""
total_terminal_value = 0
for vehicle in final_state.vehicles:
# Valor de la posición
position_value = self.position_valuator.get_value(
vehicle.position,
final_state.time
)
# Ajuste por estado del vehículo
vehicle_factor = self._get_vehicle_state_factor(vehicle)
total_terminal_value += position_value * vehicle_factor
return total_terminal_value
def _calculate_penalties(self, path: List[DecisionNode]) -> float:
"""Calcula penalizaciones del escenario"""
penalties = 0
# Penalización por kilómetros vacíos
empty_km = sum(node.action.empty_km for node in path
if hasattr(node.action, 'empty_km'))
penalties += empty_km * EMPTY_KM_COST
# Penalización por tiempo muerto
idle_time = sum(node.action.duration for node in path
if isinstance(node.action, WaitAction))
penalties += idle_time * IDLE_HOUR_COST
# Penalización por desbalance de flota
imbalance = self._calculate_fleet_imbalance(path[-1].state)
penalties += imbalance * IMBALANCE_PENALTY
return penalties
class GeographicCoverageStrategy:
"""Optimiza la cobertura geográfica de la flota"""
def optimize_coverage(self, fleet_state: FleetState,
demand_forecast: DemandForecast) -> CoverageStrategy:
"""Genera estrategia de cobertura óptima"""
# Identificar zonas de alta demanda futura
high_demand_zones = self._identify_high_demand_zones(
demand_forecast, threshold_percentile=75
)
# Calcular cobertura actual
current_coverage = self._calculate_current_coverage(
fleet_state, high_demand_zones
)
# Identificar gaps de cobertura
coverage_gaps = self._identify_coverage_gaps(
high_demand_zones, current_coverage
)
# Generar movimientos de rebalanceo
rebalancing_moves = []
for gap in coverage_gaps:
# Encontrar vehículo más cercano disponible
best_vehicle = self._find_best_vehicle_for_gap(
gap, fleet_state
)
if best_vehicle:
move = RebalancingMove(
vehicle=best_vehicle,
target_zone=gap.zone,
expected_value=gap.expected_demand_value,
urgency=gap.urgency_score
)
rebalancing_moves.append(move)
return CoverageStrategy(
moves=rebalancing_moves,
expected_coverage_improvement=self._calculate_improvement(
current_coverage, rebalancing_moves
)
)
class ValueChainStrategy:
"""Optimiza cadenas de cargas consecutivas"""
def find_optimal_chains(self, initial_position: Position,
available_loads: List[Load],
horizon: int) -> List[LoadChain]:
"""Encuentra cadenas óptimas de cargas"""
chains = []
# Usar programación dinámica
dp = {} # (position, time) -> best_value
parent = {} # Para reconstruir la cadena
# Caso base
dp[(initial_position, 0)] = 0
# Llenar tabla DP
for time in range(horizon):
for position in self._get_reachable_positions(time):
if (position, time) not in dp:
continue
current_value = dp[(position, time)]
# Explorar cargas desde esta posición
for load in self._get_loads_from_position(
position, time, available_loads
):
# Tiempo de llegada al destino
arrival_time = time + load.total_time
if arrival_time > horizon:
continue
# Valor de tomar esta carga
load_value = (
load.revenue -
load.operational_cost -
self._calculate_opportunity_cost(load)
)
new_value = current_value + load_value
# Actualizar si es mejor
key = (load.destination, arrival_time)
if key not in dp or new_value > dp[key]:
dp[key] = new_value
parent[key] = (position, time, load)
# Reconstruir mejores cadenas
return self._reconstruct_chains(dp, parent)
class PeakAnticipationStrategy:
"""Anticipa y prepara la flota para picos de demanda"""
def prepare_for_peaks(self, fleet_state: FleetState,
demand_peaks: List[DemandPeak]) -> PeakStrategy:
"""Genera estrategia para manejar picos futuros"""
strategies = []
for peak in demand_peaks:
# Calcular vehículos necesarios
vehicles_needed = self._calculate_vehicles_needed(peak)
# Identificar vehículos que pueden llegar a tiempo
available_vehicles = self._find_available_vehicles(
fleet_state, peak
)
if len(available_vehicles) < vehicles_needed:
# Estrategia de emergencia
strategy = self._create_emergency_strategy(
peak, available_vehicles
)
else:
# Estrategia normal
strategy = self._create_normal_strategy(
peak, available_vehicles[:vehicles_needed]
)
strategies.append(strategy)
return PeakStrategy(
individual_strategies=strategies,
coordination_plan=self._coordinate_strategies(strategies)
)
class OptimizationPipeline:
"""Pipeline completo de optimización multi-día"""
def __init__(self):
self.stages = [
DataPreparationStage(),
PredictionStage(),
ScenarioGenerationStage(),
EvaluationStage(),
SelectionStage(),
ValidationStage()
]
async def run_optimization(self, context: OptimizationContext) -> Result:
"""Ejecuta pipeline completo de optimización"""
result = OptimizationResult()
for stage in self.stages:
try:
# Ejecutar etapa
stage_result = await stage.execute(context)
# Actualizar contexto
context.update(stage_result)
# Logging y monitoreo
self._log_stage_completion(stage, stage_result)
# Validar resultado parcial
if not self._validate_stage_result(stage_result):
return self._handle_stage_failure(stage, stage_result)
except Exception as e:
return self._handle_stage_error(stage, e)
# Compilar resultado final
result.compile_from_context(context)
return result
class RealTimeOptimizer:
"""Optimizador que se adapta a cambios en tiempo real"""
def __init__(self):
self.current_plan = None
self.execution_monitor = ExecutionMonitor()
self.replanning_threshold = 0.2 # 20% desviación
async def continuous_optimization(self):
"""Loop continuo de optimización"""
while True:
# Monitorear ejecución actual
deviation = self.execution_monitor.calculate_deviation(
self.current_plan
)
if deviation > self.replanning_threshold:
# Re-optimizar
new_context = self._create_updated_context()
new_plan = await self.optimizer.optimize(new_context)
# Evaluar si cambiar de plan
if self._should_switch_plan(self.current_plan, new_plan):
await self._transition_to_new_plan(new_plan)
# Actualizar predicciones incrementalmente
await self._update_predictions()
# Esperar próximo ciclo
await asyncio.sleep(300) # 5 minutos
def create_strategy_dashboard(optimization_result: OptimizationResult) -> Dashboard:
"""Crea visualización interactiva de la estrategia óptima"""
dashboard = Dashboard(title="Estrategia Multi-día SIGA")
# Timeline de acciones
timeline = create_action_timeline(
optimization_result.action_plan,
show_alternatives=True
)
dashboard.add_component(timeline, area='top')
# Mapa de movimientos futuros
movement_map = create_movement_map(
optimization_result.fleet_movements,
time_slider=True
)
dashboard.add_component(movement_map, area='center')
# Métricas de valor
value_metrics = create_value_metrics(
current_value=optimization_result.current_value,
projected_value=optimization_result.projected_value,
confidence_interval=optimization_result.confidence_interval
)
dashboard.add_component(value_metrics, area='right')
# Análisis de riesgos
risk_analysis = create_risk_analysis(
optimization_result.risk_factors
)
dashboard.add_component(risk_analysis, area='bottom')
return dashboard
┌─────────────────────────────────────────────────────────────┐
│ COMPARACIÓN DE ESCENARIOS - 72 HORAS │
├─────────────────┬────────────┬───────────┬─────────────────┤
│ ESTRATEGIA │ VALOR TOTAL│ KM VACÍOS │ RIESGO (0-10) │
├─────────────────┼────────────┼───────────┼─────────────────┤
│ 🥇 Anticipación │ €45,230 │ 18.5% │ 3.2 │
│ 🥈 Conservadora │ €41,100 │ 22.1% │ 2.1 │
│ 🥉 Agresiva │ €43,900 │ 24.8% │ 6.8 │
│ ❌ Sin SIGA │ €32,400 │ 31.2% │ 4.5 │
└─────────────────┴────────────┴───────────┴─────────────────┘
Recomendación: Estrategia de Anticipación
- Mayor valor esperado con riesgo controlado
- Reducción significativa de km vacíos
- Mejor posicionamiento para demanda futura
| Métrica | Definición | Target | Actual |
|---|---|---|---|
| Valor optimizado | Incremento vs baseline | >25% | 28.7% |
| Horizonte efectivo | Horas futuras consideradas | 72h | 72h |
| Tiempo cálculo | Tiempo para solución | <30s | 22s |
| Calidad solución | Gap vs óptimo teórico | <5% | 3.8% |
| Adaptabilidad | Éxito en replanning | >90% | 92.3% |
class MultiClientOptimizer:
"""Optimiza considerando múltiples clientes con prioridades"""
def optimize_with_priorities(self, clients: List[Client],
fleet: Fleet) -> MultiClientStrategy:
"""Balancea necesidades de múltiples clientes"""
# Asignar prioridades dinámicas
client_priorities = self._calculate_dynamic_priorities(clients)
# Reservar capacidad para clientes VIP
vip_reservations = self._reserve_vip_capacity(
clients, fleet, client_priorities
)
# Optimizar capacidad restante
general_optimization = self._optimize_remaining_capacity(
fleet, vip_reservations
)
return MultiClientStrategy(
vip_assignments=vip_reservations,
general_assignments=general_optimization,
service_levels=self._calculate_service_levels(clients)
)
class StochasticOptimizer:
"""Maneja incertidumbre en predicciones y ejecución"""
def optimize_robust(self, scenarios: List[Scenario],
probabilities: List[float]) -> RobustStrategy:
"""Genera estrategia robusta ante múltiples escenarios"""
# Optimizar para cada escenario
scenario_strategies = []
for scenario in scenarios:
strategy = self.deterministic_optimizer.optimize(scenario)
scenario_strategies.append(strategy)
# Encontrar estrategia robusta
robust_strategy = self._find_minimax_regret_strategy(
scenario_strategies, probabilities
)
# Calcular métricas de robustez
robustness_metrics = self._evaluate_robustness(
robust_strategy, scenarios
)
return RobustStrategy(
base_strategy=robust_strategy,
contingency_plans=self._generate_contingencies(scenarios),
robustness_score=robustness_metrics.overall_score
)