16 Commits

Author SHA1 Message Date
dfd82b1619 feat: implemented consumer thread, all tests pass 2026-03-10 19:14:48 +01:00
8e471d3534 feat: Consumer header file, consumes socket to connect to an passes signal to main window (in theory) 2026-03-10 19:07:22 +01:00
da875871ef feat: prepared the cmake lists for qt dependencies 2026-03-10 19:05:27 +01:00
2d519937b7 Merge pull request 'fix: forgot to add the source file to the commit... disastrous' (#5) from feature/UnixIPCBridge into main
Reviewed-on: #5
2026-03-10 17:51:31 +00:00
228bb998f6 fix: forgot to add the source file to the commit... disastrous 2026-03-10 17:50:13 +00:00
572905a879 Merge pull request 'feature/UnixIPCBridge' (#4) from feature/UnixIPCBridge into main
Reviewed-on: #4
2026-03-10 17:48:39 +00:00
544d50ee6a fin: feature complete, all tests pass ready to merge 2026-03-10 17:47:45 +00:00
f63d40cffb feat: implemented unix domain socket helper header and tests 2026-03-10 17:47:09 +00:00
e1360ccbb4 feat: Updated readme for new feature 2026-03-10 17:45:53 +00:00
149c3a22b7 Merge pull request 'feature/Producer' (#3) from feature/Producer into main
Reviewed-on: #3
2026-03-10 17:34:17 +00:00
928bb5a5fb fix: reduced the time of test 1 again to 1ms 2026-03-10 17:33:01 +00:00
16bf4bccd4 fix: all tests pass now, there was a segfault for calling start and stop too quickly 2026-03-10 17:31:49 +00:00
5b6f20a70a fix: build now succeded. Added logging capabilities also 2026-03-10 17:26:32 +00:00
36093e6c73 of course it builded before, I didn't add the files to CmakeLists. Now build fails 2026-03-10 16:57:26 +00:00
9c2117e64b implemented Producer API and header, stub and tests. Builds cleanly, tests fail for now 2026-03-10 16:54:28 +00:00
a798caf1a8 feat: Updated readme to serve as guidance for dev 2026-03-10 16:45:51 +00:00
12 changed files with 898 additions and 0 deletions

View File

@@ -8,12 +8,22 @@ project(azkoyen_ipc_test LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Qt Setup — AUTOMOC runs moc automatically on Q_OBJECT headers
find_package(Qt5 REQUIRED COMPONENTS Core Widgets Test)
set(CMAKE_AUTOMOC ON)
set(CMAKE_INCLUDE_CURRENT_DIR ON)
# Core library
add_library(core
src/core/SysfsRead.cxx
src/core/UnixIpcBridge.cxx
src/core/Producer.cxx
src/core/Consumer.cxx
include/Consumer.hpp
)
target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_link_libraries(core PUBLIC Qt5::Core)
#tests
enable_testing()

View File

@@ -21,3 +21,27 @@ A Test-Driven Development (TDD) workflow was followed throughout the project. Ev
The reader never throws on I/O errors; every outcome is expressed through the enum so callers can react without exception handling. A helper `trim_in_place` strips trailing whitespace and newlines before comparison.
**Tests:** [tests/test_sysfs_read.cxx](tests/test_sysfs_read.cxx) — covers all five status branches by writing controlled content to a temporary file.
## Producer class / thread
`Producer` ([include/Producer.hpp](include/Producer.hpp), [src/core/Producer.cxx](src/core/Producer.cxx)) runs a worker `std::thread` that periodically polls the `SysfsReader` and, when the status is `Enabled`, generates a random integer and forwards it through an injected `send_fn` callback. The polling interval is 1 second under normal conditions and 7 seconds when the sysfs file reports `ErrorTempTooHigh` (cool-down).
## UnixIpcBridge
>[!note]
>why unix domain sockets? Because I have more experience with them under linux than with posix shared memmory and semaphore, and I find them easier to unit-test.
`UnixIpcBridge` ([include/UnixIpcBridge.hpp](include/UnixIpcBridge.hpp), [src/core/UnixIpcBridge.cxx](src/core/UnixIpcBridge.cxx)) is a small helper that connects to a UNIX domain socket and sends a single `int` per call. It opens a new connection for each value, which keeps the protocol stateless and simple.
**Tests:** [tests/test_unix_ipc.cxx](tests/test_unix_ipc.cxx) — spins up a fake socket server, sends values through the bridge, and asserts they arrive correctly.
## ConsumerThread
`ConsumerThread` ([include/ConsumerThread.hpp](include/ConsumerThread.hpp), [src/core/ConsumerThread.cxx](src/core/ConsumerThread.cxx)) is a `QObject` that listens on a UNIX domain socket in a background `std::thread`. On each received integer it:
1. Prints the value to `stdout`.
2. Emits the `valueReceived(int)` Qt signal.
The server socket is created and bound inside `start()` **before** the thread is spawned, so the socket is guaranteed to be ready by the time `start()` returns — eliminating race conditions with the producer. Graceful shutdown is handled by `stop()`, which shuts down the file descriptor to unblock the blocking `accept()` call.
**Tests:** [tests/test_consumer_thread.cxx](tests/test_consumer_thread.cxx) — uses `QSignalSpy` to verify single-value, multi-value, negative, and zero reception.

44
include/Consumer.hpp Normal file
View File

@@ -0,0 +1,44 @@
#pragma once
// ConsumerThread.hpp
// SPDX-License-Identifier: GPL-3.0-or-later
// Author: Unai Blazquez <unaibg2000@gmail.com>
#include <QObject>
#include <atomic>
#include <string>
#include <thread>
/// @brief Listens on a UNIX domain socket, receives integers from the
/// producer via IPC, prints them to console, and emits a Qt signal.
class ConsumerThread : public QObject
{
Q_OBJECT
public:
/// @brief Construct the consumer bound to a socket path.
/// @param socket_path UNIX domain socket path to listen on.
/// @param parent Optional QObject parent for memory management.
explicit ConsumerThread(const std::string& socket_path,
QObject* parent = nullptr);
~ConsumerThread() override;
/// @brief Start the listener thread. The server socket is ready
/// when this function returns.
void start();
/// @brief Stop the listener thread gracefully. Safe to call multiple times.
void stop();
signals:
/// @brief Emitted every time an integer is received from the producer.
void valueReceived(int value);
private:
/// @brief Main loop: accept → recv → print → emit. Runs in m_thread.
void run_loop();
std::string m_socket_path;
int m_server_fd = -1;
std::atomic<bool> m_running{false};
std::thread m_thread;
};

48
include/Producer.hpp Normal file
View File

@@ -0,0 +1,48 @@
#pragma once
// Producer.hpp
// SPDX-License-Identifier: GPL-3.0-or-later
// Author: Unai Blazquez <unaibg2000@gmail.com>
#include <atomic>
#include <chrono>
#include <filesystem>
#include <functional>
#include <thread>
#include "SysfsRead.hpp"
using RandomFn = std::function<int()>;
using LogFn = std::function<void(const std::string&)>;
using SleepFn = std::function<void(std::chrono::milliseconds)>;
class Producer
{
public:
/// @brief Construct a Producer bound to a sysfs-like control file.
/// @param sysfs_path Path to the control file (e.g. "./fake_sysfs_input").
/// @param send_fn Function called whenever a new integer should be sent.
Producer(const std::filesystem::path& sysfs_path,
std::function<void(int)> send_fn, RandomFn random_fn,
LogFn log_fn = nullptr, SleepFn sleep_fn = default_sleep);
/// @brief Start the worker thread. Safe to call only once.
void start();
/// @brief Request the worker thread to stop and wait for it to finish.
void stop();
private:
/// @brief Main loop executed by the worker thread.
void run_loop();
std::thread m_thread;
std::atomic<bool> m_running;
SysfsReader m_reader;
RandomFn m_random;
SleepFn m_sleep;
static void default_sleep(std::chrono::milliseconds d)
{
std::this_thread::sleep_for(d);
}
LogFn m_log;
std::function<void(int)> m_send;
std::chrono::milliseconds compute_delay(SysfsStatus status) const;
};

31
include/UnixIpcBridge.hpp Normal file
View File

@@ -0,0 +1,31 @@
// UnixIpcBridge.hpp
// SPDX-License-Identifier: GPL-3.0-only
// Author: Unai Blazquez <unaibg2000@gmail.com>
#pragma once
#include <fcntl.h> // nonblocking
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h> // close()
#include <string>
///@brief Small bridge to allow the producer class to send data over UNIX domain
/// sockets
class UnixIpcBridge
{
public:
///@brief constructor
///@param socket path pointing the socket
explicit UnixIpcBridge(const std::string& socket_path);
///@brief sending function, this goes into the producer
///@param integer to send over the socket
void send(int value);
private:
std::string m_socket_path;
int m_socket_fd = -1;
void connect_to_consumer();
};

104
src/core/Consumer.cxx Normal file
View File

@@ -0,0 +1,104 @@
// ConsumerThread.cxx
// SPDX-License-Identifier: GPL-3.0-or-later
// Author: Unai Blazquez <unaibg2000@gmail.com>
#include "Consumer.hpp"
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <cstring>
#include <iostream>
#include <stdexcept>
ConsumerThread::ConsumerThread(const std::string& socket_path, QObject* parent)
: QObject(parent), m_socket_path(socket_path)
{
}
ConsumerThread::~ConsumerThread() { stop(); }
void ConsumerThread::start()
{
// Remove stale socket from previous runs
unlink(m_socket_path.c_str());
// Create, bind and listen BEFORE spawning the thread so the socket
// is guaranteed ready when start() returns — no race with the producer.
m_server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (m_server_fd < 0)
{
throw std::runtime_error("ConsumerThread: socket() failed");
}
struct sockaddr_un addr = {};
addr.sun_family = AF_UNIX;
std::strncpy(addr.sun_path, m_socket_path.c_str(), sizeof(addr.sun_path) - 1);
if (bind(m_server_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
{
close(m_server_fd);
m_server_fd = -1;
throw std::runtime_error("ConsumerThread: bind() failed");
}
if (listen(m_server_fd, 5) < 0)
{
close(m_server_fd);
m_server_fd = -1;
throw std::runtime_error("ConsumerThread: listen() failed");
}
m_running.store(true);
m_thread = std::thread(&ConsumerThread::run_loop, this);
}
void ConsumerThread::stop()
{
if (!m_running.exchange(false))
{
return; // already stopped or never started
}
// Shutdown the server fd to unblock the blocking accept() call
if (m_server_fd >= 0)
{
shutdown(m_server_fd, SHUT_RDWR);
close(m_server_fd);
m_server_fd = -1;
}
if (m_thread.joinable())
{
m_thread.join();
}
unlink(m_socket_path.c_str());
}
void ConsumerThread::run_loop()
{
while (m_running.load())
{
int client_fd = accept(m_server_fd, nullptr, nullptr);
if (client_fd < 0)
{
// accept() failed — most likely stop() closed the fd
break;
}
int value = 0;
ssize_t n = recv(client_fd, &value, sizeof(value), MSG_WAITALL);
close(client_fd);
if (n == static_cast<ssize_t>(sizeof(value)))
{
// 1) Print to console (spec requirement)
std::cout << "ConsumerThread received: " << value << std::endl;
// 2) Emit Qt signal (spec requirement)
emit valueReceived(value);
}
}
}

84
src/core/Producer.cxx Normal file
View File

@@ -0,0 +1,84 @@
// Producer.cxx
// SPDX-License-Identifier: GPL-3.0-only
// Author: Unai Blazquez <unaibg2000@gmail.com>
#include "Producer.hpp"
#include "SysfsRead.hpp"
Producer::Producer(const std::filesystem::path& sysfs_path,
std::function<void(int)> send_fn, RandomFn random_fn,
LogFn log_fn, SleepFn sleep_fn)
: m_reader(sysfs_path),
m_send(std::move(send_fn)),
m_random(std::move(random_fn)),
m_log(std::move(log_fn)),
m_sleep(std::move(sleep_fn))
{
}
std::chrono::milliseconds Producer::compute_delay(SysfsStatus status) const
{
using namespace std::chrono_literals;
auto standard = 1000ms; // example: 1s, must be < 7s
auto hot = 7000ms; // exactly 7s
if (status == SysfsStatus::ErrorTempTooHigh)
{ // when error = temp too high
return hot;
}
else
{
return standard;
}
}
void Producer::start()
{
m_running.store(true);
m_thread = std::thread(&Producer::run_loop, this);
}
void Producer::stop()
{
m_running.store(false);
if (m_thread.joinable())
{
m_thread.join();
}
}
void Producer::run_loop()
{
while (m_running.load())
{
auto status = m_reader.read_status();
switch (status)
{
case SysfsStatus::Enabled:
m_send(m_random());
if (m_log) m_log("Producer: Enabled");
break;
case SysfsStatus::Unreachable:
// do nothing for now
if (m_log) m_log("Producer: SysfsFile Unreachable");
break;
case SysfsStatus::Empty:
if (m_log) m_log("Producer: SysfsFile Empty");
break;
case SysfsStatus::ErrorTempTooHigh:
if (m_log) m_log("Producer: Error temp too high!!");
break;
case SysfsStatus::UnexpectedValue:
if (m_log) m_log("Producer: UnexpectedValue");
break;
}
auto delay = compute_delay(status);
m_sleep(delay);
// Thread will end here (for now) stop will join it
}
}

View File

@@ -0,0 +1,49 @@
// UnixIpcBridge.cxx
// SPDX-License-Identifier: GPL-3.0-only
// Author: Unai Blazquez <unaibg2000@gmail.com>
#include "UnixIpcBridge.hpp"
#include <cstring>
#include <stdexcept>
UnixIpcBridge::UnixIpcBridge(const std::string& socket_path)
: m_socket_path(socket_path)
{
}
void UnixIpcBridge::send(int value)
{
connect_to_consumer();
ssize_t n = ::send(m_socket_fd, &value, sizeof(value), 0);
if (n != sizeof(value))
{
close(m_socket_fd);
m_socket_fd = -1;
throw std::runtime_error("UnixIpcBridge::send: failed to write value");
}
close(m_socket_fd);
m_socket_fd = -1;
}
void UnixIpcBridge::connect_to_consumer()
{
m_socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (m_socket_fd < 0)
{
throw std::runtime_error("UnixIpcBridge: socket() failed");
}
struct sockaddr_un addr = {};
addr.sun_family = AF_UNIX;
std::strncpy(addr.sun_path, m_socket_path.c_str(), sizeof(addr.sun_path) - 1);
if (connect(m_socket_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) <
0)
{
close(m_socket_fd);
m_socket_fd = -1;
throw std::runtime_error("UnixIpcBridge: connect() failed");
}
}

View File

@@ -12,4 +12,44 @@ target_link_libraries(test_sysfs_reader
gtest_main
)
add_test(NAME test_sysfs_reader COMMAND test_sysfs_reader)
add_executable(test_producer
test_producer.cxx
)
target_link_libraries(test_producer
PRIVATE
core
gtest
gtest_main
)
add_test(NAME test_producer COMMAND test_producer)
add_executable(test_ipc
test_unix_ipc.cxx
)
target_link_libraries(test_ipc
PRIVATE
core
gtest
gtest_main
)
add_test(NAME test_ipc COMMAND test_ipc)
add_executable(test_consumer
test_consumer.cxx
)
target_link_libraries(test_consumer
PRIVATE
core
gtest
gtest_main
Qt5::Core
Qt5::Test
)
add_test(NAME test_consumer COMMAND test_consumer)

120
tests/test_consumer.cxx Normal file
View File

@@ -0,0 +1,120 @@
#include <gtest/gtest.h>
#include <QCoreApplication>
#include <QSignalSpy>
#include "Consumer.hpp"
#include "UnixIpcBridge.hpp"
// QSignalSpy needs a QCoreApplication to dispatch queued signals
static int argc_ = 0;
static QCoreApplication app_(argc_, nullptr);
TEST(ConsumerThreadTest, ReceivesSingleValue)
{
const std::string sock = "/tmp/test_ct_single.sock";
ConsumerThread consumer(sock);
// QSignalSpy records every emission of the given signal
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(42);
// spy.wait() pumps the event loop for up to 1s until a signal arrives
spy.wait(1000);
consumer.stop();
ASSERT_EQ(spy.count(), 1);
EXPECT_EQ(spy.at(0).at(0).toInt(), 42);
}
TEST(ConsumerThreadTest, ReceivesMultipleValues)
{
const std::string sock = "/tmp/test_ct_multi.sock";
ConsumerThread consumer(sock);
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
consumer.start();
constexpr int kMessages = 5;
for (int i = 0; i < kMessages; ++i)
{
UnixIpcBridge bridge(sock);
bridge.send(i * 10);
// Small delay so the consumer can re-enter accept() between sends
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// Wait until all signals arrive (or timeout after 5s)
for (int attempt = 0; spy.count() < kMessages && attempt < 50; ++attempt)
{
spy.wait(100);
}
consumer.stop();
ASSERT_EQ(spy.count(), kMessages);
for (int i = 0; i < kMessages; ++i)
{
EXPECT_EQ(spy.at(i).at(0).toInt(), i * 10);
}
}
TEST(ConsumerThreadTest, ReceivesNegativeAndZero)
{
// Zero
{
const std::string sock = "/tmp/test_ct_zero.sock";
ConsumerThread consumer(sock);
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(0);
spy.wait(1000);
consumer.stop();
ASSERT_EQ(spy.count(), 1);
EXPECT_EQ(spy.at(0).at(0).toInt(), 0);
}
// Negative
{
const std::string sock = "/tmp/test_ct_neg.sock";
ConsumerThread consumer(sock);
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(-999);
spy.wait(1000);
consumer.stop();
ASSERT_EQ(spy.count(), 1);
EXPECT_EQ(spy.at(0).at(0).toInt(), -999);
}
}
TEST(ConsumerThreadTest, StopsCleanlyWithoutDeadlock)
{
const std::string sock = "/tmp/test_ct_stop.sock";
ConsumerThread consumer(sock);
consumer.start();
// stop() must return without hanging, even with no connections
consumer.stop();
}
TEST(ConsumerThreadTest, StopsCleanlyWhenNeverStarted)
{
const std::string sock = "/tmp/test_ct_nostart.sock";
ConsumerThread consumer(sock);
// stop() on a consumer that was never started must not crash
consumer.stop();
}

124
tests/test_producer.cxx Normal file
View File

@@ -0,0 +1,124 @@
#include <gtest/gtest.h>
#include <chrono>
#include <fstream>
#include "Producer.hpp"
TEST(ProducerTest, ProducerCallsBackWhenEnabled)
{
// Arrange create the file and write "1\n" into it.
{
std::ofstream out("fake_sysfs_input");
out << "1\n";
}
// create a fake callback function
std::vector<int> outputs;
std::vector<std::string> logs;
// construct a producer with fake file and callback
Producer producer{"fake_sysfs_input", [&outputs](int value)
{ outputs.push_back(value); }, []() { return 42; },
[&logs](const std::string& msg) { logs.push_back(msg); }};
// Act: initialize producer and stop it.
producer.start();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
producer.stop();
// Assert: we expect one output being 42
EXPECT_EQ(outputs[0], 42);
EXPECT_NE(logs[0].find("Enabled"), std::string::npos);
}
TEST(ProducerTest, ProducerDoesNotCallWhenUnexpectedValue)
{
// Arrange create the file and write "0\n" into it.
{
std::ofstream out("fake_sysfs_input");
out << "0\n";
}
std::vector<int> outputs;
std::vector<std::string> logs;
Producer producer{"fake_sysfs_input", [&outputs](int value)
{ outputs.push_back(value); }, []() { return 42; },
[&logs](const std::string& msg) { logs.push_back(msg); },
[](std::chrono::milliseconds) {}};
producer.start();
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // ← 1ms window
producer.stop();
// Assert: we expect no output
EXPECT_TRUE(outputs.empty());
EXPECT_NE(logs[0].find("UnexpectedValue"), std::string::npos);
}
TEST(ProducerTest, ProducerDoesNotCallWhenEmpty)
{
// Arrange create the file and write "0\n" into it.
{
std::ofstream out("fake_sysfs_input");
out << " ";
}
std::vector<int> outputs;
std::vector<std::string> logs;
Producer producer{"fake_sysfs_input", [&outputs](int value)
{ outputs.push_back(value); }, []() { return 42; },
[&logs](const std::string& msg) { logs.push_back(msg); },
[](std::chrono::milliseconds) {}};
producer.start();
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // ← 1ms window
producer.stop();
// Assert: we expect no output
EXPECT_TRUE(outputs.empty());
EXPECT_NE(logs[0].find("Empty"), std::string::npos);
}
TEST(ProducerTest, ProducerDoesNotCallWhenUnreachable)
{
std::vector<int> outputs;
std::vector<std::string> logs;
Producer producer{"nonexistant_sysfs_input", [&outputs](int value)
{ outputs.push_back(value); }, []() { return 42; },
[&logs](const std::string& msg) { logs.push_back(msg); },
[](std::chrono::milliseconds) {}};
producer.start();
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // ← 1ms window
producer.stop();
// Assert: we expect no output
EXPECT_TRUE(outputs.empty());
EXPECT_NE(logs[0].find("Unreachable"), std::string::npos);
}
TEST(ProducerTest, ProducerDoesNotCallWhenTempTooHigh)
{
// create a file that contains error
{
std::ofstream out("fake_sysfs_input");
out << "error: temp too high";
}
std::vector<int> outputs;
std::vector<std::string> logs;
Producer producer{"fake_sysfs_input", [&outputs](int value)
{ outputs.push_back(value); }, []() { return 42; },
[&logs](const std::string& msg) { logs.push_back(msg); },
[](std::chrono::milliseconds) {}};
producer.start();
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // ← 1ms window
producer.stop();
// Assert: we expect no output
EXPECT_TRUE(outputs.empty());
EXPECT_NE(logs[0].find("Error"), std::string::npos);
}

220
tests/test_unix_ipc.cxx Normal file
View File

@@ -0,0 +1,220 @@
#include <gtest/gtest.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <atomic>
#include <chrono>
#include <cstring>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
#include "UnixIpcBridge.hpp"
// ---------------------------------------------------------------------------
// Helper: a minimal UNIX-domain socket server that accepts one connection,
// reads `count` ints, then tears down cleanly. Uses a ready-flag so the
// client never races against bind/listen.
// ---------------------------------------------------------------------------
class FakeConsumer
{
public:
explicit FakeConsumer(const std::string& path, int count = 1)
: m_path(path), m_count(count)
{
// Remove stale socket from any previous failed run
unlink(m_path.c_str());
}
/// Start the server on a background thread.
void start()
{
m_thread = std::thread([this] { run(); });
// Spin until the server signals it is listening (bounded wait).
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
while (!m_ready.load(std::memory_order_acquire))
{
if (std::chrono::steady_clock::now() > deadline)
{
throw std::runtime_error("FakeConsumer: server failed to start");
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
/// Block until the server thread finishes.
void join() { m_thread.join(); }
/// Values received in order.
const std::vector<int>& received() const { return m_received; }
~FakeConsumer() { unlink(m_path.c_str()); }
private:
void run()
{
m_server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
ASSERT_GE(m_server_fd, 0) << "socket() failed: " << strerror(errno);
struct sockaddr_un addr = {};
addr.sun_family = AF_UNIX;
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
ASSERT_EQ(
bind(m_server_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)), 0)
<< "bind() failed: " << strerror(errno);
ASSERT_EQ(listen(m_server_fd, 1), 0)
<< "listen() failed: " << strerror(errno);
// Signal that we are ready to accept connections.
m_ready.store(true, std::memory_order_release);
int client_fd = accept(m_server_fd, nullptr, nullptr);
ASSERT_GE(client_fd, 0) << "accept() failed: " << strerror(errno);
for (int i = 0; i < m_count; ++i)
{
int value = 0;
ssize_t n = recv(client_fd, &value, sizeof(value), MSG_WAITALL);
ASSERT_EQ(n, static_cast<ssize_t>(sizeof(value)))
<< "recv() short read on message " << i;
m_received.push_back(value);
}
close(client_fd);
close(m_server_fd);
}
std::string m_path;
int m_count;
int m_server_fd = -1;
std::atomic<bool> m_ready{false};
std::thread m_thread;
std::vector<int> m_received;
};
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
/// Sends a single integer value and verifies the consumer receives it.
TEST(UnixIpcBridgeTest, SendsSingleInt)
{
const std::string sock = "/tmp/test_ipc_single.sock";
FakeConsumer consumer(sock, /*count=*/1);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(42);
consumer.join();
ASSERT_EQ(consumer.received().size(), 1u);
EXPECT_EQ(consumer.received()[0], 42);
}
/// Sends zero and a negative value — makes sure sign bits survive.
TEST(UnixIpcBridgeTest, SendsZeroAndNegativeValues)
{
// Zero
{
const std::string sock = "/tmp/test_ipc_zero.sock";
FakeConsumer consumer(sock, 1);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(0);
consumer.join();
ASSERT_EQ(consumer.received().size(), 1u);
EXPECT_EQ(consumer.received()[0], 0);
}
// Negative
{
const std::string sock = "/tmp/test_ipc_neg.sock";
FakeConsumer consumer(sock, 1);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(-1);
consumer.join();
ASSERT_EQ(consumer.received().size(), 1u);
EXPECT_EQ(consumer.received()[0], -1);
}
}
/// Sends INT_MAX / INT_MIN to check for truncation or overflow.
TEST(UnixIpcBridgeTest, SendsExtremeBoundaryValues)
{
{
const std::string sock = "/tmp/test_ipc_max.sock";
FakeConsumer consumer(sock, 1);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(std::numeric_limits<int>::max());
consumer.join();
ASSERT_EQ(consumer.received().size(), 1u);
EXPECT_EQ(consumer.received()[0], std::numeric_limits<int>::max());
}
{
const std::string sock = "/tmp/test_ipc_min.sock";
FakeConsumer consumer(sock, 1);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(std::numeric_limits<int>::min());
consumer.join();
ASSERT_EQ(consumer.received().size(), 1u);
EXPECT_EQ(consumer.received()[0], std::numeric_limits<int>::min());
}
}
/// Connecting to a non-existent socket must throw, not silently fail.
TEST(UnixIpcBridgeTest, ThrowsWhenNoConsumerListening)
{
const std::string sock = "/tmp/test_ipc_noserver.sock";
unlink(sock.c_str()); // make sure nothing is there
UnixIpcBridge bridge(sock);
EXPECT_THROW(bridge.send(99), std::runtime_error);
}
/// Multiple sequential sends (each reopens the connection).
TEST(UnixIpcBridgeTest, MultipleSendsSequentially)
{
const std::string sock = "/tmp/test_ipc_multi.sock";
constexpr int kMessages = 5;
// Server expects exactly kMessages ints from kMessages connections.
// Because the bridge reconnects every send(), we run kMessages
// single-message consumers sequentially.
std::vector<int> all_received;
for (int i = 0; i < kMessages; ++i)
{
FakeConsumer consumer(sock, 1);
consumer.start();
UnixIpcBridge bridge(sock);
bridge.send(i * 10);
consumer.join();
ASSERT_EQ(consumer.received().size(), 1u);
all_received.push_back(consumer.received()[0]);
}
ASSERT_EQ(all_received.size(), static_cast<size_t>(kMessages));
for (int i = 0; i < kMessages; ++i)
{
EXPECT_EQ(all_received[i], i * 10);
}
}