Coverage for  / home / cdot / bussinFR / webservices / bussinAPIs.py: 96%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-05 02:13 +0000

1#!/usr/bin/env python 

2 

3from fastapi import FastAPI, Query 

4from pydantic import BaseModel 

5from typing import List 

6 

7import os 

8import time 

9import datetime 

10 

11# Database imports. 

12from sqlalchemy import create_engine, Column, String, Float, Integer, UniqueConstraint, or_ 

13from sqlalchemy.orm import declarative_base 

14from sqlalchemy.orm import sessionmaker 

15from sqlalchemy.dialects.mysql import BIGINT 

16 

17### Add middleware for access. 

18###from fastapi.middleware.cors import CORSMiddleware 

19 

20### Set up origins so anyone can get at the APIs. 

21###origins = ["*"] 

22 

23# Now we do this instead : 

24from fastapi.staticfiles import StaticFiles 

25# so we can serve out the static files on the same port so 

26# there's no issue that needs middleware. 

27 

28# Small function that waits if a file exists. 

29# We do this to be sure we don't access a database while it's 

30# being updated. 

31def waitOnFile(file) : 

32 while os.path.exists(file) : 

33 time.sleep(0.25) 

34 return 

35 

36# Set up tags that appear in the documentation pages that FastAPI generates. 

37tags_metadata = [ 

38 { 

39 "name":"bussinAPIs", 

40 "description":"Fast API endpoints in support of bussinFR", 

41 "externalDocs": { 

42 "description": "bussinFR repo", 

43 "url": "https://github.com/nilesOien/bussinFR", 

44 }, 

45 }, 

46 { 

47 "name":"bus-stop-service", 

48 "description":"Serves out locations and descriptions of bus stops in a specified area. For RTD, test with a minimum latitude of 40 (Baseline road)" 

49 }, 

50 { 

51 "name":"vehicle-service", 

52 "description":"Serves out locations and descriptions of vehicles in a specified area. For the current_status field, 2=Moving 1=Stopped. Can also specify a comma separated list of routes (default is all routes). Internally spaces are removed from the list of routes and it is converted to upper case, so that \"bolt, jump\" becomes \"BOLT,JUMP\". To test for RTD, enter a minimum latitude of 40 (Baseline road)." 

53 }, 

54 { 

55 "name":"trip-service", 

56 "description":"Serves out trip updates for a specified stop ID. Union Station in Denver has stop ID 34343 which may be a good test for that region." 

57 } 

58 ] 

59 

60try: 

61 agency_name = os.environ["BFR_AGENCY_NAME"] 

62except KeyError: 

63 print("Error: BFR_AGENCY_NAME environment variable not set.") 

64 quit() 

65agency_name=agency_name.lower() 

66 

67# Get a FastAPI application object 

68bussinApp = FastAPI(title="bussinAPIs", 

69 root_path="/" + agency_name, # Because we're deploying behind a gateway. Must match nginx settings. 

70 summary="End points for bussinFR.", 

71 description="Used by javaScript to get the data.", 

72 contact={ 

73 "name": "Niles Oien", 

74 "url": "https://github.com/nilesOien", 

75 "email": "nilesoien@gmail.com", 

76 }, 

77 version="1.0.0", 

78 openapi_tags=tags_metadata) 

79 

80### Add middleware to allow all origins. 

81###bussinApp.add_middleware( 

82### CORSMiddleware, 

83### allow_origins=origins, 

84### allow_methods=["*"], 

85### allow_headers=["*"]) 

86 

87 

88# Bus stop end point. 

89class busStopServiceResponseClass(BaseModel) : 

90 """ 

91 Pydantic class that defines the format of what the bus_stop_service serves out. 

92 """ 

93 stopid: str 

94 stopname: str 

95 stopdesc: str 

96 lat: float 

97 lon: float 

98 

99# Serve out bus stop location, description. 

100@bussinApp.get("/busStopService", tags=['bus-stop-service'], response_model=List[busStopServiceResponseClass]) 

101async def get_bus_stops(minLat: float = Query(default=None), 

102 minLon: float = Query(default=None), 

103 maxLat: float = Query(default=None), 

104 maxLon: float = Query(default=None)): 

105 """ 

106 Returns bus stop information for a specified area. 

107 """ 

108 

109 # Database table ORM model. 

110 Base = declarative_base() 

111 class stopsTable(Base): 

112 __tablename__='stops' 

113 stopid = Column(String, nullable=False, primary_key=True) 

114 stopname = Column(String, nullable=False) 

115 stopdesc = Column(String, nullable=False) 

116 lat = Column(Float, nullable=False) 

117 lon = Column(Float, nullable=False) 

118 __table_args__ = (UniqueConstraint('stopid', name='unique_constraint'),) 

119 

120 # Database URL and block file. Depends on if we're in testing mode, which 

