WIP: add: base files #1

Draft
fil wants to merge 3 commits from developing into main
8 changed files with 500 additions and 0 deletions

81
Database.py Normal file
View file

@ -0,0 +1,81 @@
from config import Config
import psycopg2.extras
import psycopg2
import os
class Database():
def __init__(self):
self.config = Config()
self.connect()
def connect(self):
self.conn = psycopg2.connect(
host=self.config["DB_HOST"],
database=self.config["DB_DATABASE"],
user=self.config["DB_USER"],
password=self.config["DB_PASSWORD"]
)
self.cur = self.conn.cursor(
cursor_factory=psycopg2.extras.RealDictCursor)
def get_vm(self, id):
self.cur.execute("SELECT * FROM vm WHERE id = %s;", (str(id),))
return self.cur.fetchall()
def get_all_vm(self):
self.cur.execute("SELECT * FROM vm;")
return self.cur.fetchall()
def get_group_by_id(self, spgid):
self.cur.execute("SELECT * FROM group WHERE spgid = %s;",
(str(spgid),))
return self.cur.fetchall()
def get_all_groups(self):
self.cur.execute("SELECT * FROM \"group\";")
return self.cur.fetchall()
def enumerate_vm(self):
self.cur.execute("SELECT COUNT(*) FROM vm;")
return self.cur.fetchall()
def add_vm(self, *argv):
self.cur.execute("""INSERT INTO vm
(id, name, last_seen, spid, ipaddr, spgid)
VALUES (%s,%s,%s,%s,%s,%s) RETURNING *;""", *argv)
self.conn.commit()
return self.cur.fetchall()
def add_group(self, *argv):
self.cur.execute("""INSERT INTO \"group\" (spgid, name, gid)
VALUES (%s,%s,%s) RETURNING *;""", *argv)
self.conn.commit()
return self.cur.fetchall()
def set_last_seen(self, id, time):
self.cur.execute("""UPDATE vm SET last_seen = %s
WHERE id = %s RETURNING *;""", (time, id))
self.conn.commit()
return self.cur.fetchall()
def set_ipaddr(self, id, ipaddr):
self.cur.execute("""UPDATE vm SET ipaddr = %s
WHERE id = %s RETURNING *;""", (ipaddr, id))
self.conn.commit()
return self.cur.fetchall()
def delete_vm(self, id):
self.cur.execute("DELETE FROM vm WHERE id = %s RETURNING *;", (id,))
self.conn.commit()
return self.cur.fetchall()
def delete_all_vm(self):
self.cur.execute("DELETE FROM vm RETURNING *;")
self.conn.commit()
return self.cur.fetchall()
def delete_all_groups(self):
self.cur.execute("DELETE FROM \"group\" RETURNING *;")
self.conn.commit()
return self.cur.fetchall()

84
Statping.py Normal file
View file

