Coverage for  / home / rtd / bussinFR / webservices / bussinAPIs.py: 96%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-05 03:12 +0000

1#!/usr/bin/env python 

2 

3from fastapi import FastAPI, Query 

4from pydantic import BaseModel 

5from typing import List 

6 

7import os 

8import time 

9import datetime 

10 

11# Database imports. 

12from sqlalchemy import create_engine, Column, String, Float, Integer, UniqueConstraint, or_ 

13from sqlalchemy.orm import declarative_base 

14from sqlalchemy.orm import sessionmaker 

15from sqlalchemy.dialects.mysql import BIGINT 

16 

17### Add middleware for access. 

18###from fastapi.middleware.cors import CORSMiddleware 

19 

20### Set up origins so anyone can get at the APIs. 

21###origins = ["*"] 

22 

23# Now we do this instead : 

24from fastapi.staticfiles import StaticFiles 

25# so we can serve out the static files on the same port so 

26# there's no issue that needs middleware. 

27 

28# Small function that waits if a file exists. 

29# We do this to be sure we don't access a database while it's 

30# being updated. 

31def waitOnFile(file) : 

32 while os.path.exists(file) : 

33 time.sleep(0.25) 

34 return 

35 

36# Set up tags that appear in the documentation pages that FastAPI generates. 

37tags_metadata = [ 

38 { 

39 "name":"bussinAPIs", 

40 "description":"Fast API endpoints in support of bussinFR", 

41 "externalDocs": { 

42 "description": "bussinFR repo", 

43 "url": "https://github.com/nilesOien/bussinFR", 

44 }, 

45 }, 

46 { 

47 "name":"bus-stop-service", 

48 "description":"Serves out locations and descriptions of bus stops in a specified area. For RTD, test with a minimum latitude of 40 (Baseline road). A hard coded limit of 1000 stops returned is imposed." 

49 }, 

50 { 

51 "name":"vehicle-service", 

52 "description":"Serves out locations and descriptions of vehicles in a specified area. For the current_status field, 2=Moving 1=Stopped. Can also specify a comma separated list of routes (default is all routes). Internally spaces are removed from the list of routes and it is converted to upper case, so that \"bolt, jump\" becomes \"BOLT,JUMP\". To test for RTD, enter a minimum latitude of 40 (Baseline road). A hard coded limit of 1000 vehicles returned is imposed." 

53 }, 

54 { 

55 "name":"trip-service", 

56 "description":"Serves out trip updates for a specified stop ID. Union Station in Denver has stop ID 34343 which may be a good test for that region." 

57 } 

58 ] 

59 

60try: 

61 agency_name = os.environ["BFR_AGENCY_NAME"] 

62except KeyError: 

63 print("Error: BFR_AGENCY_NAME environment variable not set.") 

64 quit() 

65agency_name=agency_name.lower() 

66 

67# Get a FastAPI application object 

68bussinApp = FastAPI(title="bussinAPIs", 

69 root_path="/" + agency_name, # Because we're deploying behind a gateway. Must match nginx settings. 

70 summary="End points for bussinFR.", 

71 description="Used by javaScript to get the data.", 

72 contact={ 

73 "name": "Niles Oien", 

74 "url": "https://github.com/nilesOien", 

75 "email": "nilesoien@gmail.com", 

76 }, 

77 version="1.0.0", 

78 openapi_tags=tags_metadata) 

79 

80### Add middleware to allow all origins. 

81###bussinApp.add_middleware( 

82### CORSMiddleware, 

83### allow_origins=origins, 

84### allow_methods=["*"], 

85### allow_headers=["*"]) 

86 

87 

88# Bus stop end point. 

89class busStopServiceResponseClass(BaseModel) : 

90 """ 

91 Pydantic class that defines the format of what the bus_stop_service serves out. 

92 """ 

93 stopid: str 

94 stopname: str 

95 stopdesc: str 

96 lat: float 

97 lon: float 

98 

99# Serve out bus stop location, description. 

100@bussinApp.get("/busStopService", tags=['bus-stop-service'], response_model=List[busStopServiceResponseClass]) 

101async def get_bus_stops(minLat: float = Query(default=None), 

102 minLon: float = Query(default=None), 

103 maxLat: float = Query(default=None), 

104 maxLon: float = Query(default=None)): 

105 """ 

106 Returns bus stop information for a specified area. 

107 """ 

108 

109 # Database table ORM model. 

110 Base = declarative_base() 

111 class stopsTable(Base): 

112 __tablename__='stops' 

113 stopid = Column(String, nullable=False, primary_key=True) 

114 stopname = Column(String, nullable=False) 

115 stopdesc = Column(String, nullable=False) 

116 lat = Column(Float, nullable=False) 

117 lon = Column(Float, nullable=False) 

118 __table_args__ = (UniqueConstraint('stopid', name='unique_constraint'),) 

119 

120 # Database URL and block file. Depends on if we're in testing mode, which 