121 # is set through the BFR_TEST_MODE env var (which has to be set to either ON or TRUE 

122 # (case insensitive) to activate test mode). 

123 db_dir='databases' 

124 testMode=False 

125 test_env = os.getenv('BFR_TEST_MODE', 'OFF') 

126 if test_env.lower() == 'on' or test_env.lower() == 'true' : 

127 testMode=True 

128 

129 if testMode : 

130 db_dir='test_databases' 

131 

132 db_url="sqlite:///../" + db_dir + "/stops/database.db" 

133 db_block_file="../" + db_dir + "/stops/db_offline.marker" 

134 

135 # Connect to the database. 

136 engine = create_engine(db_url) 

137 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) 

138 db = SessionLocal() 

139 

140 # Set up basic query. 

141 query = db.query(stopsTable) 

142 

143 # Add filters. 

144 if minLat is not None : 

145 query = query.filter(stopsTable.lat >= minLat) 

146 

147 if minLon is not None : 

148 query = query.filter(stopsTable.lon >= minLon) 

149 

150 if maxLat is not None : 

151 query = query.filter(stopsTable.lat <= maxLat) 

152 

153 if maxLon is not None : 

154 query = query.filter(stopsTable.lon <= maxLon) 

155 

156 query = query.order_by(stopsTable.lat) 

157 

158 waitOnFile(db_block_file) 

159 

160 db_results = query.all() 

161 

162 db.close() 

163 

164 return db_results 

165 

166 

167 

168 

169 

170# Vehicle end point. 

171class vehicleServiceResponseClass(BaseModel) : 

172 """ 

173 Pydantic class that defines the format of what the vehicle end point serves out. 

174 """ 

175 route: str 

176 timestamp: int 

177 current_status: int 

178 lat: float 

179 lon: float 

180 bearing: float 

181 

182# Serve out vehicle information. 

183@bussinApp.get("/vehicleService", tags=['vehicle-service'], response_model=List[vehicleServiceResponseClass]) 

184async def get_vehicles(minLat: float = Query(default=None), 

185 minLon: float = Query(default=None), 

186 maxLat: float = Query(default=None), 

187 maxLon: float = Query(default=None), 

188 routesCSV: str = Query(default=None)): 

189 """ 

190 Returns vehicle information for a specified area. 

191 """ 

192 

193 Base = declarative_base() 

194 

195 class vehiclesTable(Base): 

196 __tablename__='vehicles' 

197 id = Column(Integer, nullable=False, primary_key=True) 

198 route = Column(String, nullable=False) 

199 schedule_relationship = Column(Integer, nullable=False) 

200 direction_id = Column(Integer, nullable=False) 

201 current_status = Column(Integer, nullable=False) 

202 timestamp = Column(BIGINT(unsigned=True), nullable=False) 

203 lat = Column(Float, nullable=False) 

204 lon = Column(Float, nullable=False) 

205 bearing = Column(Float, nullable=False) 

206 

207 

208 # Database URL and block file. Depends on if we're in testing mode, which 

209 # is set through the BFR_TEST_MODE env var (which has to be set to either ON or TRUE 

210 # (case insensitive) to activate test mode). 

211 db_dir='databases' 

212 testMode=False 

213 test_env = os.getenv('BFR_TEST_MODE', 'OFF') 

214 if test_env.lower() == 'on' or test_env.lower() == 'true' : 

215 testMode=True 

216 

217 if testMode : 

218 db_dir='test_databases' 

219 

220 db_url="sqlite:///../" + db_dir + "/vehicles/database.db" 

221 db_block_file="../" + db_dir + "/vehicles/db_offline.marker" 

222 

223 # Connect to the database. 

224 engine = create_engine(db_url) 

225 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) 

226 db = SessionLocal() 

227 

228 # Set up query but not all columns - only selected ones. 

229 query = db.query(vehiclesTable).with_entities(vehiclesTable.route, vehiclesTable.timestamp, 

230 vehiclesTable.current_status, vehiclesTable.lat, vehiclesTable.lon, vehiclesTable.bearing) 

231 

232 # Add filters. 

233 if minLat is not None : 

234 query = query.filter(vehiclesTable.lat >= minLat) 

235 

236 if minLon is not None : 

237 query = query.filter(vehiclesTable.lon >= minLon) 

238 

239 if maxLat is not None : 

240 query = query.filter(vehiclesTable.lat <= maxLat) 

241 

242 if maxLon is not None : 

243 query = query.filter(vehiclesTable.lon <= maxLon) 

244 

245 # Routes is a comma separated list of routes of interest. 

246 # Default is to serve all routes, but if this is specified, 

247 # the only serve these routes. 

248 

249 if routesCSV is not None : 

250 routesCSV = routesCSV.upper() # Convert entered route list to upper case 

251 routesCSV = "".join(routesCSV.split()) # Remove whitespaces 

