AI Data Pipeline
Enterprise-grade, multi-sector AI data intelligence platform — ingesting heterogeneous data streams from REST APIs, database CDC feeds, IoT sensors, and enterprise systems at millions of events per hour, processing them through a Kafka/Flink pipeline enriched by custom LLMs, and delivering real-time analytical reports, anomaly alerts, and decision-grade insights to stakeholders across retail, energy, healthcare, and financial services.
The platform ingests data from six source categories, each with dedicated connector modules:
- REST/GraphQL APIs — OAuth 2.0 authenticated, rate-limit-aware polling with exponential backoff. Normalised to canonical event schemas via a JSONata transformation layer.
- Database CDC (Change Data Capture) — Debezium connectors stream row-level changes from PostgreSQL, MySQL, and Oracle databases directly into Kafka topics without impact to source system performance. Captures inserts, updates, and deletes with before/after row images and transaction timestamps.
- IoT / MQTT — Eclipse Mosquitto MQTT broker bridges sensor telemetry (energy meters, environmental sensors, industrial IoT) into Kafka via a Kafka MQTT connector. Schema Registry enforces Avro schemas for binary payload efficiency.
- File / Batch Ingest — Apache Spark on Databricks processes large batch files (CSV, Parquet, Excel) via Delta Lake, with ACID transaction guarantees and schema evolution support. CDC-equivalent watermarking enables incremental processing on append-only datasets.
- Webhook / Event Push — An NGINX-fronted webhook receiver (Go microservice) validates HMAC signatures, normalises payloads, and publishes to Kafka with sub-50ms ingestion latency.
- Enterprise ERP/CRM — SAP and Salesforce native connectors using official streaming APIs (SAP Event Mesh, Salesforce Platform Events).
All sources feed into a central Apache Kafka cluster (Confluent Cloud, multi-region) with topic-level schema governance enforced by Confluent Schema Registry (Avro/Protobuf). Peak throughput: 4.5 million events/hour at sustained load.
Apache Flink stateful stream processing jobs consume Kafka topics and apply a modular enrichment pipeline:
- Entity Extraction & Classification: A fine-tuned BERT-NER model (multilingual, 12-language coverage) extracts named entities from unstructured text fields — product names, company references, location mentions, monetary values. Runs as an async I/O Flink operator calling the Triton Inference Server endpoint, maintaining full pipeline throughput without blocking.
- Sentiment Analysis: A DistilBERT sentiment model (fine-tuned on domain-specific labelled corpora per sector) classifies customer-facing text (reviews, support tickets, social mentions) into fine-grained sentiment categories (positive/neutral/negative × intensity). Results feed real-time customer experience dashboards with sub-2-second latency from event emission to dashboard update.
- Anomaly Detection: An LSTM Autoencoder trained per data stream detects statistical anomalies in time-series signals (energy consumption spikes, transaction velocity outliers, inventory depletion rates). Reconstructed signals are compared against the encoded representation; anomaly scores above a dynamically calibrated threshold trigger Kafka alert events consumed by the notification microservice (PagerDuty + Slack webhooks).
- LLM-Driven Enrichment: A RAG pipeline built on a fine-tuned Llama 3 70B model (quantised to 4-bit GPTQ, served via vLLM with PagedAttention) augments structured data records with natural language context — generating product category summaries, regulatory classification notes, and risk flags grounded in a vector database of sector-specific reference documents (pgvector, HNSW index, hybrid BM25 + dense retrieval).
Processed and enriched events land in Apache Iceberg tables on cloud object storage (multi-cloud: AWS S3 + GCS), providing ACID transactions, schema evolution, time-travel queries, and partition pruning for petabyte-scale historical analysis. Iceberg tables are registered in Apache Polaris (Iceberg REST Catalog), queryable from Spark, Flink, Trino, and BigQuery Omni simultaneously.
Hot-path serving data is materialised into PostgreSQL (for transactional queries) and TimescaleDB (for time-series metric aggregations). Redis 7 cluster provides sub-millisecond caching for dashboard metric endpoints with event-driven cache invalidation on Kafka consumer group lag alerts.
A dbt (data build tool) project manages 400+ SQL transformations across the semantic layer — with automated data lineage, column-level documentation, and Great Expectations data quality contract enforcement on every scheduled run.
The reporting engine generates board-level analytical reports entirely autonomously on configurable schedules. For each report:
- Relevant KPIs and time-series data are retrieved from the Iceberg/PostgreSQL analytical layer.
- A RAG retrieval step pulls relevant context from the client's document knowledge base (strategy documents, previous reports, benchmarks) via Weaviate vector search.
- A GPT-4-class LLM (via Azure OpenAI Service with PTU provisioned throughput) synthesises narrative analysis, trend interpretation, and forward-looking commentary grounded strictly in the retrieved data — Constitutional AI guardrails prevent hallucinated statistics.
- Charts and visualisations are rendered via Vega-Lite (server-side) and embedded in PDF reports generated by Playwright headless Chrome.
- Reports are delivered via email (SendGrid), Slack, and an authenticated web portal — with interactive drill-down on any cited data point.
Model training runs on AWS SageMaker HyperPod with distributed PyTorch FSDP (Fully Sharded Data Parallel) across multi-node H100 SXM5 clusters. MLflow (on Databricks) tracks every experiment: hyperparameters, dataset versions, evaluation metrics, and model artefacts. Kubeflow Pipelines (KFP) orchestrates the end-to-end training-to-deployment pipeline — triggered automatically when data drift is detected (using Evidently AI for feature distribution monitoring).
Model deployment follows a blue-green strategy via KServe on GKE: new model versions receive 5% canary traffic, promoted to full traffic on A/B test success metrics (F1 improvement, latency SLA compliance).