@ -0,0 +1,84 @@
from requests.adapters import HTTPAdapter, Retry
from config import Config
import requests
import os
class Statping():
def __init__(self):
self.config = Config()
self.api_key = self.config['SP_API_KEY']
self.endpoint = self.config['SP_ENDPOINT']
self.service_config = {
"name": "",
"domain": "",
"expected": "",
"expected_status": 200,
"check_interval": 60,
"type": "icmp",
"post_data": "",
"port": 0,
"timeout": 30,
"order_id": 0
}
self.session = requests.Session()
retry = Retry(connect=3, read=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def get_vm(self, spid):
r = self.session.get(self.endpoint +
f"/api/services/{spid}?api={self.api_key}")
return r.json()
def get_groups(self):
r = self.session.get(self.endpoint +
f"/api/groups?api={self.api_key}")
return r.json()
def set_description(self, desc):
json = {"description": desc}
r = self.session.post(self.endpoint +
f"/api/core?api={self.api_key}", json=json)
return r.json()
def add_vm(self, name, ipaddr, pos, group):
config = self.service_config.copy()
config["name"] = name
config["domain"] = ipaddr
config["order_id"] = pos
if group:
config["group_id"] = int(group)
print(config, group)
r = self.session.post(self.endpoint +
f"/api/services?api={self.api_key}", json=config)
print(r.json())
return r.json().get("id")
def add_group(self, name):
r = self.session.post(self.endpoint +
f"/api/groups?api={self.api_key}",
json={"name": name})
print(r.json())
return r.json().get('id')
def delete_vm(self, spid):
r = self.session.delete(self.endpoint +
f"/api/services/{spid}?api={self.api_key}",
timeout=30)
return r.json()
def delete_all_vm(self, all_spid):
for spid in all_spid:
print(self.delete_vm(spid))
def delete_group(self, spgid):
r = self.session.delete(self.endpoint +
f"/api/groups/{spgid}?api={self.api_key}",
timeout=30)
return r.json()
def delete_all_groups(self, all_spgid):
for spgid in all_spgid:
print(self.delete_vm(spgid))

213
Uptime.py Normal file
View file

@ -0,0 +1,213 @@
from proxmoxer import ProxmoxAPI
from Database import Database
from Statping import Statping
from datetime import datetime
from config import Config
from utils import ping
from log import logger
import subprocess
import urllib3
import signal
import time
import pytz
import sys
urllib3.disable_warnings()
class Uptime():
def __init__(self):
signal.signal(signal.SIGTERM, self.__signal_handler)
signal.signal(signal.SIGINT, self.__signal_handler)
self.ssh = None
self.proxmox = None
self.db = Database()
self.sp = Statping()
self.config = Config()
def __signal_handler(self, signal, frame):
logger.warning("Force terminating the process")
self.__disconnect()
sys.exit(0)
def __init_groups(self):
groups = self.sp.get_groups()
logger.info("Creating groups if needed on Statping")
for gid, name in self.config['ID_POLICY'].items():
if name in [el['name'] for el in groups]:
continue
logger.info(f"Adding {name} ({gid}) to Database")
spid = self.sp.add_group(name)
self.db.add_group([spid, name, gid])
def __connect(self, pve):
logger.info(f"Connecting to PVE wiht IP {pve}")
local_port = self.config['PROXMOXPORT']
remote_host = self.config['PROXMOXHOST']
remote_port = self.config['PROXMOXPORT']
forward = f"{local_port}:{remote_host}:{remote_port}"
self.ssh = subprocess.Popen(
f"""ssh -i {self.config['SSHKEYFILE']} -L {forward}
-J {self.config['LDAPUSER']}@{self.config['JUMP']}
{self.config['LDAPUSER']}@{pve} sleep infinity""".split(),
stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
)
logger.info("Waiting 5 seconds to ensure ssh conenction")
time.sleep(5)
def __disconnect(self):
if self.ssh:
self.ssh.kill()
logger.info("SSH connection killed")
def __get_pve(self):
logger.info("Getting PVE to connect")
for pve in self.config["PVES"]:
logger.info(f"Pinging PVE with IP {pve}")
r = ping(pve)
if r:
logger.info(f"PVE with IP {pve} is reachable")
return pve
raise Exception("Cannot find a running PVE")
def __get_qemu_ipaddr(self, node, vmid):
try:
return self.proxmox.nodes(node["node"]).qemu(vmid) \
.config.get().get('ipconfig0').split(",")[0][3:-3]
except Exception as e:
return None
def __get_lxc_ipaddr(self, node, vmid):
try:
return self.proxmox.nodes(node["node"]).lxc(vmid) \
.config.get().get('net0').split(",")[-2][3:-3]
except Exception as e:
return None
def __get_data(self, vm):
vmid = str(vm["vmid"])
standard = True
if (len(vmid) < 9):
standard = False
# remove when standard is fulfilled
vmid = vmid + "0"*(9-len(vmid))
if vmid[0] == "9" or vmid[3] == "8":
return None, standard
return vmid, standard
def __get_category(self, vmid, standard):
if not standard:
return None
for group in self.db.get_all_groups():
if vmid[:2] == group['gid']:
return group['spgid']
return None
def __update(self, name, vmid, update_time, ipaddr, running, spgid):
vm = self.db.get_vm(vmid)
if running:
if not vm:
pos = self.db.enumerate_vm()[0]["count"]
spid = self.sp.add_vm(name, ipaddr, pos, spgid)
self.db.add_vm([vmid, name, update_time, spid, ipaddr, spgid])
logger.info(f"[{vmid}] {name} added to Database and Statping")
else:
if vm[0].get('ipaddr') != ipaddr:
self.db.set_ipaddr(vmid, ipaddr)
logger.warning(f"[{vmid}] {name} changed its IP address. \
Updated it correspondingly")
self.db.set_last_seen(vmid, update_time)
logger.info(f"[{vmid}] {name} is still active")
def __clear(self):
logger.info("Synchronizing Database and Statping with Proxmox")
for vm in self.db.get_all_vm():
if vm.get('last_seen') < self.update_time:
self.sp.delete_vm(vm.get('spid'))
self.db.delete_vm(vm.get('id'))
logger.info(
f"[{vm.get('id')}] {vm.get('name')} has been deleted")
def __run(self):
logger.info("Initiate brokerage")
all_nodes = sorted(self.proxmox.nodes.get(), key=lambda x: x['node'])
for node in all_nodes:
nodeid = "0"*8 + node["node"][-1]
ip_addr = self.proxmox.nodes(node["node"]).network \
.vmbr0.get().get("address")
spgid = self.__get_category(nodeid, True)
self.__update(node["node"], nodeid, self.update_time,
ip_addr, True, spgid)
for vm in self.proxmox.nodes(node["node"]).qemu.get():
vmid, standard = self.__get_data(vm)
if vmid is None:
continue
spgid = self.__get_category(vmid, standard)
ipaddr = self.__get_qemu_ipaddr(node, vm["vmid"])
if ipaddr is None:
continue
self.__update(vm["name"], vmid, self.update_time, ipaddr,
vm["status"] == "running", spgid)
for vm in self.proxmox.nodes(node["node"]).lxc.get():
vmid, standard = self.__get_data(vm)
if vmid is None:
continue
spgid = self.__get_category(vmid, standard)
ipaddr = self.__get_lxc_ipaddr(node, vm["vmid"])
if ipaddr is None:
continue
self.__update(vm["name"], vmid, self.update_time, ipaddr,
vm["status"] == "running", spgid)
self.__clear()
self.__disconnect()
def start(self):
logger.info("Connecting to Database")
self.db.connect()
self.update_time = str(datetime.now(pytz.UTC).timestamp())
logger.info(f"Update time is {datetime.now(pytz.UTC)} \
({self.update_time})")
try:
self.__init_groups()
self.__connect(self.__get_pve())
logger.info("Connecting to Proxmox API")
self.proxmox = ProxmoxAPI(
"localhost:8006",
user=f"{self.config['LDAPUSER']}@{self.config['REALM']}",
password=self.config["KEY"], verify_ssl=False
)
self.__run()
except Exception as e:
logger.error(e)
self.__disconnect()
def purge(self):
logger.info("Deleting all services from Statping and Database")
all_spid = [vm.get('spid') for vm in self.db.get_all_vm()]
self.sp.delete_all_vm(all_spid)
self.db.delete_all_vm()
logger.info("Deleting all groups from Statping and Database")
all_spgid = [group.get('spgid') for group in self.db.get_all_groups()]
self.sp.delete_all_groups(all_spgid)
self.db.delete_all_groups()
logger.info("Purge completed")
def calculate_sla(self):
logger.info("Connecting to Database")
self.db.connect()
hits, failures, total = 0, 0, 0
logger.info("Calculating SLA (hits/failures)")
for vm in self.db.get_all_vm():
r = self.sp.get_vm(vm.get('spid')).get('stats')
hits += int(r.get('hits'))
failures += int(r.get('failures'))
logger.info(f"[{vm.get('id')}] {vm.get('name')} {hits}/{failures}")
sla = 100.0
if (hits + failures) != 0:
sla = round((hits / (hits + failures)) * 100, 3)
self.sp.set_description(str(f"SLA {sla}%"))
logger.info(f"Calculated SLA: {sla}%")
return sla

53
config.py Normal file
View file

@ -0,0 +1,53 @@
import os
from dotenv import load_dotenv
defConfig = {
"PROXMOXHOST": "localhost",
"PROXMOXPORT": "8006",
"LDAPUSER": "fil",
"SSHKEYFILE": "fil",
"REALM": "ldap.students.cs.unibo.it",
"KEY": "",
"JUMP": "",
"DB_HOST": "",
"DB_DATABASE": "",
"DB_USER": "",
"DB_PASSWORD": "",
"SP_ENDPOINT": "https://status.students.cs.unibo.it",
"SP_API_KEY": "",
"PVES": ["130.136.3.3", "130.136.3.4", "130.136.3.5",
"130.136.3.6", "130.136.3.7"],
"ID_POLICY": {
"00": "Bare Metal",
"10": "Adm Staff Core Services",
"20": "CSUNIBO",
"30": "Student Projects",
"40": "Kubernetes",
"70": "Other"
},
"check_interval_cron": "0 * * * *",
"sla_interval_cron": "5-55/5 * * * *"
}
class Config:
config_data: dict
__instance = None
def __getitem__(self, name):
if name in self.config_data:
return self.config_data[name]
return None
def __new__(cls):
if cls.__instance is None:
cls.__instance = super().__new__(cls)
cls.__instance.__init()
return cls.__instance
def __init(self):
load_dotenv()
self.config_data = defConfig
for key in defConfig.keys():
if key in os.environ:
self.config_data[key] = os.environ[key]

24
log.py Normal file
View file

@ -0,0 +1,24 @@
import colorlog
import logging
colorlogformatter = colorlog.ColoredFormatter(
"%(asctime)s | %(log_color)s%(levelname)-8s%(reset)s %(blue)s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
},
secondary_log_colors={},
style='%'
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
logger.handlers.pop()
handler = logging.StreamHandler()
handler.setFormatter(colorlogformatter)
logger.addHandler(handler)

17
main.py Normal file
View file

@ -0,0 +1,17 @@
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from config import Config
from Uptime import Uptime
if __name__ == "__main__":
uptime = Uptime()
scheduelr = BlockingScheduler()
config = Config()
scheduelr.add_job(uptime.start,
CronTrigger.from_crontab(config["check_interval_cron"]))
scheduelr.add_job(uptime.calculate_sla,
CronTrigger.from_crontab(config["sla_interval_cron"]))
scheduelr.start()

21
requirements.txt Normal file
View file

@ -0,0 +1,21 @@
APScheduler==3.10.4
bcrypt==4.1.2
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
colorlog==6.8.2
cryptography==42.0.2
idna==3.6
paramiko==3.4.0
proxmoxer==2.0.1
psycopg2-binary==2.9.9
pycodestyle==2.11.1
pycparser==2.21
PyNaCl==1.5.0
pyping==0.0.6
python-dotenv==1.0.1
pytz==2024.1
requests==2.31.0
six==1.16.0
tzlocal==5.2
urllib3==2.2.0

7
utils.py Normal file
View file

@ -0,0 +1,7 @@
import subprocess
def ping(host):
command = f"ping -c 1 -t 1 {host}".split()
return subprocess.call(command, stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT) == 0