252 routes=routesCSV.split(',') 

253 if len(routes) > 0 : 

254 # This next part is a bit tricky. 

255 # If the caller specified routesCSV="bolt, 205" 

256 # then routes is now the list [ "BOLT", "205" ]. 

257 # 

258 # The * operator (the unpacking operator) 

259 # when used in a function call 

260 # takes an iterable (like a list or tuple) and 

261 # unpacks its elements as separate positional 

262 # arguments, so for example we can : 

263 # >>> x=[1,2,3] # A list 

264 # >>> print(x) # 

265 # [1, 2, 3] # Printed the list 

266 # >>> print(*x) # 

267 # 1 2 3 # Printed the list as positional parameters 

268 # 

269 # So we can pass the elements of a list 

270 # to the function as positional arguments 

271 # (in this case the or_() 

272 # function). 

273 # 

274 # So make the list of filters that we want to throw at or_() : 

275 route_filters = [vehiclesTable.route == route for route in routes] 

276 # And then use the unpacking operator to throw them all at or_() 

277 # as positional arguments : 

278 query = query.filter(or_(*route_filters)) 

279 

280 

281 query = query.order_by(vehiclesTable.lat) 

282 

283 waitOnFile(db_block_file) 

284 

285 db_results = query.all() 

286 

287 db.close() 

288 

289 return db_results 

290 

291 

292 

293 

294 

295 

296# Trip update end point. 

297class tripServiceResponseClass(BaseModel) : 

298 """ 

299 Pydantic class that defines the format of what the trip update end point serves out. 

300 """ 

301 route: str 

302 arrivaltime: int 

303 

304# Serve out vehicle information. 

305@bussinApp.get("/tripService", tags=['trip-service'], response_model=List[tripServiceResponseClass]) 

306async def get_trips(stopID: str = Query(default=None)): 

307 """ 

308 Returns trip update information for the specified stop ID. 

309 """ 

310 

311 if stopID is None : 

312 return [] 

313 

314 Base = declarative_base() 

315 

316 class tripsTable(Base): 

317 __tablename__='intrepid_trips' 

318 id = Column(Integer, nullable=False, primary_key=True) 

319 route = Column(String, nullable=False) 

320 schedule_relationship = Column(Integer, nullable=False) 

321 arrivaltime = Column(BIGINT(unsigned=True), nullable=False) 

322 stopid = Column(String, nullable=False) 

323 

324 

325 # Database URL and block file. Depends on if we're in testing mode, which 

326 # is set through the BFR_TEST_MODE env var (which has to be set to either ON or TRUE 

327 # (case insensitive) to activate test mode). 

328 db_dir='databases' 

329 testMode=False 

330 test_env = os.getenv('BFR_TEST_MODE', 'OFF') 

331 if test_env.lower() == 'on' or test_env.lower() == 'true' : 

332 testMode=True 

333 

334 if testMode : 

335 db_dir='test_databases' 

336 

337 db_url="sqlite:///../" + db_dir + "/trip_updates/database.db" 

338 db_block_file="../" + db_dir + "/trip_updates/db_offline.marker" 

339 

340 # Connect to the database. 

341 engine = create_engine(db_url) 

342 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) 

343 db = SessionLocal() 

344 

345 # Set up query but not all columns - only selected ones. 

346 query = db.query(tripsTable).with_entities(tripsTable.route, tripsTable.arrivaltime) 

347 

348 # Add filter on stop ID. 

349 query = query.filter(tripsTable.stopid == stopID) 

350 

351 # Also add a filter so that we only serve out 

352 # arrival times that are in the future. 

353 # Don't do this in test mode. 

354 if not testMode : 

355 current_utc_time = datetime.datetime.now(datetime.timezone.utc) 

356 current_unix_time = int(current_utc_time.timestamp()) 

357 query = query.filter(tripsTable.arrivaltime >= current_unix_time) 

358 

359 query = query.order_by(tripsTable.arrivaltime) 

360 

361 waitOnFile(db_block_file) 

362 

363 db_results = query.all() 

364 

365 db.close() 

366 

367 return db_results 

368 

369 

370# Mount for the static HTML/css/javaScript/favicon 

371# In the call below : 

372# 

373# * The first argument ("/") is the URL path where the files will be exposed, 

374# so it will pop up as "http://localhost:8000/" 

375# * The second argument is the actual directory on this server, 

376# and setting html=True will serve out index.html by default. 

377# * The third argument is an internal name that can be used for URL 

378# generation in templates, often with the url_for function. 

379# 

380# Note that order matters. FastAPI matches requests *sequentially* so 

381# we define this last so that it will try the API end points first. 

382# Follow sym links to show test coverage results. 

383bussinApp.mount("/", StaticFiles(directory="../webpages", html=True, follow_symlink=True), name="webpages") 

384 

385