El sistema SIGA está diseñado con una arquitectura de microservicios en capas que permite escalabilidad, mantenibilidad y evolución independiente de componentes.
// Estructura de componentes principales
src/
├── components/
│ ├── Dashboard/
│ │ ├── FleetMap.tsx // Mapa en tiempo real
│ │ ├── KPICards.tsx // Métricas clave
│ │ └── DecisionPanel.tsx // Panel de decisiones
│ ├── Analytics/
│ │ ├── PredictionHeatmap.tsx // Mapa de calor predictivo
│ │ └── FlowAnalysis.tsx // Análisis de flujos
│ └── Decisions/
│ ├── WhatIfCalculator.tsx // Calculadora de escenarios
│ └── RecommendationList.tsx // Lista de recomendaciones
# Configuración Kong
services:
- name: decision-service
url: http://decision-engine:8000
routes:
- name: decision-routes
paths:
- /api/v1/decisions
methods:
- GET
- POST
plugins:
- name: rate-limiting
config:
minute: 100
hour: 5000
- name: jwt
config:
claims_to_verify:
- exp
El corazón del sistema que evalúa opciones y genera recomendaciones.
# Arquitectura del Decision Engine
decision_engine/
├── api/
│ ├── endpoints/
│ │ ├── decisions.py # POST /decisions/evaluate
│ │ ├── positions.py # GET /positions/value
│ │ └── recommendations.py # GET /recommendations
├── core/
│ ├── value_calculator.py # Calcula valor de posiciones
│ ├── position_evaluator.py # Evalúa opciones disponibles
│ └── decision_maker.py # Lógica de decisión
├── models/
│ ├── decision.py # Modelo de decisión
│ ├── vehicle.py # Modelo de vehículo
│ └── cargo.py # Modelo de carga
└── services/
├── rtracktor_client.py # Cliente API Rtracktor
└── cache_service.py # Gestión de cache
Servicio de ML para predicciones de demanda y patrones.
# Pipeline de predicción
class DemandPredictor:
def __init__(self):
self.model = self.load_model()
self.feature_pipeline = FeaturePipeline()
def predict(self, zone: str, horizon_hours: int) -> PredictionResult:
features = self.feature_pipeline.extract(zone, horizon_hours)
prediction = self.model.predict(features)
return PredictionResult(
zone=zone,
demand_probability=prediction[0],
expected_revenue=prediction[1],
confidence_score=prediction[2],
timestamp=datetime.now()
)
-- Esquema principal
CREATE SCHEMA siga;
-- Tabla de vehículos
CREATE TABLE siga.vehicles (
id SERIAL PRIMARY KEY,
plate VARCHAR(20) UNIQUE NOT NULL,
capacity_kg INTEGER,
current_position GEOGRAPHY(POINT),
current_zone VARCHAR(50),
status VARCHAR(50),
next_available TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Tabla de decisiones
CREATE TABLE siga.decisions (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP DEFAULT NOW(),
vehicle_id INTEGER REFERENCES siga.vehicles(id),
decision_type VARCHAR(50),
current_position GEOGRAPHY(POINT),
options_available JSONB,
decision_taken JSONB,
expected_value DECIMAL(10,2),
actual_outcome JSONB,
created_by VARCHAR(100)
);
-- Tabla de patrones aprendidos
CREATE TABLE siga.flow_patterns (
id SERIAL PRIMARY KEY,
origin_zone VARCHAR(50),
destination_zone VARCHAR(50),
day_of_week INTEGER,
hour_of_day INTEGER,
avg_demand DECIMAL(5,2),
avg_revenue DECIMAL(10,2),
confidence DECIMAL(3,2),
last_updated TIMESTAMP DEFAULT NOW()
);
-- Índices para performance
CREATE INDEX idx_decisions_vehicle_timestamp
ON siga.decisions(vehicle_id, timestamp DESC);
CREATE INDEX idx_flow_patterns_zones
ON siga.flow_patterns(origin_zone, destination_zone);
-- Tabla de posiciones
CREATE TABLE vehicle_positions (
time TIMESTAMPTZ NOT NULL,
vehicle_id INTEGER,
position GEOGRAPHY(POINT),
speed_kmh FLOAT,
heading INTEGER,
status VARCHAR(50)
);
-- Convertir a hypertable
SELECT create_hypertable('vehicle_positions', 'time');
-- Agregación continua para análisis
CREATE MATERIALIZED VIEW hourly_vehicle_stats
WITH (timescaledb.continuous) AS
SELECT
vehicle_id,
time_bucket('1 hour', time) AS hour,
AVG(speed_kmh) as avg_speed,
COUNT(*) as position_count,
ST_MakeLine(position::geometry ORDER BY time) as route_line
FROM vehicle_positions
GROUP BY vehicle_id, hour
WITH NO DATA;
-- Política de retención (mantener 6 meses)
SELECT add_retention_policy('vehicle_positions', INTERVAL '6 months');
// Middleware de autorización
const authorizationMiddleware = (requiredRole: string) => {
return async (req: Request, res: Response, next: NextFunction) => {
const token = req.headers.authorization?.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'No token provided' });
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
const userRole = decoded.role;
if (!hasPermission(userRole, requiredRole)) {
return res.status(403).json({ error: 'Insufficient permissions' });
}
req.user = decoded;
next();
} catch (error) {
return res.status(401).json({ error: 'Invalid token' });
}
};
};
# Circuit breaker para servicios externos
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if self._should_attempt_reset():
self.state = 'HALF_OPEN'
else:
raise CircuitOpenError("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
# docker-compose.monitoring.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
grafana:
image: grafana/grafana:latest
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
ports:
- "3000:3000"
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
# Métricas Prometheus
from prometheus_client import Counter, Histogram, Gauge
# Contadores
decision_requests = Counter(
'siga_decision_requests_total',
'Total decision requests',
['endpoint', 'status']
)
# Histogramas
decision_latency = Histogram(
'siga_decision_latency_seconds',
'Decision processing latency',
['decision_type']
)
# Gauges
empty_km_ratio = Gauge(
'siga_empty_km_ratio',
'Current ratio of empty kilometers'
)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: decision-engine-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: decision-engine
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
# Estrategia de cache multicapa
class CacheStrategy:
def __init__(self):
self.l1_cache = {} # In-memory
self.l2_cache = RedisCache() # Redis
def get(self, key: str):
# Check L1
if key in self.l1_cache:
return self.l1_cache[key]
# Check L2
value = self.l2_cache.get(key)
if value:
self.l1_cache[key] = value
return value
return None
def set(self, key: str, value: Any, ttl: int = 300):
self.l1_cache[key] = value
self.l2_cache.set(key, value, ttl)
⬅️ Volver a Arquitectura | ➡️ Siguiente: Componentes Principales