From da875871efb6b846b0d1bc634693f020e6caad5f Mon Sep 17 00:00:00 2001 From: unai_71 Date: Tue, 10 Mar 2026 19:05:27 +0100 Subject: [PATCH 1/3] feat: prepared the cmake lists for qt dependencies --- CMakeLists.txt | 8 ++++++++ tests/CMakeLists.txt | 16 ++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 682d705..0c77dc1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,14 +8,22 @@ project(azkoyen_ipc_test LANGUAGES CXX) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) +# Qt Setup — AUTOMOC runs moc automatically on Q_OBJECT headers +find_package(Qt5 REQUIRED COMPONENTS Core Widgets Test) +set(CMAKE_AUTOMOC ON) +set(CMAKE_INCLUDE_CURRENT_DIR ON) + # Core library add_library(core src/core/SysfsRead.cxx src/core/UnixIpcBridge.cxx src/core/Producer.cxx + src/core/Consumer.cxx + include/Consumer.hpp ) target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) +target_link_libraries(core PUBLIC Qt5::Core) #tests enable_testing() diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 981d9c8..0d90812 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -37,3 +37,19 @@ target_link_libraries(test_ipc ) add_test(NAME test_ipc COMMAND test_ipc) + + +add_executable(test_consumer + test_consumer.cxx +) + +target_link_libraries(test_consumer + PRIVATE + core + gtest + gtest_main + Qt5::Core + Qt5::Test +) + +add_test(NAME test_consumer COMMAND test_consumer) -- 2.49.1 From 8e471d3534e4ea73b6cc9aa5fc9fa623f74e1d77 Mon Sep 17 00:00:00 2001 From: unai_71 Date: Tue, 10 Mar 2026 19:07:22 +0100 Subject: [PATCH 2/3] feat: Consumer header file, consumes socket to connect to an passes signal to main window (in theory) --- include/Consumer.hpp | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 include/Consumer.hpp diff --git a/include/Consumer.hpp b/include/Consumer.hpp new file mode 100644 index 0000000..7577741 --- /dev/null +++ b/include/Consumer.hpp @@ -0,0 +1,44 @@ +#pragma once +// ConsumerThread.hpp +// SPDX-License-Identifier: GPL-3.0-or-later +// Author: Unai Blazquez +#include +#include +#include +#include + +/// @brief Listens on a UNIX domain socket, receives integers from the +/// producer via IPC, prints them to console, and emits a Qt signal. +class ConsumerThread : public QObject +{ + Q_OBJECT + + public: + /// @brief Construct the consumer bound to a socket path. + /// @param socket_path UNIX domain socket path to listen on. + /// @param parent Optional QObject parent for memory management. + explicit ConsumerThread(const std::string& socket_path, + QObject* parent = nullptr); + + ~ConsumerThread() override; + + /// @brief Start the listener thread. The server socket is ready + /// when this function returns. + void start(); + + /// @brief Stop the listener thread gracefully. Safe to call multiple times. + void stop(); + + signals: + /// @brief Emitted every time an integer is received from the producer. + void valueReceived(int value); + + private: + /// @brief Main loop: accept → recv → print → emit. Runs in m_thread. + void run_loop(); + + std::string m_socket_path; + int m_server_fd = -1; + std::atomic m_running{false}; + std::thread m_thread; +}; -- 2.49.1 From dfd82b161911f6b3f6289f311cf6a046faa7d9f5 Mon Sep 17 00:00:00 2001 From: unai_71 Date: Tue, 10 Mar 2026 19:14:48 +0100 Subject: [PATCH 3/3] feat: implemented consumer thread, all tests pass --- README.md | 11 ++++ src/core/Consumer.cxx | 104 ++++++++++++++++++++++++++++++++++ tests/test_consumer.cxx | 120 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 src/core/Consumer.cxx create mode 100644 tests/test_consumer.cxx diff --git a/README.md b/README.md index 8859ddb..bb572fb 100644 --- a/README.md +++ b/README.md @@ -34,3 +34,14 @@ The reader never throws on I/O errors; every outcome is expressed through the en `UnixIpcBridge` ([include/UnixIpcBridge.hpp](include/UnixIpcBridge.hpp), [src/core/UnixIpcBridge.cxx](src/core/UnixIpcBridge.cxx)) is a small helper that connects to a UNIX domain socket and sends a single `int` per call. It opens a new connection for each value, which keeps the protocol stateless and simple. **Tests:** [tests/test_unix_ipc.cxx](tests/test_unix_ipc.cxx) — spins up a fake socket server, sends values through the bridge, and asserts they arrive correctly. + +## ConsumerThread + +`ConsumerThread` ([include/ConsumerThread.hpp](include/ConsumerThread.hpp), [src/core/ConsumerThread.cxx](src/core/ConsumerThread.cxx)) is a `QObject` that listens on a UNIX domain socket in a background `std::thread`. On each received integer it: + +1. Prints the value to `stdout`. +2. Emits the `valueReceived(int)` Qt signal. + +The server socket is created and bound inside `start()` **before** the thread is spawned, so the socket is guaranteed to be ready by the time `start()` returns — eliminating race conditions with the producer. Graceful shutdown is handled by `stop()`, which shuts down the file descriptor to unblock the blocking `accept()` call. + +**Tests:** [tests/test_consumer_thread.cxx](tests/test_consumer_thread.cxx) — uses `QSignalSpy` to verify single-value, multi-value, negative, and zero reception. diff --git a/src/core/Consumer.cxx b/src/core/Consumer.cxx new file mode 100644 index 0000000..18046e1 --- /dev/null +++ b/src/core/Consumer.cxx @@ -0,0 +1,104 @@ +// ConsumerThread.cxx +// SPDX-License-Identifier: GPL-3.0-or-later +// Author: Unai Blazquez + +#include "Consumer.hpp" + +#include +#include +#include + +#include +#include +#include + +ConsumerThread::ConsumerThread(const std::string& socket_path, QObject* parent) + : QObject(parent), m_socket_path(socket_path) +{ +} + +ConsumerThread::~ConsumerThread() { stop(); } + +void ConsumerThread::start() +{ + // Remove stale socket from previous runs + unlink(m_socket_path.c_str()); + + // Create, bind and listen BEFORE spawning the thread so the socket + // is guaranteed ready when start() returns — no race with the producer. + m_server_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (m_server_fd < 0) + { + throw std::runtime_error("ConsumerThread: socket() failed"); + } + + struct sockaddr_un addr = {}; + addr.sun_family = AF_UNIX; + std::strncpy(addr.sun_path, m_socket_path.c_str(), sizeof(addr.sun_path) - 1); + + if (bind(m_server_fd, reinterpret_cast(&addr), sizeof(addr)) < 0) + { + close(m_server_fd); + m_server_fd = -1; + throw std::runtime_error("ConsumerThread: bind() failed"); + } + + if (listen(m_server_fd, 5) < 0) + { + close(m_server_fd); + m_server_fd = -1; + throw std::runtime_error("ConsumerThread: listen() failed"); + } + + m_running.store(true); + m_thread = std::thread(&ConsumerThread::run_loop, this); +} + +void ConsumerThread::stop() +{ + if (!m_running.exchange(false)) + { + return; // already stopped or never started + } + + // Shutdown the server fd to unblock the blocking accept() call + if (m_server_fd >= 0) + { + shutdown(m_server_fd, SHUT_RDWR); + close(m_server_fd); + m_server_fd = -1; + } + + if (m_thread.joinable()) + { + m_thread.join(); + } + + unlink(m_socket_path.c_str()); +} + +void ConsumerThread::run_loop() +{ + while (m_running.load()) + { + int client_fd = accept(m_server_fd, nullptr, nullptr); + if (client_fd < 0) + { + // accept() failed — most likely stop() closed the fd + break; + } + + int value = 0; + ssize_t n = recv(client_fd, &value, sizeof(value), MSG_WAITALL); + close(client_fd); + + if (n == static_cast(sizeof(value))) + { + // 1) Print to console (spec requirement) + std::cout << "ConsumerThread received: " << value << std::endl; + + // 2) Emit Qt signal (spec requirement) + emit valueReceived(value); + } + } +} diff --git a/tests/test_consumer.cxx b/tests/test_consumer.cxx new file mode 100644 index 0000000..d330570 --- /dev/null +++ b/tests/test_consumer.cxx @@ -0,0 +1,120 @@ +#include + +#include +#include + +#include "Consumer.hpp" +#include "UnixIpcBridge.hpp" + +// QSignalSpy needs a QCoreApplication to dispatch queued signals +static int argc_ = 0; +static QCoreApplication app_(argc_, nullptr); + +TEST(ConsumerThreadTest, ReceivesSingleValue) +{ + const std::string sock = "/tmp/test_ct_single.sock"; + + ConsumerThread consumer(sock); + + // QSignalSpy records every emission of the given signal + QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); + consumer.start(); + + UnixIpcBridge bridge(sock); + bridge.send(42); + + // spy.wait() pumps the event loop for up to 1s until a signal arrives + spy.wait(1000); + consumer.stop(); + + ASSERT_EQ(spy.count(), 1); + EXPECT_EQ(spy.at(0).at(0).toInt(), 42); +} + +TEST(ConsumerThreadTest, ReceivesMultipleValues) +{ + const std::string sock = "/tmp/test_ct_multi.sock"; + + ConsumerThread consumer(sock); + QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); + consumer.start(); + + constexpr int kMessages = 5; + for (int i = 0; i < kMessages; ++i) + { + UnixIpcBridge bridge(sock); + bridge.send(i * 10); + // Small delay so the consumer can re-enter accept() between sends + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + // Wait until all signals arrive (or timeout after 5s) + for (int attempt = 0; spy.count() < kMessages && attempt < 50; ++attempt) + { + spy.wait(100); + } + + consumer.stop(); + + ASSERT_EQ(spy.count(), kMessages); + for (int i = 0; i < kMessages; ++i) + { + EXPECT_EQ(spy.at(i).at(0).toInt(), i * 10); + } +} + +TEST(ConsumerThreadTest, ReceivesNegativeAndZero) +{ + // Zero + { + const std::string sock = "/tmp/test_ct_zero.sock"; + ConsumerThread consumer(sock); + QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); + consumer.start(); + + UnixIpcBridge bridge(sock); + bridge.send(0); + + spy.wait(1000); + consumer.stop(); + + ASSERT_EQ(spy.count(), 1); + EXPECT_EQ(spy.at(0).at(0).toInt(), 0); + } + + // Negative + { + const std::string sock = "/tmp/test_ct_neg.sock"; + ConsumerThread consumer(sock); + QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); + consumer.start(); + + UnixIpcBridge bridge(sock); + bridge.send(-999); + + spy.wait(1000); + consumer.stop(); + + ASSERT_EQ(spy.count(), 1); + EXPECT_EQ(spy.at(0).at(0).toInt(), -999); + } +} + +TEST(ConsumerThreadTest, StopsCleanlyWithoutDeadlock) +{ + const std::string sock = "/tmp/test_ct_stop.sock"; + + ConsumerThread consumer(sock); + consumer.start(); + // stop() must return without hanging, even with no connections + consumer.stop(); +} + +TEST(ConsumerThreadTest, StopsCleanlyWhenNeverStarted) +{ + const std::string sock = "/tmp/test_ct_nostart.sock"; + + ConsumerThread consumer(sock); + // stop() on a consumer that was never started must not crash + consumer.stop(); +} -- 2.49.1