#include #include #include #include #include #include #include #include "Consumer.hpp" #include "UnixIpcBridge.hpp" // QSignalSpy needs a QCoreApplication to dispatch queued signals static int argc_ = 0; static QCoreApplication app_(argc_, nullptr); TEST(ConsumerThreadTest, ReceivesSingleValue) { const std::string sock = "/tmp/test_ct_single.sock"; ConsumerThread consumer(sock); // QSignalSpy records every emission of the given signal QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); consumer.start(); UnixIpcBridge bridge(sock); bridge.send(42); // spy.wait() pumps the event loop for up to 1s until a signal arrives spy.wait(1000); consumer.stop(); ASSERT_EQ(spy.count(), 1); EXPECT_EQ(spy.at(0).at(0).toInt(), 42); } TEST(ConsumerThreadTest, ReceivesMultipleValues) { const std::string sock = "/tmp/test_ct_multi.sock"; ConsumerThread consumer(sock); QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); consumer.start(); constexpr int kMessages = 5; for (int i = 0; i < kMessages; ++i) { UnixIpcBridge bridge(sock); bridge.send(i * 10); // Small delay so the consumer can re-enter accept() between sends std::this_thread::sleep_for(std::chrono::milliseconds(10)); } // Wait until all signals arrive (or timeout after 5s) for (int attempt = 0; spy.count() < kMessages && attempt < 50; ++attempt) { spy.wait(100); } consumer.stop(); ASSERT_EQ(spy.count(), kMessages); for (int i = 0; i < kMessages; ++i) { EXPECT_EQ(spy.at(i).at(0).toInt(), i * 10); } } TEST(ConsumerThreadTest, ReceivesNegativeAndZero) { // Zero { const std::string sock = "/tmp/test_ct_zero.sock"; ConsumerThread consumer(sock); QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); consumer.start(); UnixIpcBridge bridge(sock); bridge.send(0); spy.wait(1000); consumer.stop(); ASSERT_EQ(spy.count(), 1); EXPECT_EQ(spy.at(0).at(0).toInt(), 0); } // Negative { const std::string sock = "/tmp/test_ct_neg.sock"; ConsumerThread consumer(sock); QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); consumer.start(); UnixIpcBridge bridge(sock); bridge.send(-999); spy.wait(1000); consumer.stop(); ASSERT_EQ(spy.count(), 1); EXPECT_EQ(spy.at(0).at(0).toInt(), -999); } } TEST(ConsumerThreadTest, StopsCleanlyWithoutDeadlock) { const std::string sock = "/tmp/test_ct_stop.sock"; ConsumerThread consumer(sock); consumer.start(); // stop() must return without hanging, even with no connections consumer.stop(); } TEST(ConsumerThreadTest, StopsCleanlyWhenNeverStarted) { const std::string sock = "/tmp/test_ct_nostart.sock"; ConsumerThread consumer(sock); // stop() on a consumer that was never started must not crash consumer.stop(); } // --------------------------------------------------------------------------- // Requirement 2: Consumer receiving corrupted data (non-numeric) // --------------------------------------------------------------------------- /// Helper: raw-connect to a UNIX socket and send arbitrary bytes. static void send_raw_bytes(const std::string& path, const void* data, size_t len) { int fd = socket(AF_UNIX, SOCK_STREAM, 0); ASSERT_GE(fd, 0); struct sockaddr_un addr = {}; addr.sun_family = AF_UNIX; std::strncpy(addr.sun_path, path.c_str(), sizeof(addr.sun_path) - 1); ASSERT_EQ( connect(fd, reinterpret_cast(&addr), sizeof(addr)), 0); if (len > 0) { ::send(fd, data, len, 0); } close(fd); } TEST(ConsumerThreadTest, DropsCorruptedShortMessage) { const std::string sock = "/tmp/test_ct_corrupt_short.sock"; ConsumerThread consumer(sock); QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); consumer.start(); // Send only 2 bytes instead of sizeof(int)==4 — corrupted / partial message uint16_t garbage = 0xBEEF; send_raw_bytes(sock, &garbage, sizeof(garbage)); // Give the consumer time to process (or not) spy.wait(500); consumer.stop(); // No signal should have been emitted EXPECT_EQ(spy.count(), 0); } TEST(ConsumerThreadTest, DropsEmptyConnection) { const std::string sock = "/tmp/test_ct_corrupt_empty.sock"; ConsumerThread consumer(sock); QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); consumer.start(); // Connect and immediately close — zero bytes sent send_raw_bytes(sock, nullptr, 0); spy.wait(500); consumer.stop(); EXPECT_EQ(spy.count(), 0); } TEST(ConsumerThreadTest, SurvivesCorruptedThenReceivesValid) { const std::string sock = "/tmp/test_ct_corrupt_then_valid.sock"; ConsumerThread consumer(sock); QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); consumer.start(); // First: send corrupted (1 byte) uint8_t one_byte = 0xFF; send_raw_bytes(sock, &one_byte, sizeof(one_byte)); std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Then: send a valid int via the normal bridge UnixIpcBridge bridge(sock); bridge.send(777); // Wait for the valid signal for (int attempt = 0; spy.count() < 1 && attempt < 20; ++attempt) { spy.wait(100); } consumer.stop(); // The corrupted message must have been dropped, valid one received ASSERT_EQ(spy.count(), 1); EXPECT_EQ(spy.at(0).at(0).toInt(), 777); }