SIGA se integra con múltiples sistemas externos para obtener datos en tiempo real, sincronizar operaciones y proporcionar valor agregado. Cada integración está diseñada con resiliencia, seguridad y escalabilidad en mente.
Rtracktor es el TMS (Transport Management System) actual de la empresa, que gestiona órdenes de transporte, tracking básico y documentación.
// Rtracktor API Client
class RtracktorClient {
// Órdenes de Transporte
async getTransportOrders(params: {
from_date?: Date;
to_date?: Date;
status?: OrderStatus;
truck_license?: string;
}): Promise<TransportOrder[]>
// Tracking de Vehículos
async getVehiclePosition(vehicleId: string): Promise<VehiclePosition>
// Actualización de Estado
async updateDeliveryStatus(
orderId: string,
status: DeliveryStatus,
location?: Location,
notes?: string
): Promise<void>
// Documentos
async uploadPOD(
orderId: string,
document: Buffer,
metadata: DocumentMetadata
): Promise<DocumentId>
}
# Sync job configuration
sync_config = {
'orders': {
'frequency': '*/5 * * * *', # Every 5 minutes
'endpoint': '/api/v1/transport-orders',
'params': {
'status': ['PENDING', 'IN_TRANSIT'],
'modified_after': '{last_sync_time}'
},
'transform': 'transform_rtracktor_order_to_siga'
},
'vehicles': {
'frequency': '*/1 * * * *', # Every minute
'endpoint': '/api/v1/fleet/positions',
'params': {
'active': True
},
'transform': 'transform_rtracktor_vehicle_to_siga'
}
}
@retry(max_attempts=3, backoff_factor=2)
async def sync_rtracktor_data():
try:
async with RtracktorClient() as client:
# Fetch data
orders = await client.get_transport_orders()
# Validate
validated_orders = validate_orders(orders)
# Transform
siga_orders = transform_orders(validated_orders)
# Store
await store_orders(siga_orders)
except RtracktorAPIError as e:
if e.status_code == 429: # Rate limit
await exponential_backoff(e.retry_after)
elif e.status_code >= 500: # Server error
await circuit_breaker.record_failure()
else:
logger.error(f"Unrecoverable error: {e}")
raise
# EDI Adapter (EDIFACT)
class EDIAdapter:
def parse_edifact_order(self, edi_message: str) -> Order:
segments = edi_message.split("'")
order = Order()
for segment in segments:
if segment.startswith('DTM'):
order.delivery_date = self.parse_dtm(segment)
elif segment.startswith('LOC'):
order.locations.append(self.parse_loc(segment))
elif segment.startswith('MEA'):
order.measurements = self.parse_mea(segment)
return order
# REST API Adapter
class RESTAdapter:
async def fetch_orders(self, customer_config: CustomerConfig):
headers = self.build_auth_headers(customer_config)
async with aiohttp.ClientSession() as session:
async with session.get(
customer_config.endpoint,
headers=headers
) as response:
return await response.json()
# FTP/CSV Adapter
class FTPAdapter:
def poll_ftp_server(self, ftp_config: FTPConfig):
with FTP(ftp_config.host) as ftp:
ftp.login(ftp_config.user, ftp_config.password)
files = ftp.nlst(ftp_config.path)
new_files = self.filter_new_files(files)
for file in new_files:
content = self.download_file(ftp, file)
orders = self.parse_csv_orders(content)
yield from orders
customer_mappings:
customer_a:
order_number: "$.OrderRef"
pickup_address: "$.CollectionPoint.Address"
delivery_address: "$.DeliveryPoint.Address"
weight: "$.GrossWeight"
customer_b:
order_number: "$.reference_id"
pickup_address: "$.from_location"
delivery_address: "$.to_location"
weight: "$.cargo.weight_kg"
customer_c:
order_number: "EDI:BGM+220"
pickup_address: "EDI:LOC+5"
delivery_address: "EDI:LOC+7"
weight: "EDI:MEA+AAE+G+KGM"
interface MapService {
// Routing
calculateRoute(
origin: Location,
destination: Location,
options?: RouteOptions
): Promise<Route>;
// Geocoding
geocode(address: string): Promise<Location>;
reverseGeocode(location: Location): Promise<Address>;
// Traffic
getTrafficConditions(
route: Route,
departureTime: Date
): Promise<TrafficInfo>;
// Distance Matrix
getDistanceMatrix(
origins: Location[],
destinations: Location[]
): Promise<DistanceMatrix>;
}
// Implementation with fallback
class MapServiceManager implements MapService {
private providers: MapProvider[] = [
new GraphHopperProvider(),
new TomTomProvider(),
new OSRMProvider() // Fallback
];
async calculateRoute(
origin: Location,
destination: Location,
options?: RouteOptions
): Promise<Route> {
for (const provider of this.providers) {
try {
return await provider.calculateRoute(
origin,
destination,
options
);
} catch (error) {
logger.warn(`Provider ${provider.name} failed: ${error}`);
continue;
}
}
throw new Error('All map providers failed');
}
}
class RouteOptimizer:
def optimize_route(self, request: RouteRequest) -> OptimizedRoute:
# Base route from map service
base_route = self.map_service.calculate_route(
request.origin,
request.destination
)
# Apply truck-specific restrictions
truck_route = self.apply_truck_restrictions(
base_route,
truck_type=request.vehicle.type,
weight=request.cargo.weight,
hazmat=request.cargo.is_hazardous
)
# Consider traffic patterns
traffic_adjusted = self.adjust_for_traffic(
truck_route,
departure_time=request.departure_time
)
# Weather impact
weather_adjusted = self.apply_weather_conditions(
traffic_adjusted,
weather_forecast=self.get_weather_forecast(truck_route)
)
return OptimizedRoute(
route=weather_adjusted,
distance_km=weather_adjusted.total_distance,
duration_minutes=weather_adjusted.estimated_duration,
toll_cost=self.calculate_tolls(weather_adjusted),
fuel_cost=self.estimate_fuel_cost(weather_adjusted)
)
class TelemetryProcessor:
def __init__(self):
self.mqtt_client = MQTTClient()
self.stream_processor = KafkaStreams()
async def process_telemetry(self, message: TelemetryMessage):
# Parse device data
data = self.parse_device_data(message)
# Enrich with context
enriched = await self.enrich_data(data)
# Detect events
events = self.detect_events(enriched)
# Store and forward
await asyncio.gather(
self.store_timeseries(enriched),
self.update_vehicle_state(enriched),
self.publish_events(events)
)
def detect_events(self, data: EnrichedTelemetry) -> List[Event]:
events = []
# Harsh braking
if data.acceleration < -4.5: # m/s²
events.append(HarshBrakingEvent(data))
# Speeding
if data.speed > data.road_speed_limit * 1.1:
events.append(SpeedingEvent(data))
# Idle time
if data.speed == 0 and data.engine_on and data.idle_duration > 300:
events.append(ExcessiveIdleEvent(data))
# Fuel efficiency
if data.fuel_consumption > data.expected_consumption * 1.2:
events.append(IneffcientDrivingEvent(data))
return events
class WeatherIntegration {
private providers = {
primary: new OpenWeatherAPI(),
fallback: new WeatherAPIService()
};
async getRouteWeatherForecast(
route: Route,
departureTime: Date
): Promise<RouteWeatherForecast> {
// Get weather for key points along route
const waypoints = this.extractWaypoints(route, every_km=50);
const forecasts = await Promise.all(
waypoints.map(async (point) => {
const eta = this.calculateETA(
route.origin,
point,
departureTime
);
return {
location: point,
time: eta,
weather: await this.getWeatherAt(point, eta)
};
})
);
// Analyze impact
return this.analyzeWeatherImpact(forecasts);
}
private analyzeWeatherImpact(
forecasts: PointForecast[]
): RouteWeatherForecast {
const impacts = {
visibility: this.assessVisibilityImpact(forecasts),
precipitation: this.assessPrecipitationImpact(forecasts),
wind: this.assessWindImpact(forecasts),
temperature: this.assessTemperatureImpact(forecasts)
};
return {
overall_risk: this.calculateOverallRisk(impacts),
speed_reduction_factor: this.calculateSpeedReduction(impacts),
safety_warnings: this.generateWarnings(impacts),
alternative_departure: this.suggestBetterTime(forecasts)
};
}
}
class BillingIntegration:
def __init__(self):
self.erp_client = ERPClient()
self.cost_calculator = CostCalculator()
async def process_completed_delivery(self, delivery: CompletedDelivery):
# Calculate all costs
costs = await self.cost_calculator.calculate(
distance_km=delivery.actual_distance,
duration_hours=delivery.duration,
toll_costs=delivery.tolls,
fuel_consumed=delivery.fuel_liters,
driver_hours=delivery.driver_time,
vehicle_type=delivery.vehicle.type
)
# Prepare invoice data
invoice_data = {
'customer_id': delivery.customer_id,
'reference': delivery.order_reference,
'line_items': [
{
'description': f'Transport {delivery.origin} → {delivery.destination}',
'quantity': 1,
'unit_price': delivery.agreed_price,
'tax_rate': 0.21
}
],
'additional_charges': self.calculate_extras(delivery),
'supporting_docs': [
delivery.pod_document_id,
delivery.tracking_report_id
]
}
# Send to ERP
invoice_id = await self.erp_client.create_invoice(invoice_data)
# Update delivery record
await self.update_delivery_invoiced(delivery.id, invoice_id)
# Vault configuration
secrets:
integrations:
rtracktor:
api_key: vault:secret/integrations/rtracktor/api_key
refresh_token: vault:secret/integrations/rtracktor/refresh_token
customer_apis:
customer_a:
username: vault:secret/customers/a/username
password: vault:secret/customers/a/password
map_services:
graphhopper:
api_key: vault:secret/maps/graphhopper/key
tomtom:
api_key: vault:secret/maps/tomtom/key
class IntegrationSecurity:
def __init__(self):
self.vault_client = VaultClient()
self.token_manager = TokenManager()
async def get_authenticated_client(self, service: str):
# Get credentials from vault
creds = await self.vault_client.get_secret(f'integrations/{service}')
# Handle different auth types
if service == 'rtracktor':
return await self.oauth2_client(creds)
elif service.startswith('customer_'):
return await self.basic_auth_client(creds)
elif service in ['graphhopper', 'tomtom']:
return await self.api_key_client(creds)
async def oauth2_client(self, creds):
# Check if token needs refresh
if self.token_manager.is_expired(creds.access_token):
new_token = await self.refresh_oauth_token(creds.refresh_token)
await self.vault_client.update_secret(
'integrations/rtracktor/access_token',
new_token
)
creds.access_token = new_token
return OAuth2Client(creds.access_token)
interface IntegrationHealth {
service: string;
status: 'healthy' | 'degraded' | 'down';
latency_ms: number;
error_rate: number;
last_success: Date;
last_error?: {
timestamp: Date;
message: string;
code: string;
};
}
class IntegrationMonitor {
async getHealthStatus(): Promise<IntegrationHealth[]> {
const integrations = [
'rtracktor',
'customer_a_edi',
'customer_b_api',
'graphhopper',
'tomtom',
'weather_api'
];
return Promise.all(
integrations.map(service => this.checkHealth(service))
);
}
private async checkHealth(service: string): Promise<IntegrationHealth> {
const metrics = await this.getMetrics(service, '5m');
return {
service,
status: this.calculateStatus(metrics),
latency_ms: metrics.avg_latency,
error_rate: metrics.error_count / metrics.total_requests,
last_success: metrics.last_success_time,
last_error: metrics.last_error
};
}
}
alerts:
- name: integration_down
condition: status == 'down' for 5m
severity: critical
channels: ['pagerduty', 'slack']
- name: high_latency
condition: latency_ms > 5000 for 10m
severity: warning
channels: ['slack']
- name: high_error_rate
condition: error_rate > 0.05 for 15m
severity: warning
channels: ['email', 'slack']
# Prevent cascade failures
circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60,
expected_exception=IntegrationError
)
# Exponential backoff for transient failures
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(TransientError)
)
async def call_external_service():
pass
# Respect external API limits
rate_limiter = RateLimiter(
max_requests=100,
time_window=60 # seconds
)
# Validate all external data
validator = DataValidator(schema='customer_order_v2')
validated_data = validator.validate(external_data)