Refactoring
This commit is contained in:
@ -20,7 +20,7 @@ async def getWeek(weekstart: datetime, file: str, showsat: bool):
|
||||
:param weekstart:
|
||||
:param file:
|
||||
:param showsat:
|
||||
:return:
|
||||
:return (Event-Liste, Essens-Liste, voheriges Wochendatum, nächstes Wochendatum, Datum des Montags):
|
||||
"""
|
||||
if weekstart == "today":
|
||||
start_date = datetime.date.today()
|
||||
@ -80,15 +80,15 @@ async def getWeek(weekstart: datetime, file: str, showsat: bool):
|
||||
"teacher": teacher,
|
||||
}
|
||||
eventl += [eventdict]
|
||||
return eventl, await daylist(start_date, showsat), prevw, nextw, mon
|
||||
return eventl, await MensaDayList(start_date, showsat), prevw, nextw, mon
|
||||
|
||||
|
||||
async def daylist(weekstart: datetime, showsat: bool):
|
||||
async def MensaDayList(weekstart: datetime, showsat: bool):
|
||||
"""
|
||||
Gibt die Essen einer Woche zurück.
|
||||
:param weekstart:
|
||||
:param showsat:
|
||||
:return:
|
||||
:return Essens-Liste:
|
||||
"""
|
||||
weekday = weekstart
|
||||
dayl = []
|
||||
|
||||
@ -25,10 +25,10 @@ async def checkUser(email: str, password: str):
|
||||
:return (Token, Cookie):
|
||||
"""
|
||||
async with httpx.AsyncClient() as s:
|
||||
fpw = urllib.parse.quote(password, safe='', encoding=None, errors=None)
|
||||
fmail = urllib.parse.quote(email, safe='', encoding=None, errors=None)
|
||||
content = (f'usrname={fmail}&pass={fpw}&ARGUMENTS=clino%2Cusrname%2Cpass%2Cmenuno%2Cmenu_type%2Cbrowser'
|
||||
f'%2Cplatform&APPNAME=CampusNet&PRGNAME=LOGINCHECK')
|
||||
formattedPassword = urllib.parse.quote(password, safe='', encoding=None, errors=None)
|
||||
formattedEmail = urllib.parse.quote(email, safe='', encoding=None, errors=None)
|
||||
content = (f"usrname={formattedEmail}&pass={formattedPassword}&ARGUMENTS=clino%2Cusrname%2Cpass%2C"
|
||||
f"menuno%2Cmenu_type%2Cbrowser%2Cplatform&APPNAME=CampusNet&PRGNAME=LOGINCHECK")
|
||||
response = await s.post(url=url, headers=headers, content=content)
|
||||
header = response.headers
|
||||
try:
|
||||
@ -48,7 +48,7 @@ async def getKurs(token: int, cookie: str):
|
||||
TODO: Umstellen auf Bezeichner INKL. Standort
|
||||
:param token:
|
||||
:param cookie:
|
||||
:return Kurs-Bezeichner:
|
||||
:return Kurs-Bezeichner ODER 0 bei Fehler:
|
||||
"""
|
||||
try:
|
||||
headers["Cookie"] = "cnsc=" + cookie
|
||||
@ -97,9 +97,9 @@ async def getSem(token: int, cookie: str):
|
||||
select = select.find_all(value=True)
|
||||
optlist = []
|
||||
for i in select:
|
||||
t = i.text.replace("Wi", "Winter").replace("So", "Sommer")
|
||||
t = t.replace("Se", "semester")
|
||||
optlist += [[t, i['value']]]
|
||||
text = i.text.replace("Wi", "Winter").replace("So", "Sommer")
|
||||
text = text.replace("Se", "semester")
|
||||
optlist += [[text, i['value']]]
|
||||
return optlist
|
||||
|
||||
|
||||
@ -119,25 +119,25 @@ async def getResults(token, cookie: str, resl: str):
|
||||
html = BeautifulSoup(response.content.decode("utf-8"), 'lxml')
|
||||
table = html.find('table', attrs={"class": "nb list"})
|
||||
body = table.find("tbody")
|
||||
vorl = body.find_all("tr")
|
||||
vorlist = []
|
||||
vorlesungen = body.find_all("tr")
|
||||
vorlesungenList = []
|
||||
tasks = []
|
||||
i = 0
|
||||
for row in vorl:
|
||||
cols = row.find_all("td")
|
||||
col = [[e.text.strip()] for e in cols]
|
||||
if len(col) != 0:
|
||||
if len(col[4][0]) == 0 or len(col[2][0]) == 0:
|
||||
for row in vorlesungen:
|
||||
columns = row.find_all("td")
|
||||
column = [[e.text.strip()] for e in columns]
|
||||
if len(column) != 0:
|
||||
if len(column[4][0]) == 0 or len(column[2][0]) == 0:
|
||||
tasks += [getPruefung(s, row.find("a")["href"])]
|
||||
col[2] = i
|
||||
column[2] = i
|
||||
i += 1
|
||||
vorlist += [col[1:4]]
|
||||
vorlesungenList += [column[1:4]]
|
||||
notlisted = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for i in vorlist:
|
||||
for i in vorlesungenList:
|
||||
for e in range(0, len(i)):
|
||||
if isinstance(i[e], int):
|
||||
i[e] = notlisted[i[e]]
|
||||
return vorlist[:-1]
|
||||
return vorlesungenList[:-1]
|
||||
|
||||
|
||||
async def getPruefung(s, url):
|
||||
@ -146,24 +146,24 @@ async def getPruefung(s, url):
|
||||
TODO: Namen der spezifischen Prüfungen auch zurückgeben, um Zusammensetzung zu spezifizieren.
|
||||
:param s:
|
||||
:param url:
|
||||
:return list:
|
||||
:return Noten-Liste:
|
||||
"""
|
||||
response = await s.get("https://dualis.dhbw.de" + url, headers=headers)
|
||||
html = BeautifulSoup(response.content.decode("utf-8"), 'lxml')
|
||||
table = html.find('table')
|
||||
pruefung = table.find_all("tr")
|
||||
ret = []
|
||||
returnList = []
|
||||
for row in pruefung:
|
||||
cols = row.find_all("td")
|
||||
col = [e.text.strip() for e in cols]
|
||||
if len(col) == 6 and len(col[3]) <= 13:
|
||||
if len(col[3]) != 0:
|
||||
ret += [col[0] + ": " + col[3].split("\xa0")[0]]
|
||||
if ret[-1][0] == ':':
|
||||
ret[-1] = "Gesamt" + ret[-1]
|
||||
if len(ret) == 0:
|
||||
ret = ["Noch nicht gesetzt"]
|
||||
return ret
|
||||
columns = row.find_all("td")
|
||||
column = [e.text.strip() for e in columns]
|
||||
if len(column) == 6 and len(column[3]) <= 13:
|
||||
if len(column[3]) != 0:
|
||||
returnList += [column[0] + ": " + column[3].split("\xa0")[0]]
|
||||
if returnList[-1][0] == ':':
|
||||
returnList[-1] = "Gesamt" + returnList[-1]
|
||||
if len(returnList) == 0:
|
||||
returnList = ["Noch nicht gesetzt"]
|
||||
return returnList
|
||||
|
||||
|
||||
def checkLifetime(timecode: float):
|
||||
|
||||
@ -2,7 +2,7 @@ import json
|
||||
|
||||
import asyncio
|
||||
|
||||
from init import db, Meals, scheduler, flask_app
|
||||
from init import db, Meals, scheduler, flaskApp
|
||||
import datetime
|
||||
import time
|
||||
import httpx
|
||||
@ -41,36 +41,37 @@ async def getMealsFromAPI(day: str, dbentry: bool = False):
|
||||
async with httpx.AsyncClient() as s:
|
||||
response = await s.get(url=f"https://dh-api.paulmartin.cloud/plans/{day}?canteens=erzberger")
|
||||
response = response.content
|
||||
jres = json.loads(response.decode("utf-8"))
|
||||
jsonResponse = json.loads(response.decode("utf-8"))
|
||||
essen = []
|
||||
try:
|
||||
num = len(jres["data"][0]["lines"])
|
||||
for i in range(num):
|
||||
number = len(jsonResponse["data"][0]["lines"])
|
||||
for i in range(number):
|
||||
try:
|
||||
jsmeal = jres["data"][0]["lines"][i]["meals"]
|
||||
cont = True
|
||||
jsonMeals = jsonResponse["data"][0]["lines"][i]["meals"]
|
||||
hasContent = True
|
||||
except IndexError:
|
||||
essen = []
|
||||
cont = False
|
||||
if cont:
|
||||
for e in range(len(jsmeal)):
|
||||
ji = jsmeal[e]
|
||||
name = ji["name"]
|
||||
if pricetofloat(ji["price"]) >= 1.1:
|
||||
vegan = ji["classifiers"].count("VG") == 1
|
||||
schwein = ji["classifiers"].count("S") == 1
|
||||
hasContent = False
|
||||
if hasContent:
|
||||
for e in range(len(jsonMeals)):
|
||||
jsonEntry = jsonMeals[e]
|
||||
name = jsonEntry["name"]
|
||||
if pricetofloat(jsonEntry["price"]) >= 1.1:
|
||||
vegan = jsonEntry["classifiers"].count("VG") == 1
|
||||
schwein = jsonEntry["classifiers"].count("S") == 1
|
||||
if vegan:
|
||||
veget = True
|
||||
vegetarian = True
|
||||
else:
|
||||
veget = ji["classifiers"].count("VEG") == 1
|
||||
if veget:
|
||||
vegetarian = jsonEntry["classifiers"].count("VEG") == 1
|
||||
if vegetarian:
|
||||
if name.count("Reibekäse") > 0:
|
||||
vegan = True
|
||||
|
||||
essen += [name]
|
||||
if dbentry:
|
||||
mid = int(time.time() * 1000) % 100000
|
||||
neu = Meals(date=day, name=name, id=mid, vegan=vegan, vegetarian=veget, schwein=schwein)
|
||||
neu = Meals(date=day, name=name, id=mid, vegan=vegan, vegetarian=vegetarian,
|
||||
schwein=schwein)
|
||||
db.session.add(neu)
|
||||
db.session.commit()
|
||||
if not essen:
|
||||
@ -102,20 +103,20 @@ def formatDay(day: datetime):
|
||||
:return str:
|
||||
"""
|
||||
if day.month < 10:
|
||||
mon = "0" + str(day.month)
|
||||
monat = "0" + str(day.month)
|
||||
else:
|
||||
mon = str(day.month)
|
||||
monat = str(day.month)
|
||||
if day.day < 10:
|
||||
tag = "0" + str(day.day)
|
||||
else:
|
||||
tag = str(day.day)
|
||||
day = str(day.year) + "-" + mon + "-" + tag
|
||||
return day
|
||||
formattedDay = str(day.year) + "-" + monat + "-" + tag
|
||||
return formattedDay
|
||||
|
||||
|
||||
async def refreshMeals():
|
||||
"""
|
||||
Aktualisiert immer vormittags alle Mahlzeiten in der Datenbank. \n
|
||||
Aktualisiert alle Mahlzeiten in der Datenbank. \n
|
||||
Datenbankeinträge werden ersetzt, wenn die API andere Mahlzeiten liefert.
|
||||
"""
|
||||
print("Aktualisiere Essenspläne...\n")
|
||||
@ -128,20 +129,22 @@ async def refreshMeals():
|
||||
for i in range(len(dates)):
|
||||
dates[i] = formatDay(dates[i])
|
||||
for i in dates:
|
||||
apinames = await getMealsFromAPI(i)
|
||||
apinames = await getMealsFromAPI(i)
|
||||
dbmeals = Meals.query.filter_by(date=i).all()
|
||||
dbnames = []
|
||||
for m in dbmeals:
|
||||
dbnames += [m.name]
|
||||
if set(dbnames) != set(apinames) and nomeal not in apinames:
|
||||
for n in dbnames:
|
||||
db.session.delete(Meals.query.filter_by(date=i, name=n).first())
|
||||
apiNames = await getMealsFromAPI(i)
|
||||
dbMeals = Meals.query.filter_by(date=i).all()
|
||||
dbNames = []
|
||||
for meal in dbMeals:
|
||||
dbNames += [meal.name]
|
||||
if set(dbNames) != set(apiNames) and nomeal not in apiNames:
|
||||
for name in dbNames:
|
||||
db.session.delete(Meals.query.filter_by(date=i, name=name).first())
|
||||
db.session.commit()
|
||||
await getMealsFromAPI(i, True)
|
||||
|
||||
|
||||
@scheduler.task('cron', id="mensaschedule", hour='8-11', day_of_week='*', minute='*/15', week='*', second='5')
|
||||
def mensaschedule():
|
||||
with flask_app.app_context():
|
||||
@scheduler.task('cron', id="mensaSchedule", hour='8-11', day_of_week='*', minute='*/15', week='*', second='5')
|
||||
def mensaSchedule():
|
||||
"""
|
||||
Nutzt vormittags die Funktion refreshMeals(), um die Essen zu aktualisieren
|
||||
"""
|
||||
with flaskApp.app_context():
|
||||
asyncio.run(refreshMeals())
|
||||
|
||||
105
fetchRAPLA.py
105
fetchRAPLA.py
@ -9,29 +9,45 @@ import icalendar
|
||||
from icalendar import Calendar, Event
|
||||
import json
|
||||
|
||||
from init import scheduler, flask_app, Rapla, db
|
||||
from init import scheduler, flaskApp, Rapla, db
|
||||
|
||||
|
||||
async def fetchPlan(session, url):
|
||||
"""
|
||||
Hilfsfunktion, liefert die Response auf einen GET-Request zur angegebenen URL
|
||||
:param session:
|
||||
:param url:
|
||||
:return Response:
|
||||
"""
|
||||
return await session.get(url=url)
|
||||
|
||||
|
||||
def writeToFile(filename, data):
|
||||
with open(filename, 'w+') as f:
|
||||
f.write(data.text)
|
||||
f.close()
|
||||
"""
|
||||
Schreibt die Daten in die angegebene Datei. Erstellt die Datei, falls sie noch nicht existiert.
|
||||
:param filename:
|
||||
:param data:
|
||||
"""
|
||||
with open(filename, 'w+') as file:
|
||||
file.write(data.text)
|
||||
file.close()
|
||||
|
||||
|
||||
def writeToDB(kurs, url):
|
||||
def writeKursToDB(kurs, url):
|
||||
"""
|
||||
Schreibt Kurs und URL in die Datenbank
|
||||
:param kurs:
|
||||
:param url:
|
||||
"""
|
||||
if Rapla.query.filter_by(name=kurs).first() is None:
|
||||
new_kurs = Rapla(name=kurs, link=url, file=f"rapla{kurs}.ical")
|
||||
db.session.add(new_kurs)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def parseURL(url: str):
|
||||
def parseRaplaURL(url: str):
|
||||
"""
|
||||
Konvertiert URLs ins korrekte Format. \n
|
||||
Konvertiert Rapla-URLs ins korrekte Format. \n
|
||||
Konvertiert werden: http, www.; page=calendar \n
|
||||
In: https; page=ical
|
||||
:param url:
|
||||
@ -47,12 +63,12 @@ def parseURL(url: str):
|
||||
url = f"https{url[http:]}"
|
||||
elif http == -1:
|
||||
url = f"https://{url}"
|
||||
p = url.find("page=")
|
||||
u = url.find("&")
|
||||
if (url[p + 5:u]).lower() == "ical":
|
||||
pageInURL = url.find("page=")
|
||||
andInURL = url.find("&")
|
||||
if (url[pageInURL + 5:andInURL]).lower() == "ical":
|
||||
return 1, url
|
||||
elif p != -1:
|
||||
return 1, f"{url[:p + 5]}ical{url[u:]}"
|
||||
elif pageInURL != -1:
|
||||
return 1, f"{url[:pageInURL + 5]}ical{url[andInURL:]}"
|
||||
elif url.find("key") != -1:
|
||||
return 2, url
|
||||
else:
|
||||
@ -67,15 +83,15 @@ async def getNewRapla(url: str):
|
||||
:param url:
|
||||
:return str:
|
||||
"""
|
||||
parsed = parseURL(url)
|
||||
parsed = parseRaplaURL(url)
|
||||
if parsed[0] == 0:
|
||||
return 0
|
||||
elif parsed[0] == 1:
|
||||
url = parsed[1]
|
||||
elif parsed[0] == 2:
|
||||
return await buildFromKey(parsed[1], onlyUpdate=False)
|
||||
urlfile = url.find("file=")
|
||||
kurs = url[urlfile + 5:].upper()
|
||||
return await buildICALfromKey(parsed[1], onlyUpdate=False)
|
||||
fileInURL = url.find("file=")
|
||||
kurs = url[fileInURL + 5:].upper()
|
||||
if url[-5:] != ".ical":
|
||||
try:
|
||||
async with httpx.AsyncClient() as s:
|
||||
@ -83,7 +99,7 @@ async def getNewRapla(url: str):
|
||||
writeToFile(f"calendars/rapla{kurs}.ical", response)
|
||||
except urllib.error.URLError:
|
||||
return -1
|
||||
writeToDB(kurs, url)
|
||||
writeKursToDB(kurs, url)
|
||||
return f"rapla{kurs}.ical"
|
||||
else:
|
||||
return url
|
||||
@ -104,43 +120,52 @@ def getIcal(kurs: str):
|
||||
|
||||
def getRaplas():
|
||||
"""
|
||||
Liefert alle auf dem Server gespeicherten Raplas.
|
||||
Liefert alle in der Datenbank gespeicherten Raplas.
|
||||
:return (Kursliste, Dateiliste, URL-Liste):
|
||||
"""
|
||||
raplas = Rapla.query.all()
|
||||
kursl = [rapla.name for rapla in raplas]
|
||||
filel = [rapla.file for rapla in raplas]
|
||||
urll = [rapla.link for rapla in raplas]
|
||||
return kursl, filel, urll
|
||||
kursList = [rapla.name for rapla in raplas]
|
||||
fileList = [rapla.file for rapla in raplas]
|
||||
urlList = [rapla.link for rapla in raplas]
|
||||
return kursList, fileList, urlList
|
||||
|
||||
|
||||
async def refreshRapla():
|
||||
"""
|
||||
Aktualisiert alle 5 Minuten alle gespeicherten Raplas.
|
||||
Aktualisiert alle gespeicherten Raplas.
|
||||
"""
|
||||
filel = getRaplas()[1]
|
||||
urll = getRaplas()[2]
|
||||
jobl = []
|
||||
fileList = getRaplas()[1]
|
||||
urlList = getRaplas()[2]
|
||||
jobList = []
|
||||
async with httpx.AsyncClient() as s:
|
||||
for i in range(len(filel)):
|
||||
print(f"Update Rapla: {filel[i][:-5]}")
|
||||
if urll[i].find("file") != -1:
|
||||
jobl += [fetchPlan(s, urll[i])]
|
||||
for i in range(len(fileList)):
|
||||
print(f"Update Rapla: {fileList[i][:-5]}")
|
||||
if urlList[i].find("file") != -1:
|
||||
jobList += [fetchPlan(s, urlList[i])]
|
||||
else:
|
||||
jobl += [buildFromKey(urll[i], onlyUpdate=True)]
|
||||
callist = await asyncio.gather(*jobl, return_exceptions=True)
|
||||
for cal in range(len(callist)):
|
||||
if callist[cal] != 200:
|
||||
writeToFile(f"calendars/{filel[cal]}", callist[cal])
|
||||
jobList += [buildICALfromKey(urlList[i], onlyUpdate=True)]
|
||||
calendarList = await asyncio.gather(*jobList, return_exceptions=True)
|
||||
for calendar in range(len(calendarList)):
|
||||
if calendarList[calendar] != 200:
|
||||
writeToFile(f"calendars/{fileList[calendar]}", calendarList[calendar])
|
||||
|
||||
|
||||
@scheduler.task('cron', id="raplaschedule", hour='*', day_of_week='*', minute='*/3', week='*', second='40')
|
||||
@scheduler.task('cron', id="raplaSchedule", hour='*', day_of_week='*', minute='*/3', week='*', second='40')
|
||||
def raplaSchedule():
|
||||
with flask_app.app_context():
|
||||
"""
|
||||
Nutzt alle 3 Minuten refreshRapla() um die Stundenpläne zu aktualisieren.
|
||||
"""
|
||||
with flaskApp.app_context():
|
||||
asyncio.run(refreshRapla())
|
||||
|
||||
|
||||
async def buildFromKey(url, onlyUpdate):
|
||||
async def buildICALfromKey(url, onlyUpdate):
|
||||
"""
|
||||
Baut eine .ical-Datei aus der mitgegebenen URL, die ein Key-Parameter enthalten muss.
|
||||
:param url:
|
||||
:param onlyUpdate:
|
||||
:return Dateinamen:
|
||||
"""
|
||||
async with httpx.AsyncClient() as s:
|
||||
page = await s.get(url=url)
|
||||
if page.text[:10] == "BEGIN:VCAL":
|
||||
@ -148,7 +173,7 @@ async def buildFromKey(url, onlyUpdate):
|
||||
kursname = info[info.find('filename=')+9:-4].upper()
|
||||
writeToFile(f"rapla{kursname}.ical", page)
|
||||
if not onlyUpdate:
|
||||
writeToDB(kursname, url)
|
||||
writeKursToDB(kursname, url)
|
||||
return url
|
||||
else:
|
||||
return 200
|
||||
@ -185,7 +210,7 @@ async def buildFromKey(url, onlyUpdate):
|
||||
f.write(cal.to_ical())
|
||||
f.close()
|
||||
if not onlyUpdate:
|
||||
writeToDB(kursname, url)
|
||||
writeKursToDB(kursname, url)
|
||||
return f"rapla{kursname}.ical"
|
||||
else:
|
||||
return 200
|
||||
|
||||
42
genSCSSstarts.py
Normal file
42
genSCSSstarts.py
Normal file
@ -0,0 +1,42 @@
|
||||
"""
|
||||
Generiert das SCSS für cal.scss
|
||||
"""
|
||||
|
||||
css = ""
|
||||
|
||||
for startEnd in ("start", "end"):
|
||||
acht = 0
|
||||
for i in range(0, 28):
|
||||
if i % 4 == 0:
|
||||
acht += 100
|
||||
height = str(acht)
|
||||
else:
|
||||
height = str(acht + (i % 4 * 15))
|
||||
if len(height) == 3:
|
||||
height = "0" + height
|
||||
css += "." + startEnd + "-" + height + " {\n"
|
||||
if startEnd == "start":
|
||||
css += "grid-row-" + startEnd + ": " + "1" + "\n}\n"
|
||||
else:
|
||||
css += "grid-row-" + startEnd + ": " + "4" + "\n}\n"
|
||||
css += "\n\n\n"
|
||||
|
||||
|
||||
for startEnd in ("start", "end"):
|
||||
acht = 700
|
||||
for i in range(0, 45):
|
||||
if i % 4 == 0:
|
||||
acht += 100
|
||||
height = str(acht)
|
||||
else:
|
||||
height = str(acht + (i % 4 * 15))
|
||||
if len(height) == 3:
|
||||
height = "0" + height
|
||||
css += "." + startEnd + "-" + height + " {\n"
|
||||
css += "grid-row-" + startEnd + ": " + str(i + 1) + "\n}\n"
|
||||
css += "\n\n\n"
|
||||
|
||||
file = open("static/cal.scss", "a")
|
||||
file.write ("\n // Generated by genSCSSstarts.py\n\n")
|
||||
file.write(css)
|
||||
file.close()
|
||||
40
genstarts.py
40
genstarts.py
@ -1,40 +0,0 @@
|
||||
from flask import url_for
|
||||
|
||||
css = ""
|
||||
|
||||
for se in ("start", "end"):
|
||||
acht = 0
|
||||
for i in range(0, 28):
|
||||
if i % 4 == 0:
|
||||
acht += 100
|
||||
h = str(acht)
|
||||
else:
|
||||
h = str(acht + (i % 4 * 15))
|
||||
if len(h) == 3:
|
||||
h = "0" + h
|
||||
css += "." + se + "-" + h + " {\n"
|
||||
if se == "start":
|
||||
css += "grid-row-" + se + ": " + "1" + "\n}\n"
|
||||
else:
|
||||
css += "grid-row-" + se + ": " + "4" + "\n}\n"
|
||||
css += "\n\n\n"
|
||||
|
||||
|
||||
for se in ("start", "end"):
|
||||
acht = 700
|
||||
for i in range(0, 45):
|
||||
if i % 4 == 0:
|
||||
acht += 100
|
||||
h = str(acht)
|
||||
else:
|
||||
h = str(acht + (i % 4 * 15))
|
||||
if len(h) == 3:
|
||||
h = "0" + h
|
||||
css += "." + se + "-" + h + " {\n"
|
||||
css += "grid-row-" + se + ": " + str(i+1) + "\n}\n"
|
||||
css += "\n\n\n"
|
||||
|
||||
f = open("static/cal.scss", "a")
|
||||
f.write ("\n // Generated by genstarts.py\n\n")
|
||||
f.write(css)
|
||||
f.close()
|
||||
23
getMySQL.py
Normal file
23
getMySQL.py
Normal file
@ -0,0 +1,23 @@
|
||||
import getpass
|
||||
import sys
|
||||
|
||||
|
||||
# noinspection PyPackageRequirements
|
||||
def get_mysql():
|
||||
"""
|
||||
Extrahiert die MySQL-Anmeldedaten aus ~/.my.cnf . \n
|
||||
Funktioniert wahrscheinlich nur auf Linux, vor allem für den Server gedacht.
|
||||
:return: [username: str, password: str]:
|
||||
"""
|
||||
if sys.platform == "linux":
|
||||
systemUser = getpass.getuser()
|
||||
file = open("/home/"+systemUser+"/.my.cnf", "r")
|
||||
content = file.read()
|
||||
username = content.find("user=")
|
||||
password = content.find("password=")
|
||||
username = content[username+5:password-1]
|
||||
readOnly = content.find("[clientreadonly]")
|
||||
password = content[password+9:readOnly-2]
|
||||
return username, password
|
||||
else:
|
||||
return "username-goes-here", "password-goes-here"
|
||||
21
get_mysql.py
21
get_mysql.py
@ -1,21 +0,0 @@
|
||||
import getpass
|
||||
import sys
|
||||
|
||||
|
||||
def get_mysql():
|
||||
"""
|
||||
Extrahiert die MySQL-Anmeldedaten aus ~/.my.cnf . \n
|
||||
Funktioniert wahrscheinlich nur auf Linux, vor allem für den Server gedacht.
|
||||
"""
|
||||
if sys.platform == "linux":
|
||||
u = getpass.getuser()
|
||||
f = open("/home/"+u+"/.my.cnf", "r")
|
||||
i = f.read()
|
||||
u = i.find("user=")
|
||||
p = i.find("password=")
|
||||
u = i[u+5:p-1]
|
||||
ro = i.find("[clientreadonly]")
|
||||
p = i[p+9:ro-2]
|
||||
return u, p
|
||||
else:
|
||||
return "username-goes-here", "password-goes-here"
|
||||
10
init.py
10
init.py
@ -4,7 +4,7 @@ from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_talisman import Talisman
|
||||
from sqlalchemy import ForeignKey
|
||||
|
||||
from get_mysql import get_mysql
|
||||
from getMySQL import get_mysql
|
||||
import atexit
|
||||
from flask_apscheduler import APScheduler
|
||||
|
||||
@ -97,13 +97,13 @@ class Meals(db.Model):
|
||||
|
||||
|
||||
scheduler = APScheduler()
|
||||
flask_app = create()
|
||||
with flask_app.app_context():
|
||||
flaskApp = create()
|
||||
with flaskApp.app_context():
|
||||
print("Creating Tables....")
|
||||
db.create_all()
|
||||
def_src = ["*.paulmartin.cloud", '\'self\'']
|
||||
Talisman(flask_app, content_security_policy={"default-src": def_src, "script-src": def_src # + ["'unsafe-inline'"]
|
||||
Talisman(flaskApp, content_security_policy={"default-src": def_src, "script-src": def_src # + ["'unsafe-inline'"]
|
||||
})
|
||||
scheduler.init_app(flask_app)
|
||||
scheduler.init_app(flaskApp)
|
||||
scheduler.start()
|
||||
scheduler.api_enabled = True
|
||||
|
||||
@ -5,7 +5,7 @@ def getCookie(cookies):
|
||||
"""
|
||||
Liefert (letzten) Cookie der Cookies-Liste zurück.
|
||||
:param cookies:
|
||||
:return:
|
||||
:return Cookie:
|
||||
"""
|
||||
cookie = 0
|
||||
for c in cookies:
|
||||
@ -14,13 +14,23 @@ def getCookie(cookies):
|
||||
|
||||
|
||||
def getSemesterList(uid):
|
||||
semesterlist = Semesterlist.query.filter_by(uid=uid).all()
|
||||
semester = []
|
||||
for s in semesterlist:
|
||||
semester += [[s.semestername, s.semesterid]]
|
||||
semester.sort(key=lambda x: x[-1], reverse=True)
|
||||
return semester
|
||||
"""
|
||||
Liefert die IDs der Semester für den User
|
||||
:param uid:
|
||||
:return Semester-ID-Liste:
|
||||
"""
|
||||
dbSemesterList = Semesterlist.query.filter_by(uid=uid).all()
|
||||
semesterList = []
|
||||
for semester in dbSemesterList:
|
||||
semesterList += [[semester.semestername, semester.semesterid]]
|
||||
semesterList.sort(key=lambda x: x[-1], reverse=True)
|
||||
return semesterList
|
||||
|
||||
|
||||
def loadUser(uid):
|
||||
"""
|
||||
Hilfsfunktion, die den User für die UID zurückgibt.
|
||||
:param uid:
|
||||
:return User:
|
||||
"""
|
||||
return User.query.filter_by(id=uid).first()
|
||||
|
||||
233
routing.py
233
routing.py
@ -1,16 +1,14 @@
|
||||
#!/usr/bin/env python3.6
|
||||
from flask import make_response
|
||||
from flask import render_template, url_for, redirect, request
|
||||
from flask_login import login_user, login_required, current_user, logout_user
|
||||
from flask_login import (login_user as loginUser, login_required as loginRequired,
|
||||
logout_user as logoutUser, current_user as currentUser)
|
||||
from werkzeug.exceptions import HTTPException
|
||||
from werkzeug.security import generate_password_hash, check_password_hash
|
||||
import hashlib
|
||||
import datetime
|
||||
import time
|
||||
import random
|
||||
|
||||
import fetchDUALIS
|
||||
import fetchMENSA
|
||||
import fetchRAPLA
|
||||
from requesthelpers import *
|
||||
from fetchRAPLA import *
|
||||
@ -18,9 +16,10 @@ from calendar_generation import getWeek
|
||||
from init import *
|
||||
|
||||
|
||||
def init_routes(app: Flask):
|
||||
def initRoutes(app: Flask):
|
||||
"""
|
||||
Initialisiert die App-Routen. Nötig für Tests.
|
||||
:param app:
|
||||
"""
|
||||
|
||||
@app.route("/")
|
||||
@ -32,59 +31,60 @@ def init_routes(app: Flask):
|
||||
return redirect(url_for("login"))
|
||||
|
||||
@app.route("/dashboard")
|
||||
@login_required
|
||||
@loginRequired
|
||||
def welcome():
|
||||
"""
|
||||
Dashboard
|
||||
:return HTML:
|
||||
"""
|
||||
if not current_user.kurs:
|
||||
if not currentUser.kurs:
|
||||
return redirect(url_for("getKurs", next=url_for(request.endpoint)))
|
||||
sel = request.args.get("sel")
|
||||
if not sel:
|
||||
sel = "theorie"
|
||||
kurs = current_user.kurs
|
||||
name = current_user.name
|
||||
if sel == "theorie":
|
||||
t = ""
|
||||
p = "hidden"
|
||||
selectedPhase = request.args.get("sel")
|
||||
if not selectedPhase:
|
||||
selectedPhase = "theorie"
|
||||
kurs = currentUser.kurs
|
||||
name = currentUser.name
|
||||
if selectedPhase == "theorie":
|
||||
theorie = ""
|
||||
praxis = "hidden"
|
||||
else:
|
||||
t = "hidden"
|
||||
p = ""
|
||||
return render_template('dashboard.html', kurs=kurs, name=name, theorie=t, praxis=p)
|
||||
theorie = "hidden"
|
||||
praxis = ""
|
||||
return render_template('dashboard.html', kurs=kurs, name=name, theorie=theorie, praxis=praxis)
|
||||
|
||||
@app.route("/theorie/noten", methods=["GET", "POST"])
|
||||
@login_required
|
||||
@loginRequired
|
||||
async def displayNoten():
|
||||
"""
|
||||
Zeigt die Noten aus Dualis an. Hierfür ist ein aktives Token nötig.
|
||||
:return HTML:
|
||||
"""
|
||||
d = Dualis.query.filter_by(uid=current_user.id).first()
|
||||
dualisUser = Dualis.query.filter_by(uid=currentUser.id).first()
|
||||
if request.method == "POST":
|
||||
d.semester = request.form.get("sem")
|
||||
dualisUser.semester = request.form.get("sem")
|
||||
db.session.commit()
|
||||
if not d.semester:
|
||||
if not dualisUser.semester:
|
||||
return redirect(url_for("getSemester", next=url_for(request.endpoint)))
|
||||
t = d.token
|
||||
chosensemester = d.semester
|
||||
c = request.cookies.get("cnsc")
|
||||
timeout = fetchDUALIS.timeOut(d, c, "displayNoten")
|
||||
token = dualisUser.token
|
||||
chosenSemester = dualisUser.semester
|
||||
cookie = request.cookies.get("cnsc")
|
||||
timeout = fetchDUALIS.timeOut(dualisUser, cookie, "displayNoten")
|
||||
if timeout:
|
||||
return timeout
|
||||
semester = getSemesterList(current_user.id)
|
||||
noten = await fetchDUALIS.getResults(t, c, chosensemester)
|
||||
return render_template("noten.html", noten=noten, semester=semester, sel=chosensemester, s="n", praxis="hidden")
|
||||
semester = getSemesterList(currentUser.id)
|
||||
noten = await fetchDUALIS.getResults(token, cookie, chosenSemester)
|
||||
return render_template("noten.html", noten=noten, semester=semester, sel=chosenSemester, s="n",
|
||||
praxis="hidden")
|
||||
|
||||
@app.route("/plan", methods=["GET"])
|
||||
@login_required
|
||||
@loginRequired
|
||||
async def displayRapla():
|
||||
"""
|
||||
Zeigt den Stundenplan für eingeloggte User an. \n
|
||||
TODO: Persönliche Filter, Notizen, Essensvorlieben etc. berücksichtigen.
|
||||
:return HTML:
|
||||
"""
|
||||
if not current_user.kurs:
|
||||
if not currentUser.kurs:
|
||||
return redirect(url_for("getKurs", next=url_for(request.endpoint)))
|
||||
week = request.args.get("week")
|
||||
if week:
|
||||
@ -94,9 +94,9 @@ def init_routes(app: Flask):
|
||||
samstag = request.args.get("samstag")
|
||||
if not samstag:
|
||||
samstag = False
|
||||
events = await getWeek(week, fetchRAPLA.getIcal(current_user.kurs), samstag)
|
||||
events = await getWeek(week, fetchRAPLA.getIcal(currentUser.kurs), samstag)
|
||||
return render_template("plan-user.html", events=events[0], eventdays=events[1],
|
||||
name=current_user.name, prev=str(events[2])[:10], next=str(events[3])[:10],
|
||||
name=currentUser.name, prev=str(events[2])[:10], next=str(events[3])[:10],
|
||||
mon=events[4],
|
||||
s="p", praxis="hidden")
|
||||
|
||||
@ -114,7 +114,7 @@ def init_routes(app: Flask):
|
||||
else:
|
||||
week = "today"
|
||||
try:
|
||||
if current_user.kurs == kurs.upper():
|
||||
if currentUser.kurs == kurs.upper():
|
||||
return redirect(url_for("displayRapla"))
|
||||
except AttributeError:
|
||||
pass
|
||||
@ -139,88 +139,89 @@ def init_routes(app: Flask):
|
||||
return redirect(url_for("getKurs"))
|
||||
|
||||
@app.route("/set-up/kurs")
|
||||
@login_required
|
||||
@loginRequired
|
||||
async def getKurs():
|
||||
"""
|
||||
Automatische Kurs-Auswahl. \n
|
||||
Aktives Dualis-Token benötigt.
|
||||
:return HTML:
|
||||
"""
|
||||
d = Dualis.query.filter_by(uid=current_user.id).first()
|
||||
if d:
|
||||
dualisUser = Dualis.query.filter_by(uid=currentUser.id).first()
|
||||
if dualisUser:
|
||||
cookie = request.cookies.get("cnsc")
|
||||
timeout = fetchDUALIS.timeOut(d, cookie, "getKurs")
|
||||
timeout = fetchDUALIS.timeOut(dualisUser, cookie, "getKurs")
|
||||
if timeout:
|
||||
return timeout
|
||||
e = False
|
||||
if not current_user.kurs:
|
||||
kurs = await fetchDUALIS.getKurs(d.token, cookie)
|
||||
dualisError = False
|
||||
if not currentUser.kurs:
|
||||
kurs = await fetchDUALIS.getKurs(dualisUser.token, cookie)
|
||||
if kurs != 0:
|
||||
if not fetchRAPLA.getIcal(kurs):
|
||||
return render_template('kurs.html', detected=(kurs, e), s="s", theorie="hidden",
|
||||
praxis="hidden",
|
||||
file=False)
|
||||
current_user.kurs = kurs
|
||||
return render_template('kurs.html', detected=(kurs, dualisError), s="s",
|
||||
theorie="hidden", praxis="hidden", file=False)
|
||||
currentUser.kurs = kurs
|
||||
db.session.commit()
|
||||
else:
|
||||
e = True
|
||||
dualisError = True
|
||||
else:
|
||||
kurs = current_user.kurs
|
||||
current_user.kurs = kurs
|
||||
kurs = currentUser.kurs
|
||||
currentUser.kurs = kurs
|
||||
db.session.commit()
|
||||
else:
|
||||
e = True
|
||||
dualisError = True
|
||||
kurs = ""
|
||||
return render_template('kurs.html', detected=(kurs, e), s="s", theorie="hidden", praxis="hidden", file=True)
|
||||
return render_template('kurs.html', detected=(kurs, dualisError), s="s", theorie="hidden",
|
||||
praxis="hidden", file=True)
|
||||
|
||||
@app.route("/set-up/semester")
|
||||
@login_required
|
||||
@loginRequired
|
||||
async def getSemester():
|
||||
"""
|
||||
Manuelle Semester-Auswahl.
|
||||
:return HTML:
|
||||
"""
|
||||
t = Dualis.query.filter_by(uid=current_user.id).first().token
|
||||
c = request.cookies.get("cnsc")
|
||||
semesterlist = Semesterlist.query.filter_by(uid=current_user.id).all()
|
||||
if not semesterlist:
|
||||
semester = await fetchDUALIS.getSem(t, c)
|
||||
token = Dualis.query.filter_by(uid=currentUser.id).first().token
|
||||
cookie = request.cookies.get("cnsc")
|
||||
semesterList = Semesterlist.query.filter_by(uid=currentUser.id).all()
|
||||
if not semesterList:
|
||||
semester = await fetchDUALIS.getSem(token, cookie)
|
||||
for i in semester:
|
||||
semitem = Semesterlist(semestername=i[0], semesterid=i[1], uid=current_user.id,
|
||||
itemid=current_user.id * int(i[1][-7:]) // 1000000)
|
||||
db.session.add(semitem)
|
||||
semsterItem = Semesterlist(semestername=i[0], semesterid=i[1], uid=currentUser.id,
|
||||
itemid=currentUser.id * int(i[1][-7:]) // 1000000)
|
||||
db.session.add(semsterItem)
|
||||
db.session.commit()
|
||||
else:
|
||||
semester = getSemesterList(current_user.id)
|
||||
return render_template("semester.html", semester=semester, s="s", theorie="hidden", praxis="hidden")
|
||||
semester = getSemesterList(currentUser.id)
|
||||
return render_template("semester.html", semester=semester, s="s",
|
||||
theorie="hidden", praxis="hidden")
|
||||
|
||||
@app.route("/set-up/semester", methods=["POST"])
|
||||
@login_required
|
||||
@loginRequired
|
||||
def setSemester():
|
||||
"""
|
||||
Speichern der Semester-Auswahl.
|
||||
:return HTML:
|
||||
"""
|
||||
n = request.args.get("next")
|
||||
if not n:
|
||||
n = url_for("welcome")
|
||||
d = Dualis.query.filter_by(uid=current_user.id).first()
|
||||
d.semester = request.form.get("sem")
|
||||
nextArg = request.args.get("next")
|
||||
if not nextArg:
|
||||
nextArg = url_for("welcome")
|
||||
dualisUser = Dualis.query.filter_by(uid=currentUser.id).first()
|
||||
dualisUser.semester = request.form.get("sem")
|
||||
db.session.commit()
|
||||
return redirect(n)
|
||||
return redirect(nextArg)
|
||||
|
||||
@app.route("/set-up/rapla")
|
||||
@login_required
|
||||
@loginRequired
|
||||
def chooseRaplas():
|
||||
"""
|
||||
Manuelle Rapla-Auswahl.
|
||||
:return HTML:
|
||||
"""
|
||||
r = getRaplas()
|
||||
return render_template("rapla.html", raplas=r, s="s", theorie="hidden", praxis="hidden")
|
||||
raplas = getRaplas()
|
||||
return render_template("rapla.html", raplas=raplas, s="s", theorie="hidden", praxis="hidden")
|
||||
|
||||
@app.route("/set-up/rapla", methods=["POST"])
|
||||
@login_required
|
||||
@loginRequired
|
||||
async def getRapla():
|
||||
"""
|
||||
Verarbeitet die Eingabe von chooseRaplas().
|
||||
@ -231,12 +232,12 @@ def init_routes(app: Flask):
|
||||
if file == url == "None":
|
||||
return redirect(url_for("chooseRaplas"))
|
||||
if file != "None":
|
||||
loadUser(current_user.id).kurs = file[5:-5]
|
||||
loadUser(currentUser.id).kurs = file[5:-5]
|
||||
db.session.commit()
|
||||
elif url != "None":
|
||||
file = await getNewRapla(url)
|
||||
if type(file) is not int:
|
||||
loadUser(current_user.id).kurs = file[5:-5]
|
||||
loadUser(currentUser.id).kurs = file[5:-5]
|
||||
db.session.commit()
|
||||
else:
|
||||
return redirect(url_for("error", ecode=900))
|
||||
@ -260,89 +261,93 @@ def init_routes(app: Flask):
|
||||
"""
|
||||
email = request.form.get("email")
|
||||
password = request.form.get("password")
|
||||
n = request.args.get("next")
|
||||
if n:
|
||||
success = make_response(redirect(n))
|
||||
nextArg = request.args.get("next")
|
||||
if nextArg:
|
||||
success = make_response(redirect(nextArg))
|
||||
else:
|
||||
success = make_response(redirect(url_for("getKurs")))
|
||||
|
||||
user = User.query.filter_by(email=email).first()
|
||||
t = await fetchDUALIS.checkUser(email, password)
|
||||
if t[0] == -2:
|
||||
tokenAndCookie = await fetchDUALIS.checkUser(email, password)
|
||||
if tokenAndCookie[0] == -2:
|
||||
return redirect(url_for("login", code=-2))
|
||||
if user:
|
||||
dualis = Dualis.query.filter_by(uid=user.id).first()
|
||||
dualis.token = t[0]
|
||||
newcookie = t[1]
|
||||
dualis.token_created = time.time()
|
||||
dualisUser = Dualis.query.filter_by(uid=user.id).first()
|
||||
dualisUser.token = tokenAndCookie[0]
|
||||
newCookie = tokenAndCookie[1]
|
||||
dualisUser.token_created = time.time()
|
||||
db.session.commit()
|
||||
login_user(user)
|
||||
loginUser(user)
|
||||
if user.kurs:
|
||||
if not dualis.semester:
|
||||
if not dualisUser.semester:
|
||||
success = make_response(redirect(url_for("getSemester")))
|
||||
elif not n:
|
||||
elif not nextArg:
|
||||
success = make_response(redirect(url_for("welcome")))
|
||||
success.set_cookie("cnsc", value=newcookie, httponly=True, secure=True)
|
||||
success.set_cookie("cnsc", value=newCookie, httponly=True, secure=True)
|
||||
else:
|
||||
hashid = int(hashlib.sha1(email.encode("utf-8")).hexdigest(), 16) % (10 ** 8)
|
||||
pname = email.find(".") + 1
|
||||
ename = min(email[pname:].find("."), email[pname:].find("@"))
|
||||
name = email[pname:pname + ename].capitalize()
|
||||
new_user = User(email=email, name=name, id=hashid)
|
||||
hashedID = int(hashlib.sha1(email.encode("utf-8")).hexdigest(), 16) % (10 ** 8)
|
||||
vorname = email.find(".") + 1
|
||||
nachname = min(email[vorname:].find("."), email[vorname:].find("@"))
|
||||
name = email[vorname:(vorname + nachname)].capitalize()
|
||||
new_user = User(email=email, name=name, id=hashedID)
|
||||
db.session.add(new_user)
|
||||
cookie = t[1]
|
||||
cookie = tokenAndCookie[1]
|
||||
|
||||
new_dualis = Dualis(uid=hashid, token=t[0], token_created=int(time.time()))
|
||||
db.session.add(new_dualis)
|
||||
newDualis = Dualis(uid=hashedID, token=tokenAndCookie[0], token_created=int(time.time()))
|
||||
db.session.add(newDualis)
|
||||
db.session.commit()
|
||||
login_user(new_user)
|
||||
loginUser(new_user)
|
||||
success.set_cookie("cnsc", value=cookie, httponly=True, secure=True)
|
||||
return success
|
||||
|
||||
@app.route("/log-out")
|
||||
@login_required
|
||||
@loginRequired
|
||||
async def logout():
|
||||
"""
|
||||
Loggt den User aus.
|
||||
:return Empty Token:
|
||||
"""
|
||||
cookie = request.cookies.get("cnsc")
|
||||
dualis = Dualis.query.filter_by(uid=current_user.id).first()
|
||||
await fetchDUALIS.logOut(dualis.token, cookie)
|
||||
dualis.token = None
|
||||
dualisUser = Dualis.query.filter_by(uid=currentUser.id).first()
|
||||
await fetchDUALIS.logOut(dualisUser.token, cookie)
|
||||
dualisUser.token = None
|
||||
db.session.commit()
|
||||
logout_user()
|
||||
red = make_response(redirect(url_for("login", code=1, next=url_for("welcome"))))
|
||||
red.set_cookie("cnsc", value="Logged out! Your temporary token "
|
||||
logoutUser()
|
||||
redirection = make_response(redirect(url_for("login", code=1, next=url_for("welcome"))))
|
||||
redirection.set_cookie("cnsc", value="Logged out! Your temporary token "
|
||||
"on our server and the cookie on your device have been deleted.", httponly=True,
|
||||
secure=True)
|
||||
return red
|
||||
return redirection
|
||||
|
||||
@app.route("/error")
|
||||
def error():
|
||||
"""
|
||||
Error Page für custom-Errors. \n
|
||||
TODO: Funktion depreciaten. Ersetzen durch Errors auf den entsprechenden Seiten.
|
||||
:return:
|
||||
:return HTML:
|
||||
"""
|
||||
error = request.args.get("ecode")
|
||||
if error == "900":
|
||||
msg = "Ungültige RAPLA-URL! Sicher, dass der Link zum DHBW-Rapla führt?"
|
||||
elif error == "899":
|
||||
msg = "Der Kalender wurde nicht gefunden! Sicher, dass der Link korrekt ist?"
|
||||
errorCode = request.args.get("ecode")
|
||||
if errorCode == "900":
|
||||
message = "Ungültige RAPLA-URL! Sicher, dass der Link zum DHBW-Rapla führt?"
|
||||
elif errorCode == "899":
|
||||
message = "Der Kalender wurde nicht gefunden! Sicher, dass der Link korrekt ist?"
|
||||
else:
|
||||
msg = str(error)
|
||||
return render_template('display-message.html', message=msg)
|
||||
message = str(errorCode)
|
||||
return render_template('display-message.html', message=message)
|
||||
|
||||
@app.route("/error")
|
||||
@app.errorhandler(HTTPException)
|
||||
def handle(e):
|
||||
""""
|
||||
HTTP-Exception-Handler
|
||||
:param e:
|
||||
:return HTML:
|
||||
"""
|
||||
return render_template('display-message.html', message=e)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
init_routes(flask_app)
|
||||
flask_app.run(host='0.0.0.0', port=2024, debug=True)
|
||||
initRoutes(flaskApp)
|
||||
flaskApp.run(host='0.0.0.0', port=2024, debug=True)
|
||||
else:
|
||||
initRoutes(flaskApp)
|
||||
|
||||
@ -277,7 +277,7 @@ a.footer:hover {
|
||||
// Place on Timeline
|
||||
|
||||
|
||||
// Generated by genstarts.py
|
||||
// Generated by genSCSSstarts.py
|
||||
|
||||
.start-0100 {
|
||||
grid-row-start: 1
|
||||
|
||||
@ -8,25 +8,34 @@ from tests_examples import login_data
|
||||
|
||||
@pytest.fixture()
|
||||
def app():
|
||||
"""
|
||||
Erstellt die App und konfiguriert sie zum Test-Modus
|
||||
:yield app:
|
||||
"""
|
||||
app = init.create(testing=True)
|
||||
app.config.update({
|
||||
"TESTING": True,
|
||||
})
|
||||
routing.init_routes(app)
|
||||
routing.initRoutes(app)
|
||||
yield app
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def client(app):
|
||||
"""
|
||||
Liefert einen Test-Client
|
||||
:param app:
|
||||
:return client:
|
||||
"""
|
||||
return app.test_client(use_cookies=True)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def runner(app):
|
||||
return app.test_cli_runner()
|
||||
|
||||
|
||||
def login(client):
|
||||
"""
|
||||
Hilfsfunktion, die den Client einloggt
|
||||
:param client:
|
||||
:return Bool (true if successful, false otherwise):
|
||||
"""
|
||||
client.post('/log-in', data=dict(email=login_data.email, password=login_data.password),
|
||||
follow_redirects=True)
|
||||
cookie = client.get_cookie("cnsc")
|
||||
@ -37,6 +46,10 @@ def login(client):
|
||||
|
||||
|
||||
def test_login(client):
|
||||
"""
|
||||
Testet die Login-Funktion
|
||||
:param client:
|
||||
"""
|
||||
loginpage = client.get("/log-in", follow_redirects=True)
|
||||
assert b"Einloggen" in loginpage.data
|
||||
assert loginpage.status_code == 200
|
||||
@ -51,6 +64,10 @@ def test_login(client):
|
||||
|
||||
|
||||
def test_kurssetup(client):
|
||||
"""
|
||||
Testet die Konfiguration eines Kurses
|
||||
:param client:
|
||||
"""
|
||||
if login(client):
|
||||
kurspage = client.get("/set-up", follow_redirects=True)
|
||||
assert kurspage.status_code == 200
|
||||
@ -70,14 +87,18 @@ def test_kurssetup(client):
|
||||
|
||||
|
||||
def test_semestersetup(client):
|
||||
"""
|
||||
Testet die Konfiguration eines Semesters
|
||||
:param client:
|
||||
"""
|
||||
if login(client):
|
||||
semesterpage = client.get("/set-up/semester", follow_redirects=True)
|
||||
assert semesterpage.status_code == 200
|
||||
semesterpage_html = BeautifulSoup(semesterpage.text, "lxml")
|
||||
semesterform = semesterpage_html.find("form")
|
||||
semesterform_action = semesterform.get("action")
|
||||
semesterform_options = semesterform.find_all("option")
|
||||
nextpage = client.post(semesterform_action, data=dict(sem=semesterform_options[-1].get("value")),
|
||||
semesterpageHTML = BeautifulSoup(semesterpage.text, "lxml")
|
||||
semesterform = semesterpageHTML.find("form")
|
||||
semesterformAction = semesterform.get("action")
|
||||
semesterformOptions = semesterform.find_all("option")
|
||||
nextpage = client.post(semesterformAction, data=dict(sem=semesterformOptions[-1].get("value")),
|
||||
follow_redirects=True)
|
||||
assert nextpage.status_code == 200
|
||||
assert b"Willkommen, " in nextpage.data
|
||||
@ -86,30 +107,38 @@ def test_semestersetup(client):
|
||||
|
||||
|
||||
def test_noten(client):
|
||||
"""
|
||||
Testet das Abrufen der Noten aus zwei verschiedenen Semestern
|
||||
:param client:
|
||||
"""
|
||||
if login(client):
|
||||
notenpage = client.get("/theorie/noten", follow_redirects=True)
|
||||
assert notenpage.status_code == 200
|
||||
notenpage_html = BeautifulSoup(notenpage.text, "lxml")
|
||||
notenpage_heading = notenpage_html.find("h1")
|
||||
notenpage_form = notenpage_html.find("form")
|
||||
notenpage_action = notenpage_form.get("action")
|
||||
notenpage_selection = notenpage_form.find("select")
|
||||
notenpage_options = notenpage_selection.find_all("option")
|
||||
notenpage_semester = "Not found!"
|
||||
notenpageHTML = BeautifulSoup(notenpage.text, "lxml")
|
||||
notenpageHeading = notenpageHTML.find("h1")
|
||||
notenpageForm = notenpageHTML.find("form")
|
||||
notenpageAction = notenpageForm.get("action")
|
||||
notenpageSelection = notenpageForm.find("select")
|
||||
notenpageOptions = notenpageSelection.find_all("option")
|
||||
notenpageSemester = "Not found!"
|
||||
nextpage = "Not found!"
|
||||
for i in notenpage_options:
|
||||
for i in notenpageOptions:
|
||||
if i.get("selected") == "":
|
||||
notenpage_semester = i.text[:-1]
|
||||
notenpageSemester = i.text[:-1]
|
||||
else:
|
||||
nextpage = i.get("value")
|
||||
assert notenpage_semester.encode("utf-8") in notenpage_heading.encode("utf-8")
|
||||
nextpage = client.post(notenpage_action, data=dict(sem=nextpage), follow_redirects=True)
|
||||
assert notenpageSemester.encode("utf-8") in notenpageHeading.encode("utf-8")
|
||||
nextpage = client.post(notenpageAction, data=dict(sem=nextpage), follow_redirects=True)
|
||||
assert nextpage.status_code == 200
|
||||
else:
|
||||
assert False
|
||||
|
||||
|
||||
def test_logout(client):
|
||||
"""
|
||||
Testet die Logout-Funktion
|
||||
:param client:
|
||||
"""
|
||||
if login(client):
|
||||
loginpage = client.get("/log-out", follow_redirects=True)
|
||||
assert loginpage.status_code == 200
|
||||
|
||||
@ -93,6 +93,7 @@ async def checkUser_async():
|
||||
"""
|
||||
# noinspection DuplicatedCode
|
||||
async with httpx.AsyncClient() as s:
|
||||
# noinspection DuplicatedCode
|
||||
content = (f'usrname={fmail}&pass={fpw}&ARGUMENTS=clino%2Cusrname%2Cpass%2Cmenuno%2Cmenu_type%2Cbrowser'
|
||||
f'%2Cplatform&APPNAME=CampusNet&PRGNAME=LOGINCHECK')
|
||||
response = await s.post(url=url, headers=headers, content=content)
|
||||
@ -167,6 +168,7 @@ async def getResults_async(token, cookie, resl):
|
||||
async with httpx.AsyncClient() as s:
|
||||
response = await s.get(url=url + "?APPNAME=CampusNet&PRGNAME=COURSERESULTS&ARGUMENTS=-N" + token +
|
||||
",-N000307," + ",-N" + resl, headers=headers)
|
||||
# noinspection DuplicatedCode
|
||||
html = BeautifulSoup(response.content.decode("utf-8"), 'lxml')
|
||||
table = html.find('table', attrs={"class": "nb list"})
|
||||
body = table.find("tbody")
|
||||
@ -183,7 +185,7 @@ async def getResults_async(token, cookie, resl):
|
||||
col[2] = i
|
||||
i += 1
|
||||
vorlist += [col[1:4]]
|
||||
|
||||
# noinspection DuplicatedCode
|
||||
extrakurse = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
for i in vorlist:
|
||||
@ -192,6 +194,7 @@ async def getResults_async(token, cookie, resl):
|
||||
i[e] = extrakurse[i[e]]
|
||||
return vorlist[:-1]
|
||||
|
||||
|
||||
# noinspection DuplicatedCode
|
||||
async def getPruefung_async(s, url):
|
||||
response = await s.get("https://dualis.dhbw.de" + url, headers=headers)
|
||||
|
||||
Reference in New Issue
Block a user