El flujo de datos en SIGA est谩 dise帽ado para capturar, procesar y transformar informaci贸n operativa en decisiones inteligentes en tiempo real. Cada dato sigue un camino espec铆fico desde su origen hasta generar valor para el negocio.
1. Trigger (T+0ms)
{
"event": "delivery_completed",
"vehicle_id": "TR-2847",
"timestamp": "2024-01-15T14:30:00Z",
"location": {
"lat": 39.4699,
"lng": -0.3763
},
"delivery_id": "DEL-45678"
}
2. Enriquecimiento (T+50ms)
{
"vehicle": {
"id": "TR-2847",
"capacity": 24000,
"type": "TRAILER",
"driver": "DRV-123"
},
"current_zone": "VALENCIA_PORT",
"available_loads": [
{
"id": "LD-789",
"origin": "VALENCIA",
"destination": "BARCELONA",
"revenue": 650,
"pickup_time": "16:00"
}
],
"fleet_context": {
"vehicles_in_zone": 5,
"avg_wait_time": 8.5
}
}
3. Predicci贸n (T+200ms)
{
"demand_forecast": {
"VALENCIA": {
"next_24h": 0.3,
"next_48h": 0.5
},
"BARCELONA": {
"next_24h": 0.85,
"next_48h": 0.9
}
},
"position_values": {
"VALENCIA": 320,
"BARCELONA": 890,
"MADRID": 750
}
}
4. Recomendaci贸n (T+500ms)
{
"recommendations": [
{
"rank": 1,
"action": "REPOSITION",
"destination": "BARCELONA",
"expected_value": 890,
"confidence": 0.87,
"justification": "Alta demanda predicha BCN pr贸ximas 24h"
}
]
}
message VehiclePosition {
string vehicle_id = 1;
double latitude = 2;
double longitude = 3;
float speed_kmh = 4;
int32 heading = 5;
int64 timestamp = 6;
message EngineData {
float fuel_level = 1;
float temperature = 2;
int32 rpm = 3;
}
EngineData engine = 7;
enum Status {
MOVING = 0;
STOPPED = 1;
LOADING = 2;
UNLOADING = 3;
}
Status status = 8;
}
# Feature engineering pipeline
features_config = {
'temporal': {
'hour_of_day': 'extract_hour(timestamp)',
'day_of_week': 'extract_dow(timestamp)',
'is_weekend': 'dow IN (5,6)',
'is_holiday': 'check_holiday_calendar(date)'
},
'spatial': {
'zone_id': 'get_zone(lat, lng)',
'distance_to_port': 'calculate_distance(position, nearest_port)',
'urban_rural': 'classify_area_type(position)'
},
'historical': {
'avg_wait_time_7d': 'AVG(wait_time) OVER last_7_days',
'demand_trend': 'linear_regression(demand, 30d)',
'seasonal_factor': 'extract_seasonality(historical_demand)'
},
'contextual': {
'vehicles_in_zone': 'COUNT(vehicles) WHERE zone = current_zone',
'weather_impact': 'weather_severity_score(forecast)',
'traffic_index': 'get_traffic_conditions(zone, time)'
}
}
class IntegrationFlow:
def __init__(self):
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60,
expected_exception=RequestException
)
self.retry_policy = RetryPolicy(
max_attempts=3,
backoff_multiplier=2,
max_backoff=30
)
async def fetch_external_data(self, source: str, params: dict):
@self.circuit_breaker
@self.retry_policy
async def _fetch():
response = await external_api.get(source, params)
return self.validate_response(response)
try:
return await _fetch()
except CircuitOpenError:
return self.get_cached_fallback(source, params)
optimization_input:
fleet_state:
- vehicle_id: TR-001
current_location: VALENCIA
available_from: "2024-01-15T16:00:00"
capacity: 24000
restrictions: ["NO_ADR"]
demand_forecast:
VALENCIA:
- time_window: "2024-01-16T08:00-12:00"
probability: 0.75
expected_loads: 3
avg_revenue: 650
constraints:
- type: DRIVER_HOURS
max_continuous: 9
max_daily: 11
- type: VEHICLE_CAPACITY
max_weight: 24000
max_volume: 80
objectives:
primary: MAXIMIZE_REVENUE
secondary: MINIMIZE_EMPTY_KM
weights: [0.7, 0.3]
interface EventConfiguration {
NEW_LOAD_AVAILABLE: {
channels: ['push', 'dashboard'],
priority: 'high',
template: 'new_load_notification',
conditions: {
distance_km: '<100',
revenue: '>500',
vehicle_compatible: true
}
},
FLEET_IMBALANCE_DETECTED: {
channels: ['email', 'dashboard'],
priority: 'medium',
template: 'imbalance_alert',
conditions: {
imbalance_ratio: '>0.3',
affected_zones: '>2'
},
throttle: '1 per hour'
},
PREDICTION_ACCURACY_LOW: {
channels: ['email'],
priority: 'low',
template: 'ml_performance_alert',
recipients: ['data-team@company.com'],
conditions: {
accuracy: '<0.75',
consecutive_failures: '>3'
}
}
}
-- Latencia por etapa del flujo
CREATE VIEW flow_latency_metrics AS
SELECT
flow_type,
stage,
percentile_cont(0.50) WITHIN GROUP (ORDER BY latency_ms) as p50_latency,
percentile_cont(0.95) WITHIN GROUP (ORDER BY latency_ms) as p95_latency,
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) as p99_latency,
COUNT(*) as total_requests,
SUM(CASE WHEN success THEN 1 ELSE 0 END)::float / COUNT(*) as success_rate
FROM flow_traces
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY flow_type, stage;
grafana_dashboards:
- name: "Data Flow Health"
panels:
- title: "End-to-End Latency"
query: "flow_latency_metrics.p99_latency"
threshold: 1000
- title: "Data Quality Score"
query: "data_quality_checks.pass_rate"
threshold: 0.95
- title: "Integration Health"
query: "external_api_health.availability"
threshold: 0.99
- title: "ML Pipeline Status"
query: "ml_pipeline_runs.success_rate"
threshold: 0.90
Encriptaci贸n en Tr谩nsito
Encriptaci贸n en Reposo
Control de Acceso
Auditor铆a
Privacidad
{
"event": "data_access",
"timestamp": "2024-01-15T14:30:45.123Z",
"user": "dispatcher-123",
"service": "decision-engine",
"action": "read",
"resource": "vehicle_position",
"filters": {
"vehicle_id": "TR-2847"
},
"ip_address": "10.0.1.45",
"result": "success"
}
cache_strategy = {
'vehicle_positions': {
'ttl': 30, # seconds
'invalidation': 'time-based'
},
'demand_predictions': {
'ttl': 3600, # 1 hour
'invalidation': 'event-based'
},
'route_calculations': {
'ttl': 1800, # 30 minutes
'invalidation': 'traffic-change'
}
}
# Batch similar requests
@batch_processor(max_batch_size=100, max_wait_ms=50)
async def process_position_updates(updates: List[PositionUpdate]):
# Process all updates in single DB transaction
async with db.transaction():
for update in updates:
await db.update_position(update)
// Kafka Streams topology
val positionStream = builder.stream[String, VehiclePosition]("vehicle-positions")
val enrichedStream = positionStream
.selectKey((_, position) => position.zone)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
initializer = ZoneMetrics(),
aggregator = (_, position, metrics) => metrics.update(position)
)
.toStream()
.to("zone-metrics")
猬咃笍 Volver a Componentes Principales | 鉃★笍 Siguiente: Integraciones