Compare commits

..

No commits in common. "dfd82b161911f6b3f6289f311cf6a046faa7d9f5" and "da875871efb6b846b0d1bc634693f020e6caad5f" have entirely different histories.

4 changed files with 0 additions and 279 deletions

View File

@ -34,14 +34,3 @@ The reader never throws on I/O errors; every outcome is expressed through the en
`UnixIpcBridge` ([include/UnixIpcBridge.hpp](include/UnixIpcBridge.hpp), [src/core/UnixIpcBridge.cxx](src/core/UnixIpcBridge.cxx)) is a small helper that connects to a UNIX domain socket and sends a single `int` per call. It opens a new connection for each value, which keeps the protocol stateless and simple. `UnixIpcBridge` ([include/UnixIpcBridge.hpp](include/UnixIpcBridge.hpp), [src/core/UnixIpcBridge.cxx](src/core/UnixIpcBridge.cxx)) is a small helper that connects to a UNIX domain socket and sends a single `int` per call. It opens a new connection for each value, which keeps the protocol stateless and simple.
**Tests:** [tests/test_unix_ipc.cxx](tests/test_unix_ipc.cxx) — spins up a fake socket server, sends values through the bridge, and asserts they arrive correctly. **Tests:** [tests/test_unix_ipc.cxx](tests/test_unix_ipc.cxx) — spins up a fake socket server, sends values through the bridge, and asserts they arrive correctly.
## ConsumerThread
`ConsumerThread` ([include/ConsumerThread.hpp](include/ConsumerThread.hpp), [src/core/ConsumerThread.cxx](src/core/ConsumerThread.cxx)) is a `QObject` that listens on a UNIX domain socket in a background `std::thread`. On each received integer it:
1. Prints the value to `stdout`.
2. Emits the `valueReceived(int)` Qt signal.
The server socket is created and bound inside `start()` **before** the thread is spawned, so the socket is guaranteed to be ready by the time `start()` returns — eliminating race conditions with the producer. Graceful shutdown is handled by `stop()`, which shuts down the file descriptor to unblock the blocking `accept()` call.
**Tests:** [tests/test_consumer_thread.cxx](tests/test_consumer_thread.cxx) — uses `QSignalSpy` to verify single-value, multi-value, negative, and zero reception.

View File

@ -1,44 +0,0 @@
#pragma once
// ConsumerThread.hpp
// SPDX-License-Identifier: GPL-3.0-or-later
// Author: Unai Blazquez <unaibg2000@gmail.com>
#include <QObject>
#include <atomic>
#include <string>
#include <thread>
/// @brief Listens on a UNIX domain socket, receives integers from the
/// producer via IPC, prints them to console, and emits a Qt signal.
class ConsumerThread : public QObject
{
Q_OBJECT
public:
/// @brief Construct the consumer bound to a socket path.
/// @param socket_path UNIX domain socket path to listen on.
/// @param parent Optional QObject parent for memory management.
explicit ConsumerThread(const std::string& socket_path,
QObject* parent = nullptr);
~ConsumerThread() override;
/// @brief Start the listener thread. The server socket is ready
/// when this function returns.
void start();
/// @brief Stop the listener thread gracefully. Safe to call multiple times.
void stop();
signals:
/// @brief Emitted every time an integer is received from the producer.
void valueReceived(int value);
private:
/// @brief Main loop: accept → recv → print → emit. Runs in m_thread.
void run_loop();
std::string m_socket_path;
int m_server_fd = -1;
std::atomic<bool> m_running{false};
std::thread m_thread;
};

View File