121 # is set through the BFR_TEST_MODE env var (which has to be set to either ON or TRUE 

122 # (case insensitive) to activate test mode). 

123 db_dir='databases' 

124 testMode=False 

125 test_env = os.getenv('BFR_TEST_MODE', 'OFF') 

126 if test_env.lower() == 'on' or test_env.lower() == 'true' : 

127 testMode=True 

128 

129 if testMode : 

130 db_dir='test_databases' 

131 

132 db_url="sqlite:///../" + db_dir + "/stops/database.db" 

133 db_block_file="../" + db_dir + "/stops/db_offline.marker" 

134 

135 # Connect to the database. 

136 engine = create_engine(db_url) 

137 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) 

138 db = SessionLocal() 

139 

140 # Set up basic query. 

141 query = db.query(stopsTable) 

142 

143 # Add filters. 

144 if minLat is not None : 

145 query = query.filter(stopsTable.lat >= minLat) 

146 

147 if minLon is not None : 

148 query = query.filter(stopsTable.lon >= minLon) 

149 

150 if maxLat is not None : 

151 query = query.filter(stopsTable.lat <= maxLat) 

152 

153 if maxLon is not None : 

154 query = query.filter(stopsTable.lon <= maxLon) 

155 

156 # In the interests of speed if someone uses the API directly, 

157 # decided not to do this. 

158 #query = query.order_by(stopsTable.lat) 

159 # 

160 # In fact, decided to do this instead 

161 query = query.limit(1000) 

162 

163 waitOnFile(db_block_file) 

164 

165 db_results = query.all() 

166 

167 db.close() 

168 

169 return db_results 

170 

171 

172 

173 

174 

175# Vehicle end point. 

176class vehicleServiceResponseClass(BaseModel) : 

177 """ 

178 Pydantic class that defines the format of what the vehicle end point serves out. 

179 """ 

180 route: str 

181 timestamp: int 

182 current_status: int 

183 lat: float 

184 lon: float 

185 bearing: float 

186 

187# Serve out vehicle information. 

188@bussinApp.get("/vehicleService", tags=['vehicle-service'], response_model=List[vehicleServiceResponseClass]) 

189async def get_vehicles(minLat: float = Query(default=None), 

190 minLon: float = Query(default=None), 

191 maxLat: float = Query(default=None), 

192 maxLon: float = Query(default=None), 

193 routesCSV: str = Query(default=None)): 

194 """ 

195 Returns vehicle information for a specified area. 

196 """ 

197 

198 Base = declarative_base() 

199 

200 class vehiclesTable(Base): 

201 __tablename__='vehicles' 

202 id = Column(Integer, nullable=False, primary_key=True) 

203 route = Column(String, nullable=False) 

204 schedule_relationship = Column(Integer, nullable=False) 

205 direction_id = Column(Integer, nullable=False) 

206 current_status = Column(Integer, nullable=False) 

207 timestamp = Column(BIGINT(unsigned=True), nullable=False) 

208 lat = Column(Float, nullable=False) 

209 lon = Column(Float, nullable=False) 

210 bearing = Column(Float, nullable=False) 

211 

212 

213 # Database URL and block file. Depends on if we're in testing mode, which 

214 # is set through the BFR_TEST_MODE env var (which has to be set to either ON or TRUE 

215 # (case insensitive) to activate test mode). 

216 db_dir='databases' 

217 testMode=False 

218 test_env = os.getenv('BFR_TEST_MODE', 'OFF') 

219 if test_env.lower() == 'on' or test_env.lower() == 'true' : 

220 testMode=True 

221 

222 if testMode : 

223 db_dir='test_databases' 

224 

225 db_url="sqlite:///../" + db_dir + "/vehicles/database.db" 

226 db_block_file="../" + db_dir + "/vehicles/db_offline.marker" 

227 

228 # Connect to the database. 

229 engine = create_engine(db_url) 

230 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) 

231 db = SessionLocal() 

232 

233 # Set up query but not all columns - only selected ones. 

234 query = db.query(vehiclesTable).with_entities(vehiclesTable.route, vehiclesTable.timestamp, 

235 vehiclesTable.current_status, vehiclesTable.lat, vehiclesTable.lon, vehiclesTable.bearing) 

236 

237 # Add filters. 

238 if minLat is not None : 

239 query = query.filter(vehiclesTable.lat >= minLat) 

240 

241 if minLon is not None : 

242 query = query.filter(vehiclesTable.lon >= minLon) 

243 

244 if maxLat is not None : 

245 query = query.filter(vehiclesTable.lat <= maxLat) 

246 

247 if maxLon is not None : 

248 query = query.filter(vehiclesTable.lon <= maxLon) 

249 

250 # Routes is a comma separated list of routes of interest. 

251 # Default is to serve all routes, but if this is specified, 

252 # the only serve these routes. 

253 

254 if routesCSV is not None : 

255 routesCSV = routesCSV.upper() # Convert entered route list to upper case 

