From 6db58d07cbdda30cda6ba15c4a93e80e77cbb21c Mon Sep 17 00:00:00 2001 From: Chris Josten Date: Fri, 22 Jan 2021 19:06:44 +0100 Subject: [PATCH] Reimplement based on systemd's inhibit system --- ober/aiuo-shutdown-sleep.service | 13 -- ober/dub.json | 15 ++ ober/dub.selections.json | 14 ++ ober/ober/autoshutdown.py | 149 -------------- ober/setup.py | 16 -- ober/source/app.d | 187 ++++++++++++++++++ ober/source/msgpackrpc/client.d | 134 +++++++++++++ ober/source/msgpackrpc/package.d | 3 + ober/source/msgpackrpc/protocol.d | 32 +++ .../msgpackrpc/server.d} | 0 10 files changed, 385 insertions(+), 178 deletions(-) delete mode 100644 ober/aiuo-shutdown-sleep.service create mode 100644 ober/dub.json create mode 100644 ober/dub.selections.json delete mode 100755 ober/ober/autoshutdown.py delete mode 100644 ober/setup.py create mode 100644 ober/source/app.d create mode 100644 ober/source/msgpackrpc/client.d create mode 100644 ober/source/msgpackrpc/package.d create mode 100644 ober/source/msgpackrpc/protocol.d rename ober/{ober/__init__.py => source/msgpackrpc/server.d} (100%) diff --git a/ober/aiuo-shutdown-sleep.service b/ober/aiuo-shutdown-sleep.service deleted file mode 100644 index 7d2bda6..0000000 --- a/ober/aiuo-shutdown-sleep.service +++ /dev/null @@ -1,13 +0,0 @@ -[Unit] -Description=AIUO sleep hook -Before=sleep.target -StopWhenUnneeded=yes - -[Service] -Type=oneshot -RemainAfterExit=yes -ExecStart=-aiuo-shutdown --notify sleep -ExecStop=-aiuo-shutdown --notify wakeup - -[Install] -WantedBy=sleep.target diff --git a/ober/dub.json b/ober/dub.json new file mode 100644 index 0000000..990bdda --- /dev/null +++ b/ober/dub.json @@ -0,0 +1,15 @@ +{ + "authors": [ + "Chris Josten" + ], + "copyright": "Copyright © 2021, Chris Josten", + "dependencies": { + "ddbus": {"path": "/home/chris/Programmeren/D/ddbus"}, + "msgpack-d": "~>1.0.3", + "vibe-core": "~>1.13.0" + }, + "description": "Automatisch In-Uitschakelen Ober", + "license": "MIT", + "name": "aiuo-shutdown", + "versions": ["WithRPC"] +} diff --git a/ober/dub.selections.json b/ober/dub.selections.json new file mode 100644 index 0000000..299396f --- /dev/null +++ b/ober/dub.selections.json @@ -0,0 +1,14 @@ +{ + "fileVersion": 1, + "versions": { + "ddbus": {"path":"../../../D/ddbus"}, + "dunit": "1.0.16", + "eventcore": "0.9.13", + "libasync": "0.8.6", + "memutils": "1.0.4", + "msgpack-d": "1.0.3", + "stdx-allocator": "2.77.5", + "taggedalgebraic": "0.11.19", + "vibe-core": "1.13.0" + } +} diff --git a/ober/ober/autoshutdown.py b/ober/ober/autoshutdown.py deleted file mode 100755 index c2f000f..0000000 --- a/ober/ober/autoshutdown.py +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/env python3 -import msgpackrpc -import argparse -import asyncio -import logging -import os -import signal -import sys -import subprocess -import time - -CLIENT_IP="example.org" -LOGGER = logging.getLogger(__name__) -TIMEOUT: float = 30 * 60 -last_checked: float = time.time() -last_checked_value: bool = False -cur_timeout: float = TIMEOUT -client = None - -def count_users() -> int: - """Checks if any users are logged in""" - cmd = "who | wc -l" - ps = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT) - output = ps.communicate()[0] - try: - user_count = int(output) - except Exception as e: - LOGGER.exception(f"{output} could not be parsed as an int") - return user_count - -def count_minecraft_players() -> int: - """Checks if there any minecraft players""" - cmd = "netstat | grep 25565 | grep -v TIME_WAIT | wc -l" - ps = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT) - output = ps.communicate()[0] - try: - minecraft_players = int(output) - except Exception as e: - LOGGER.exception(f"{output} could not be parsed as an int") - return minecraft_players - -def should_shutdown() -> bool: - user_count = count_users() - player_count = count_minecraft_players() - res = user_count == 0 and player_count == 0 - LOGGER.debug(f"Player count: {player_count}") - LOGGER.debug(f"User count: {user_count}") - LOGGER.info("Should shut down" if res else "Should not shut down") - return res - -def trusty_sleep(amount: float): - while time.time() - last_checked < amount: - LOGGER.debug("Woke up early") - time.sleep(amount - (time.time() - last_checked)) - -def sigterm_handler(signum, frame): - LOGGER.debug("Notifying of shutdown") - prepareShutdown() - sys.exit(0) - -def sigusr1_handler(signum, frame): - LOGGER.debug("Going to sleep") - if client: - client.notify("NotifySleep") - -def sigusr2_handler(signum, frame): - LOGGER.debug("Waking up") - if client: - client.notify("NotifyWakeup") - -def prepareShutdown(): - client.notify("NotifyShutdown") - -def main() -> None: - global TIMEOUT - global CLIENT_IP - global last_checked - global last_checked_value - global client - - - LOGGER.debug("Trying to connect via RPC") - - success = False - while not success: - try: - client = msgpackrpc.Client(msgpackrpc.Address(CLIENT_IP, 18002)) - success = True - except Exception as e: - LOGGER.debug(f"Connection failed: {e}. Retrying in 10s") - time.sleep(10) - - LOGGER.debug(client.call("GetBootReason")) - # Alleen de klant op de hoogte stellen nadat we hebben laten weten dat we er zijn. - LOGGER.debug("signalen registreren") - signal.signal(signal.SIGTERM, sigterm_handler) - signal.signal(signal.SIGUSR1, sigusr1_handler) - signal.signal(signal.SIGUSR2, sigusr2_handler) - - - final_verdict = False - while not final_verdict: - LOGGER.debug("Stayling alive") - trusty_sleep(TIMEOUT / 2) - last_checked = time.time() - - shutdown = should_shutdown() - if last_checked_value and shutdown: - break - elif shutdown: - last_checked_value = True - else: - last_checked_value = False - prepareShutdown() - os.system("shutdown +1 The system is going to shut down because no active sessions were found and no one is on the server") - - -def notify(what) -> None: - if what == "sleep": - sig_to_send = "USR1" - elif what == "wakeup": - sig_to_send = "USR2" - else: - return -1 - os.system(f"systemctl kill -s {sig_to_send} aiuo-shutdown.service") - - return 0 - - -def start_main(): - global CLIENT_IP - global TIMEOUT - - logging.basicConfig(format='[%(asctime)s %(levelname)s] %(name)s: %(message)s', level=logging.DEBUG) - argparser = argparse.ArgumentParser(description="AIUO - Automatisch In- en Uitschakelen Ober (Oberkant)") - argparser.add_argument("wachttijd", type=float, nargs="?", help="Wachttijd voor het uitschakelen van ober, in secondes") - argparser.add_argument("ip_klant", type=str, nargs="?", help="IP-adres van klant") - argparser.add_argument("--notify", type=str, help="Notify before sleep (sleep) or after wakeup (wakeup)") - args = argparser.parse_args() - if args.notify: - return notify(args.notify) - else: - CLIENT_IP = args.ip_klant - TIMEOUT = args.wachttijd - - return main() - -if __name__ == "__main__": - start_main() diff --git a/ober/setup.py b/ober/setup.py deleted file mode 100644 index a77680b..0000000 --- a/ober/setup.py +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env python3 -from setuptools import setup - -setup(name="AIUO-ober", - version="0.3", - description="Automatisch In- en Uitschakelen Ober Ober", - author="Chris Josten", - author_email="chris@netsoj.nl", - packages=["ober"], - install_requires=["msgpack-rpc-python"], - entry_points={ - "console_scripts": { - "aiuo-shutdown = ober.autoshutdown:start_main" - } - } - ) diff --git a/ober/source/app.d b/ober/source/app.d new file mode 100644 index 0000000..db621bf --- /dev/null +++ b/ober/source/app.d @@ -0,0 +1,187 @@ +import std.conv; +import std.datetime; +import std.experimental.logger; +import std.stdio; +import std.string; + +import vibe.core.core; +import vibe.core.net; +import vibe.core.process; + +import ddbus; +import ddbus.c_lib; +import msgpackrpc; + +string appName = "Automatisch In- en Uitschakelen Ober"; + +extern (C) void close(int fileNo); + +/** + * Represententeerd de toepassing + */ +struct Toep { +public: + immutable ushort PORT = 18_002; + immutable int m_waitingTime; + immutable string m_clientAddress; + immutable int CHECK_COUNT = 2; + Connection m_dbusCon; + bool m_keepRunning = true; + version(WithRPC) Client m_client; + + this(int waitingTime, string clientAddress) { + this.m_waitingTime = waitingTime; + this.m_clientAddress = clientAddress; + this.m_dbusCon = connectToBus(DBusBusType.DBUS_BUS_SYSTEM); + } + + int exec() { + info ("Can shut down: ", canShutdown()); + version(WithRPC) { + TCPConnection conn = connectTCP(m_clientAddress, PORT); + Duration sleepTime = 500.msecs; + while(!conn) { + infof("Not connected, trying again in %s", sleepTime); + sleep(sleepTime); + conn = connectTCP(m_clientAddress, PORT); + } + info("Connected!"); + m_client = new Client(conn); + writeln("Receive response: '", m_client.call!(string)("GetBootReason"), "'"); + } + + runTask({ + import vibe.core.core : yield; + while(m_dbusCon.tick() && m_keepRunning) { + yield(); + } + }); + + runTask({ + FileDescriptor inhibitLock = FileDescriptor.none; + version(WithRPC) { + FileDescriptor sleepLock = FileDescriptor.none; + FileDescriptor shutdownLock = FileDescriptor.none; + } + + // Get interfaces + BusName loginBus = busName("org.freedesktop.login1"); + ObjectPath loginPath = ObjectPath("/org/freedesktop/login1"); + InterfaceName loginIFace = interfaceName("org.freedesktop.login1.Manager"); + PathIface loginManager = new PathIface(m_dbusCon, loginBus, loginPath, loginIFace); + PathIface loginManagerProps = new PathIface(m_dbusCon, loginBus, loginPath, interfaceName("org.freedesktop.DBus.Properties")); + + void releaseLock(ref FileDescriptor fd) { + if (fd != FileDescriptor.none) { + close(cast(int) fd); + fd = FileDescriptor.none; + } + } + + version(WithRPC) { + // Register signal listeners + // FIXME: this does not work yet. + MessageRouter router = new MessageRouter(); + MessagePattern sleepPattern = MessagePattern(loginPath, loginIFace, "PrepareForSleep", true); + MessagePattern shutdownPattern = MessagePattern(loginPath, loginIFace, "PrepareForShutdown", true); + router.setHandler!(void, bool)(sleepPattern, (bool active) { + logf("Preparing for sleep: %b", active); + if (active) { + m_client.notify("NotifySleep"); + releaseLock(sleepLock); + } else { + m_client.notify("NotifyWakeup"); + sleepLock = loginManager.Inhibit("sleep", appName, "Systeem op de hoogte brengen dat de ober gaat slapen", "delay").to!FileDescriptor; + } + }); + router.setHandler!(void, bool)(shutdownPattern, (bool active) { + logf("Preparing for shutdown: %b", active); + if (active) { + m_client.notify("NotifyShutdown"); + releaseLock(shutdownLock); + } + }); + registerRouter(m_dbusCon, router); + + // Take the sleep lock + sleepLock = loginManager.Inhibit("sleep", appName, "Systeem op de hoogte brengen dat de ober gaat slapen", + "delay").to!FileDescriptor; + shutdownLock = loginManager.Inhibit("shutdown", appName, "Systeem op de hoogte brengen dat de ober gaat sluiten", + "delay").to!FileDescriptor; + } + + + void block() { + if (inhibitLock != FileDescriptor.none) return; + Message mesg = loginManager.Inhibit("shutdown:sleep:idle:handle-suspend-key:handle-power-key", appName, "Er zijn spelers op de server", "block"); + inhibitLock = mesg.to!FileDescriptor; + } + + + scope (exit) { + // Als we om een of andere redenen deze functie verlaten, laat het slot los! + releaseLock(inhibitLock); + releaseLock(sleepLock); + releaseLock(shutdownLock); + } + + int checkCount = CHECK_COUNT; + while(m_keepRunning) { + // Check if we are already preventing shutdown + if (inhibitLock != FileDescriptor.none) { + // We are. Check if we can shutdown (as in, no players are on the server) + if (canShutdown()) { + + if (checkCount == 1) { + // Release the lock + releaseLock(inhibitLock); + info("Stop preventing shutdown"); + } else { + // Check 1? more time + checkCount--; + tracef("Checking %d more time(s)", checkCount); + } + } else { + // We cannot shut down. Reset the check counter. + checkCount = CHECK_COUNT; + tracef("Still players out there. Keeping lock"); + } + } else if (!canShutdown()) { + try { + block(); + } catch(DBusException e) { + warning("Could not take lock and prevent sleep/shutdown: ", e); + } + info("Start preventing shutdown"); + } else { + trace("Nothing to do"); + } + sleep(seconds(m_waitingTime)); + } + }); + + int exitCode = runEventLoop(); + // cleanup + info("Cleanup"); + return exitCode; + } + + /** + * Checks if the system can shut down. + */ + bool canShutdown() { + auto result = execute(["ss", "-H", "--query=tcp", "state", "established", "sport", "=", ":minecraft"]); + return result.output.splitLines().length == 0; + } +} + + +int main(string[] args) { + scope(failure) stderr.writefln("GEBRUIK: %s WACHTTIJD KLANTADRES", args[0]); + if (args.length < 3) { + stderr.writefln("GEBRUIK: %s WACHTTIJD KLANTADRES", args[0]); + return -1; + } + int waitingTime = to!int(args[1]); + return Toep(waitingTime, args[2]).exec(); +} diff --git a/ober/source/msgpackrpc/client.d b/ober/source/msgpackrpc/client.d new file mode 100644 index 0000000..d8fc470 --- /dev/null +++ b/ober/source/msgpackrpc/client.d @@ -0,0 +1,134 @@ +import core.atomic; + +import std.array; +import std.concurrency : receiveOnly; +import std.exception; +import std.experimental.logger; +import std.traits; +import std.typecons; + +import msgpack; + +import vibe.core.concurrency; +import vibe.core.core; +import vibe.core.net; +import vibe.core.task; + +import protocol; + +/** + * MessagePack RPC client + */ +class Client { +private: + TCPConnection m_connection; + shared int m_nextRequestId = 0; + + /// Map of sent request, mapping the requestId to a future. + Tid[int] m_requests; +public: + this(TCPConnection connection) { + this.m_connection = connection; + runTask(&receiveLoop); + } + + /** + * Notifies the server, not waiting for a response. Although it waits for the request to be sent + * + * params: + * method = The name of the method to be notified + * arguments = The arguments to pass to the server + */ + void notify(Args...)(string method, Args arguments) { + tracef("Notify '%s'", method); + auto pack = packer(Appender!(ubyte[])()); + pack.beginArray(3).pack(MessageType.notify, method, pack.packArray(arguments)); + sendData(pack.stream.data); + } + + /** + * Calls a method on the other side, awaiting a response. + * + * The difference between this and notify(string, Args) is that call expects a result from the + * other endpoint and will actually wait for it. + */ + R call(R, Args...)(string method, Args arguments) { + R res = sendRequest!R(method, arguments).getResult(); + return res; + } + + /** + * Calls a method on the other endpoint, but will not wait for a result. Instead, a Future!R is + * returned. + */ + Future!R callAsync(R, Args...)(string method, Args arguments) { + return sendRequest!R(method, arguments); + } + +private: + void receiveLoop() { + StreamingUnpacker unpacker = StreamingUnpacker([]); + while(m_connection.connected) { + m_connection.waitForData(); + ubyte[] buf = new ubyte[m_connection.leastSize]; + m_connection.read(buf); + unpacker.feed(buf); + + foreach(ref message; unpacker) { + enforce!ProtocolException(message.length == 3 || message.length == 4, "Protocol error: incoming message size mismatch"); + immutable uint messageType = message[0].as!uint; + switch(messageType) { + case MessageType.response: + enforce!ProtocolException(message.length == 4, "Protocol error: Response must be of size 4"); + immutable error = message[2]; + immutable result = message[3]; + onResponse(message[1].as!int, error, result); + break; + default: + tracef("Received unhandled messageType %s", cast(MessageType) messageType); + break; + } + } + } + } + + void onResponse(int id, immutable Value error, immutable Value result) { + if (id in m_requests) { + tracef("Reply for %d; Error: %s; Result: %s", id, error.type, result.type); + send(m_requests[id], error, result); + m_requests.remove(id); + } else { + warningf("No task for id %d", id); + } + } + + void sendData(T)(T data) { + m_connection.write(data); + m_connection.flush(); + } + + /** + * Sends a request and returns a future containing the result. + */ + Future!R sendRequest(R, Args...)(string method, Args arguments) { + tracef("Call '%s'", method); + auto pack = packer(Appender!(ubyte[])()); + immutable int requestId = atomicFetchAdd(m_nextRequestId, 1); + pack.beginArray(4) + .pack(MessageType.request, requestId, method) + .packArray(arguments); + + sendData(pack.stream.data); + + return async(delegate R() { + m_requests[requestId] = Task.getThis().tid; + // error result + Tuple!(immutable Value, immutable Value) reply = receiveOnly!(immutable Value, immutable Value); + if (reply[0].type != Value.Type.nil) { + throw new RPCException(reply[0]); + } + Value result = reply[1]; + return result.as!R; + }); + } +} diff --git a/ober/source/msgpackrpc/package.d b/ober/source/msgpackrpc/package.d new file mode 100644 index 0000000..d51a182 --- /dev/null +++ b/ober/source/msgpackrpc/package.d @@ -0,0 +1,3 @@ +module msgpackrpc; +public import client; +public import server; diff --git a/ober/source/msgpackrpc/protocol.d b/ober/source/msgpackrpc/protocol.d new file mode 100644 index 0000000..63153ae --- /dev/null +++ b/ober/source/msgpackrpc/protocol.d @@ -0,0 +1,32 @@ +import std.exception; + +import msgpack; + +enum MessageType { + request = 0, + response = 1, + notify = 2 +} + +/** + * Thrown when the wire protocol could not be parsed. + */ +class ProtocolException : Exception { + mixin basicExceptionCtors; +} + +/** + * Thrown when the other endpoint reports an error. + */ +class RPCException : Exception { +private: + Value m_value; +public: + this(Value value, string file = __FILE__, int line = __LINE__, Throwable next = null) { + super(value.as!string, file, line, next); + this.m_value = value; + } + + @property Value value() { return m_value; } + +} diff --git a/ober/ober/__init__.py b/ober/source/msgpackrpc/server.d similarity index 100% rename from ober/ober/__init__.py rename to ober/source/msgpackrpc/server.d