Coverage for / home / rtd / bussinFR / webservices / bussinAPIs.py: 96%
159 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-05 03:12 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-05 03:12 +0000
1#!/usr/bin/env python
3from fastapi import FastAPI, Query
4from pydantic import BaseModel
5from typing import List
7import os
8import time
9import datetime
11# Database imports.
12from sqlalchemy import create_engine, Column, String, Float, Integer, UniqueConstraint, or_
13from sqlalchemy.orm import declarative_base
14from sqlalchemy.orm import sessionmaker
15from sqlalchemy.dialects.mysql import BIGINT
17### Add middleware for access.
18###from fastapi.middleware.cors import CORSMiddleware
20### Set up origins so anyone can get at the APIs.
21###origins = ["*"]
23# Now we do this instead :
24from fastapi.staticfiles import StaticFiles
25# so we can serve out the static files on the same port so
26# there's no issue that needs middleware.
28# Small function that waits if a file exists.
29# We do this to be sure we don't access a database while it's
30# being updated.
31def waitOnFile(file) :
32 while os.path.exists(file) :
33 time.sleep(0.25)
34 return
36# Set up tags that appear in the documentation pages that FastAPI generates.
37tags_metadata = [
38 {
39 "name":"bussinAPIs",
40 "description":"Fast API endpoints in support of bussinFR",
41 "externalDocs": {
42 "description": "bussinFR repo",
43 "url": "https://github.com/nilesOien/bussinFR",
44 },
45 },
46 {
47 "name":"bus-stop-service",
48 "description":"Serves out locations and descriptions of bus stops in a specified area. For RTD, test with a minimum latitude of 40 (Baseline road). A hard coded limit of 1000 stops returned is imposed."
49 },
50 {
51 "name":"vehicle-service",
52 "description":"Serves out locations and descriptions of vehicles in a specified area. For the current_status field, 2=Moving 1=Stopped. Can also specify a comma separated list of routes (default is all routes). Internally spaces are removed from the list of routes and it is converted to upper case, so that \"bolt, jump\" becomes \"BOLT,JUMP\". To test for RTD, enter a minimum latitude of 40 (Baseline road). A hard coded limit of 1000 vehicles returned is imposed."
53 },
54 {
55 "name":"trip-service",
56 "description":"Serves out trip updates for a specified stop ID. Union Station in Denver has stop ID 34343 which may be a good test for that region."
57 }
58 ]
60try:
61 agency_name = os.environ["BFR_AGENCY_NAME"]
62except KeyError:
63 print("Error: BFR_AGENCY_NAME environment variable not set.")
64 quit()
65agency_name=agency_name.lower()
67# Get a FastAPI application object
68bussinApp = FastAPI(title="bussinAPIs",
69 root_path="/" + agency_name, # Because we're deploying behind a gateway. Must match nginx settings.
70 summary="End points for bussinFR.",
71 description="Used by javaScript to get the data.",
72 contact={
73 "name": "Niles Oien",
74 "url": "https://github.com/nilesOien",
75 "email": "nilesoien@gmail.com",
76 },
77 version="1.0.0",
78 openapi_tags=tags_metadata)
80### Add middleware to allow all origins.
81###bussinApp.add_middleware(
82### CORSMiddleware,
83### allow_origins=origins,
84### allow_methods=["*"],
85### allow_headers=["*"])
88# Bus stop end point.
89class busStopServiceResponseClass(BaseModel) :
90 """
91 Pydantic class that defines the format of what the bus_stop_service serves out.
92 """
93 stopid: str
94 stopname: str
95 stopdesc: str
96 lat: float
97 lon: float
99# Serve out bus stop location, description.
100@bussinApp.get("/busStopService", tags=['bus-stop-service'], response_model=List[busStopServiceResponseClass])
101async def get_bus_stops(minLat: float = Query(default=None),
102 minLon: float = Query(default=None),
103 maxLat: float = Query(default=None),
104 maxLon: float = Query(default=None)):
105 """
106 Returns bus stop information for a specified area.
107 """
109 # Database table ORM model.
110 Base = declarative_base()
111 class stopsTable(Base):
112 __tablename__='stops'
113 stopid = Column(String, nullable=False, primary_key=True)
114 stopname = Column(String, nullable=False)
115 stopdesc = Column(String, nullable=False)
116 lat = Column(Float, nullable=False)
117 lon = Column(Float, nullable=False)
118 __table_args__ = (UniqueConstraint('stopid', name='unique_constraint'),)
120 # Database URL and block file. Depends on if we're in testing mode, which
121 # is set through the BFR_TEST_MODE env var (which has to be set to either ON or TRUE
122 # (case insensitive) to activate test mode).
123 db_dir='databases'
124 testMode=False
125 test_env = os.getenv('BFR_TEST_MODE', 'OFF')
126 if test_env.lower() == 'on' or test_env.lower() == 'true' :
127 testMode=True
129 if testMode :
130 db_dir='test_databases'
132 db_url="sqlite:///../" + db_dir + "/stops/database.db"
133 db_block_file="../" + db_dir + "/stops/db_offline.marker"
135 # Connect to the database.
136 engine = create_engine(db_url)
137 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
138 db = SessionLocal()
140 # Set up basic query.
141 query = db.query(stopsTable)
143 # Add filters.
144 if minLat is not None :
145 query = query.filter(stopsTable.lat >= minLat)
147 if minLon is not None :
148 query = query.filter(stopsTable.lon >= minLon)
150 if maxLat is not None :
151 query = query.filter(stopsTable.lat <= maxLat)
153 if maxLon is not None :
154 query = query.filter(stopsTable.lon <= maxLon)
156 # In the interests of speed if someone uses the API directly,
157 # decided not to do this.
158 #query = query.order_by(stopsTable.lat)
159 #
160 # In fact, decided to do this instead
161 query = query.limit(1000)
163 waitOnFile(db_block_file)
165 db_results = query.all()
167 db.close()
169 return db_results
175# Vehicle end point.
176class vehicleServiceResponseClass(BaseModel) :
177 """
178 Pydantic class that defines the format of what the vehicle end point serves out.
179 """
180 route: str
181 timestamp: int
182 current_status: int
183 lat: float
184 lon: float
185 bearing: float
187# Serve out vehicle information.
188@bussinApp.get("/vehicleService", tags=['vehicle-service'], response_model=List[vehicleServiceResponseClass])
189async def get_vehicles(minLat: float = Query(default=None),
190 minLon: float = Query(default=None),
191 maxLat: float = Query(default=None),
192 maxLon: float = Query(default=None),
193 routesCSV: str = Query(default=None)):
194 """
195 Returns vehicle information for a specified area.
196 """
198 Base = declarative_base()
200 class vehiclesTable(Base):
201 __tablename__='vehicles'
202 id = Column(Integer, nullable=False, primary_key=True)
203 route = Column(String, nullable=False)
204 schedule_relationship = Column(Integer, nullable=False)
205 direction_id = Column(Integer, nullable=False)
206 current_status = Column(Integer, nullable=False)
207 timestamp = Column(BIGINT(unsigned=True), nullable=False)
208 lat = Column(Float, nullable=False)
209 lon = Column(Float, nullable=False)
210 bearing = Column(Float, nullable=False)
213 # Database URL and block file. Depends on if we're in testing mode, which
214 # is set through the BFR_TEST_MODE env var (which has to be set to either ON or TRUE
215 # (case insensitive) to activate test mode).
216 db_dir='databases'
217 testMode=False
218 test_env = os.getenv('BFR_TEST_MODE', 'OFF')
219 if test_env.lower() == 'on' or test_env.lower() == 'true' :
220 testMode=True
222 if testMode :
223 db_dir='test_databases'
225 db_url="sqlite:///../" + db_dir + "/vehicles/database.db"
226 db_block_file="../" + db_dir + "/vehicles/db_offline.marker"
228 # Connect to the database.
229 engine = create_engine(db_url)
230 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
231 db = SessionLocal()
233 # Set up query but not all columns - only selected ones.
234 query = db.query(vehiclesTable).with_entities(vehiclesTable.route, vehiclesTable.timestamp,
235 vehiclesTable.current_status, vehiclesTable.lat, vehiclesTable.lon, vehiclesTable.bearing)
237 # Add filters.
238 if minLat is not None :
239 query = query.filter(vehiclesTable.lat >= minLat)
241 if minLon is not None :
242 query = query.filter(vehiclesTable.lon >= minLon)
244 if maxLat is not None :
245 query = query.filter(vehiclesTable.lat <= maxLat)
247 if maxLon is not None :
248 query = query.filter(vehiclesTable.lon <= maxLon)
250 # Routes is a comma separated list of routes of interest.
251 # Default is to serve all routes, but if this is specified,
252 # the only serve these routes.
254 if routesCSV is not None :
255 routesCSV = routesCSV.upper() # Convert entered route list to upper case
256 routesCSV = "".join(routesCSV.split()) # Remove whitespaces
257 routes=routesCSV.split(',')
258 if len(routes) > 0 :
259 # This next part is a bit tricky.
260 # If the caller specified routesCSV="bolt, 205"
261 # then routes is now the list [ "BOLT", "205" ].
262 #
263 # The * operator (the unpacking operator)
264 # when used in a function call
265 # takes an iterable (like a list or tuple) and
266 # unpacks its elements as separate positional
267 # arguments, so for example we can :
268 # >>> x=[1,2,3] # A list
269 # >>> print(x) #
270 # [1, 2, 3] # Printed the list
271 # >>> print(*x) #
272 # 1 2 3 # Printed the list as positional parameters
273 #
274 # So we can pass the elements of a list
275 # to the function as positional arguments
276 # (in this case the or_()
277 # function).
278 #
279 # So make the list of filters that we want to throw at or_() :
280 route_filters = [vehiclesTable.route == route for route in routes]
281 # And then use the unpacking operator to throw them all at or_()
282 # as positional arguments :
283 query = query.filter(or_(*route_filters))
285 # Decided against doing this.
286 #query = query.order_by(vehiclesTable.lat)
287 # Did this in case anyone uses the API directly.
288 query = query.limit(1000)
290 waitOnFile(db_block_file)
292 db_results = query.all()
294 db.close()
296 return db_results
303# Trip update end point.
304class tripServiceResponseClass(BaseModel) :
305 """
306 Pydantic class that defines the format of what the trip update end point serves out.
307 """
308 route: str
309 arrivaltime: int
311# Serve out vehicle information.
312@bussinApp.get("/tripService", tags=['trip-service'], response_model=List[tripServiceResponseClass])
313async def get_trips(stopID: str = Query(default=None)):
314 """
315 Returns trip update information for the specified stop ID.
316 """
318 if stopID is None :
319 return []
321 Base = declarative_base()
323 class tripsTable(Base):
324 __tablename__='intrepid_trips'
325 id = Column(Integer, nullable=False, primary_key=True)
326 route = Column(String, nullable=False)
327 schedule_relationship = Column(Integer, nullable=False)
328 arrivaltime = Column(BIGINT(unsigned=True), nullable=False)
329 stopid = Column(String, nullable=False)
332 # Database URL and block file. Depends on if we're in testing mode, which
333 # is set through the BFR_TEST_MODE env var (which has to be set to either ON or TRUE
334 # (case insensitive) to activate test mode).
335 db_dir='databases'
336 testMode=False
337 test_env = os.getenv('BFR_TEST_MODE', 'OFF')
338 if test_env.lower() == 'on' or test_env.lower() == 'true' :
339 testMode=True
341 if testMode :
342 db_dir='test_databases'
344 db_url="sqlite:///../" + db_dir + "/trip_updates/database.db"
345 db_block_file="../" + db_dir + "/trip_updates/db_offline.marker"
347 # Connect to the database.
348 engine = create_engine(db_url)
349 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
350 db = SessionLocal()
352 # Set up query but not all columns - only selected ones.
353 query = db.query(tripsTable).with_entities(tripsTable.route, tripsTable.arrivaltime)
355 # Add filter on stop ID.
356 query = query.filter(tripsTable.stopid == stopID)
358 # Also add a filter so that we only serve out
359 # arrival times that are in the future.
360 # Don't do this in test mode.
361 if not testMode :
362 current_utc_time = datetime.datetime.now(datetime.timezone.utc)
363 current_unix_time = int(current_utc_time.timestamp())
364 query = query.filter(tripsTable.arrivaltime >= current_unix_time)
366 query = query.order_by(tripsTable.arrivaltime)
368 waitOnFile(db_block_file)
370 db_results = query.all()
372 db.close()
374 return db_results
377# Mount for the static HTML/css/javaScript/favicon
378# In the call below :
379#
380# * The first argument ("/") is the URL path where the files will be exposed,
381# so it will pop up as "http://localhost:8000/"
382# * The second argument is the actual directory on this server,
383# and setting html=True will serve out index.html by default.
384# * The third argument is an internal name that can be used for URL
385# generation in templates, often with the url_for function.
386#
387# Note that order matters. FastAPI matches requests *sequentially* so
388# we define this last so that it will try the API end points first.
389# Follow sym links to show test coverage results.
390bussinApp.mount("/", StaticFiles(directory="../webpages", html=True, follow_symlink=True), name="webpages")