Compare commits
9 Commits
149c3a22b7
...
feature/Co
| Author | SHA1 | Date | |
|---|---|---|---|
| dfd82b1619 | |||
| 8e471d3534 | |||
| da875871ef | |||
| 2d519937b7 | |||
| 228bb998f6 | |||
| 572905a879 | |||
| 544d50ee6a | |||
| f63d40cffb | |||
| e1360ccbb4 |
@@ -8,13 +8,22 @@ project(azkoyen_ipc_test LANGUAGES CXX)
|
|||||||
set(CMAKE_CXX_STANDARD 17)
|
set(CMAKE_CXX_STANDARD 17)
|
||||||
set(CMAKE_CXX_STANDARD_REQUIRED ON)
|
set(CMAKE_CXX_STANDARD_REQUIRED ON)
|
||||||
|
|
||||||
|
# Qt Setup — AUTOMOC runs moc automatically on Q_OBJECT headers
|
||||||
|
find_package(Qt5 REQUIRED COMPONENTS Core Widgets Test)
|
||||||
|
set(CMAKE_AUTOMOC ON)
|
||||||
|
set(CMAKE_INCLUDE_CURRENT_DIR ON)
|
||||||
|
|
||||||
# Core library
|
# Core library
|
||||||
add_library(core
|
add_library(core
|
||||||
src/core/SysfsRead.cxx
|
src/core/SysfsRead.cxx
|
||||||
|
src/core/UnixIpcBridge.cxx
|
||||||
src/core/Producer.cxx
|
src/core/Producer.cxx
|
||||||
|
src/core/Consumer.cxx
|
||||||
|
include/Consumer.hpp
|
||||||
)
|
)
|
||||||
|
|
||||||
target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
|
target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
|
||||||
|
target_link_libraries(core PUBLIC Qt5::Core)
|
||||||
|
|
||||||
#tests
|
#tests
|
||||||
enable_testing()
|
enable_testing()
|
||||||
|
|||||||
20
README.md
20
README.md
@@ -25,3 +25,23 @@ The reader never throws on I/O errors; every outcome is expressed through the en
|
|||||||
## Producer class / thread
|
## Producer class / thread
|
||||||
|
|
||||||
`Producer` ([include/Producer.hpp](include/Producer.hpp), [src/core/Producer.cxx](src/core/Producer.cxx)) runs a worker `std::thread` that periodically polls the `SysfsReader` and, when the status is `Enabled`, generates a random integer and forwards it through an injected `send_fn` callback. The polling interval is 1 second under normal conditions and 7 seconds when the sysfs file reports `ErrorTempTooHigh` (cool-down).
|
`Producer` ([include/Producer.hpp](include/Producer.hpp), [src/core/Producer.cxx](src/core/Producer.cxx)) runs a worker `std::thread` that periodically polls the `SysfsReader` and, when the status is `Enabled`, generates a random integer and forwards it through an injected `send_fn` callback. The polling interval is 1 second under normal conditions and 7 seconds when the sysfs file reports `ErrorTempTooHigh` (cool-down).
|
||||||
|
|
||||||
|
## UnixIpcBridge
|
||||||
|
|
||||||
|
>[!note]
|
||||||
|
>why unix domain sockets? Because I have more experience with them under linux than with posix shared memmory and semaphore, and I find them easier to unit-test.
|
||||||
|
|
||||||
|
`UnixIpcBridge` ([include/UnixIpcBridge.hpp](include/UnixIpcBridge.hpp), [src/core/UnixIpcBridge.cxx](src/core/UnixIpcBridge.cxx)) is a small helper that connects to a UNIX domain socket and sends a single `int` per call. It opens a new connection for each value, which keeps the protocol stateless and simple.
|
||||||
|
|
||||||
|
**Tests:** [tests/test_unix_ipc.cxx](tests/test_unix_ipc.cxx) — spins up a fake socket server, sends values through the bridge, and asserts they arrive correctly.
|
||||||
|
|
||||||
|
## ConsumerThread
|
||||||
|
|
||||||
|
`ConsumerThread` ([include/ConsumerThread.hpp](include/ConsumerThread.hpp), [src/core/ConsumerThread.cxx](src/core/ConsumerThread.cxx)) is a `QObject` that listens on a UNIX domain socket in a background `std::thread`. On each received integer it:
|
||||||
|
|
||||||
|
1. Prints the value to `stdout`.
|
||||||
|
2. Emits the `valueReceived(int)` Qt signal.
|
||||||
|
|
||||||
|
The server socket is created and bound inside `start()` **before** the thread is spawned, so the socket is guaranteed to be ready by the time `start()` returns — eliminating race conditions with the producer. Graceful shutdown is handled by `stop()`, which shuts down the file descriptor to unblock the blocking `accept()` call.
|
||||||
|
|
||||||
|
**Tests:** [tests/test_consumer_thread.cxx](tests/test_consumer_thread.cxx) — uses `QSignalSpy` to verify single-value, multi-value, negative, and zero reception.
|
||||||
|
|||||||
44
include/Consumer.hpp
Normal file
44
include/Consumer.hpp
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
#pragma once
|
||||||
|
// ConsumerThread.hpp
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
// Author: Unai Blazquez <unaibg2000@gmail.com>
|
||||||
|
#include <QObject>
|
||||||
|
#include <atomic>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
/// @brief Listens on a UNIX domain socket, receives integers from the
|
||||||
|
/// producer via IPC, prints them to console, and emits a Qt signal.
|
||||||
|
class ConsumerThread : public QObject
|
||||||
|
{
|
||||||
|
Q_OBJECT
|
||||||
|
|
||||||
|
public:
|
||||||
|
/// @brief Construct the consumer bound to a socket path.
|
||||||
|
/// @param socket_path UNIX domain socket path to listen on.
|
||||||
|
/// @param parent Optional QObject parent for memory management.
|
||||||
|
explicit ConsumerThread(const std::string& socket_path,
|
||||||
|
QObject* parent = nullptr);
|
||||||
|
|
||||||
|
~ConsumerThread() override;
|
||||||
|
|
||||||
|
/// @brief Start the listener thread. The server socket is ready
|
||||||
|
/// when this function returns.
|
||||||
|
void start();
|
||||||
|
|
||||||
|
/// @brief Stop the listener thread gracefully. Safe to call multiple times.
|
||||||
|
void stop();
|
||||||
|
|
||||||
|
signals:
|
||||||
|
/// @brief Emitted every time an integer is received from the producer.
|
||||||
|
void valueReceived(int value);
|
||||||
|
|
||||||
|
private:
|
||||||
|
/// @brief Main loop: accept → recv → print → emit. Runs in m_thread.
|
||||||
|
void run_loop();
|
||||||
|
|
||||||
|
std::string m_socket_path;
|
||||||
|
int m_server_fd = -1;
|
||||||
|
std::atomic<bool> m_running{false};
|
||||||
|
std::thread m_thread;
|
||||||
|
};
|
||||||
31
include/UnixIpcBridge.hpp
Normal file
31
include/UnixIpcBridge.hpp
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
// UnixIpcBridge.hpp
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-only
|
||||||
|
// Author: Unai Blazquez <unaibg2000@gmail.com>
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
#include <fcntl.h> // non‑blocking
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/un.h>
|
||||||
|
#include <unistd.h> // close()
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
///@brief Small bridge to allow the producer class to send data over UNIX domain
|
||||||
|
/// sockets
|
||||||
|
class UnixIpcBridge
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
///@brief constructor
|
||||||
|
///@param socket path pointing the socket
|
||||||
|
explicit UnixIpcBridge(const std::string& socket_path);
|
||||||
|
|
||||||
|
///@brief sending function, this goes into the producer
|
||||||
|
///@param integer to send over the socket
|
||||||
|
void send(int value);
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::string m_socket_path;
|
||||||
|
int m_socket_fd = -1;
|
||||||
|
|
||||||
|
void connect_to_consumer();
|
||||||
|
};
|
||||||
104
src/core/Consumer.cxx
Normal file
104
src/core/Consumer.cxx
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
// ConsumerThread.cxx
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
// Author: Unai Blazquez <unaibg2000@gmail.com>
|
||||||
|
|
||||||
|
#include "Consumer.hpp"
|
||||||
|
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/un.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include <cstring>
|
||||||
|
#include <iostream>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
|
ConsumerThread::ConsumerThread(const std::string& socket_path, QObject* parent)
|
||||||
|
: QObject(parent), m_socket_path(socket_path)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
ConsumerThread::~ConsumerThread() { stop(); }
|
||||||
|
|
||||||
|
void ConsumerThread::start()
|
||||||
|
{
|
||||||
|
// Remove stale socket from previous runs
|
||||||
|
unlink(m_socket_path.c_str());
|
||||||
|
|
||||||
|
// Create, bind and listen BEFORE spawning the thread so the socket
|
||||||
|
// is guaranteed ready when start() returns — no race with the producer.
|
||||||
|
m_server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||||
|
if (m_server_fd < 0)
|
||||||
|
{
|
||||||
|
throw std::runtime_error("ConsumerThread: socket() failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
struct sockaddr_un addr = {};
|
||||||
|
addr.sun_family = AF_UNIX;
|
||||||
|
std::strncpy(addr.sun_path, m_socket_path.c_str(), sizeof(addr.sun_path) - 1);
|
||||||
|
|
||||||
|
if (bind(m_server_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
|
||||||
|
{
|
||||||
|
close(m_server_fd);
|
||||||
|
m_server_fd = -1;
|
||||||
|
throw std::runtime_error("ConsumerThread: bind() failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (listen(m_server_fd, 5) < 0)
|
||||||
|
{
|
||||||
|
close(m_server_fd);
|
||||||
|
m_server_fd = -1;
|
||||||
|
throw std::runtime_error("ConsumerThread: listen() failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
m_running.store(true);
|
||||||
|
m_thread = std::thread(&ConsumerThread::run_loop, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConsumerThread::stop()
|
||||||
|
{
|
||||||
|
if (!m_running.exchange(false))
|
||||||
|
{
|
||||||
|
return; // already stopped or never started
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown the server fd to unblock the blocking accept() call
|
||||||
|
if (m_server_fd >= 0)
|
||||||
|
{
|
||||||
|
shutdown(m_server_fd, SHUT_RDWR);
|
||||||
|
close(m_server_fd);
|
||||||
|
m_server_fd = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_thread.joinable())
|
||||||
|
{
|
||||||
|
m_thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
unlink(m_socket_path.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConsumerThread::run_loop()
|
||||||
|
{
|
||||||
|
while (m_running.load())
|
||||||
|
{
|
||||||
|
int client_fd = accept(m_server_fd, nullptr, nullptr);
|
||||||
|
if (client_fd < 0)
|
||||||
|
{
|
||||||
|
// accept() failed — most likely stop() closed the fd
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
int value = 0;
|
||||||
|
ssize_t n = recv(client_fd, &value, sizeof(value), MSG_WAITALL);
|
||||||
|
close(client_fd);
|
||||||
|
|
||||||
|
if (n == static_cast<ssize_t>(sizeof(value)))
|
||||||
|
{
|
||||||
|
// 1) Print to console (spec requirement)
|
||||||
|
std::cout << "ConsumerThread received: " << value << std::endl;
|
||||||
|
|
||||||
|
// 2) Emit Qt signal (spec requirement)
|
||||||
|
emit valueReceived(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
49
src/core/UnixIpcBridge.cxx
Normal file
49
src/core/UnixIpcBridge.cxx
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
// UnixIpcBridge.cxx
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-only
|
||||||
|
// Author: Unai Blazquez <unaibg2000@gmail.com>
|
||||||
|
#include "UnixIpcBridge.hpp"
|
||||||
|
|
||||||
|
#include <cstring>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
|
UnixIpcBridge::UnixIpcBridge(const std::string& socket_path)
|
||||||
|
: m_socket_path(socket_path)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void UnixIpcBridge::send(int value)
|
||||||
|
{
|
||||||
|
connect_to_consumer();
|
||||||
|
|
||||||
|
ssize_t n = ::send(m_socket_fd, &value, sizeof(value), 0);
|
||||||
|
if (n != sizeof(value))
|
||||||
|
{
|
||||||
|
close(m_socket_fd);
|
||||||
|
m_socket_fd = -1;
|
||||||
|
throw std::runtime_error("UnixIpcBridge::send: failed to write value");
|
||||||
|
}
|
||||||
|
|
||||||
|
close(m_socket_fd);
|
||||||
|
m_socket_fd = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void UnixIpcBridge::connect_to_consumer()
|
||||||
|
{
|
||||||
|
m_socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||||
|
if (m_socket_fd < 0)
|
||||||
|
{
|
||||||
|
throw std::runtime_error("UnixIpcBridge: socket() failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
struct sockaddr_un addr = {};
|
||||||
|
addr.sun_family = AF_UNIX;
|
||||||
|
std::strncpy(addr.sun_path, m_socket_path.c_str(), sizeof(addr.sun_path) - 1);
|
||||||
|
|
||||||
|
if (connect(m_socket_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) <
|
||||||
|
0)
|
||||||
|
{
|
||||||
|
close(m_socket_fd);
|
||||||
|
m_socket_fd = -1;
|
||||||
|
throw std::runtime_error("UnixIpcBridge: connect() failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -25,3 +25,31 @@ target_link_libraries(test_producer
|
|||||||
|
|
||||||
add_test(NAME test_producer COMMAND test_producer)
|
add_test(NAME test_producer COMMAND test_producer)
|
||||||
|
|
||||||
|
add_executable(test_ipc
|
||||||
|
test_unix_ipc.cxx
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries(test_ipc
|
||||||
|
PRIVATE
|
||||||
|
core
|
||||||
|
gtest
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(NAME test_ipc COMMAND test_ipc)
|
||||||
|
|
||||||
|
|
||||||
|
add_executable(test_consumer
|
||||||
|
test_consumer.cxx
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries(test_consumer
|
||||||
|
PRIVATE
|
||||||
|
core
|
||||||
|
gtest
|
||||||
|
gtest_main
|
||||||
|
Qt5::Core
|
||||||
|
Qt5::Test
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(NAME test_consumer COMMAND test_consumer)
|
||||||
|
|||||||
120
tests/test_consumer.cxx
Normal file
120
tests/test_consumer.cxx
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <QCoreApplication>
|
||||||
|
#include <QSignalSpy>
|
||||||
|
|
||||||
|
#include "Consumer.hpp"
|
||||||
|
#include "UnixIpcBridge.hpp"
|
||||||
|
|
||||||
|
// QSignalSpy needs a QCoreApplication to dispatch queued signals
|
||||||
|
static int argc_ = 0;
|
||||||
|
static QCoreApplication app_(argc_, nullptr);
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, ReceivesSingleValue)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_single.sock";
|
||||||
|
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
|
||||||
|
// QSignalSpy records every emission of the given signal
|
||||||
|
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(42);
|
||||||
|
|
||||||
|
// spy.wait() pumps the event loop for up to 1s until a signal arrives
|
||||||
|
spy.wait(1000);
|
||||||
|
consumer.stop();
|
||||||
|
|
||||||
|
ASSERT_EQ(spy.count(), 1);
|
||||||
|
EXPECT_EQ(spy.at(0).at(0).toInt(), 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, ReceivesMultipleValues)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_multi.sock";
|
||||||
|
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
constexpr int kMessages = 5;
|
||||||
|
for (int i = 0; i < kMessages; ++i)
|
||||||
|
{
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(i * 10);
|
||||||
|
// Small delay so the consumer can re-enter accept() between sends
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until all signals arrive (or timeout after 5s)
|
||||||
|
for (int attempt = 0; spy.count() < kMessages && attempt < 50; ++attempt)
|
||||||
|
{
|
||||||
|
spy.wait(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer.stop();
|
||||||
|
|
||||||
|
ASSERT_EQ(spy.count(), kMessages);
|
||||||
|
for (int i = 0; i < kMessages; ++i)
|
||||||
|
{
|
||||||
|
EXPECT_EQ(spy.at(i).at(0).toInt(), i * 10);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, ReceivesNegativeAndZero)
|
||||||
|
{
|
||||||
|
// Zero
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_zero.sock";
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(0);
|
||||||
|
|
||||||
|
spy.wait(1000);
|
||||||
|
consumer.stop();
|
||||||
|
|
||||||
|
ASSERT_EQ(spy.count(), 1);
|
||||||
|
EXPECT_EQ(spy.at(0).at(0).toInt(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Negative
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_neg.sock";
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
QSignalSpy spy(&consumer, &ConsumerThread::valueReceived);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(-999);
|
||||||
|
|
||||||
|
spy.wait(1000);
|
||||||
|
consumer.stop();
|
||||||
|
|
||||||
|
ASSERT_EQ(spy.count(), 1);
|
||||||
|
EXPECT_EQ(spy.at(0).at(0).toInt(), -999);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, StopsCleanlyWithoutDeadlock)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_stop.sock";
|
||||||
|
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
consumer.start();
|
||||||
|
// stop() must return without hanging, even with no connections
|
||||||
|
consumer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ConsumerThreadTest, StopsCleanlyWhenNeverStarted)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ct_nostart.sock";
|
||||||
|
|
||||||
|
ConsumerThread consumer(sock);
|
||||||
|
// stop() on a consumer that was never started must not crash
|
||||||
|
consumer.stop();
|
||||||
|
}
|
||||||
220
tests/test_unix_ipc.cxx
Normal file
220
tests/test_unix_ipc.cxx
Normal file
@@ -0,0 +1,220 @@
|
|||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/un.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <chrono>
|
||||||
|
#include <cstring>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "UnixIpcBridge.hpp"
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Helper: a minimal UNIX-domain socket server that accepts one connection,
|
||||||
|
// reads `count` ints, then tears down cleanly. Uses a ready-flag so the
|
||||||
|
// client never races against bind/listen.
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
class FakeConsumer
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit FakeConsumer(const std::string& path, int count = 1)
|
||||||
|
: m_path(path), m_count(count)
|
||||||
|
{
|
||||||
|
// Remove stale socket from any previous failed run
|
||||||
|
unlink(m_path.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the server on a background thread.
|
||||||
|
void start()
|
||||||
|
{
|
||||||
|
m_thread = std::thread([this] { run(); });
|
||||||
|
|
||||||
|
// Spin until the server signals it is listening (bounded wait).
|
||||||
|
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
|
||||||
|
while (!m_ready.load(std::memory_order_acquire))
|
||||||
|
{
|
||||||
|
if (std::chrono::steady_clock::now() > deadline)
|
||||||
|
{
|
||||||
|
throw std::runtime_error("FakeConsumer: server failed to start");
|
||||||
|
}
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Block until the server thread finishes.
|
||||||
|
void join() { m_thread.join(); }
|
||||||
|
|
||||||
|
/// Values received in order.
|
||||||
|
const std::vector<int>& received() const { return m_received; }
|
||||||
|
|
||||||
|
~FakeConsumer() { unlink(m_path.c_str()); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
void run()
|
||||||
|
{
|
||||||
|
m_server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||||
|
ASSERT_GE(m_server_fd, 0) << "socket() failed: " << strerror(errno);
|
||||||
|
|
||||||
|
struct sockaddr_un addr = {};
|
||||||
|
addr.sun_family = AF_UNIX;
|
||||||
|
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
|
||||||
|
|
||||||
|
ASSERT_EQ(
|
||||||
|
bind(m_server_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)), 0)
|
||||||
|
<< "bind() failed: " << strerror(errno);
|
||||||
|
ASSERT_EQ(listen(m_server_fd, 1), 0)
|
||||||
|
<< "listen() failed: " << strerror(errno);
|
||||||
|
|
||||||
|
// Signal that we are ready to accept connections.
|
||||||
|
m_ready.store(true, std::memory_order_release);
|
||||||
|
|
||||||
|
int client_fd = accept(m_server_fd, nullptr, nullptr);
|
||||||
|
ASSERT_GE(client_fd, 0) << "accept() failed: " << strerror(errno);
|
||||||
|
|
||||||
|
for (int i = 0; i < m_count; ++i)
|
||||||
|
{
|
||||||
|
int value = 0;
|
||||||
|
ssize_t n = recv(client_fd, &value, sizeof(value), MSG_WAITALL);
|
||||||
|
ASSERT_EQ(n, static_cast<ssize_t>(sizeof(value)))
|
||||||
|
<< "recv() short read on message " << i;
|
||||||
|
m_received.push_back(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
close(client_fd);
|
||||||
|
close(m_server_fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string m_path;
|
||||||
|
int m_count;
|
||||||
|
int m_server_fd = -1;
|
||||||
|
std::atomic<bool> m_ready{false};
|
||||||
|
std::thread m_thread;
|
||||||
|
std::vector<int> m_received;
|
||||||
|
};
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Tests
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// Sends a single integer value and verifies the consumer receives it.
|
||||||
|
TEST(UnixIpcBridgeTest, SendsSingleInt)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ipc_single.sock";
|
||||||
|
|
||||||
|
FakeConsumer consumer(sock, /*count=*/1);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(42);
|
||||||
|
|
||||||
|
consumer.join();
|
||||||
|
|
||||||
|
ASSERT_EQ(consumer.received().size(), 1u);
|
||||||
|
EXPECT_EQ(consumer.received()[0], 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends zero and a negative value — makes sure sign bits survive.
|
||||||
|
TEST(UnixIpcBridgeTest, SendsZeroAndNegativeValues)
|
||||||
|
{
|
||||||
|
// Zero
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ipc_zero.sock";
|
||||||
|
FakeConsumer consumer(sock, 1);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(0);
|
||||||
|
|
||||||
|
consumer.join();
|
||||||
|
ASSERT_EQ(consumer.received().size(), 1u);
|
||||||
|
EXPECT_EQ(consumer.received()[0], 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Negative
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ipc_neg.sock";
|
||||||
|
FakeConsumer consumer(sock, 1);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(-1);
|
||||||
|
|
||||||
|
consumer.join();
|
||||||
|
ASSERT_EQ(consumer.received().size(), 1u);
|
||||||
|
EXPECT_EQ(consumer.received()[0], -1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends INT_MAX / INT_MIN to check for truncation or overflow.
|
||||||
|
TEST(UnixIpcBridgeTest, SendsExtremeBoundaryValues)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ipc_max.sock";
|
||||||
|
FakeConsumer consumer(sock, 1);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(std::numeric_limits<int>::max());
|
||||||
|
|
||||||
|
consumer.join();
|
||||||
|
ASSERT_EQ(consumer.received().size(), 1u);
|
||||||
|
EXPECT_EQ(consumer.received()[0], std::numeric_limits<int>::max());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ipc_min.sock";
|
||||||
|
FakeConsumer consumer(sock, 1);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(std::numeric_limits<int>::min());
|
||||||
|
|
||||||
|
consumer.join();
|
||||||
|
ASSERT_EQ(consumer.received().size(), 1u);
|
||||||
|
EXPECT_EQ(consumer.received()[0], std::numeric_limits<int>::min());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connecting to a non-existent socket must throw, not silently fail.
|
||||||
|
TEST(UnixIpcBridgeTest, ThrowsWhenNoConsumerListening)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ipc_noserver.sock";
|
||||||
|
unlink(sock.c_str()); // make sure nothing is there
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
EXPECT_THROW(bridge.send(99), std::runtime_error);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Multiple sequential sends (each reopens the connection).
|
||||||
|
TEST(UnixIpcBridgeTest, MultipleSendsSequentially)
|
||||||
|
{
|
||||||
|
const std::string sock = "/tmp/test_ipc_multi.sock";
|
||||||
|
constexpr int kMessages = 5;
|
||||||
|
|
||||||
|
// Server expects exactly kMessages ints from kMessages connections.
|
||||||
|
// Because the bridge reconnects every send(), we run kMessages
|
||||||
|
// single-message consumers sequentially.
|
||||||
|
std::vector<int> all_received;
|
||||||
|
for (int i = 0; i < kMessages; ++i)
|
||||||
|
{
|
||||||
|
FakeConsumer consumer(sock, 1);
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
UnixIpcBridge bridge(sock);
|
||||||
|
bridge.send(i * 10);
|
||||||
|
|
||||||
|
consumer.join();
|
||||||
|
ASSERT_EQ(consumer.received().size(), 1u);
|
||||||
|
all_received.push_back(consumer.received()[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(all_received.size(), static_cast<size_t>(kMessages));
|
||||||
|
for (int i = 0; i < kMessages; ++i)
|
||||||
|
{
|
||||||
|
EXPECT_EQ(all_received[i], i * 10);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user