Merge branch 'main' of https://github.com/jeanGaston/RF-AD
This commit is contained in:
commit
e66fa709aa
163
.gitignore
vendored
163
.gitignore
vendored
@ -1,2 +1,163 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
Server/.env
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
|
||||
.pdm.toml
|
||||
.pdm-python
|
||||
.pdm-build/
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
Server/Program/env.py
|
||||
|
||||
@ -7,10 +7,10 @@ from mfrc522 import MFRC522
|
||||
from ssd1306 import SSD1306_I2C
|
||||
|
||||
# Global variables
|
||||
DOOR_ID = '[Your door ID]'
|
||||
WLAN_SSID = '[Your SSID]'
|
||||
WLAN_PASS = '[Your password]'
|
||||
SERVER_IP = '[Your server IP]'
|
||||
DOOR_ID = "[Your door ID]"
|
||||
WLAN_SSID = "[Your SSID]"
|
||||
WLAN_PASS = "[Your password]"
|
||||
SERVER_IP = "[Your server IP]"
|
||||
SERVER_PORT = 5000
|
||||
|
||||
# Initialize RFID reader
|
||||
@ -31,23 +31,26 @@ redled.on()
|
||||
time.sleep(0.5)
|
||||
redled.off()
|
||||
|
||||
|
||||
def init_oled():
|
||||
global oled
|
||||
try:
|
||||
oled = SSD1306_I2C(128, 64, i2c)
|
||||
oled.fill(0)
|
||||
oled.text('Initializing...', 0, 0)
|
||||
oled.text("Initializing...", 0, 0)
|
||||
oled.show()
|
||||
except Exception as e:
|
||||
print("display error:", e)
|
||||
#init_oled()
|
||||
# init_oled()
|
||||
|
||||
|
||||
def display_message(message, ip_address):
|
||||
try:
|
||||
oled.fill(0)
|
||||
oled.text(f'Door ID: {DOOR_ID}', 0, 0) # Display Door ID at the top
|
||||
oled.text(f"Door ID: {DOOR_ID}", 0, 0) # Display Door ID at the top
|
||||
oled.text("___________________", 0, 3)
|
||||
|
||||
lines = message.split('\n')
|
||||
lines = message.split("\n")
|
||||
for i, line in enumerate(lines):
|
||||
oled.text(line, 0, 20 + i * 10) # Adjust the y position for each line
|
||||
oled.text("__________________", 0, 47)
|
||||
@ -58,7 +61,8 @@ def display_message(message, ip_address):
|
||||
redled.off()
|
||||
print("display error:", e)
|
||||
init_oled()
|
||||
|
||||
|
||||
|
||||
# Connect to WiFi
|
||||
def connect_wifi(ssid, password):
|
||||
wlan = network.WLAN(network.STA_IF)
|
||||
@ -69,28 +73,30 @@ def connect_wifi(ssid, password):
|
||||
print("Connecting to WiFi...")
|
||||
ip_address = wlan.ifconfig()[0]
|
||||
print("Connected to WiFi:", ip_address)
|
||||
display_message('WiFi Connected', ip_address)
|
||||
display_message("WiFi Connected", ip_address)
|
||||
|
||||
# Test connection to the server
|
||||
|
||||
|
||||
try:
|
||||
response = requests.get(f"http://{SERVER_IP}:{SERVER_PORT}/")
|
||||
if response.status_code == 200:
|
||||
print("Server connection successful")
|
||||
display_message('Server Connected', ip_address)
|
||||
display_message("Server Connected", ip_address)
|
||||
else:
|
||||
print("Server connection failed")
|
||||
display_message('Server Fail', ip_address)
|
||||
display_message("Server Fail", ip_address)
|
||||
time.sleep(5)
|
||||
while response.status_code != 200 :
|
||||
while response.status_code != 200:
|
||||
wlan.connect(ssid, password)
|
||||
response = requests.get(f"http://{SERVER_IP}:{SERVER_PORT}/")
|
||||
display_message('Reconnecting ...', ip_address)
|
||||
display_message("Reconnecting ...", ip_address)
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
print("Server connection error:", e)
|
||||
display_message(f'Server Error \n {e}', ip_address)
|
||||
display_message(f"Server Error \n {e}", ip_address)
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
# while response.status_code != 200 :
|
||||
# wlan.connect(ssid, password)
|
||||
# try :
|
||||
@ -102,14 +108,12 @@ def connect_wifi(ssid, password):
|
||||
# Function to send RFID UID to the server
|
||||
def send_rfid_to_server(rfid_uid):
|
||||
url = f"http://{SERVER_IP}:{SERVER_PORT}/access"
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
data = {
|
||||
'rfid_uid': rfid_uid,
|
||||
'door_id': DOOR_ID
|
||||
}
|
||||
headers = {"Content-Type": "application/json"}
|
||||
data = {"rfid_uid": rfid_uid, "door_id": DOOR_ID}
|
||||
response = requests.post(url, headers=headers, data=json.dumps(data))
|
||||
return response.json()
|
||||
|
||||
|
||||
# Main loop to scan RFID tags
|
||||
def main():
|
||||
# Retry mechanism for OLED initialization
|
||||
@ -120,39 +124,39 @@ def main():
|
||||
except Exception as e:
|
||||
print("OLED init error:", e)
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
connect_wifi(WLAN_SSID, WLAN_PASS)
|
||||
ip_address = network.WLAN(network.STA_IF).ifconfig()[0]
|
||||
display_message('Scan your tag', ip_address)
|
||||
display_message("Scan your tag", ip_address)
|
||||
|
||||
while True:
|
||||
|
||||
(status, tag_type) = reader.request(reader.REQIDL)
|
||||
if status == reader.OK:
|
||||
(status, uid) = reader.SelectTagSN()
|
||||
if status == reader.OK:
|
||||
rfid_uid_decimal = ''.join([str(i) for i in uid])
|
||||
rfid_uid_decimal = "".join([str(i) for i in uid])
|
||||
print("RFID UID:", rfid_uid_decimal)
|
||||
display_message('Checking...', ip_address)
|
||||
|
||||
display_message("Checking...", ip_address)
|
||||
|
||||
response = send_rfid_to_server(rfid_uid_decimal)
|
||||
|
||||
if response.get('access_granted'):
|
||||
user_upn = response.get('upn')
|
||||
|
||||
if response.get("access_granted"):
|
||||
user_upn = response.get("upn")
|
||||
print("Access Granted:", user_upn)
|
||||
display_message(f'Access Granted\n{user_upn}', ip_address)
|
||||
display_message(f"Access Granted\n{user_upn}", ip_address)
|
||||
# Turn on the LED to indicate door open
|
||||
greenled.on()
|
||||
# Add code here to open the door (e.g., trigger a relay)
|
||||
else:
|
||||
print("Access Denied")
|
||||
display_message('Access Denied', ip_address)
|
||||
display_message("Access Denied", ip_address)
|
||||
redled.on()
|
||||
|
||||
time.sleep(2) # Delay to avoid rapid repeated reads
|
||||
greenled.off()
|
||||
redled.off() # Turn off the LED
|
||||
display_message('Scan your tag', ip_address)
|
||||
display_message("Scan your tag", ip_address)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@ -1,111 +1,116 @@
|
||||
from machine import Pin, SPI
|
||||
from os import uname
|
||||
|
||||
|
||||
|
||||
|
||||
class MFRC522:
|
||||
|
||||
DEBUG = False
|
||||
OK = 0
|
||||
NOTAGERR = 1
|
||||
ERR = 2
|
||||
|
||||
|
||||
REQIDL = 0x26
|
||||
REQALL = 0x52
|
||||
AUTHENT1A = 0x60
|
||||
AUTHENT1B = 0x61
|
||||
|
||||
|
||||
PICC_ANTICOLL1 = 0x93
|
||||
PICC_ANTICOLL2 = 0x95
|
||||
PICC_ANTICOLL3 = 0x97
|
||||
|
||||
|
||||
def __init__(self, sck, mosi, miso, rst, cs,baudrate=1000000,spi_id=0):
|
||||
|
||||
|
||||
def __init__(self, sck, mosi, miso, rst, cs, baudrate=1000000, spi_id=0):
|
||||
self.sck = Pin(sck, Pin.OUT)
|
||||
self.mosi = Pin(mosi, Pin.OUT)
|
||||
self.miso = Pin(miso)
|
||||
self.rst = Pin(rst, Pin.OUT)
|
||||
self.cs = Pin(cs, Pin.OUT)
|
||||
|
||||
|
||||
self.rst.value(0)
|
||||
self.cs.value(1)
|
||||
|
||||
|
||||
board = uname()[0]
|
||||
|
||||
if board == 'WiPy' or board == 'LoPy' or board == 'FiPy':
|
||||
|
||||
if board == "WiPy" or board == "LoPy" or board == "FiPy":
|
||||
self.spi = SPI(0)
|
||||
self.spi.init(SPI.MASTER, baudrate=1000000, pins=(self.sck, self.mosi, self.miso))
|
||||
elif (board == 'esp8266') or (board == 'esp32'):
|
||||
self.spi = SPI(baudrate=100000, polarity=0, phase=0, sck=self.sck, mosi=self.mosi, miso=self.miso)
|
||||
self.spi.init(
|
||||
SPI.MASTER, baudrate=1000000, pins=(self.sck, self.mosi, self.miso)
|
||||
)
|
||||
elif (board == "esp8266") or (board == "esp32"):
|
||||
self.spi = SPI(
|
||||
baudrate=100000,
|
||||
polarity=0,
|
||||
phase=0,
|
||||
sck=self.sck,
|
||||
mosi=self.mosi,
|
||||
miso=self.miso,
|
||||
)
|
||||
self.spi.init()
|
||||
elif board == 'rp2':
|
||||
self.spi = SPI(spi_id,baudrate=baudrate,sck=self.sck, mosi= self.mosi, miso= self.miso)
|
||||
elif board == "rp2":
|
||||
self.spi = SPI(
|
||||
spi_id, baudrate=baudrate, sck=self.sck, mosi=self.mosi, miso=self.miso
|
||||
)
|
||||
else:
|
||||
raise RuntimeError("Unsupported platform")
|
||||
|
||||
|
||||
self.rst.value(1)
|
||||
self.init()
|
||||
|
||||
|
||||
def _wreg(self, reg, val):
|
||||
|
||||
self.cs.value(0)
|
||||
self.spi.write(b'%c' % int(0xff & ((reg << 1) & 0x7e)))
|
||||
self.spi.write(b'%c' % int(0xff & val))
|
||||
self.spi.write(b"%c" % int(0xFF & ((reg << 1) & 0x7E)))
|
||||
self.spi.write(b"%c" % int(0xFF & val))
|
||||
self.cs.value(1)
|
||||
|
||||
|
||||
def _rreg(self, reg):
|
||||
|
||||
self.cs.value(0)
|
||||
self.spi.write(b'%c' % int(0xff & (((reg << 1) & 0x7e) | 0x80)))
|
||||
self.spi.write(b"%c" % int(0xFF & (((reg << 1) & 0x7E) | 0x80)))
|
||||
val = self.spi.read(1)
|
||||
self.cs.value(1)
|
||||
|
||||
|
||||
return val[0]
|
||||
|
||||
|
||||
def _sflags(self, reg, mask):
|
||||
self._wreg(reg, self._rreg(reg) | mask)
|
||||
|
||||
|
||||
def _cflags(self, reg, mask):
|
||||
self._wreg(reg, self._rreg(reg) & (~mask))
|
||||
|
||||
|
||||
def _tocard(self, cmd, send):
|
||||
|
||||
recv = []
|
||||
bits = irq_en = wait_irq = n = 0
|
||||
stat = self.ERR
|
||||
|
||||
|
||||
if cmd == 0x0E:
|
||||
irq_en = 0x12
|
||||
wait_irq = 0x10
|
||||
elif cmd == 0x0C:
|
||||
irq_en = 0x77
|
||||
wait_irq = 0x30
|
||||
|
||||
|
||||
self._wreg(0x02, irq_en | 0x80)
|
||||
self._cflags(0x04, 0x80)
|
||||
self._sflags(0x0A, 0x80)
|
||||
self._wreg(0x01, 0x00)
|
||||
|
||||
|
||||
for c in send:
|
||||
self._wreg(0x09, c)
|
||||
self._wreg(0x01, cmd)
|
||||
|
||||
|
||||
if cmd == 0x0C:
|
||||
self._sflags(0x0D, 0x80)
|
||||
|
||||
|
||||
i = 2000
|
||||
while True:
|
||||
n = self._rreg(0x04)
|
||||
i -= 1
|
||||
if ~((i != 0) and ~(n & 0x01) and ~(n & wait_irq)):
|
||||
break
|
||||
|
||||
|
||||
self._cflags(0x0D, 0x80)
|
||||
|
||||
|
||||
if i:
|
||||
if (self._rreg(0x06) & 0x1B) == 0x00:
|
||||
stat = self.OK
|
||||
|
||||
|
||||
if n & irq_en & 0x01:
|
||||
stat = self.NOTAGERR
|
||||
elif cmd == 0x0C:
|
||||
@ -115,40 +120,38 @@ class MFRC522:
|
||||
bits = (n - 1) * 8 + lbits
|
||||
else:
|
||||
bits = n * 8
|
||||
|
||||
|
||||
if n == 0:
|
||||
n = 1
|
||||
elif n > 16:
|
||||
n = 16
|
||||
|
||||
|
||||
for _ in range(n):
|
||||
recv.append(self._rreg(0x09))
|
||||
else:
|
||||
stat = self.ERR
|
||||
|
||||
|
||||
return stat, recv, bits
|
||||
|
||||
|
||||
def _crc(self, data):
|
||||
|
||||
self._cflags(0x05, 0x04)
|
||||
self._sflags(0x0A, 0x80)
|
||||
|
||||
|
||||
for c in data:
|
||||
self._wreg(0x09, c)
|
||||
|
||||
|
||||
self._wreg(0x01, 0x03)
|
||||
|
||||
|
||||
i = 0xFF
|
||||
while True:
|
||||
n = self._rreg(0x05)
|
||||
i -= 1
|
||||
if not ((i != 0) and not (n & 0x04)):
|
||||
break
|
||||
|
||||
|
||||
return [self._rreg(0x22), self._rreg(0x21)]
|
||||
|
||||
|
||||
def init(self):
|
||||
|
||||
self.reset()
|
||||
self._wreg(0x2A, 0x8D)
|
||||
self._wreg(0x2B, 0x3E)
|
||||
@ -157,35 +160,32 @@ class MFRC522:
|
||||
self._wreg(0x15, 0x40)
|
||||
self._wreg(0x11, 0x3D)
|
||||
self.antenna_on()
|
||||
|
||||
|
||||
def reset(self):
|
||||
self._wreg(0x01, 0x0F)
|
||||
|
||||
|
||||
def antenna_on(self, on=True):
|
||||
|
||||
if on and ~(self._rreg(0x14) & 0x03):
|
||||
self._sflags(0x14, 0x03)
|
||||
else:
|
||||
self._cflags(0x14, 0x03)
|
||||
|
||||
|
||||
def request(self, mode):
|
||||
|
||||
self._wreg(0x0D, 0x07)
|
||||
(stat, recv, bits) = self._tocard(0x0C, [mode])
|
||||
|
||||
|
||||
if (stat != self.OK) | (bits != 0x10):
|
||||
stat = self.ERR
|
||||
|
||||
|
||||
return stat, bits
|
||||
|
||||
def anticoll(self,anticolN):
|
||||
|
||||
|
||||
def anticoll(self, anticolN):
|
||||
ser_chk = 0
|
||||
ser = [anticolN, 0x20]
|
||||
|
||||
|
||||
self._wreg(0x0D, 0x00)
|
||||
(stat, recv, bits) = self._tocard(0x0C, ser)
|
||||
|
||||
|
||||
if stat == self.OK:
|
||||
if len(recv) == 5:
|
||||
for i in range(4):
|
||||
@ -194,134 +194,128 @@ class MFRC522:
|
||||
stat = self.ERR
|
||||
else:
|
||||
stat = self.ERR
|
||||
|
||||
|
||||
return stat, recv
|
||||
|
||||
|
||||
def PcdSelect(self, serNum,anticolN):
|
||||
|
||||
def PcdSelect(self, serNum, anticolN):
|
||||
backData = []
|
||||
buf = []
|
||||
buf.append(anticolN)
|
||||
buf.append(0x70)
|
||||
#i = 0
|
||||
# i = 0
|
||||
###xorsum=0;
|
||||
for i in serNum:
|
||||
buf.append(i)
|
||||
#while i<5:
|
||||
# while i<5:
|
||||
# buf.append(serNum[i])
|
||||
# i = i + 1
|
||||
pOut = self._crc(buf)
|
||||
buf.append(pOut[0])
|
||||
buf.append(pOut[1])
|
||||
(status, backData, backLen) = self._tocard( 0x0C, buf)
|
||||
(status, backData, backLen) = self._tocard(0x0C, buf)
|
||||
if (status == self.OK) and (backLen == 0x18):
|
||||
return 1
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
def SelectTag(self, uid):
|
||||
byte5 = 0
|
||||
|
||||
#(status,puid)= self.anticoll(self.PICC_ANTICOLL1)
|
||||
#print("uid",uid,"puid",puid)
|
||||
|
||||
# (status,puid)= self.anticoll(self.PICC_ANTICOLL1)
|
||||
# print("uid",uid,"puid",puid)
|
||||
for i in uid:
|
||||
byte5 = byte5 ^ i
|
||||
puid = uid + [byte5]
|
||||
|
||||
if self.PcdSelect(puid,self.PICC_ANTICOLL1) == 0:
|
||||
return (self.ERR,[])
|
||||
return (self.OK , uid)
|
||||
|
||||
def tohexstring(self,v):
|
||||
s="["
|
||||
|
||||
if self.PcdSelect(puid, self.PICC_ANTICOLL1) == 0:
|
||||
return (self.ERR, [])
|
||||
return (self.OK, uid)
|
||||
|
||||
def tohexstring(self, v):
|
||||
s = "["
|
||||
for i in v:
|
||||
if i != v[0]:
|
||||
s = s+ ", "
|
||||
s=s+ "0x{:02X}".format(i)
|
||||
s= s+ "]"
|
||||
s = s + ", "
|
||||
s = s + "0x{:02X}".format(i)
|
||||
s = s + "]"
|
||||
return s
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def SelectTagSN(self):
|
||||
valid_uid=[]
|
||||
(status,uid)= self.anticoll(self.PICC_ANTICOLL1)
|
||||
#print("Select Tag 1:",self.tohexstring(uid))
|
||||
valid_uid = []
|
||||
(status, uid) = self.anticoll(self.PICC_ANTICOLL1)
|
||||
# print("Select Tag 1:",self.tohexstring(uid))
|
||||
if status != self.OK:
|
||||
return (self.ERR,[])
|
||||
|
||||
if self.DEBUG: print("anticol(1) {}".format(uid))
|
||||
if self.PcdSelect(uid,self.PICC_ANTICOLL1) == 0:
|
||||
return (self.ERR,[])
|
||||
if self.DEBUG: print("pcdSelect(1) {}".format(uid))
|
||||
|
||||
#check if first byte is 0x88
|
||||
if uid[0] == 0x88 :
|
||||
#ok we have another type of card
|
||||
return (self.ERR, [])
|
||||
|
||||
if self.DEBUG:
|
||||
print("anticol(1) {}".format(uid))
|
||||
if self.PcdSelect(uid, self.PICC_ANTICOLL1) == 0:
|
||||
return (self.ERR, [])
|
||||
if self.DEBUG:
|
||||
print("pcdSelect(1) {}".format(uid))
|
||||
|
||||
# check if first byte is 0x88
|
||||
if uid[0] == 0x88:
|
||||
# ok we have another type of card
|
||||
valid_uid.extend(uid[1:4])
|
||||
(status,uid)=self.anticoll(self.PICC_ANTICOLL2)
|
||||
#print("Select Tag 2:",self.tohexstring(uid))
|
||||
(status, uid) = self.anticoll(self.PICC_ANTICOLL2)
|
||||
# print("Select Tag 2:",self.tohexstring(uid))
|
||||
if status != self.OK:
|
||||
return (self.ERR,[])
|
||||
if self.DEBUG: print("Anticol(2) {}".format(uid))
|
||||
rtn = self.PcdSelect(uid,self.PICC_ANTICOLL2)
|
||||
if self.DEBUG: print("pcdSelect(2) return={} uid={}".format(rtn,uid))
|
||||
return (self.ERR, [])
|
||||
if self.DEBUG:
|
||||
print("Anticol(2) {}".format(uid))
|
||||
rtn = self.PcdSelect(uid, self.PICC_ANTICOLL2)
|
||||
if self.DEBUG:
|
||||
print("pcdSelect(2) return={} uid={}".format(rtn, uid))
|
||||
if rtn == 0:
|
||||
return (self.ERR,[])
|
||||
if self.DEBUG: print("PcdSelect2() {}".format(uid))
|
||||
#now check again if uid[0] is 0x88
|
||||
if uid[0] == 0x88 :
|
||||
return (self.ERR, [])
|
||||
if self.DEBUG:
|
||||
print("PcdSelect2() {}".format(uid))
|
||||
# now check again if uid[0] is 0x88
|
||||
if uid[0] == 0x88:
|
||||
valid_uid.extend(uid[1:4])
|
||||
(status , uid) = self.anticoll(self.PICC_ANTICOLL3)
|
||||
#print("Select Tag 3:",self.tohexstring(uid))
|
||||
(status, uid) = self.anticoll(self.PICC_ANTICOLL3)
|
||||
# print("Select Tag 3:",self.tohexstring(uid))
|
||||
if status != self.OK:
|
||||
return (self.ERR,[])
|
||||
if self.DEBUG: print("Anticol(3) {}".format(uid))
|
||||
if self.MFRC522_PcdSelect(uid,self.PICC_ANTICOLL3) == 0:
|
||||
return (self.ERR,[])
|
||||
if self.DEBUG: print("PcdSelect(3) {}".format(uid))
|
||||
return (self.ERR, [])
|
||||
if self.DEBUG:
|
||||
print("Anticol(3) {}".format(uid))
|
||||
if self.MFRC522_PcdSelect(uid, self.PICC_ANTICOLL3) == 0:
|
||||
return (self.ERR, [])
|
||||
if self.DEBUG:
|
||||
print("PcdSelect(3) {}".format(uid))
|
||||
valid_uid.extend(uid[0:5])
|
||||
# if we are here than the uid is ok
|
||||
# let's remove the last BYTE whic is the XOR sum
|
||||
|
||||
return (self.OK , valid_uid[:len(valid_uid)-1])
|
||||
#return (self.OK , valid_uid)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
return (self.OK, valid_uid[: len(valid_uid) - 1])
|
||||
# return (self.OK , valid_uid)
|
||||
|
||||
def auth(self, mode, addr, sect, ser):
|
||||
return self._tocard(0x0E, [mode, addr] + sect + ser[:4])[0]
|
||||
|
||||
def authKeys(self,uid,addr,keyA=None, keyB=None):
|
||||
|
||||
def authKeys(self, uid, addr, keyA=None, keyB=None):
|
||||
status = self.ERR
|
||||
if keyA is not None:
|
||||
status = self.auth(self.AUTHENT1A, addr, keyA, uid)
|
||||
elif keyB is not None:
|
||||
status = self.auth(self.AUTHENT1B, addr, keyB, uid)
|
||||
return status
|
||||
|
||||
|
||||
|
||||
def stop_crypto1(self):
|
||||
self._cflags(0x08, 0x08)
|
||||
|
||||
|
||||
def read(self, addr):
|
||||
|
||||
data = [0x30, addr]
|
||||
data += self._crc(data)
|
||||
(stat, recv, _) = self._tocard(0x0C, data)
|
||||
return stat, recv
|
||||
|
||||
|
||||
def write(self, addr, data):
|
||||
|
||||
buf = [0xA0, addr]
|
||||
buf += self._crc(buf)
|
||||
(stat, recv, bits) = self._tocard(0x0C, buf)
|
||||
|
||||
|
||||
if not (stat == self.OK) or not (bits == 4) or not ((recv[0] & 0x0F) == 0x0A):
|
||||
stat = self.ERR
|
||||
else:
|
||||
@ -330,51 +324,59 @@ class MFRC522:
|
||||
buf.append(data[i])
|
||||
buf += self._crc(buf)
|
||||
(stat, recv, bits) = self._tocard(0x0C, buf)
|
||||
if not (stat == self.OK) or not (bits == 4) or not ((recv[0] & 0x0F) == 0x0A):
|
||||
if (
|
||||
not (stat == self.OK)
|
||||
or not (bits == 4)
|
||||
or not ((recv[0] & 0x0F) == 0x0A)
|
||||
):
|
||||
stat = self.ERR
|
||||
return stat
|
||||
|
||||
|
||||
def writeSectorBlock(self,uid, sector, block, data, keyA=None, keyB = None):
|
||||
absoluteBlock = sector * 4 + (block % 4)
|
||||
if absoluteBlock > 63 :
|
||||
|
||||
def writeSectorBlock(self, uid, sector, block, data, keyA=None, keyB=None):
|
||||
absoluteBlock = sector * 4 + (block % 4)
|
||||
if absoluteBlock > 63:
|
||||
return self.ERR
|
||||
if len(data) != 16:
|
||||
return self.ERR
|
||||
if self.authKeys(uid,absoluteBlock,keyA,keyB) != self.ERR :
|
||||
if self.authKeys(uid, absoluteBlock, keyA, keyB) != self.ERR:
|
||||
return self.write(absoluteBlock, data)
|
||||
return self.ERR
|
||||
|
||||
def readSectorBlock(self,uid ,sector, block, keyA=None, keyB = None):
|
||||
absoluteBlock = sector * 4 + (block % 4)
|
||||
if absoluteBlock > 63 :
|
||||
|
||||
def readSectorBlock(self, uid, sector, block, keyA=None, keyB=None):
|
||||
absoluteBlock = sector * 4 + (block % 4)
|
||||
if absoluteBlock > 63:
|
||||
return self.ERR, None
|
||||
if self.authKeys(uid,absoluteBlock,keyA,keyB) != self.ERR :
|
||||
if self.authKeys(uid, absoluteBlock, keyA, keyB) != self.ERR:
|
||||
return self.read(absoluteBlock)
|
||||
return self.ERR, None
|
||||
|
||||
def MFRC522_DumpClassic1K(self,uid, Start=0, End=64, keyA=None, keyB=None):
|
||||
for absoluteBlock in range(Start,End):
|
||||
status = self.authKeys(uid,absoluteBlock,keyA,keyB)
|
||||
|
||||
def MFRC522_DumpClassic1K(self, uid, Start=0, End=64, keyA=None, keyB=None):
|
||||
for absoluteBlock in range(Start, End):
|
||||
status = self.authKeys(uid, absoluteBlock, keyA, keyB)
|
||||
# Check if authenticated
|
||||
print("{:02d} S{:02d} B{:1d}: ".format(absoluteBlock, absoluteBlock//4 , absoluteBlock % 4),end="")
|
||||
if status == self.OK:
|
||||
print(
|
||||
"{:02d} S{:02d} B{:1d}: ".format(
|
||||
absoluteBlock, absoluteBlock // 4, absoluteBlock % 4
|
||||
),
|
||||
end="",
|
||||
)
|
||||
if status == self.OK:
|
||||
status, block = self.read(absoluteBlock)
|
||||
if status == self.ERR:
|
||||
break
|
||||
else:
|
||||
for value in block:
|
||||
print("{:02X} ".format(value),end="")
|
||||
print(" ",end="")
|
||||
print("{:02X} ".format(value), end="")
|
||||
print(" ", end="")
|
||||
for value in block:
|
||||
if (value > 0x20) and (value < 0x7f):
|
||||
print(chr(value),end="")
|
||||
if (value > 0x20) and (value < 0x7F):
|
||||
print(chr(value), end="")
|
||||
else:
|
||||
print('.',end="")
|
||||
print(".", end="")
|
||||
print("")
|
||||
else:
|
||||
break
|
||||
if status == self.ERR:
|
||||
print("Authentication error")
|
||||
return self.ERR
|
||||
return self.OK
|
||||
return self.OK
|
||||
|
||||
@ -1,35 +1,50 @@
|
||||
from threading import Thread
|
||||
from flask import Flask, render_template, send_file, Response, request, redirect, jsonify
|
||||
from flask import (
|
||||
Flask,
|
||||
render_template,
|
||||
send_file,
|
||||
Response,
|
||||
request,
|
||||
redirect,
|
||||
jsonify,
|
||||
)
|
||||
import io
|
||||
from ldapSync import sync_ldap_to_database
|
||||
from database import *
|
||||
from env import *
|
||||
from database import (add_door_to_database, check_access, delete_group_from_database,
|
||||
get_doors, get_existing_groups, get_latest_logs, get_logs, get_users,
|
||||
log_access_attempt)
|
||||
from env import DBFILE, WebServerPORT
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
# Route to the home
|
||||
@app.route('/')
|
||||
@app.route("/")
|
||||
def index():
|
||||
existing_groups = get_existing_groups(DBFILE) # Update with your database file path
|
||||
logs = get_latest_logs(DBFILE,5)
|
||||
#print(logs[0])
|
||||
return render_template('./index.html', existing_groups=existing_groups, logs=logs)
|
||||
logs = get_latest_logs(DBFILE, 5)
|
||||
# print(logs[0])
|
||||
return render_template("./index.html", existing_groups=existing_groups, logs=logs)
|
||||
|
||||
|
||||
# Route to display the fuser db
|
||||
@app.route('/UserDB')
|
||||
@app.route("/UserDB")
|
||||
def usersdb():
|
||||
users = get_users()
|
||||
return render_template('userdb.html', users=users)
|
||||
return render_template("userdb.html", users=users)
|
||||
|
||||
|
||||
# Route to display the fuser db
|
||||
@app.route('/LogsDB')
|
||||
@app.route("/LogsDB")
|
||||
def logsdb():
|
||||
logs = get_logs()
|
||||
return render_template('logsdb.html', logs=logs)
|
||||
return render_template("logsdb.html", logs=logs)
|
||||
|
||||
@app.route('/export_logs')
|
||||
|
||||
@app.route("/export_logs")
|
||||
def export_logs():
|
||||
logs = get_logs()
|
||||
|
||||
|
||||
# Create a file-like string to write logs
|
||||
log_output = io.StringIO()
|
||||
log_line = "TimeStamp,User,Tag UID,Door ID,Granted,\n"
|
||||
@ -37,75 +52,83 @@ def export_logs():
|
||||
for log in logs:
|
||||
log_line = f"{log[0]},{log[1]},{log[2]},{log[4]},{'Yes' if log[3] else 'No'},\n"
|
||||
log_output.write(log_line)
|
||||
|
||||
|
||||
# Set the position to the beginning of the stream
|
||||
log_output.seek(0)
|
||||
|
||||
|
||||
# Create a response with the file data
|
||||
return Response(
|
||||
log_output,
|
||||
mimetype="text/plain",
|
||||
headers={"Content-disposition": "attachment; filename=logs.csv"}
|
||||
headers={"Content-disposition": "attachment; filename=logs.csv"},
|
||||
)
|
||||
|
||||
@app.route('/GroupsDB')
|
||||
|
||||
@app.route("/GroupsDB")
|
||||
def groupsdb():
|
||||
doors = get_doors()
|
||||
groups = get_existing_groups(DBFILE)
|
||||
return render_template('groupsdb.html', doors=doors, groups=groups)
|
||||
return render_template("groupsdb.html", doors=doors, groups=groups)
|
||||
|
||||
@app.route('/delete_group/<group_cn>', methods=['POST'])
|
||||
|
||||
@app.route("/delete_group/<group_cn>", methods=["POST"])
|
||||
def delete_group(group_cn):
|
||||
delete_group_from_database(group_cn)
|
||||
return render_template('./index.html')
|
||||
return render_template("./index.html")
|
||||
|
||||
|
||||
# Route to handle form submission and add the door to the database
|
||||
@app.route('/add_door', methods=['POST'])
|
||||
@app.route("/add_door", methods=["POST"])
|
||||
def add_door():
|
||||
Door_id = request.form["Door_id"]
|
||||
group_cn = request.form["group_cn"]
|
||||
|
||||
# Update with your database file path
|
||||
|
||||
# Update with your database file path
|
||||
exec = add_door_to_database(DBFILE, group_cn, Door_id)
|
||||
if add_door_to_database(DBFILE, group_cn, Door_id):
|
||||
return redirect('/')
|
||||
return redirect("/")
|
||||
else:
|
||||
return f"Failed to add door to the database."
|
||||
|
||||
|
||||
# Route to handle sync button click
|
||||
@app.route('/sync')
|
||||
@app.route("/sync")
|
||||
def sync():
|
||||
sync_ldap_to_database(DBFILE)
|
||||
return render_template('./LDAP.html')
|
||||
return render_template("./LDAP.html")
|
||||
|
||||
|
||||
# Route to handle door access requests
|
||||
@app.route('/access', methods=['POST'])
|
||||
@app.route("/access", methods=["POST"])
|
||||
def door_access():
|
||||
data = request.get_json()
|
||||
rfid_uid = data.get('rfid_uid')
|
||||
door_id = data.get('door_id')
|
||||
rfid_uid = data.get("rfid_uid")
|
||||
door_id = data.get("door_id")
|
||||
|
||||
if rfid_uid is None or door_id is None:
|
||||
return jsonify({'error': 'RFID UID and door ID are required'}), 400
|
||||
return jsonify({"error": "RFID UID and door ID are required"}), 400
|
||||
|
||||
access_granted, upn = check_access(rfid_uid, door_id)
|
||||
if access_granted:
|
||||
print('')
|
||||
log_access_attempt(DBFILE,upn,rfid_uid,True,door_id)
|
||||
return jsonify({'access_granted': True, 'upn': upn}), 200
|
||||
|
||||
print("")
|
||||
log_access_attempt(DBFILE, upn, rfid_uid, True, door_id)
|
||||
return jsonify({"access_granted": True, "upn": upn}), 200
|
||||
|
||||
else:
|
||||
log_access_attempt(DBFILE,upn,rfid_uid,False,door_id)
|
||||
return jsonify({'access_granted': False}), 403
|
||||
|
||||
log_access_attempt(DBFILE, upn, rfid_uid, False, door_id)
|
||||
return jsonify({"access_granted": False}), 403
|
||||
|
||||
|
||||
def run_flask_app():
|
||||
app.run(debug=True, use_reloader=False, port=5000, host="0.0.0.0")
|
||||
app.run(debug=True, use_reloader=False, port=WebServerPORT, host="0.0.0.0")
|
||||
|
||||
|
||||
def run_webServer_thread():
|
||||
print(f'STARTING WEB SERVER ON PORT {WebServerPORT}')
|
||||
print(f"STARTING WEB SERVER ON PORT {WebServerPORT}")
|
||||
flask_thread = Thread(target=run_flask_app, daemon=True)
|
||||
flask_thread.start()
|
||||
# flask_thread.join()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(debug=True)
|
||||
|
||||
0
Server/Program/__init__.py
Normal file
0
Server/Program/__init__.py
Normal file
@ -1,42 +1,50 @@
|
||||
from datetime import datetime
|
||||
import sqlite3
|
||||
from env import *
|
||||
from env import DBFILE
|
||||
|
||||
|
||||
# Function to check if a table exists in the database
|
||||
def table_exists(cursor, table_name):
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
|
||||
cursor.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,)
|
||||
)
|
||||
return cursor.fetchone() is not None
|
||||
|
||||
|
||||
# Function to create the Users table
|
||||
def create_users_table(cursor):
|
||||
cursor.execute('''CREATE TABLE Users (
|
||||
cursor.execute("""CREATE TABLE Users (
|
||||
upn TEXT PRIMARY KEY,
|
||||
rFIDUID TEXT,
|
||||
MemberOf TEXT,
|
||||
FOREIGN KEY (MemberOf) REFERENCES Groups(cn)
|
||||
)''')
|
||||
)""")
|
||||
|
||||
|
||||
# Function to create the Groups table
|
||||
def create_groups_table(cursor):
|
||||
cursor.execute('''CREATE TABLE Groups (
|
||||
cursor.execute("""CREATE TABLE Groups (
|
||||
cn TEXT PRIMARY KEY
|
||||
)''')
|
||||
)""")
|
||||
|
||||
|
||||
# Function to create the Doors table
|
||||
def create_doors_table(cursor):
|
||||
cursor.execute('''CREATE TABLE Doors (
|
||||
cursor.execute("""CREATE TABLE Doors (
|
||||
id INTEGER PRIMARY KEY,
|
||||
GroupCn TEXT,
|
||||
FOREIGN KEY (GroupCn) REFERENCES Groups(cn)
|
||||
)''')
|
||||
)""")
|
||||
|
||||
|
||||
# Function to create the logs table
|
||||
def create_logs_table(cursor):
|
||||
"""
|
||||
Create a log table with columns id, timestamp, user, and granted.
|
||||
|
||||
|
||||
:param db_file: The database file path.
|
||||
"""
|
||||
cursor.execute('''
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS log (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT ,
|
||||
@ -48,7 +56,9 @@ def create_logs_table(cursor):
|
||||
FOREIGN KEY (user) REFERENCES Users (upn)
|
||||
FOREIGN KEY (rFIDUID) REFERENCES Users (rFIDUID)
|
||||
)
|
||||
''')
|
||||
""")
|
||||
|
||||
|
||||
# Function to setup the database
|
||||
def setup_database(db_file):
|
||||
# Connect to the SQLite database
|
||||
@ -85,10 +95,11 @@ def setup_database(db_file):
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def log_access_attempt(db_file, user, rFIDUID, granted, doorID):
|
||||
"""
|
||||
Log an access attempt to the log table.
|
||||
|
||||
|
||||
:param db_file: The database file path.
|
||||
:param user: The user attempting access.
|
||||
:param rFIDUID: The user's tag uid
|
||||
@ -97,15 +108,19 @@ def log_access_attempt(db_file, user, rFIDUID, granted, doorID):
|
||||
"""
|
||||
conn = sqlite3.connect(db_file)
|
||||
cursor = conn.cursor()
|
||||
|
||||
|
||||
print(f'[{datetime.now()}] User {user} get granted : {granted} on door : {doorID}')
|
||||
cursor.execute('''
|
||||
print(f"[{datetime.now()}] User {user} get granted : {granted} on door : {doorID}")
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO log (timestamp, user, rFIDUID, granted, door_id) VALUES (?, ?, ?, ?, ?)
|
||||
''', (datetime.now(), user, rFIDUID, granted, doorID))
|
||||
|
||||
""",
|
||||
(datetime.now(), user, rFIDUID, granted, doorID),
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def print_users_table(cursor):
|
||||
cursor.execute("SELECT * FROM Users")
|
||||
rows = cursor.fetchall()
|
||||
@ -113,6 +128,7 @@ def print_users_table(cursor):
|
||||
for row in rows:
|
||||
print(row)
|
||||
|
||||
|
||||
# Function to print the content of the Groups table
|
||||
def print_groups_table(cursor):
|
||||
cursor.execute("SELECT * FROM Groups")
|
||||
@ -121,6 +137,7 @@ def print_groups_table(cursor):
|
||||
for row in rows:
|
||||
print(row)
|
||||
|
||||
|
||||
# Function to print the content of the Doors table
|
||||
def print_doors_table(cursor):
|
||||
cursor.execute("SELECT * FROM Doors")
|
||||
@ -128,6 +145,8 @@ def print_doors_table(cursor):
|
||||
print("Doors:")
|
||||
for row in rows:
|
||||
print(row)
|
||||
|
||||
|
||||
# Function to print the content of the Log table
|
||||
def print_log_table(cursor):
|
||||
cursor.execute("SELECT * FROM log")
|
||||
@ -136,59 +155,65 @@ def print_log_table(cursor):
|
||||
for row in rows:
|
||||
print(row)
|
||||
|
||||
|
||||
# Function to print the content of the entire database
|
||||
def print_database_content(db_file):
|
||||
conn = sqlite3.connect(db_file)
|
||||
cursor = conn.cursor()
|
||||
|
||||
|
||||
print_users_table(cursor)
|
||||
print_groups_table(cursor)
|
||||
print_doors_table(cursor)
|
||||
#print_log_table(cursor)
|
||||
|
||||
conn.close()
|
||||
|
||||
def get_logs():
|
||||
# print_log_table(cursor)
|
||||
|
||||
conn.close()
|
||||
|
||||
|
||||
def get_logs():
|
||||
"""
|
||||
Fetch all logs from the log table in the database.
|
||||
:return: List of log records.
|
||||
"""
|
||||
conn = sqlite3.connect(DBFILE)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
|
||||
cursor.execute("""
|
||||
SELECT timestamp, user, rFIDUID, granted, door_id
|
||||
FROM log
|
||||
ORDER BY id DESC
|
||||
''')
|
||||
""")
|
||||
|
||||
logs = cursor.fetchall()
|
||||
|
||||
|
||||
conn.close()
|
||||
return logs
|
||||
|
||||
|
||||
def get_latest_logs(db_file,limit=10):
|
||||
def get_latest_logs(db_file, limit=10):
|
||||
"""
|
||||
Fetch the latest logs from the database.
|
||||
|
||||
|
||||
:param limit: The number of latest logs to fetch.
|
||||
:return: List of log entries.
|
||||
"""
|
||||
conn = sqlite3.connect(db_file)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT timestamp, user, rFIDUID, granted, door_id
|
||||
FROM log
|
||||
ORDER BY id DESC
|
||||
LIMIT ?
|
||||
''', (limit,))
|
||||
|
||||
""",
|
||||
(limit,),
|
||||
)
|
||||
|
||||
logs = cursor.fetchall()
|
||||
conn.close()
|
||||
return logs
|
||||
|
||||
|
||||
# Function to fetch list of existing groups from the database
|
||||
def get_existing_groups(db_file):
|
||||
try:
|
||||
@ -201,13 +226,17 @@ def get_existing_groups(db_file):
|
||||
except sqlite3.Error as e:
|
||||
print(f"SQLite Error: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def delete_group_from_database(group_cn):
|
||||
conn = sqlite3.connect(DBFILE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DELETE FROM Groups WHERE cn = ?", (group_cn,))
|
||||
cursor.execute("DELETE FROM Doors WHERE GroupCn = ?", (group_cn,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def get_doors():
|
||||
conn = sqlite3.connect(DBFILE)
|
||||
cursor = conn.cursor()
|
||||
@ -216,6 +245,7 @@ def get_doors():
|
||||
conn.close()
|
||||
return doors
|
||||
|
||||
|
||||
def get_users():
|
||||
"""
|
||||
Fetch all users from the Users table in the database.
|
||||
@ -223,28 +253,36 @@ def get_users():
|
||||
"""
|
||||
conn = sqlite3.connect(DBFILE)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('SELECT upn, rFIDUID, MemberOf FROM Users')
|
||||
|
||||
cursor.execute("SELECT upn, rFIDUID, MemberOf FROM Users")
|
||||
users = cursor.fetchall()
|
||||
|
||||
|
||||
conn.close()
|
||||
return users
|
||||
|
||||
|
||||
# Function to add a door to the database
|
||||
def add_door_to_database(db_file, group_cn, Door_id):
|
||||
try:
|
||||
conn = sqlite3.connect(db_file)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("INSERT INTO Doors (id, GroupCn) VALUES (?,?)", (Door_id,group_cn,))
|
||||
cursor.execute(
|
||||
"INSERT INTO Doors (id, GroupCn) VALUES (?,?)",
|
||||
(
|
||||
Door_id,
|
||||
group_cn,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
#print_database_content(DBFILE)
|
||||
# print_database_content(DBFILE)
|
||||
return True
|
||||
except sqlite3.Error as e:
|
||||
#print_database_content(DBFILE)
|
||||
# print_database_content(DBFILE)
|
||||
print(f"SQLite Error: {e}")
|
||||
return (False, e)
|
||||
|
||||
|
||||
|
||||
# Function to verify if the user is allowed to open the door
|
||||
def check_access(rfid_uid_str, door_id):
|
||||
try:
|
||||
@ -252,10 +290,12 @@ def check_access(rfid_uid_str, door_id):
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Convert the received RFID UID string to bytes
|
||||
rfid_uid_bytes = rfid_uid_str.encode('utf-8')
|
||||
rfid_uid_bytes = rfid_uid_str.encode("utf-8")
|
||||
|
||||
# Get the user's UPN and group memberships based on the RFID UID
|
||||
cursor.execute("SELECT upn, MemberOf FROM Users WHERE rFIDUID = ?", (rfid_uid_bytes,))
|
||||
cursor.execute(
|
||||
"SELECT upn, MemberOf FROM Users WHERE rFIDUID = ?", (rfid_uid_bytes,)
|
||||
)
|
||||
user_data = cursor.fetchone()
|
||||
if user_data is None:
|
||||
return False, None # User not found
|
||||
@ -263,7 +303,7 @@ def check_access(rfid_uid_str, door_id):
|
||||
upn_bytes, user_groups = user_data
|
||||
|
||||
# Decode the UPN bytes to string
|
||||
upn = upn_bytes.decode('utf-8')
|
||||
upn = upn_bytes.decode("utf-8")
|
||||
|
||||
# Get the group associated with the door
|
||||
cursor.execute("SELECT GroupCn FROM Doors WHERE id = ?", (door_id,))
|
||||
@ -274,11 +314,11 @@ def check_access(rfid_uid_str, door_id):
|
||||
door_group = door_group[0]
|
||||
|
||||
# Check if the user's group is allowed to open the door
|
||||
if door_group in user_groups.split(','):
|
||||
if door_group in user_groups.split(","):
|
||||
return True, upn # Access granted
|
||||
else:
|
||||
return False, None # Access denied
|
||||
|
||||
except sqlite3.Error as e:
|
||||
print(f"SQLite Error: {e}")
|
||||
return False, None
|
||||
return False, None
|
||||
|
||||
@ -3,7 +3,8 @@ import ldap
|
||||
import sqlite3
|
||||
import threading
|
||||
import schedule
|
||||
from env import *
|
||||
from env import DOOR_ACCESS_GROUPS_DN, LDAPPASS, LDAPUSER, LDAP_SERVER, USERS_DN
|
||||
|
||||
|
||||
# Function to initialize LDAP connection
|
||||
def initialize_ldap_connection():
|
||||
@ -11,8 +12,8 @@ def initialize_ldap_connection():
|
||||
## Settings :
|
||||
None
|
||||
## Behavior
|
||||
Init the connection to the LDAP server.
|
||||
Return LDAPobjet instance when connected
|
||||
Init the connection to the LDAP server.
|
||||
Return LDAPobjet instance when connected
|
||||
if it fail, return None and print error code
|
||||
"""
|
||||
try:
|
||||
@ -22,43 +23,50 @@ def initialize_ldap_connection():
|
||||
print(f"[{datetime.now()}] LDAP connection successful.")
|
||||
return connect
|
||||
except ldap.LDAPError as e:
|
||||
print(f'[{datetime.now()}] LDAP Error: {e}')
|
||||
print(f"[{datetime.now()}] LDAP Error: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# Function to retrieve users from LDAP
|
||||
def retrieve_users_from_ldap(ldap_connection):
|
||||
"""
|
||||
## Settings :
|
||||
- ldap_connection : LDAPobjet instance
|
||||
## Behavior
|
||||
retrieve the users in the specified OU
|
||||
Return result when it success
|
||||
if it fail, return empty list and print error code
|
||||
## Settings :
|
||||
- ldap_connection : LDAPobjet instance
|
||||
## Behavior
|
||||
retrieve the users in the specified OU
|
||||
Return result when it success
|
||||
if it fail, return empty list and print error code
|
||||
"""
|
||||
try:
|
||||
result = ldap_connection.search_s(USERS_DN, ldap.SCOPE_SUBTREE, '(objectClass=user)')
|
||||
result = ldap_connection.search_s(
|
||||
USERS_DN, ldap.SCOPE_SUBTREE, "(objectClass=user)"
|
||||
)
|
||||
return result
|
||||
except ldap.LDAPError as e:
|
||||
print(f'[{datetime.now()}] LDAP Error: {e}')
|
||||
print(f"[{datetime.now()}] LDAP Error: {e}")
|
||||
return []
|
||||
|
||||
|
||||
# Function to retrieve groups from LDAP
|
||||
def retrieve_groups_from_ldap(ldap_connection):
|
||||
"""
|
||||
## Settings :
|
||||
- ldap_connection : LDAPobjet instance
|
||||
## Behavior
|
||||
retrieve the groups in the specified OU
|
||||
Return result when it success
|
||||
retrieve the groups in the specified OU
|
||||
Return result when it success
|
||||
if it fail, return empty list and print error code
|
||||
"""
|
||||
try:
|
||||
result = ldap_connection.search_s(DOOR_ACCESS_GROUPS_DN, ldap.SCOPE_SUBTREE, '(objectClass=group)')
|
||||
result = ldap_connection.search_s(
|
||||
DOOR_ACCESS_GROUPS_DN, ldap.SCOPE_SUBTREE, "(objectClass=group)"
|
||||
)
|
||||
return result
|
||||
except ldap.LDAPError as e:
|
||||
print(f'[{datetime.now()}]LDAP Error: {e}')
|
||||
print(f"[{datetime.now()}]LDAP Error: {e}")
|
||||
return []
|
||||
|
||||
|
||||
# Function to add user to the database or update if already exists
|
||||
def add_user_to_database(conn, cursor, upn, rfid_uid, member_of):
|
||||
try:
|
||||
@ -67,18 +75,27 @@ def add_user_to_database(conn, cursor, upn, rfid_uid, member_of):
|
||||
if existing_user:
|
||||
# User already exists, check if data needs to be updated
|
||||
if existing_user[1] != rfid_uid or existing_user[2] != member_of:
|
||||
cursor.execute("UPDATE Users SET rFIDUID=?, MemberOf=? WHERE upn=?", (rfid_uid, member_of, upn))
|
||||
cursor.execute(
|
||||
"UPDATE Users SET rFIDUID=?, MemberOf=? WHERE upn=?",
|
||||
(rfid_uid, member_of, upn),
|
||||
)
|
||||
conn.commit()
|
||||
print(f"[{datetime.now()}] User '{upn}' updated in the database.")
|
||||
else:
|
||||
print(f"[{datetime.now()}] User '{upn}' already exists in the database with the same data.")
|
||||
print(
|
||||
f"[{datetime.now()}] User '{upn}' already exists in the database with the same data."
|
||||
)
|
||||
else:
|
||||
# User doesn't exist, insert new user
|
||||
cursor.execute("INSERT INTO Users (upn, rFIDUID, MemberOf) VALUES (?, ?, ?)", (upn, rfid_uid, member_of))
|
||||
cursor.execute(
|
||||
"INSERT INTO Users (upn, rFIDUID, MemberOf) VALUES (?, ?, ?)",
|
||||
(upn, rfid_uid, member_of),
|
||||
)
|
||||
conn.commit()
|
||||
print(f"[{datetime.now()}] User '{upn}' added to the database.")
|
||||
except sqlite3.Error as e:
|
||||
print(f'SQLite Error: {e}')
|
||||
print(f"SQLite Error: {e}")
|
||||
|
||||
|
||||
# Function to add group to the database or update if already exists
|
||||
def add_group_to_database(conn, cursor, cn):
|
||||
@ -94,7 +111,8 @@ def add_group_to_database(conn, cursor, cn):
|
||||
conn.commit()
|
||||
print(f"[{datetime.now()}] Group '{cn}' added to the database.")
|
||||
except sqlite3.Error as e:
|
||||
print(f'SQLite Error: {e}')
|
||||
print(f"SQLite Error: {e}")
|
||||
|
||||
|
||||
# Function to sync LDAP users and groups to the database
|
||||
def sync_ldap_to_database(db_file):
|
||||
@ -124,41 +142,52 @@ def sync_ldap_to_database(db_file):
|
||||
# Retrieve users from LDAP and add them to the database
|
||||
users = retrieve_users_from_ldap(ldap_conn)
|
||||
for dn, user_info in users:
|
||||
upn = user_info.get('userPrincipalName', [''])[0]
|
||||
rfid_uid = user_info.get('rFIDUID', [''])[0]
|
||||
member_of = [group.decode('utf-8').split(',')[0].split('=')[1] for group in user_info.get('memberOf', [])]
|
||||
upn = user_info.get("userPrincipalName", [""])[0]
|
||||
rfid_uid = user_info.get("rFIDUID", [""])[0]
|
||||
member_of = [
|
||||
group.decode("utf-8").split(",")[0].split("=")[1]
|
||||
for group in user_info.get("memberOf", [])
|
||||
]
|
||||
|
||||
# Check if the user is disabled in LDAP
|
||||
user_account_control = user_info.get('userAccountControl', [0])[0]
|
||||
if user_account_control == b'514' or user_account_control == b'66050': # Check if the 9th bit is set (ADS_UF_ACCOUNTDISABLE flag)
|
||||
user_account_control = user_info.get("userAccountControl", [0])[0]
|
||||
if (
|
||||
user_account_control == b"514" or user_account_control == b"66050"
|
||||
): # Check if the 9th bit is set (ADS_UF_ACCOUNTDISABLE flag)
|
||||
# User is disabled, check if user exists in the database and remove if present
|
||||
cursor.execute("SELECT * FROM Users WHERE upn=?", (upn,))
|
||||
existing_user = cursor.fetchone()
|
||||
if existing_user:
|
||||
cursor.execute("DELETE FROM Users WHERE upn=?", (upn,))
|
||||
conn.commit()
|
||||
print(f"[{datetime.now()}] User '{upn}' disabled in LDAP and removed from the database.")
|
||||
print(
|
||||
f"[{datetime.now()}] User '{upn}' disabled in LDAP and removed from the database."
|
||||
)
|
||||
else:
|
||||
print(f"[{datetime.now()}] User '{upn}' disabled in LDAP but not present in the database.")
|
||||
print(
|
||||
f"[{datetime.now()}] User '{upn}' disabled in LDAP but not present in the database."
|
||||
)
|
||||
continue # Skip adding the disabled user to the database
|
||||
|
||||
# User is not disabled, add or update user in the database
|
||||
add_user_to_database(conn, cursor, upn, rfid_uid, ', '.join(member_of))
|
||||
add_user_to_database(conn, cursor, upn, rfid_uid, ", ".join(member_of))
|
||||
|
||||
# Retrieve groups from LDAP and add them to the database
|
||||
groups = retrieve_groups_from_ldap(ldap_conn)
|
||||
for dn, group_info in groups:
|
||||
cn = group_info.get('cn', [''])[0].decode('utf-8')
|
||||
cn = group_info.get("cn", [""])[0].decode("utf-8")
|
||||
add_group_to_database(conn, cursor, cn)
|
||||
|
||||
# Close connections
|
||||
conn.close()
|
||||
ldap_conn.unbind()
|
||||
|
||||
|
||||
def run_sync_ldap_to_database_thread(db_file):
|
||||
print(f"[{datetime.now()}] Running LDAP sync")
|
||||
threading.Thread(target=sync_ldap_to_database, args=(db_file,), daemon=True).start()
|
||||
|
||||
|
||||
def schedule_sync_ldap_to_database(db_file):
|
||||
run_sync_ldap_to_database_thread(db_file) # Run immediately
|
||||
schedule.every(5).minutes.do(run_sync_ldap_to_database_thread, db_file)
|
||||
schedule.every(5).minutes.do(run_sync_ldap_to_database_thread, db_file)
|
||||
|
||||
@ -1,15 +1,14 @@
|
||||
from ldapSync import *
|
||||
from database import *
|
||||
from ldapSync import schedule_sync_ldap_to_database
|
||||
from database import setup_database, print_database_content
|
||||
from Webserver import run_webServer_thread
|
||||
from env import *
|
||||
from env import DBFILE
|
||||
import schedule
|
||||
|
||||
|
||||
|
||||
setup_database(DBFILE)
|
||||
print_database_content(DBFILE)
|
||||
#print_database_content(DBFILE)
|
||||
run_webServer_thread()
|
||||
schedule_sync_ldap_to_database(DBFILE)
|
||||
|
||||
while True :
|
||||
schedule.run_pending()
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
|
||||
@ -99,4 +99,14 @@ form input[type="submit"]:hover {
|
||||
.navbar a:hover {
|
||||
background-color: #ddd;
|
||||
color: black;
|
||||
}
|
||||
|
||||
.delete-btn {
|
||||
width: 100%;
|
||||
padding: 10px;
|
||||
background-color: #b2424a;
|
||||
color: white;
|
||||
border: none;
|
||||
border-radius: 4px;
|
||||
cursor: pointer;
|
||||
}
|
||||
@ -3,7 +3,7 @@
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>Access Logs</title>
|
||||
<title>Doors and Groups Associations</title>
|
||||
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
|
||||
</head>
|
||||
<body>
|
||||
@ -14,7 +14,8 @@
|
||||
<a href="/LogsDB">Logs</a>
|
||||
|
||||
</div>
|
||||
<div class="container"><h1>Doors and Groups Associations</h1>
|
||||
<div class="container">
|
||||
<h1>Doors and Groups Associations</h1>
|
||||
<h2>Doors</h2>
|
||||
<table>
|
||||
<thead>
|
||||
@ -45,7 +46,7 @@
|
||||
<tr>
|
||||
<td>{{group}}</td>
|
||||
<td>
|
||||
<form action="{{ url_for('delete_group', group_cn=group['cn']) }}" method="post">
|
||||
<form action="{{ url_for('delete_group', group_cn=group) }}" method="post">
|
||||
<button type="submit" class="delete-btn">Delete</button>
|
||||
</form>
|
||||
</td>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user