@ -1,104 +0,0 @@
// ConsumerThread.cxx
// SPDX-License-Identifier: GPL-3.0-or-later
// Author: Unai Blazquez <unaibg2000@gmail.com>
#include "Consumer.hpp"
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <cstring>
#include <iostream>
#include <stdexcept>
ConsumerThread::ConsumerThread(const std::string& socket_path, QObject* parent)
: QObject(parent), m_socket_path(socket_path)
{
}
ConsumerThread::~ConsumerThread() { stop(); }
void ConsumerThread::start()
{
// Remove stale socket from previous runs
unlink(m_socket_path.c_str());
// Create, bind and listen BEFORE spawning the thread so the socket
// is guaranteed ready when start() returns — no race with the producer.
m_server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (m_server_fd < 0)
{
throw std::runtime_error("ConsumerThread: socket() failed");
}
struct sockaddr_un addr = {};
addr.sun_family = AF_UNIX;
std::strncpy(addr.sun_path, m_socket_path.c_str(), sizeof(addr.sun_path) - 1);
if (bind(m_server_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
{
close(m_server_fd);
m_server_fd = -1;
throw std::runtime_error("ConsumerThread: bind() failed");
}
if (listen(m_server_fd, 5) < 0)
{
close(m_server_fd);
m_server_fd = -1;
throw std::runtime_error("ConsumerThread: listen() failed");
}
m_running.store(true);
m_thread = std::thread(&ConsumerThread::run_loop, this);
}
void ConsumerThread::stop()
{
if (!m_running.exchange(false))
{
return; // already stopped or never started
}
// Shutdown the server fd to unblock the blocking accept() call
if (m_server_fd >= 0)
{
shutdown(m_server_fd, SHUT_RDWR);
close(m_server_fd);
m_server_fd = -1;
}
if (m_thread.joinable())
{
m_thread.join();
}
unlink(m_socket_path.c_str());
}
void ConsumerThread::run_loop()
{
while (m_running.load())
{
int client_fd = accept(m_server_fd, nullptr, nullptr);
if (client_fd < 0)
{
// accept() failed — most likely stop() closed the fd
break;
}
int value = 0;
ssize_t n = recv(client_fd, &value, sizeof(value), MSG_WAITALL);
close(client_fd);
if (n == static_cast<ssize_t>(sizeof(value)))
{
// 1) Print to console (spec requirement)
std::cout << "ConsumerThread received: " << value << std::endl;
// 2) Emit Qt signal (spec requirement)
emit valueReceived(value);
}
}
}

View File

@ -1,120 +0,0 @@
#include <gtest/gtest.h>
#include <QCoreApplication>
#include <QSignalSpy>
#include "Consumer.hpp"
#include "UnixIpcBridge.hpp"
// QSignalSpy needs a QCoreApplication to dispatch queued signals
static int argc_ = 0;
static QCoreApplication app_(argc_, nullptr);
TEST(ConsumerThreadTest, ReceivesSingleValue)
{
const std::string sock = "/tmp/test_ct_single.sock";
ConsumerThread consumer(sock);
// QSignalSpy records every emission of the given signal
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(42);
// spy.wait() pumps the event loop for up to 1s until a signal arrives
spy.wait(1000);
consumer.stop();
ASSERT_EQ(spy.count(), 1);
EXPECT_EQ(spy.at(0).at(0).toInt(), 42);
}
TEST(ConsumerThreadTest, ReceivesMultipleValues)
{
const std::string sock = "/tmp/test_ct_multi.sock";
ConsumerThread consumer(sock);
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
consumer.start();
constexpr int kMessages = 5;
for (int i = 0; i < kMessages; ++i)
{
UnixIpcBridge bridge(sock);
bridge.send(i * 10);
// Small delay so the consumer can re-enter accept() between sends
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// Wait until all signals arrive (or timeout after 5s)
for (int attempt = 0; spy.count() < kMessages && attempt < 50; ++attempt)
{
spy.wait(100);
}
consumer.stop();
ASSERT_EQ(spy.count(), kMessages);
for (int i = 0; i < kMessages; ++i)
{
EXPECT_EQ(spy.at(i).at(0).toInt(), i * 10);
}
}
TEST(ConsumerThreadTest, ReceivesNegativeAndZero)
{
// Zero
{
const std::string sock = "/tmp/test_ct_zero.sock";
ConsumerThread consumer(sock);
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(0);
spy.wait(1000);
consumer.stop();
ASSERT_EQ(spy.count(), 1);
EXPECT_EQ(spy.at(0).at(0).toInt(), 0);
}
// Negative
{
const std::string sock = "/tmp/test_ct_neg.sock";
ConsumerThread consumer(sock);
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(-999);
spy.wait(1000);
consumer.stop();
ASSERT_EQ(spy.count(), 1);
EXPECT_EQ(spy.at(0).at(0).toInt(), -999);
}
}
TEST(ConsumerThreadTest, StopsCleanlyWithoutDeadlock)
{
const std::string sock = "/tmp/test_ct_stop.sock";
ConsumerThread consumer(sock);
consumer.start();
// stop() must return without hanging, even with no connections
consumer.stop();
}
TEST(ConsumerThreadTest, StopsCleanlyWhenNeverStarted)
{
const std::string sock = "/tmp/test_ct_nostart.sock";
ConsumerThread consumer(sock);
// stop() on a consumer that was never started must not crash
consumer.stop();
}