import urllib.error from datetime import datetime, timedelta from dateutil.parser import * import asyncio import httpx import icalendar from dateutil.relativedelta import relativedelta from icalendar import Calendar, Event import json from init import scheduler, flaskApp, Rapla, db, HiddenVL, User from calendar_generation import createCustomCalendar async def fetchPlan(session, url): """ Hilfsfunktion, liefert die Response auf einen GET-Request zur angegebenen URL :param session: :param url: :return Response: """ return await session.get(url=url) def writeToFile(filename, data): """ Schreibt die Daten in die angegebene Datei. Erstellt die Datei, falls sie noch nicht existiert. :param filename: :param data: """ with open(filename, 'w+') as file: assert "BEGIN:VCALENDAR" in data.text file.write(data.text) file.close() def writeKursToDB(kurs, url): """ Schreibt Kurs und URL in die Datenbank :param kurs: :param url: """ if Rapla.query.filter_by(name=kurs).first() is None: new_kurs = Rapla(name=kurs, link=url, file=f"rapla{kurs}.ical") db.session.add(new_kurs) db.session.commit() def parseRaplaURL(url: str): """ Konvertiert Rapla-URLs ins korrekte Format. \n Konvertiert werden: http, www.; page=calendar \n In: https; page=ical :param url: :return [Status: int, URL: str]: """ rapla = url.find("rapla.") if rapla == -1: return 0 elif len(url[:rapla]) == 4 or len(url[:rapla]) > 6: url = url[rapla:] http = url.find(":") if url[:http] == "http": url = f"https{url[http:]}" elif http == -1: url = f"https://{url}" pageInURL = url.find("page=") andInURL = url.find("&") if (url[pageInURL + 5:andInURL]).lower() == "ical": return 1, url elif pageInURL != -1: return 1, f"{url[:pageInURL + 5]}ical{url[andInURL:]}" elif url.find("key") != -1: return 2, url else: return 0, 0 async def getNewRapla(url: str, testing=False): """ Speichert den iCal eines Raplas auf dem Server. \n Gibt Namen der Datei zurück. \n TODO: Standort zu Dateibezeichner hinzufügen, um Konflikte zu vermeiden. :param url: :param opt. testing: :return str: """ parsed = parseRaplaURL(url) if testing: assert "https" in parsed[1] try: assert "ical" in parsed[1] except AssertionError: assert "key" in parsed[1] if parsed[0] == 0: return 0 elif parsed[0] == 1: url = parsed[1] elif parsed[0] == 2: return await buildICALfromKey(parsed[1], onlyUpdate=False, testing=testing) fileInURL = url.find("file=") kurs = url[fileInURL + 5:].replace(" ", "").upper() if url[-5:] != ".ical": try: async with httpx.AsyncClient() as s: response = await fetchPlan(s, url) if testing: assert "Vollmer" in response.text writeToFile(f"calendars/rapla{kurs}.ical", response) except urllib.error.URLError: return -1 writeKursToDB(kurs, url) if testing: assert "TINF22B3" in Rapla.query.filter(Rapla.name == "TINF22B3").first().file return f"rapla{kurs}.ical" else: return url def getIcal(uid: int | None = None, kurs: str | None = None): """ Liefert den Namen der Datei des mitgegebenen Users. :param uid: :param kurs: :return str: """ if uid: if HiddenVL.query.filter_by(uid=uid).first(): return f"{uid}.ical" else: kurs = User.query.filter_by(id=uid).first().kurs rapla = Rapla.query.filter(Rapla.name == kurs).first() try: return rapla.file except AttributeError or KeyError: return None def getRaplas(): """ Liefert alle in der Datenbank gespeicherten Raplas. :return (Kursliste, Dateiliste, URL-Liste): """ raplas = Rapla.query.all() kursList = [rapla.name for rapla in raplas] fileList = [rapla.file for rapla in raplas] urlList = [rapla.link for rapla in raplas] return kursList, fileList, urlList async def refreshRapla(): """ Aktualisiert alle gespeicherten Raplas. """ fileList = getRaplas()[1] urlList = getRaplas()[2] jobList = [] async with httpx.AsyncClient() as s: for i in range(len(fileList)): print(f"Update Rapla: {fileList[i][:-5]}") if urlList[i].find("file") != -1: jobList += [fetchPlan(s, urlList[i])] else: jobList += [buildICALfromKey(urlList[i], onlyUpdate=True)] calendarList = await asyncio.gather(*jobList, return_exceptions=True) for calendar in range(len(calendarList)): if calendarList[calendar] != 200: writeToFile(f"calendars/{fileList[calendar]}", calendarList[calendar]) hiddenEvents = HiddenVL.query.all() hiddenUsers = {event.uid for event in hiddenEvents} for user in hiddenUsers: uid = await createCustomCalendar(user) print (f"Update custom calendar: {uid}") @scheduler.task('cron', id="raplaSchedule", hour='*', day_of_week='*', minute='*/3', week='*', second='40') def raplaSchedule(): """ Nutzt alle 3 Minuten refreshRapla() um die Stundenpläne zu aktualisieren. """ if not flaskApp.config["TESTING"]: with flaskApp.app_context(): asyncio.run(refreshRapla()) async def buildICALfromKey(url, onlyUpdate, testing=False): """ Baut eine .ical-Datei aus der mitgegebenen URL, die ein Key-Parameter enthalten muss. :param url: :param onlyUpdate: :param opt. testing: :return Dateinamen: """ async with httpx.AsyncClient() as s: page = await s.get(url=url) if page.text[:10] == "BEGIN:VCAL": info = page.headers['content-disposition'] kursname = info[info.find('filename=')+9:-4].upper() if testing: assert "TMB22" in kursname writeToFile(f"calendars/rapla{kursname}.ical", page) if not onlyUpdate: writeKursToDB(kursname, url) assert "TMB22" in Rapla.query.filter(Rapla.name == kursname).first().file return f"rapla{kursname}.ical" else: return 200 else: kursname = page.text[page.text.find("") + 7:page.text.find("")].replace(" ", "") if testing: assert "TMT22B1" in kursname if len(kursname) > 15: return 0 start = f'{(datetime.now() - relativedelta(days=100)):%Y-%m-%d}' end = f'{(datetime.now() + relativedelta(days=100)):%Y-%m-%d}' payload = {"url": url, "start": start, "end": end} if testing: req = await s.post(url="https://dh-api.paulmartin.cloud/rapla", data=payload, headers={'Content-Type': 'application/x-www-form-urlencoded'}, timeout=20) else: req = await s.post(url="https://dh-api.paulmartin.cloud/rapla", data=payload, headers={'Content-Type': 'application/x-www-form-urlencoded'}, timeout=10) jsonresp = json.loads(req.text) if testing: assert len(jsonresp) > 0 cal = Calendar() cal.add('prodid', '-//Rapla//iCal Plugin//EN') cal.add('version', '2.0') for i in jsonresp: if len(jsonresp[i]) != 0: for jsonday in jsonresp[i]: event = Event() event.add('SUMMARY', jsonday['title']) event.add('LOCATION', jsonday['ressources']) event.add('DTSTART', parse(jsonday['startDate'], dayfirst=True)) event.add('DTEND', parse(jsonday['endDate'], dayfirst=True)) event.add('UID', jsonday['UID']) try: teacher = icalendar.vCalAddress(value=jsonday['persons']) teacher.params["CN"] = jsonday['persons'] except KeyError: teacher = icalendar.vCalAddress(value="") teacher.params["CN"] = "" event.add('ATTENDEE', teacher) cal.add_component(event) with open(f"calendars/rapla{kursname}.ical", "wb") as f: f.write(cal.to_ical()) f.close() if not onlyUpdate: writeKursToDB(kursname, url) if testing: assert "TMT22B1" in Rapla.query.filter(Rapla.name == "TMT22B1").first().file return f"rapla{kursname}.ical" else: return 200