#include <google/protobuf/descriptor.h> #include <google/protobuf/wire_format_lite.h> #include <ipc/common.pb.h> #include <ipc/internal.pb.h> #include "playback_process.hpp" #include "playback_backend.hpp" #include <functional> #include "rpc.hpp" #include "thirdparty/CRC.hpp" #include <stdlib.h> #include <string.h> #include <stdio.h> #include <unistd.h> #include <exception> #include <thread> #include <config.h> #include <signal.h> #include "backend.hpp" #include "util.hpp" #include "log.hpp" #include <chrono> #include <fmt/core.h> #include <fmt/format.h> #ifdef __WINDOWS__ #include <windows.h> #endif #ifdef __HAIKU__ #include <kernel/scheduler.h> #endif #include <google/protobuf/message.h> #include "util.hpp" using namespace google::protobuf; int sndfd; int rcvfd; extern char *executable_path; void print_field_descriptor(const google::protobuf::FieldDescriptor *fdesc, const google::protobuf::Message &msg, size_t level) { auto desc = msg.GetDescriptor(); auto reflection = msg.GetReflection(); static std::map<google::protobuf::FieldDescriptor::Label, const char*> labelNames = { {(google::protobuf::FieldDescriptor::Label)0, "(Error)"}, {google::protobuf::FieldDescriptor::LABEL_OPTIONAL, "Optional"}, {google::protobuf::FieldDescriptor::LABEL_REPEATED, "Repeated"}, {google::protobuf::FieldDescriptor::LABEL_REQUIRED, "Required"} }; DEBUG.writef_level(level, "- %s = %d", fdesc->full_name().c_str(), fdesc->number()); level++; DEBUG.writef_level(level, "Type: %s (C++: %s)", fdesc->type_name(), fdesc->cpp_type_name()); std::string value = "(None)"; if (fdesc->has_default_value()) { FORMAT_CPP_TYPE_LOWERCASE(fdesc, value, fdesc->default_value_); DEBUG.writef_level(level, "Default value: %s", value.c_str()); } DEBUG.writef_level(level, "Label: %s", labelNames.contains(fdesc->label()) ? labelNames[fdesc->label()] : labelNames[(google::protobuf::FieldDescriptor::Label)0]); DEBUG.writef_level(level, "Index: %d", fdesc->index()); if (fdesc->type() == google::protobuf::FieldDescriptor::TYPE_MESSAGE || fdesc->type() == google::protobuf::FieldDescriptor::TYPE_GROUP) { const google::protobuf::Descriptor *desc = fdesc->message_type(); DEBUG.writef_level(level, "Value: (Message)"); const Message &out_msg = reflection->GetMessage(msg, fdesc); print_ipc_message(out_msg, level + 1); return; } if (fdesc->type() == google::protobuf::FieldDescriptor::TYPE_BYTES) { value = "(bytes)"; } else { FORMAT_CPP_TYPE_TITLECASE(fdesc, value, reflection->Get, msg, fdesc); } DEBUG.writef_level(level, "Value: %s", value.c_str()); } void print_ipc_message(const google::protobuf::Message &msg, size_t level) { const google::protobuf::Descriptor *msgdesc = msg.GetDescriptor(); const google::protobuf::Reflection *reflect = msg.GetReflection(); DEBUG.writef_level(level, "Message type: %s", msg.GetTypeName().c_str()); std::vector<const FieldDescriptor *> descriptors; reflect->ListFields(msg, &descriptors); DEBUG.writef_level(level, "Message field count: %d", descriptors.size()); for (auto field : descriptors) { print_field_descriptor(field, msg, level); } auto &unknown_fields = reflect->GetUnknownFields(msg); DEBUG.writef_level(level, "Unknown fields: %d", unknown_fields.field_count()); } void show_command(const std::string &cmdid, const google::protobuf::Message &msg) { DEBUG.writefln("Command %s:", cmdid.c_str()); print_ipc_message(msg); } RenderResponseOrError PlaybackProcessServiceImpl::Render(const RenderCommand *cmd) { RenderResponseOrError response; auto cur_backend = cur_backend_lock.lock(); size_t maxlen = cmd->len(); auto lock = render_ptr.lock(); void *ptr = lock->get_byte_sized<void>(maxlen); size_t len = cur_backend->render(ptr, maxlen); if (len == 0) { DEBUG.writeln("Didn't get any audio when rendering"); } auto output = new RenderResponse(); output->set_data((const char*)ptr, len); output->set_len(len); response.set_allocated_output(output); return response; } PropertyDataOrError PlaybackProcessServiceImpl::Get(const GetProperty *request) { PropertyDataOrError response; auto cur_backend = cur_backend_lock.get_unsafe(); PropertyData *data = new PropertyData(); ErrorResponse *err_maybe = nullptr; switch (request->id()) { case PropertyId::PlaybackRate: { DoubleProperty property; property.set_value(cur_backend->get_rate()); data->mutable_value()->PackFrom(property); } break; case PropertyId::StreamIdProperty: { StreamId id; id.set_id(cur_backend->get_stream_idx()); data->mutable_value()->PackFrom(id); } break; case PropertyId::FilenameProperty: { StringProperty prop; prop.set_value(std::filesystem::path(cur_backend->get_current_file().value_or("None")).filename().string()); data->mutable_value()->PackFrom(prop); } break; case PropertyId::BackendId: { StringProperty prop; prop.set_value(cur_backend->get_id()); data->mutable_value()->PackFrom(prop); } break; case PropertyId::BackendName: { StringProperty prop; prop.set_value(cur_backend->get_name()); data->mutable_value()->PackFrom(prop); } case PropertyId::FilePathProperty: { StringProperty path; path.set_value(cur_backend->get_current_file().value_or("")); data->mutable_value()->PackFrom(path); } break; case PropertyId::SpecProperty: { AudioSpec spec; SDL_AudioSpec sdl_spec = cur_backend->get_spec(); audio_data_t sample_spec = sdl_to_sample_spec(sdl_spec.format); DEBUG.writefln("Sample_spec.size: %d", sample_spec.size); spec.set_bits(sample_spec.size * 8); spec.set_channel_count(sdl_spec.channels); spec.set_endian(sample_spec.endian ? EndianID::BIG : EndianID::LITTLE); spec.set_format_type(sample_spec.is_float ? FormatType::FLOAT : sample_spec.is_signed ? FormatType::SIGNED : FormatType::UNSIGNED); spec.set_sample_rate(sdl_spec.freq); data->mutable_value()->PackFrom(spec); } break; case PropertyId::StreamsProperty: { auto full_streams = cur_backend->get_streams(); if (request->has_idx()) { size_t idx = (size_t)request->idx(); Stream stream; if (idx >= full_streams.size()) { ErrorResponse *err = new ErrorResponse(); err->set_id("invalid_index"); err->set_desc("An attempt to access an array element outside the boundaries of that array was detected."); err->set_fatal(false); err_maybe = err; break; } auto backend_stream = full_streams[idx]; stream.set_id(backend_stream.id); stream.set_len(backend_stream.length); stream.set_title(backend_stream.name); data->mutable_value()->PackFrom(stream); } else { StreamList out_streams; for (auto backend_stream : full_streams) { Stream stream = *out_streams.add_streams(); stream.set_id(backend_stream.id); stream.set_len(backend_stream.length); stream.set_title(backend_stream.name); } data->mutable_value()->PackFrom(out_streams); } } break; case PropertyId::TitleProperty: { StringProperty title; title.set_value(cur_backend->get_title().value_or("")); data->mutable_value()->PackFrom(title); } break; case PropertyId::PositionProperty: { DoubleProperty pos; pos.set_value(cur_backend->get_position()); data->mutable_value()->PackFrom(pos); } break; case PropertyId::BackendSpecific: { if (!request->has_path()) { ErrorResponse *err = new ErrorResponse(); err->set_id("param_missing"); err->set_fatal(false); err->set_desc("Backend specific parameters require a path."); err_maybe = err; break; } auto output_maybe = cur_backend->get(request->path()); if (output_maybe.has_value()) { data->mutable_value()->PackFrom(output_maybe.value()); } else { ErrorResponse *err = new ErrorResponse; err->set_id("not_found"); err->set_fatal(false); err->set_desc("The backend reported that the property was not found."); err_maybe = err; } } break; case PropertyId::LengthProperty: { DoubleProperty output; output.set_value(cur_backend->get_length()); data->mutable_value()->PackFrom(output); } break; default: { ErrorResponse *err = new ErrorResponse(); err->set_id("invalid_property"); err->set_fatal(false); err->set_desc("The property requested was invalid or write-only"); err_maybe = err; } break; } if (err_maybe != nullptr) { delete data; response.set_allocated_err(err_maybe); } else { response.set_allocated_output(data); } return response; } MaybeError PlaybackProcessServiceImpl::Set(const SetProperty *request) { MaybeError response; ErrorResponse *err_maybe = nullptr; auto cur_backend = cur_backend_lock.lock(); switch (request->id()) { case PropertyId::PlaybackRate: { cur_backend->set_rate(resolve_any<DoubleProperty>(request->value())->value()); } break; case PropertyId::PositionProperty: { double value = resolve_any<DoubleProperty>(request->value())->value(); cur_backend->seek(value); } break; case PropertyId::StreamIdProperty: { cur_backend->switch_stream(resolve_any<StreamId>(request->value())->id()); } break; case PropertyId::BackendSpecific: { if (!request->has_path()) { ErrorResponse *err = new ErrorResponse(); err->set_id("param_missing"); err->set_fatal(false); err->set_desc("Backend specific parameters require a path."); err_maybe = err; break; } if (!cur_backend->set(request->path(), request->value())) { ErrorResponse *err = new ErrorResponse(); err->set_id("not_found"); err->set_fatal(false); err->set_desc("The backend reported that the property being set was invalid for it."); err_maybe = err; } } break; default: { ErrorResponse *err = new ErrorResponse(); err->set_id("invalid_property"); err->set_fatal(false); err->set_desc("The property requested was invalid or read-only"); err_maybe = err; } break; } if (err_maybe != nullptr) { response.set_allocated_err(err_maybe); } return response; } ResetResponse PlaybackProcessServiceImpl::Reset(const ResetProperty *request) { ResetResponse response; auto cur_backend = cur_backend_lock.lock(); auto path = request->path(); std::optional<uint64_t> idx = {}; if (request->has_idx()) { idx = request->idx(); } auto output = cur_backend->reset(path); if (output.has_value()) { PropertyDataOrError *real_output = new PropertyDataOrError(); PropertyData *data = new PropertyData(); data->mutable_value()->PackFrom(output.value()); data->set_id(PropertyId::BackendSpecific); data->clear_idx(); real_output->set_allocated_output(data); response.set_allocated_value(real_output); } return response; } MaybeError PlaybackProcessServiceImpl::Quit(const QuitCmd *request) { if (process == nullptr) return MaybeError(); process->done = true; cur_backend_lock.get_unsafe()->cleanup(); return MaybeError(); } PropertyList PlaybackProcessServiceImpl::GetPropertyList(const GetPropertyListCommand *cmd) { std::vector<Property> list; { list = cur_backend_lock.lock()->get_property_list(); } PropertyList output; for (auto el : list) { output.add_list()->CopyFrom(el); } return output; } MaybeError PlaybackProcessServiceImpl::Init(const InitCommand *cmd) { MaybeError ret; MaybeError *response = &ret; auto lock = cur_backend_lock.lock(); render_ptr.lock().set(new DynPtr(), true); lock.clear(); auto filename = cmd->filename(); if (!std::filesystem::exists(filename)) { ErrorResponse *maybe_error = response->mutable_err(); maybe_error->set_desc("File not found"); maybe_error->set_id("not_found"); maybe_error->set_fatal(true); process->done = true; return ret; } auto idx = cmd->idx(); for (auto &backend : PlaybackBackendHelper()) { DEBUG.writefln("Trying backend: %s", backend.second->get_name().c_str()); try { backend.second->init(filename.c_str(), idx); } catch (std::exception e) { DEBUG.writeln("Cleaning up backend."); backend.second->cleanup(); continue; } lock.set(backend.second, false); DEBUG.writefln("Using backend: %s", backend.second->get_name().c_str()); break; } if (!cur_backend_lock.has_value()) { ErrorResponse *maybe_error = response->mutable_err(); maybe_error->set_desc("Couldn't find a backend."); maybe_error->set_id("no_backend_for_file"); maybe_error->set_fatal(true); process->done = true; DEBUG.writefln("Couldn't find any backend."); return ret; } return ret; } void init_logging_subprocess(PlaybackProcess *proc) { Looper::Log::init_logging(); } PlaybackStream deserialize_stream(Stream serialized) { PlaybackStream stream; stream.id = serialized.id(); stream.length = serialized.len(); stream.name = serialized.title(); return stream; } Stream serialize_stream(PlaybackStream stream, std::optional<Stream*> stream_ptr = {}) { Stream serialized; serialized.set_id(stream.id); serialized.set_len(stream.length); serialized.set_title(stream.name); if (stream_ptr.has_value()) { *(stream_ptr.value()) = serialized; } return serialized; } std::vector<PlaybackStream> deserialize_stream_list(StreamList list) { int len = list.streams_size(); std::vector<PlaybackStream> output; output.reserve(len); for (int i = 0; i < len; i++) { Stream stream = list.streams(i); output.push_back(deserialize_stream(stream)); } return output; } StreamList serialize_stream_list(std::vector<PlaybackStream> streams) { StreamList list; for (auto input : streams) { Stream *stream = list.add_streams(); serialize_stream(input, stream); } return list; } PlaybackProcess::PlaybackProcess(PlaybackProcess *parent) { other_process = parent; DEBUG.writeln("Playback backends: "); for (auto &backend : PlaybackBackendHelper()) { DEBUG.writefln(" - %s", backend.second->get_id().c_str()); } DEBUG.writeln("Host process address: (in-process)"); } PlaybackProcess::PlaybackProcess(std::vector<std::string> args) { #ifdef __HAIKU__ set_thread_priority(find_thread(NULL), suggest_thread_priority(B_LIVE_AUDIO_MANIPULATION)); #endif SDL_InitSubSystem(SDL_INIT_AUDIO); done = false; is_playback_process = true; Looper::Log::init_logging(); init_playback_backends(); DEBUG.writeln("Playback backends: "); for (auto &backend : PlaybackBackendHelper()) { DEBUG.writefln(" - %s", backend.second->get_id().c_str()); } init_audio_data(); send_fd = std::stoi(args[0]); recv_fd = std::stoi(args[1]); this->impl.process = this; done = true; } PlaybackProcess::PlaybackProcess(std::string filename, int idx) { // multi_process = Looper::Options::get_option<bool>("playback.multi_process", true); multi_process = true; done = false; this->done = false; if (multi_process) { int fd1[2]; int fd2[2]; if (pipe(fd1) < 0) { throw CustomException("Pipe creation failed!"); } if (pipe(fd2) < 0) { throw CustomException("Pipe creation failed!"); } std::vector<std::string> args; args.push_back(fmt::format("{}", fd1[1])); args.push_back(fmt::format("{}", fd2[0])); send_fd = fd2[1]; recv_fd = fd1[0]; pid = launch_self("playback", args); } else { this->impl.process = this; other_process = new PlaybackProcess(this); } done = true; DEBUG.writeln("Playback process started."); InitCommand cmd; cmd.set_filename(filename); cmd.set_idx(idx); RPCCall call; call.mutable_init()->CopyFrom(cmd); RPCResponse response = SendCommand(&call); if (response.has_err()) { this->init_failed = true; delete other_process; other_process = nullptr; auto err = response.err(); throw CustomException(err.has_desc() ? err.desc() : err.id()); } } bool PlaybackProcess::process_running() { if (is_playback_process) return true; if (other_process != nullptr) return true; if (!multi_process) return other_process != nullptr; return kill(pid, 0) == 0; } RPCResponse PlaybackProcess::handle_command(RPCCall &call) { SimpleAckResponse *ack = new SimpleAckResponse(); RPCResponse resp; if (call.has_get()) { PropertyDataOrError output = impl.Get(&call.get()); if (output.has_err()) { resp.mutable_err()->CopyFrom(output.err()); } else { resp.mutable_data()->CopyFrom(output.output()); } } else if (call.has_init()) { MaybeError output = impl.Init(&call.init()); if (output.has_err()) { resp.mutable_err()->CopyFrom(output.err()); } else { resp.set_allocated_ack(ack); } } else if (call.has_quit()) { MaybeError output = impl.Quit(&call.quit()); if (output.has_err()) { resp.mutable_err()->CopyFrom(output.err()); } else { resp.set_allocated_ack(ack); } } else if (call.has_set()) { MaybeError output = impl.Set(&call.set()); if (output.has_err()) { resp.mutable_err()->CopyFrom(output.err()); } else { resp.set_allocated_ack(ack); } } else if (call.has_reset()) { ResetResponse output = impl.Reset(&call.reset()); resp.mutable_reset()->CopyFrom(output); } else if (call.has_render()) { RenderResponseOrError output = impl.Render(&call.render()); if (output.has_err()) { resp.mutable_err()->CopyFrom(output.err()); } else { resp.mutable_render()->CopyFrom(output.output()); } } else if (call.has_get_property_list()) { PropertyList output = impl.GetPropertyList(&call.get_property_list()); resp.mutable_property_list()->CopyFrom(output); } if (!resp.has_ack()) { delete ack; } return resp; } void PlaybackProcess::run_playback_process() { while (true) { uint64_t len; blocking_read(recv_fd, &len, sizeof(len)); std::string in_bytes = std::string(' ', len, std::allocator<char>()); in_bytes.resize(len); blocking_read(recv_fd, in_bytes.data(), len); RPCCall call; if (!call.ParseFromString(in_bytes)) { throw std::exception(); } RPCResponse resp = handle_command(call); std::string bytes = resp.SerializeAsString(); len = bytes.length(); ssize_t ret = -1; blocking_write(send_fd, &len, sizeof(len)); blocking_write(send_fd, bytes.data(), bytes.length()); } } int looper_run_playback_process(std::vector<std::string> args) { auto proc = PlaybackProcess(args); proc.run_playback_process(); SDL_QuitSubSystem(SDL_INIT_AUDIO); return 0; } PropertyData PlaybackProcess::get_property(PropertyId property, std::optional<uint64_t> idx) { GetProperty get_property; get_property.set_id(property); if (idx.has_value()) { get_property.set_idx(idx.value()); } else { get_property.clear_idx(); } RPCCall call; call.mutable_get()->CopyFrom(get_property); RPCResponse output = SendCommand(&call); if (output.has_err()) { throw std::exception(); } return output.data(); } void PlaybackProcess::set_property(PropertyId id, PropertyData data, std::optional<uint64_t> idx) { SetProperty set_property; set_property.set_id(id); set_property.mutable_value()->CopyFrom(data.value()); if (idx.has_value()) { set_property.set_idx(idx.value()); } else { set_property.clear_idx(); } RPCCall call; call.mutable_set()->CopyFrom(set_property); SendCommand(&call); } std::string PlaybackProcess::get_version_code() { return std::string(TAG) + #ifdef DEBUG_MODE std::string("-debugmode") + #endif std::string("-at") + std::string(__TIME__); } double PlaybackProcess::get_position() { return get_property_double(PropertyId::PositionProperty); } void PlaybackProcess::set_position(double value) { DoubleProperty *pos = new DoubleProperty(); pos->set_value(value); set_property_value<DoubleProperty>(PropertyId::PositionProperty, pos); delete pos; } double PlaybackProcess::get_length() { return get_property_double(PropertyId::LengthProperty); } size_t PlaybackProcess::get_stream_idx() { StreamId *id = get_property_value<StreamId>(PropertyId::StreamIdProperty); size_t output = id->id(); delete id; return output; } void PlaybackProcess::set_stream_idx(size_t idx) { StreamId *id = new StreamId(); id->set_id(idx); set_property_value<StreamId>(PropertyId::StreamIdProperty, id); delete id; } std::string PlaybackProcess::get_title() { return get_property_string(PropertyId::TitleProperty); } std::string PlaybackProcess::get_file_path() { return get_property_string(PropertyId::FilePathProperty); } std::string PlaybackProcess::get_file_name() { return get_property_string(PropertyId::FilenameProperty); } RPCResponse PlaybackProcess::SendCommand(RPCCall *call) { if (multi_process) { std::lock_guard guard(ipc_mutex); uint64_t len; std::string str = call->SerializeAsString(); len = str.length(); blocking_write(send_fd, &len, sizeof(len)); blocking_write(send_fd, str.data(), str.length()); blocking_read(recv_fd, &len, sizeof(len)); str.resize(len); blocking_read(recv_fd, str.data(), len); RPCResponse resp; resp.ParseFromString(str); return resp; } else { return handle_command(*call); } } PlaybackStream PlaybackProcess::get_playback_stream(size_t idx) { auto *stream = get_property_value<Stream>(PropertyId::StreamsProperty, idx); PlaybackStream output = deserialize_stream(*stream); delete stream; return output; } std::string PlaybackProcess::get_backend_id() { return get_property_string(PropertyId::BackendId); } std::string PlaybackProcess::get_backend_name() { return get_property_string(PropertyId::BackendName); } std::vector<PlaybackStream> PlaybackProcess::get_playback_streams() { auto *list = get_property_value<StreamList>(PropertyId::StreamsProperty); auto output = deserialize_stream_list(*list); delete list; return output; } bool PlaybackProcess::set_property(std::string path, google::protobuf::Any value) { SetProperty set_property; set_property.set_id(PropertyId::BackendSpecific); set_property.mutable_value()->CopyFrom(value); set_property.set_path(path); RPCCall call; call.mutable_set()->CopyFrom(set_property); RPCResponse output = SendCommand(&call); if (output.has_err()) { return false; } return true; } std::optional<google::protobuf::Any> PlaybackProcess::get_property(std::string path) { GetProperty get_property; get_property.set_id(PropertyId::BackendSpecific); get_property.set_path(path); RPCCall call; call.mutable_get()->CopyFrom(get_property); RPCResponse output = SendCommand(&call); if (output.has_err()) { return {}; } return output.data().value(); } size_t PlaybackProcess::render(void *buf, size_t maxlen) { RenderCommand rend_cmd = RenderCommand(); rend_cmd.set_len(maxlen); RPCResponse response; RPCCall call; call.mutable_render()->CopyFrom(rend_cmd); response = SendCommand(&call); if (response.has_err()) { ERROR.writefln("Error rendering audio: %s", response.err().id().c_str()); return 0; } else { std::string data = response.render().data(); if (data.length() == 0) { WARNING.writeln("Rendering audio didn't produce anything!"); } if (data.length() != response.render().len()) { WARNING.writefln("Rendering audio size mismatch: Intended != Provided\nIntended: %d\nProvided: %d", response.render().len(), data.length()); } memcpy(buf, data.data(), data.length()); return data.length(); } } AudioSpec *PlaybackProcess::get_audio_spec() { return get_property_value<AudioSpec>(PropertyId::SpecProperty); } PlaybackProcess::~PlaybackProcess() { if (init_failed || !process_running()) { done = true; return; } QuitCmd quit_cmd; RPCCall call; call.mutable_quit()->CopyFrom(quit_cmd); SendCommand(&call); done = true; if (multi_process) kill(pid, SIGHUP); } void PlaybackProcess::threadfunc() { while (!process_running()) { std::this_thread::yield(); if (done) return; } while (true) { if (!process_running()) { done = true; started.notify_all(); } if (done) { return; } std::this_thread::yield(); } } double PlaybackProcess::get_cached_rate() { return get_property_double(PropertyId::PlaybackRate); } void PlaybackProcess::set_rate(double value) { set_property_double(PropertyId::PlaybackRate, value); } std::vector<Property> PlaybackProcess::get_property_list() { RPCCall call; GetPropertyListCommand cmd; call.mutable_get_property_list()->CopyFrom(cmd); RPCResponse resp = SendCommand(&call); if (resp.has_property_list()) { std::vector<Property> output; for (size_t i = 0; i < resp.property_list().list_size(); i++) { output.push_back(resp.property_list().list(i)); } return output; } else { return std::vector<Property>(); } }