Files
rfid/app.py
2026-05-14 17:17:11 +00:00

364 lines
8.8 KiB
Python

from flask import Flask, request, jsonify, render_template_string, redirect, render_template
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timezone, timedelta
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///rfid.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
# --------------------------------------------------
# MODELS
# --------------------------------------------------
class Student(db.Model):
id = db.Column(db.Integer, primary_key=True)
uid = db.Column(db.String(64), unique=True)
name = db.Column(db.String(128))
current_room = db.Column(db.String(64))
previous_room = db.Column(db.String(64))
expected_return = db.Column(db.String(64))
expected_room = db.Column(db.String(64))
state = db.Column(db.String(32), default="OUT")
last_scan = db.Column(db.DateTime)
last_reader = db.Column(db.String(64))
class Event(db.Model):
id = db.Column(db.Integer, primary_key=True)
uid = db.Column(db.String(64))
room = db.Column(db.String(64))
event_type = db.Column(db.String(64))
timestamp = db.Column(db.DateTime)
class Room(db.Model):
id = db.Column(db.Integer, primary_key=True)
room = db.Column(db.String(64), unique=True)
count = db.Column(db.Integer, default=0)
max = db.Column(db.Integer, default=2)
bathroom = db.Column(db.Boolean, default=True)
bathroom_id = db.Column(db.String)
class Anomaly(db.Model):
id = db.Column(db.Integer, primary_key=True)
uid = db.Column(db.String(64))
type = db.Column(db.String(64))
timestamp = db.Column(db.DateTime)
# --------------------------------------------------
# UTIL
# --------------------------------------------------
COOLDOWN = 5
def now():
return datetime.now()
def recent_scan(student):
if not student.last_scan:
return False
return (now() - student.last_scan).total_seconds() < COOLDOWN
# --------------------------------------------------
# MOVEMENT ENGINE
# --------------------------------------------------
def process_scan(student, room):
timestamp = now()
# duplicate suppression
if recent_scan(student) and student.last_reader == room:
return None
student.last_scan = timestamp
student.last_reader = room
event_type = "move"
# bathroom logic
if room.startswith("bathroom"):
if student.state != "IN_BATHROOM":
student.previous_room = student.current_room
student.current_room = room
student.state = "IN_BATHROOM"
student.expected_return = student.previous_room
event_type = "bathroom_enter"
else:
student.current_room = "hallway"
student.state = "IN_HALLWAY"
event_type = "bathroom_exit"
else:
# returning from bathroom
if student.state == "IN_HALLWAY" and student.expected_return == room:
student.expected_return = None
student.previous_room = student.current_room
student.current_room = room
student.state = "IN_ROOM"
db.session.add(Event(
uid=student.uid,
room=room,
event_type=event_type,
timestamp=timestamp
))
# anomaly detection
if student.expected_return and room != student.expected_return and student.state == "IN_HALLWAY":
db.session.add(Anomaly(
uid=student.uid,
type="WRONG_RETURN",
timestamp=timestamp
))
db.session.commit()
# --------------------------------------------------
# ROUTES
# --------------------------------------------------
@app.route("/scan", methods=["POST"])
def scan():
data = request.json
action = data["action"]
print(action)
student = Student.query.filter_by(uid=data["uid"]).first()
if not student:
student = Student(uid=data["uid"], name="Unknown")
db.session.add(student)
else:
student = Student.query.filter_by(uid=data["uid"]).first()
location = data["location_id"]
room = Room.query.filter_by(room=location).first()
student.last_reader = location
student.last_scan = now()
if action == "":
if student.state == "out" and "door" in location: # Entering school
student.state = "hallway"
if student.state == "hallway" and "door" in location: # Leaving school
student.state = "out"
if student.state == "hallway" and "classroom" in location: # Entering a classroom
student.current_room = location
student.state = "in class"
room.count = room.count + 1
if student.state == "in class" and "classroom" in location: # leaving a classroom should also have time check when schedules are implemtned if time is wrong do an anomaly
student.state = "hallway"
room.count = room.count - 1
# anomaly
elif action == "bathroom" :
room.bathroom = False
student.state = "hallway"
student.expected_room = room.bathroom_id
student.expected_return = location
db.session.commit()
return jsonify({"status": "ok"})
@app.route("/lights/bathroom/<id>", methods=["GET"])
def lightsBathroom(id):
room = Room.query.filter_by(room=id).first()
bathroom = Room.query.filter_by(room=room.bathroom_id).first()
if not room:
room = Room(room=id)
db.session.add(room)
db.session.commit()
if (room.bathroom and bathroom.count < bathroom.max):
code = 202
else:
code = 200
return jsonify({"status": "ok"}), code
# --------------------------------------------------
# ADMIN DASHBOARD
# --------------------------------------------------
@app.route("/admin")
def admin():
return render_template("admin/index.html")
@app.route("/admin/students")
def admin_students():
students = Student.query.all()
return render_template(
"admin/students.html",
students=students
)
@app.route("/admin/unknown")
def admin_unknown():
students = Student.query.all()
return render_template(
"admin/unknown_cards.html",
students=students
)
@app.route("/admin/rooms")
def admin_rooms():
rooms = Room.query.all()
return render_template(
"admin/rooms.html",
rooms=rooms
)
@app.route("/admin/anomalies")
def admin_anomalies():
anomalies = Anomaly.query.order_by(
Anomaly.timestamp.desc()
).all()
return render_template(
"admin/anomalies.html",
anomalies=anomalies
)
@app.route("/admin/student/assign", methods=["POST"])
def assign_student():
uid = request.form["uid"]
name = request.form["name"]
student = Student.query.filter_by(
uid=uid
).first()
student.name = name
db.session.commit()
return admin_unknown()
@app.route("/admin/student/merge", methods=["POST"])
def merge_student():
old_uid = request.form["old_uid"]
new_uid = request.form["new_uid"]
old_student = Student.query.filter_by(
uid=old_uid
).first()
new_student = Student.query.filter_by(
uid=new_uid
).first()
if not old_student or not new_student:
return "Missing student", 404
# move all events
events = Event.query.filter_by(
uid=new_uid
).all()
for e in events:
e.uid = old_uid
db.session.delete(new_student)
db.session.commit()
return admin_unknown()
@app.route(
"/admin/room/update/<int:id>",
methods=["POST"]
)
def update_room(id):
room = Room.query.get(id)
room.max = int(
request.form["max"]
)
room.bathroom_id = request.form[
"bathroom_id"
]
room.tracks_bathroom = (
"tracks_bathroom"
in request.form
)
db.session.commit()
return admin_rooms()
# --------------------------------------------------
# TEACHER DASHBOARD
# --------------------------------------------------
TEACHER_HTML = """
<!doctype html>
<html>
<head>
<script src="https://unpkg.com/htmx.org"></script>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-blue-950 text-white p-6">
<h1 class="text-2xl font-bold">Teacher Dashboard</h1>
<div hx-get="/teacher/class" hx-trigger="load, every 3s" hx-swap="innerHTML"></div>
</body>
</html>
"""
@app.route("/teacher")
def teacher():
return TEACHER_HTML
@app.route("/teacher/class")
def teacher_class():
students = Student.query.all()
html = ""
for s in students:
html += f"""
<div class='bg-blue-900 p-3 m-2 rounded'>
{s.name} - {s.current_room} - {s.state}
</div>
"""
return html
# --------------------------------------------------
# INIT
# --------------------------------------------------
if __name__ == "__main__":
with app.app_context():
db.create_all()
app.run(host="0.0.0.0", port=5000, debug=True)