feat: implemented consumer thread, all tests pass
This commit is contained in:
parent
8e471d3534
commit
dfd82b1619
11
README.md
11
README.md
@ -34,3 +34,14 @@ The reader never throws on I/O errors; every outcome is expressed through the en
|
|||||||
`UnixIpcBridge` ([include/UnixIpcBridge.hpp](include/UnixIpcBridge.hpp), [src/core/UnixIpcBridge.cxx](src/core/UnixIpcBridge.cxx)) is a small helper that connects to a UNIX domain socket and sends a single `int` per call. It opens a new connection for each value, which keeps the protocol stateless and simple.
|
`UnixIpcBridge` ([include/UnixIpcBridge.hpp](include/UnixIpcBridge.hpp), [src/core/UnixIpcBridge.cxx](src/core/UnixIpcBridge.cxx)) is a small helper that connects to a UNIX domain socket and sends a single `int` per call. It opens a new connection for each value, which keeps the protocol stateless and simple.
|
||||||
|
|
||||||
**Tests:** [tests/test_unix_ipc.cxx](tests/test_unix_ipc.cxx) — spins up a fake socket server, sends values through the bridge, and asserts they arrive correctly.
|
**Tests:** [tests/test_unix_ipc.cxx](tests/test_unix_ipc.cxx) — spins up a fake socket server, sends values through the bridge, and asserts they arrive correctly.
|
||||||
|
|
||||||
|
## ConsumerThread
|
||||||
|
|
||||||
|
`ConsumerThread` ([include/ConsumerThread.hpp](include/ConsumerThread.hpp), [src/core/ConsumerThread.cxx](src/core/ConsumerThread.cxx)) is a `QObject` that listens on a UNIX domain socket in a background `std::thread`. On each received integer it:
|
||||||
|
|
||||||
|
1. Prints the value to `stdout`.
|
||||||
|
2. Emits the `valueReceived(int)` Qt signal.
|
||||||
|
|
||||||
|
The server socket is created and bound inside `start()` **before** the thread is spawned, so the socket is guaranteed to be ready by the time `start()` returns — eliminating race conditions with the producer. Graceful shutdown is handled by `stop()`, which shuts down the file descriptor to unblock the blocking `accept()` call.
|
||||||
|
|
||||||
|
**Tests:** [tests/test_consumer_thread.cxx](tests/test_consumer_thread.cxx) — uses `QSignalSpy` to verify single-value, multi-value, negative, and zero reception.
|
||||||
|
|||||||
104
src/core/Consumer.cxx
Normal file
104
src/core/Consumer.cxx
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
// ConsumerThread.cxx
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
// Author: Unai Blazquez <unaibg2000@gmail.com>
|
||||||
|
|
||||||
|
#include "Consumer.hpp"
|
||||||
|
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/un.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include <cstring>
|
||||||
|
#include <iostream>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
|
ConsumerThread::ConsumerThread(const std::string& socket_path, QObject* parent)
|
||||||
|
: QObject(parent), m_socket_path(socket_path)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
ConsumerThread::~ConsumerThread() { stop(); }
|
||||||
|
|
||||||
|
void ConsumerThread::start()
|
||||||
|
{
|
||||||
|
// Remove stale socket from previous runs
|
||||||
|
unlink(m_socket_path.c_str());
|
||||||
|
|
||||||
|
// Create, bind and listen BEFORE spawning the thread so the socket
|
||||||
|
// is guaranteed ready when start() returns — no race with the producer.
|
||||||
|
m_server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||||
|
if (m_server_fd < 0)
|
||||||
|
{
|
||||||
|
throw std::runtime_error("ConsumerThread: socket() failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
struct sockaddr_un addr = {};
|
||||||
|
addr.sun_family = AF_UNIX;
|
||||||
|
std::strncpy(addr.sun_path, m_socket_path.c_str(), sizeof(addr.sun_path) - 1);
|
||||||
|
|
||||||
|
if (bind(m_server_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
|
||||||
|
{
|
||||||
|
close(m_server_fd);
|
||||||
|
m_server_fd = -1;
|
||||||
|
throw std::runtime_error("ConsumerThread: bind() failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (listen(m_server_fd, 5) < 0)
|
||||||
|
{
|
||||||
|
close(m_server_fd);
|
||||||
|
m_server_fd = -1;
|
||||||
|
throw std::runtime_error("ConsumerThread: listen() failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
m_running.store(true);
|
||||||
|
m_thread = std::thread(&ConsumerThread::run_loop, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConsumerThread::stop()
|
||||||
|
{
|
||||||
|
if (!m_running.exchange(false))
|
||||||
|
{
|
||||||
|
return; // already stopped or never started
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown the server fd to unblock the blocking accept() call
|
||||||
|
if (m_server_fd >= 0)
|
||||||
|
{
|
||||||
|
shutdown(m_server_fd, SHUT_RDWR);
|
||||||
|
close(m_server_fd);
|
||||||
|
m_server_fd = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_thread.joinable())
|
||||||
|
{
|
||||||
|
m_thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
unlink(m_socket_path.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConsumerThread::run_loop()
|
||||||
|
{
|
||||||
|
while (m_running.load())
|
||||||
|
{
|
||||||
|
int client_fd = accept(m_server_fd, nullptr, nullptr);
|
||||||
|
if (client_fd < 0)
|
||||||
|
{
|
||||||
|
// accept() failed — most likely stop() closed the fd
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
int value = 0;
|
||||||
|
ssize_t n = recv(client_fd, &value, sizeof(value), MSG_WAITALL);
|
||||||
|
close(client_fd);
|
||||||
|
|
||||||
|
if (n == static_cast<ssize_t>(sizeof(value)))
|
||||||
|
{
|
||||||
|
// 1) Print to console (spec requirement)
|
||||||
|
std::cout << "ConsumerThread received: " << value << std::endl;
|
||||||
|
|
||||||
|
// 2) Emit Qt signal (spec requirement)
|
||||||
|
emit valueReceived(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
120
tests/test_consumer.cxx
Normal file
120
tests/test_consumer.cxx
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <QCoreApplication>
|
||||||
|
#include <QSignalSpy>
|
||||||
|
|
||||||
|
#include "Consumer.hpp"
|
||||||
|
#include "UnixIpcBridge.hpp"
|
||||||
|
|
||||||
|
// QSignalSpy needs a QCoreApplication to dispatch queued signals
|
||||||
|
static int argc_ = 0;
|
||||||
|
static QCoreApplication app_(argc_, nullptr);
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, ReceivesSingleValue)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_single.sock";
|
||||||
|
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
|
||||||
|
// QSignalSpy records every emission of the given signal
|
||||||
|
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(42);
|
||||||
|
|
||||||
|
// spy.wait() pumps the event loop for up to 1s until a signal arrives
|
||||||
|
spy.wait(1000);
|
||||||
|
consumer.stop();
|
||||||
|
|
||||||
|
ASSERT_EQ(spy.count(), 1);
|
||||||
|
EXPECT_EQ(spy.at(0).at(0).toInt(), 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, ReceivesMultipleValues)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_multi.sock";
|
||||||
|
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
constexpr int kMessages = 5;
|
||||||
|
for (int i = 0; i < kMessages; ++i)
|
||||||
|
{
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(i * 10);
|
||||||
|
// Small delay so the consumer can re-enter accept() between sends
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until all signals arrive (or timeout after 5s)
|
||||||
|
for (int attempt = 0; spy.count() < kMessages && attempt < 50; ++attempt)
|
||||||
|
{
|
||||||
|
spy.wait(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer.stop();
|
||||||
|
|
||||||
|
ASSERT_EQ(spy.count(), kMessages);
|
||||||
|
for (int i = 0; i < kMessages; ++i)
|
||||||
|
{
|
||||||
|
EXPECT_EQ(spy.at(i).at(0).toInt(), i * 10);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, ReceivesNegativeAndZero)
|
||||||
|
{
|
||||||
|
// Zero
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_zero.sock";
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(0);
|
||||||
|
|
||||||
|
spy.wait(1000);
|
||||||
|
consumer.stop();
|
||||||
|
|
||||||
|
ASSERT_EQ(spy.count(), 1);
|
||||||
|
EXPECT_EQ(spy.at(0).at(0).toInt(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Negative
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_neg.sock";
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(-999);
|
||||||
|
|
||||||
|
spy.wait(1000);
|
||||||
|
consumer.stop();
|
||||||
|
|
||||||
|
ASSERT_EQ(spy.count(), 1);
|
||||||
|
EXPECT_EQ(spy.at(0).at(0).toInt(), -999);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, StopsCleanlyWithoutDeadlock)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_stop.sock";
|
||||||
|
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
consumer.start();
|
||||||
|
// stop() must return without hanging, even with no connections
|
||||||
|
consumer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, StopsCleanlyWhenNeverStarted)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_nostart.sock";
|
||||||
|
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
// stop() on a consumer that was never started must not crash
|
||||||
|
consumer.stop();
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user