looper/rpc.hpp
2024-09-16 15:05:53 -07:00

241 lines
8.5 KiB
C++

#pragma once
#include <algorithm>
#include <any>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#include <json/json.h>
#include <chrono>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/generated_message_reflection.h>
#include <google/protobuf/message.h>
#include <memory>
#include <type_traits>
#include <cstdarg>
#include <stdint.h>
#include <stdlib.h>
#include "thirdparty/CRC.hpp"
#include <string>
#include <functional>
#include <thread>
#include <type_traits>
#include <vector>
#include <optional>
#include <thread>
#include <atomic>
#include <mutex>
#include <string.h>
#include <map>
#include <grpc++/grpc++.h>
#include <grpc++/server.h>
#include <variant>
#include <filesystem>
#include "log.hpp"
using namespace std::literals;
#define _FORMAT_CPP_TYPE(fdesc, value, prefix, int32, int64, uint32, uint64, float, double, bool, string, enum, message, ...) \
switch (fdesc->cpp_type()) { \
case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: \
value = prefix##bool(__VA_ARGS__) ? "true" : "false"; \
break; \
case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: \
value = fmt::format("{0}", prefix##double(__VA_ARGS__)); \
break; \
case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: \
value = fmt::format("{0}", prefix##float(__VA_ARGS__)); \
break; \
case google::protobuf::FieldDescriptor::CPPTYPE_INT32: \
value = fmt::format("{0}", prefix##int32(__VA_ARGS__)); \
break; \
case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: \
value = fmt::format("{0}", prefix##uint32(__VA_ARGS__)); \
break; \
case google::protobuf::FieldDescriptor::CPPTYPE_INT64: \
value = fmt::format("{0}", prefix##int64(__VA_ARGS__)); \
break; \
case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: \
value = fmt::format("{0}", prefix##uint64(__VA_ARGS__)); \
break; \
case google::protobuf::FieldDescriptor::CPPTYPE_STRING: \
value = prefix##string(__VA_ARGS__); \
break; \
case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: {\
const auto *enum_desc = prefix##enum(__VA_ARGS__); \
value = fmt::format("{0} = {1}", enum_desc->full_name(), enum_desc->index()); \
} break; \
case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: \
value = "(Message)"; \
break; \
}
#define FORMAT_CPP_TYPE_LOWERCASE(fdesc, value, prefix, ...) _FORMAT_CPP_TYPE(fdesc, value, prefix, int32, int64, uint32, uint64, float, double, bool, string, enum, message __VA_OPT__(,) __VA_ARGS__)
#define FORMAT_CPP_TYPE_TITLECASE(fdesc, value, prefix, ...) _FORMAT_CPP_TYPE(fdesc, value, prefix, Int32, Int64, UInt32, UInt64, Float, Double, Bool, String, Enum, Message __VA_OPT__(,) __VA_ARGS__)
extern std::string current_process_type;
inline std::string generate_address() {
std::filesystem::path tmpdir = std::filesystem::temp_directory_path();
std::filesystem::path sockpath = tmpdir / std::filesystem::path("looper_" + current_process_type + ".");
const char *chars = "abcdefghijklmnopqrtsuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890._";
for (size_t i = 0; i < 64; i++) {
char chr = chars[rand() % strlen(chars)];
sockpath += chr;
}
sockpath += ".sock";
return "unix:" + std::string(sockpath);
}
template<class C>
class IPCClient {
public:
using Stub = std::unique_ptr<typename C::Stub>;
std::shared_ptr<grpc::ChannelInterface> channel;
IPCClient(std::string address) {
DEBUG.writefln("Connecting to '%s'...", address.c_str());
channel = grpc::CreateChannel(address, grpc::InsecureChannelCredentials());
channel->WaitForConnected(gpr_time_from_seconds(30, gpr_clock_type::GPR_CLOCK_MONOTONIC));
DEBUG.writefln("Connection successful!");
}
Stub client_stub() {
return C::NewStub(channel);
}
};
template<class S>
class IPCServer {
std::optional<std::shared_ptr<grpc::Server>> server;
std::string address;
using Service = S;
public:
using ServiceConstructor = std::function<grpc::Service*()>;
private:
void init_address(std::string address) {
this->address = address;
}
void init_address() {
init_address(generate_address());
}
void init(ServiceConstructor custom_service_constructor) {
this->address = address;
grpc::ServerBuilder builder;
grpc::Service *service = nullptr;
service = custom_service_constructor();
builder.AddListeningPort(address, grpc::InsecureServerCredentials());
builder.RegisterService(service);
server = std::shared_ptr<grpc::Server>(builder.BuildAndStart().release());
DEBUG.writefln("Server listening on '%s'...", address.c_str());
}
template<class SI>
void init() {
return init([]() {
return new SI();
});
}
public:
inline std::shared_ptr<grpc::Server> get_server() {
return server.value();
}
std::string get_address() {
return address;
}
template<class SI>
IPCServer(std::string address) {
init_address(address);
init<SI>();
}
template<class SI>
IPCServer() {
init_address();
init<SI>();
}
IPCServer(ServiceConstructor custom_service_constructor) {
init_address();
init(custom_service_constructor);
}
IPCServer(std::string address, ServiceConstructor custom_service_constructor) {
init_address(address);
init(custom_service_constructor);
}
};
template<class S, class C>
class IPCChannel {
using Client = IPCClient<C>;
using Server = IPCServer<S>;
using ClientPtr = std::shared_ptr<Client>;
using ServerPtr = std::shared_ptr<Server>;
using ServiceConstructor = typename Server::ServiceConstructor;
std::optional<ClientPtr> client = {};
ServerPtr server;
void init_client(ClientPtr client) {
this->client = client;
}
void init_client(Client *client) {
init_client(ClientPtr(client));
}
void init_server(ServerPtr server) {
this->server = server;
}
void init_server(Server *server) {
init_server(ServerPtr(server));
}
void init_client(std::string client_address) {
init_client(new Client(client_address));
}
void init_server(std::string server_address, typename Server::ServiceConstructor constructor) {
init_server(new Server(server_address, constructor));
}
void init_server(typename Server::ServiceConstructor constructor) {
init_server(new Server(constructor));
}
public:
using Stub = typename Client::Stub;
bool has_client() const {
return client.has_value();
}
Stub get_stub() {
if (!has_client()) {
ERROR.writeln("Attempt to get client stub for nonexistant client!");
throw std::exception();
}
return get_client()->client_stub();
}
ServerPtr get_server() {
return server;
}
const ServerPtr get_server() const {
return server;
}
ClientPtr get_client() {
return client.value();
}
const ClientPtr get_client() const {
return client.value();
}
void set_client(ClientPtr client) {
if (this->client.has_value()) return;
init_client(client);
}
void construct_client(std::string client_address) {
if (this->client.has_value()) {
ERROR.writefln("Invalid attempt to construct client for address %s", client_address.c_str());
return;
} else {
DEBUG.writefln("Constructing client for address %s...", client_address.c_str());
}
init_client(client_address);
}
/// @brief Constructs an IP channel with a newly-created server with a generated address.
IPCChannel(ServiceConstructor custom_service_constructor) {
init_server(custom_service_constructor);
}
/// @brief Constructs an IPC channel with a specific address for the server, creating a new server in the process
/// @param address The address to use
IPCChannel(std::string address, ServiceConstructor custom_service_constructor = {}) {
init_server(address, custom_service_constructor);
}
/// @brief Constructs an IPC channel with only an already-existing server.
/// @param server_init The existing server to use
IPCChannel(ServerPtr server_init) {
init_server(server_init);
}
};