{"id":21477,"date":"2024-12-18T13:14:21","date_gmt":"2024-12-18T19:14:21","guid":{"rendered":"http:\/\/www.designandexecute.com\/designs\/?p=21477"},"modified":"2024-12-18T15:43:59","modified_gmt":"2024-12-18T21:43:59","slug":"walkthrough-of-a-databricks-real-time-data-processing-pipeline","status":"publish","type":"post","link":"https:\/\/www.designandexecute.com\/designs\/walkthrough-of-a-databricks-real-time-data-processing-pipeline\/","title":{"rendered":"Walkthrough of a Databricks Real-Time Data Processing Pipeline"},"content":{"rendered":"\n<h3 class=\"wp-block-heading\"><strong>Pipeline Overview:<\/strong><\/h3>\n\n\n\n<p>The goal of the pipeline was to process real-time telemetry data from industrial IoT (IIoT) sensors for predictive maintenance and anomaly detection. The data included temperature, pressure, and vibration metrics from hundreds of sensors.<\/p>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Steps:<\/strong><\/h4>\n\n\n\n<ol class=\"wp-block-list\"><li>\n<strong>Data Ingestion:<\/strong>\n<ul><li>Data was ingested using <strong>Apache Kafka<\/strong> topics. Each topic represented a sensor type (e.g., temperature, vibration).<\/li><li>Kafka brokers distributed the high-velocity sensor data across multiple partitions for scalability.<\/li><\/ul>\n<code>kafka_df = (\n    spark.readStream.format(\"kafka\")\n    .option(\"kafka.bootstrap.servers\", \"kafka_broker:9092\")\n    .option(\"subscribe\", \"sensor-data\")\n    .load()\n)\n<\/code>\n<\/li><li>\n<strong>Schema Parsing and Transformation:<\/strong>\n<ul><li>Raw sensor data was serialized as JSON. We parsed it into a structured format for processing.<\/li><li>Example fields: <code>sensor_id<\/code>, <code>timestamp<\/code>, <code>metric_type<\/code>, <code>value<\/code>.<\/li><\/ul>\n<code>from pyspark.sql.functions import from_json, col\nfrom pyspark.sql.types import StructType, StringType, FloatType, TimestampType\n\nschema = StructType([\n    StructField(\"sensor_id\", StringType(), True),\n    StructField(\"timestamp\", TimestampType(), True),\n    StructField(\"metric_type\", StringType(), True),\n    StructField(\"value\", FloatType(), True)\n])\n\nparsed_df = kafka_df.select(\n    from_json(col(\"value\").cast(\"string\"), schema).alias(\"data\")\n).select(\"data.*\")\n<\/code>\n<\/li><li>\n<strong>Anomaly Detection:<\/strong>\n<ul><li>Applied custom logic to detect anomalies, e.g., flagging if the temperature exceeded a threshold.<\/li><li>Used <strong>Spark SQL<\/strong> for time-window-based aggregations.<\/li><\/ul>\n<code>from pyspark.sql.functions import window\n\nanomaly_df = parsed_df.filter(col(\"metric_type\") == \"temperature\").filter(col(\"value\") &gt; 100)\nanomalies_with_window = (\n    anomaly_df.groupBy(window(\"timestamp\", \"1 minute\"), \"sensor_id\")\n    .count()\n    .filter(col(\"count\") &gt; 5)  # Example threshold\n)\n<\/code>\n<\/li><li>\n<strong>Output Storage:<\/strong>\n<ul><li>Anomalies were written to an <strong>Azure Data Lake<\/strong> for long-term storage and an <strong>Azure Event Hub<\/strong> for alert notifications.<\/li><\/ul>\n<code>anomalies_with_window.writeStream \\\n    .format(\"delta\") \\\n    .option(\"checkpointLocation\", \"\/mnt\/checkpoints\/anomalies\") \\\n    .start(\"\/mnt\/datalake\/anomalies\")\n<\/code>\n<\/li><li>\n<strong>Visualization:<\/strong>\n<ul><li>Data was fed into <strong>Power BI<\/strong> for anomaly trend visualization, using Azure Synapse Analytics as the integration layer.<\/li><\/ul>\n<\/li><\/ol>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>2. Trade-Offs: Micro-Batch vs. Continuous Processing in Spark Streaming<\/strong><\/h3>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Micro-Batch Processing:<\/strong><\/h4>\n\n\n\n<ul class=\"wp-block-list\"><li><strong>How It Works:<\/strong> Data is processed in small, discrete time intervals (e.g., every second or minute).<\/li><li><strong>Pros:<\/strong>\n<ul><li>Easier to implement and debug.<\/li><li>Can use existing batch processing logic.<\/li><li>Achieves near real-time latency with less complexity.<\/li><\/ul>\n<\/li><li><strong>Cons:<\/strong>\n<ul><li>Slight latency due to batching intervals.<\/li><li>Inefficient for low-latency use cases like fraud detection or industrial automation.<\/li><\/ul>\n<\/li><\/ul>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Continuous Processing:<\/strong><\/h4>\n\n\n\n<ul class=\"wp-block-list\"><li><strong>How It Works:<\/strong> Data is processed record-by-record as it arrives.<\/li><li><strong>Pros:<\/strong>\n<ul><li>Lowest latency, ideal for real-time use cases.<\/li><li>Suitable for event-driven architectures.<\/li><\/ul>\n<\/li><li><strong>Cons:<\/strong>\n<ul><li>Harder to debug and maintain.<\/li><li>Requires careful resource tuning for scalability.<\/li><li>Supported in Spark Streaming, but not widely adopted due to implementation complexity.<\/li><\/ul>\n<\/li><\/ul>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Trade-Off:<\/strong><\/h4>\n\n\n\n<p>For most use cases, <strong>micro-batching<\/strong> strikes a balance between simplicity and performance. <strong>Continuous processing<\/strong> is chosen only for ultra-low-latency requirements, like stock trading or fraud detection.<\/p>\n\n\n\n<p>Here is a sample coding difference between <strong>Micro-Batch<\/strong> and <strong>Continuous Processing<\/strong> in Spark Streaming, using <strong>PySpark<\/strong> as an example.<\/p>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<h4 class=\"wp-block-heading\">2.<strong>1. Micro-Batch Processing<\/strong><\/h4>\n\n\n\n<p>Micro-batch processing divides the data into small, fixed-duration batches for processing.<\/p>\n\n\n\n<h4 class=\"wp-block-heading\">Code Example:<\/h4>\n\n\n\n<pre class=\"wp-block-code\"><code>from pyspark.sql import SparkSession\nfrom pyspark.sql.functions import col\n\n# Create SparkSession\nspark = SparkSession.builder \\\n    .appName(\"MicroBatchExample\") \\\n    .getOrCreate()\n\n# Define streaming DataFrame (e.g., reading from Kafka topic)\ndf = spark.readStream \\\n    .format(\"kafka\") \\\n    .option(\"kafka.bootstrap.servers\", \"localhost:9092\") \\\n    .option(\"subscribe\", \"topic_name\") \\\n    .load()\n\n# Transformation: Process streaming data\ntransformed_df = df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\") \\\n    .withColumn(\"value_length\", col(\"value\").cast(\"string\").length())\n\n# Write output in micro-batches (e.g., to console or file sink)\nquery = transformed_df.writeStream \\\n    .outputMode(\"append\") \\\n    .format(\"console\") \\\n    .trigger(processingTime=\"5 seconds\") \\  # Micro-batch trigger interval\n    .start()\n\nquery.awaitTermination()\n<\/code><\/pre>\n\n\n\n<p><strong>Key Points<\/strong>:<\/p>\n\n\n\n<ul class=\"wp-block-list\"><li>Uses <code>trigger(processingTime=\"5 seconds\")<\/code> to process in fixed intervals (5 seconds).<\/li><li>Data is collected and processed in small, discrete batches.<\/li><\/ul>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>2.2 Continuous Processing<\/strong><\/h4>\n\n\n\n<p>Continuous processing (introduced in Spark 2.3) processes data with very low latency in a near-real-time fashion, skipping micro-batch intervals.<\/p>\n\n\n\n<h4 class=\"wp-block-heading\">Code Example:<\/h4>\n\n\n\n<pre class=\"wp-block-code\"><code>from pyspark.sql import SparkSession\nfrom pyspark.sql.functions import col\n\n# Create SparkSession\nspark = SparkSession.builder \\\n    .appName(\"ContinuousProcessingExample\") \\\n    .getOrCreate()\n\n# Define streaming DataFrame (e.g., reading from Kafka topic)\ndf = spark.readStream \\\n    .format(\"kafka\") \\\n    .option(\"kafka.bootstrap.servers\", \"localhost:9092\") \\\n    .option(\"subscribe\", \"topic_name\") \\\n    .load()\n\n# Transformation: Process streaming data\ntransformed_df = df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\") \\\n    .withColumn(\"value_length\", col(\"value\").cast(\"string\").length())\n\n# Write output in continuous mode\nquery = transformed_df.writeStream \\\n    .outputMode(\"append\") \\\n    .format(\"console\") \\\n    .trigger(continuous=\"1 second\") \\  # Continuous processing\n    .start()\n\nquery.awaitTermination()\n<\/code><\/pre>\n\n\n\n<p><strong>Key Points<\/strong>:<\/p>\n\n\n\n<ul class=\"wp-block-list\"><li>Uses <code>trigger(continuous=\"1 second\")<\/code> for continuous streaming with a low-latency trigger.<\/li><li>Supports only <code>append<\/code> output mode.<\/li><li>Continuous mode has lower latency but fewer supported sinks and transformations compared to micro-batch.<\/li><\/ul>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>Key Differences in Code:<\/strong><\/h3>\n\n\n\n<table class=\"wp-block-table\"><thead><tr><th>Feature<\/th><th>Micro-Batch<\/th><th>Continuous Processing<\/th><\/tr><\/thead><tbody><tr><td><strong>Trigger Configuration<\/strong><\/td><td><code>trigger(processingTime=\"X seconds\")<\/code><\/td><td><code>trigger(continuous=\"X seconds\")<\/code><\/td><\/tr><tr><td><strong>Latency<\/strong><\/td><td>Higher latency (batch intervals)<\/td><td>Lower latency (continuous flow)<\/td><\/tr><tr><td><strong>Output Modes<\/strong><\/td><td>Supports <code>append<\/code>, <code>complete<\/code>, <code>update<\/code><\/td><td>Only supports <code>append<\/code><\/td><\/tr><tr><td><strong>Use Case<\/strong><\/td><td>Suitable for batch-like processing<\/td><td>Best for low-latency real-time use<\/td><\/tr><\/tbody><\/table>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<p>Choose the mode depending on your latency requirements and support for transformations\/sinks.<\/p>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>3. Databricks Integration with Azure Services<\/strong><\/h3>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Azure Data Lake:<\/strong><\/h4>\n\n\n\n<ul class=\"wp-block-list\"><li>\nDatabricks seamlessly integrates with <strong>Azure Data Lake Storage Gen2<\/strong> for storing structured and unstructured data.\n<\/li><li>\nUsed <strong>Delta Lake<\/strong> format to ensure ACID compliance and efficient querying.\n<\/li><li>\nExample: Storing processed telemetry data for historical analysis.\n<code>delta_df.write.format(\"delta\").mode(\"append\").save(\"abfss:\/\/&lt;container&gt;@&lt;storage_account&gt;.dfs.core.windows.net\/sensor-data\")\n<\/code>\n<\/li><\/ul>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Azure Event Hub:<\/strong><\/h4>\n\n\n\n<ul class=\"wp-block-list\"><li>\nUsed for real-time alert notifications when anomalies were detected.\n<\/li><li>\nDatabricks wrote the alert data directly to Event Hub.\n<code>alerts_df.writeStream \\\n    .format(\"eventhubs\") \\\n    .option(\"eventhubs.connectionString\", \"&lt;EventHubConnectionString&gt;\") \\\n    .start()\n<\/code>\n<\/li><\/ul>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Azure Synapse Analytics:<\/strong><\/h4>\n\n\n\n<ul class=\"wp-block-list\"><li>\nProcessed data was fed into Synapse for integration with Power BI dashboards.\n<code>data.write.format(\"com.databricks.spark.sqldw\") \\\n    .option(\"url\", \"&lt;Synapse_SQL_Endpoint&gt;\") \\\n    .option(\"tempDir\", \"abfss:\/\/&lt;temp_dir&gt;.dfs.core.windows.net\/\") \\\n    .save()\n<\/code>\n<\/li><\/ul>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>4. Fault Tolerance and Scalability in a Kafka-Based System<\/strong><\/h3>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Fault Tolerance:<\/strong><\/h4>\n\n\n\n<ol class=\"wp-block-list\"><li> <strong>Replication:<\/strong> <ul><li>Kafka replicates partitions across brokers, ensuring data availability during broker failures. <\/li><\/ul><\/li><li> <strong>Consumer Offsets:<\/strong> <ul><li>Stored in Kafka or an external database (e.g., ZooKeeper), consumers can resume from the last processed record. <\/li><\/ul><\/li><li> <strong>Checkpointing:<\/strong> <ul><li>Spark Streaming checkpoints data to persistent storage (e.g., Azure Blob) to recover state after failures. <code>stream_df.writeStream \\     .format(\"delta\") \\     .option(\"checkpointLocation\", \"\/mnt\/checkpoints\/sensor-data\") \\     .start() <\/code> <\/li><\/ul><\/li><\/ol>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Scalability:<\/strong><\/h4>\n\n\n\n<ol class=\"wp-block-list\"><li><strong>Partitioning:<\/strong>\n<ul><li>Distribute sensor data across Kafka partitions for parallel processing by Spark.<\/li><\/ul>\n<\/li><li><strong>Scaling Consumers:<\/strong>\n<ul><li>Multiple Spark executors (or microservices) consume data from different partitions concurrently.<\/li><\/ul>\n<\/li><li><strong>Compression:<\/strong>\n<ul><li>Use Kafka compression (e.g., Snappy) to optimize storage and reduce network bandwidth.<\/li><\/ul>\n<\/li><\/ol>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>5. Real-World Example of Event-Driven Architecture<\/strong><\/h3>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Example: Predictive Maintenance in Manufacturing<\/strong><\/h4>\n\n\n\n<ol class=\"wp-block-list\"><li><strong>Event Source:<\/strong> <ul><li>IIoT sensors emit real-time events (e.g., temperature, vibration) to Kafka topics. <\/li><\/ul><\/li><li><strong>Event Processing:<\/strong> <ul><li>Anomaly detection logic in Databricks processes these events. <\/li><\/ul><\/li><li>Trigger Actions: <ul><li>If an anomaly is detected, the pipeline sends alerts to Azure Event Hub, which triggers: <ul><li>A Logic App will notify the maintenance team.<\/li><li>A <strong>Power Automate<\/strong> workflow to create a ticket in a maintenance management system.  <\/li><\/ul><\/li><\/ul><\/li><li><strong>Feedback Loop:<\/strong> <ul><li>Sensor data, anomalies, and outcomes are fed back into machine learning models for continuous improvement. <\/li><\/ul><\/li><\/ol>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<h3 class=\"wp-block-heading\"><strong>6. How IoT Edge Enhances Event-Driven Architectures<\/strong><\/h3>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Key Benefits:<\/strong><\/h4>\n\n\n\n<ol class=\"wp-block-list\"><li><strong>Local Processing:<\/strong> <ul><li>IoT Edge processes data locally on devices, reducing latency and dependency on cloud connectivity.<\/li><li>Example: Perform real-time anomaly detection directly on manufacturing equipment. <\/li><\/ul><\/li><li><strong>Event Filtering:<\/strong> <ul><li>Filters data before sending to the cloud, reducing bandwidth usage. <\/li><\/ul><\/li><li><strong>Offline Capability:<\/strong> <ul><li>IoT Edge stores events locally offline and syncs with the cloud when connectivity is restored. <\/li><\/ul><\/li><li><strong>Custom Modules:<\/strong> <ul><li>Developers can deploy custom Docker containers (e.g., ML models) on IoT Edge for event-driven processing. <\/li><\/ul><\/li><\/ol>\n\n\n\n<h4 class=\"wp-block-heading\"><strong>Example:<\/strong><\/h4>\n\n\n\n<p>An IoT Edge device in a factory detects abnormal vibrations using an on-device ML model. If an anomaly is found:<\/p>\n\n\n\n<ol class=\"wp-block-list\"><li>An alert is sent to the Kafka pipeline for cloud processing.<\/li><li>Local actions (e.g., stopping the machine) are triggered instantly without waiting for cloud input.<\/li><\/ol>\n","protected":false},"excerpt":{"rendered":"<p>Pipeline Overview: The goal of the pipeline was to process real-time telemetry data from industrial IoT (IIoT) sensors for predictive maintenance and anomaly detection. The data included temperature, pressure, and vibration metrics from hundreds of sensors. Steps: Data Ingestion: Data was ingested using Apache Kafka topics. Each topic represented a sensor type (e.g., temperature, vibration). [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":21478,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[31],"tags":[],"class_list":["post-21477","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-bi-data-warehouse"],"jetpack_featured_media_url":"https:\/\/www.designandexecute.com\/designs\/wp-content\/uploads\/2024\/12\/databricks_logor_stacked_rgb_1200px-2830717254.png","_links":{"self":[{"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/posts\/21477","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/comments?post=21477"}],"version-history":[{"count":3,"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/posts\/21477\/revisions"}],"predecessor-version":[{"id":21516,"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/posts\/21477\/revisions\/21516"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/media\/21478"}],"wp:attachment":[{"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/media?parent=21477"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/categories?post=21477"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.designandexecute.com\/designs\/wp-json\/wp\/v2\/tags?post=21477"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}