RF-AD/Server/Program/database.py

324 lines
8.6 KiB
Python

from datetime import datetime
import sqlite3
from env import DBFILE
# Function to check if a table exists in the database
def table_exists(cursor, table_name):
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,)
)
return cursor.fetchone() is not None
# Function to create the Users table
def create_users_table(cursor):
cursor.execute("""CREATE TABLE Users (
upn TEXT PRIMARY KEY,
rFIDUID TEXT,
MemberOf TEXT,
FOREIGN KEY (MemberOf) REFERENCES Groups(cn)
)""")
# Function to create the Groups table
def create_groups_table(cursor):
cursor.execute("""CREATE TABLE Groups (
cn TEXT PRIMARY KEY
)""")
# Function to create the Doors table
def create_doors_table(cursor):
cursor.execute("""CREATE TABLE Doors (
id INTEGER PRIMARY KEY,
GroupCn TEXT,
FOREIGN KEY (GroupCn) REFERENCES Groups(cn)
)""")
# Function to create the logs table
def create_logs_table(cursor):
"""
Create a log table with columns id, timestamp, user, and granted.
:param db_file: The database file path.
"""
cursor.execute("""
CREATE TABLE IF NOT EXISTS log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT ,
user TEXT ,
rFIDUID TEXT,
door_id INTEGER ,
granted BOOLEAN ,
FOREIGN KEY (door_id) REFERENCES Doors (id)
FOREIGN KEY (user) REFERENCES Users (upn)
FOREIGN KEY (rFIDUID) REFERENCES Users (rFIDUID)
)
""")
# Function to setup the database
def setup_database(db_file):
# Connect to the SQLite database
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# Check and create Users table
if not table_exists(cursor, "Users"):
create_users_table(cursor)
print(f"[{datetime.now()}] Users table created successfully.")
else:
print(f"[{datetime.now()}] Users table already exists.")
# Check and create Groups table
if not table_exists(cursor, "Groups"):
create_groups_table(cursor)
print(f"[{datetime.now()}] Groups table created successfully.")
else:
print(f"[{datetime.now()}] Groups table already exists.")
# Check and create Doors table
if not table_exists(cursor, "Doors"):
create_doors_table(cursor)
print(f"[{datetime.now()}] Doors table created successfully.")
else:
print(f"[{datetime.now()}] Doors table already exists.")
# Check and create Doors table
if not table_exists(cursor, "Log"):
create_logs_table(cursor)
print(f"[{datetime.now()}] Log table created successfully.")
else:
print(f"[{datetime.now()}] Log table already exists.")
# Commit changes and close connection
conn.commit()
conn.close()
def log_access_attempt(db_file, user, rFIDUID, granted, doorID):
"""
Log an access attempt to the log table.
:param db_file: The database file path.
:param user: The user attempting access.
:param rFIDUID: The user's tag uid
:param granted: Whether access was granted (True/False).
:param doorID: The door id
"""
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
print(f"[{datetime.now()}] User {user} get granted : {granted} on door : {doorID}")
cursor.execute(
"""
INSERT INTO log (timestamp, user, rFIDUID, granted, door_id) VALUES (?, ?, ?, ?, ?)
""",
(datetime.now(), user, rFIDUID, granted, doorID),
)
conn.commit()
conn.close()
def print_users_table(cursor):
cursor.execute("SELECT * FROM Users")
rows = cursor.fetchall()
print("Users:")
for row in rows:
print(row)
# Function to print the content of the Groups table
def print_groups_table(cursor):
cursor.execute("SELECT * FROM Groups")
rows = cursor.fetchall()
print("Groups:")
for row in rows:
print(row)
# Function to print the content of the Doors table
def print_doors_table(cursor):
cursor.execute("SELECT * FROM Doors")
rows = cursor.fetchall()
print("Doors:")
for row in rows:
print(row)
# Function to print the content of the Log table
def print_log_table(cursor):
cursor.execute("SELECT * FROM log")
rows = cursor.fetchall()
print("Logs:")
for row in rows:
print(row)
# Function to print the content of the entire database
def print_database_content(db_file):
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
print_users_table(cursor)
print_groups_table(cursor)
print_doors_table(cursor)
# print_log_table(cursor)
conn.close()
def get_logs():
"""
Fetch all logs from the log table in the database.
:return: List of log records.
"""
conn = sqlite3.connect(DBFILE)
cursor = conn.cursor()
cursor.execute("""
SELECT timestamp, user, rFIDUID, granted, door_id
FROM log
ORDER BY id DESC
""")
logs = cursor.fetchall()
conn.close()
return logs
def get_latest_logs(db_file, limit=10):
"""
Fetch the latest logs from the database.
:param limit: The number of latest logs to fetch.
:return: List of log entries.
"""
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute(
"""
SELECT timestamp, user, rFIDUID, granted, door_id
FROM log
ORDER BY id DESC
LIMIT ?
""",
(limit,),
)
logs = cursor.fetchall()
conn.close()
return logs
# Function to fetch list of existing groups from the database
def get_existing_groups(db_file):
try:
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute("SELECT cn FROM Groups")
groups = cursor.fetchall()
conn.close()
return [group[0] for group in groups]
except sqlite3.Error as e:
print(f"SQLite Error: {e}")
return []
def delete_group_from_database(group_cn):
conn = sqlite3.connect(DBFILE)
cursor = conn.cursor()
cursor.execute("DELETE FROM Groups WHERE cn = ?", (group_cn,))
conn.commit()
conn.close()
def get_doors():
conn = sqlite3.connect(DBFILE)
cursor = conn.cursor()
cursor.execute("SELECT * FROM Doors")
doors = cursor.fetchall()
conn.close()
return doors
def get_users():
"""
Fetch all users from the Users table in the database.
:return: List of user records.
"""
conn = sqlite3.connect(DBFILE)
cursor = conn.cursor()
cursor.execute("SELECT upn, rFIDUID, MemberOf FROM Users")
users = cursor.fetchall()
conn.close()
return users
# Function to add a door to the database
def add_door_to_database(db_file, group_cn, Door_id):
try:
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO Doors (id, GroupCn) VALUES (?,?)",
(
Door_id,
group_cn,
),
)
conn.commit()
conn.close()
# print_database_content(DBFILE)
return True
except sqlite3.Error as e:
# print_database_content(DBFILE)
print(f"SQLite Error: {e}")
return (False, e)
# Function to verify if the user is allowed to open the door
def check_access(rfid_uid_str, door_id):
try:
conn = sqlite3.connect(DBFILE) # Update with your database file path
cursor = conn.cursor()
# Convert the received RFID UID string to bytes
rfid_uid_bytes = rfid_uid_str.encode("utf-8")
# Get the user's UPN and group memberships based on the RFID UID
cursor.execute(
"SELECT upn, MemberOf FROM Users WHERE rFIDUID = ?", (rfid_uid_bytes,)
)
user_data = cursor.fetchone()
if user_data is None:
return False, None # User not found
upn_bytes, user_groups = user_data
# Decode the UPN bytes to string
upn = upn_bytes.decode("utf-8")
# Get the group associated with the door
cursor.execute("SELECT GroupCn FROM Doors WHERE id = ?", (door_id,))
door_group = cursor.fetchone()
if door_group is None:
return False, None # Door not found
door_group = door_group[0]
# Check if the user's group is allowed to open the door
if door_group in user_groups.split(","):
return True, upn # Access granted
else:
return False, None # Access denied
except sqlite3.Error as e:
print(f"SQLite Error: {e}")
return False, None