256 routesCSV = "".join(routesCSV.split()) # Remove whitespaces 

257 routes=routesCSV.split(',') 

258 if len(routes) > 0 : 

259 # This next part is a bit tricky. 

260 # If the caller specified routesCSV="bolt, 205" 

261 # then routes is now the list [ "BOLT", "205" ]. 

262 # 

263 # The * operator (the unpacking operator) 

264 # when used in a function call 

265 # takes an iterable (like a list or tuple) and 

266 # unpacks its elements as separate positional 

267 # arguments, so for example we can : 

268 # >>> x=[1,2,3] # A list 

269 # >>> print(x) # 

270 # [1, 2, 3] # Printed the list 

271 # >>> print(*x) # 

272 # 1 2 3 # Printed the list as positional parameters 

273 # 

274 # So we can pass the elements of a list 

275 # to the function as positional arguments 

276 # (in this case the or_() 

277 # function). 

278 # 

279 # So make the list of filters that we want to throw at or_() : 

280 route_filters = [vehiclesTable.route == route for route in routes] 

281 # And then use the unpacking operator to throw them all at or_() 

282 # as positional arguments : 

283 query = query.filter(or_(*route_filters)) 

284 

285 # Decided against doing this. 

286 #query = query.order_by(vehiclesTable.lat) 

287 # Did this in case anyone uses the API directly. 

288 query = query.limit(1000) 

289 

290 waitOnFile(db_block_file) 

291 

292 db_results = query.all() 

293 

294 db.close() 

295 

296 return db_results 

297 

298 

299 

300 

301 

302 

303# Trip update end point. 

304class tripServiceResponseClass(BaseModel) : 

305 """ 

306 Pydantic class that defines the format of what the trip update end point serves out. 

307 """ 

308 route: str 

309 arrivaltime: int 

310 

311# Serve out vehicle information. 

312@bussinApp.get("/tripService", tags=['trip-service'], response_model=List[tripServiceResponseClass]) 

313async def get_trips(stopID: str = Query(default=None)): 

314 """ 

315 Returns trip update information for the specified stop ID. 

316 """ 

317 

318 if stopID is None : 

319 return [] 

320 

321 Base = declarative_base() 

322 

323 class tripsTable(Base): 

324 __tablename__='intrepid_trips' 

325 id = Column(Integer, nullable=False, primary_key=True) 

326 route = Column(String, nullable=False) 

327 schedule_relationship = Column(Integer, nullable=False) 

328 arrivaltime = Column(BIGINT(unsigned=True), nullable=False) 

329 stopid = Column(String, nullable=False) 

330 

331 

332 # Database URL and block file. Depends on if we're in testing mode, which 

333 # is set through the BFR_TEST_MODE env var (which has to be set to either ON or TRUE 

334 # (case insensitive) to activate test mode). 

335 db_dir='databases' 

336 testMode=False 

337 test_env = os.getenv('BFR_TEST_MODE', 'OFF') 

338 if test_env.lower() == 'on' or test_env.lower() == 'true' : 

339 testMode=True 

340 

341 if testMode : 

342 db_dir='test_databases' 

343 

344 db_url="sqlite:///../" + db_dir + "/trip_updates/database.db" 

345 db_block_file="../" + db_dir + "/trip_updates/db_offline.marker" 

346 

347 # Connect to the database. 

348 engine = create_engine(db_url) 

349 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) 

350 db = SessionLocal() 

351 

352 # Set up query but not all columns - only selected ones. 

353 query = db.query(tripsTable).with_entities(tripsTable.route, tripsTable.arrivaltime) 

354 

355 # Add filter on stop ID. 

356 query = query.filter(tripsTable.stopid == stopID) 

357 

358 # Also add a filter so that we only serve out 

359 # arrival times that are in the future. 

360 # Don't do this in test mode. 

361 if not testMode : 

362 current_utc_time = datetime.datetime.now(datetime.timezone.utc) 

363 current_unix_time = int(current_utc_time.timestamp()) 

364 query = query.filter(tripsTable.arrivaltime >= current_unix_time) 

365 

366 query = query.order_by(tripsTable.arrivaltime) 

367 

368 waitOnFile(db_block_file) 

369 

370 db_results = query.all() 

371 

372 db.close() 

373 

374 return db_results 

375 

376 

377# Mount for the static HTML/css/javaScript/favicon 

378# In the call below : 

379# 

380# * The first argument ("/") is the URL path where the files will be exposed, 

381# so it will pop up as "http://localhost:8000/" 

382# * The second argument is the actual directory on this server, 

383# and setting html=True will serve out index.html by default. 

384# * The third argument is an internal name that can be used for URL 

385# generation in templates, often with the url_for function. 

386# 

387# Note that order matters. FastAPI matches requests *sequentially* so 

388# we define this last so that it will try the API end points first. 

389# Follow sym links to show test coverage results. 

390bussinApp.mount("/", StaticFiles(directory="../webpages", html=True, follow_symlink=True), name="webpages") 

391 

392