-
Notifications
You must be signed in to change notification settings - Fork 1
/
nrt_stream_process.py
138 lines (104 loc) · 4.71 KB
/
nrt_stream_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import datetime
import json
import logging
import uuid
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf
import os
from pyspark.sql.types import StringType, ArrayType, StructType, StructField
from database_manager import DatabaseManager
from open_route_manager import DirectionsAPI
from redis_manager import RedisManager
def main():
os.environ[
'PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
# Set log level to ERROR for KafkaDataConsumer and urllib3
logging.getLogger("org.apache.spark.streaming.kafka010.KafkaDataConsumer").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
spark = SparkSession.builder.appName("KafkaStructuredStreamingExample").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
bootstrap_servers = ['localhost:29092']
topic = 'route_request_tpp'
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", *bootstrap_servers) \
.option("subscribe", topic) \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.selectExpr("CAST(value AS STRING)")
# Define the schema for parsing route request JSON data
schema = StructType([
StructField("id", StringType()),
StructField("src", StructType([
StructField("longitude", StringType()),
StructField("latitude", StringType())
])),
StructField("dst", StructType([
StructField("longitude", StringType()),
StructField("latitude", StringType())
]))
])
# redis_manager = RedisManager().get_instance()
root_manager = DirectionsAPI()
def my_udf(value1, value2):
route_request = {
'src': [value1["longitude"], value1["latitude"]],
'dst': [value2["longitude"], value2["latitude"]],
}
response = root_manager.get_directions(route_request)
# # Check if the traffic flow data is cached in Redis
redis_manager = RedisManager().get_instance()
mongo_db_manager = DatabaseManager()
#################################
traffic_flow_key = "traffic_flow_tp"
mongo_db_manager.connect_to_database("traffic_management", traffic_flow_key)
traffic_flow_data = redis_manager.get(traffic_flow_key)
if traffic_flow_data:
# print("Weather data fetched from Redis.")
traffic_flow_data = json.loads(traffic_flow_data.decode('utf-8'))
else:
# print("Weather data fetched from Mongo.")
traffic_flow_data = mongo_db_manager.get_latest_data(traffic_flow_key)
# Cache the traffic flow data in Redis
# redis_manager.set(traffic_flow_key, json.dumps(traffic_flow_data))
#################################
weather_key = "weather_tp"
mongo_db_manager.connect_to_database("traffic_management", weather_key)
weather_data = redis_manager.get(weather_key)
if weather_data:
# print("Weather data fetched from Redis.")
weather_data = json.loads(weather_data.decode('utf-8'))
else:
# print("Weather data fetched from Mongo.")
# Fetch weather data from MongoDB using a key
weather_data = mongo_db_manager.get_latest_data(weather_key)
# Cache the weather data in Redis
# redis_manager.set(weather_key, json.dumps(weather_data))
response['congestion_level'] = traffic_flow_data["Vol"] # after ml
response['weather_condition'] = weather_data["temperature"]
return json.dumps(response)
my_udf_spark = udf(my_udf)
processed_stream = df.select(from_json(col("value"), schema).alias("route_request")) \
.select("route_request.id",
my_udf_spark("route_request.dst", "route_request.src").alias("processed_dst"))
# Print the received data
query = processed_stream.writeStream \
.outputMode("append") \
.format("console") \
.start()
def send_to_kafka(record):
# Save the processed stream into Kafka
kafka_producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
value = json.dumps(record.asDict(), ensure_ascii=False).encode('utf-8')
kafka_producer.send('route_response_tp', value=value)
query = processed_stream.writeStream \
.outputMode("append") \
.foreach(send_to_kafka) \
.start()
# Await termination
query.awaitTermination()
if __name__ == "__main__":
main()