// test_race_conditions.cxx // SPDX-License-Identifier: GPL-3.0-or-later // Author: Unai Blazquez #include #include #include #include #include #include #include #include #include #include #include #include #include #include "Consumer.hpp" #include "Producer.hpp" #include "UnixIpcBridge.hpp" static int argc_ = 0; static QCoreApplication app_(argc_, nullptr); TEST(RaceConditionTest, RepeatedStartStopWhileProducerSends) { const std::string sock = "/tmp/test_race.sock"; constexpr int kCycles = 20; // Watchdog: if the test takes longer than 15s, declare deadlock. std::atomic test_done{false}; std::thread watchdog([&test_done]() { for (int i = 0; i < 150 && !test_done.load(); ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } if (!test_done.load()) { std::cerr << "DEADLOCK DETECTED: RepeatedStartStopWhileProducerSends timed out" << std::endl; std::abort(); } }); std::atomic producer_running{true}; std::thread producer([&]() { while (producer_running.load()) { try { UnixIpcBridge bridge(sock); bridge.send(42); } catch (const std::runtime_error&) { // Expected: consumer socket not ready or just torn down. } std::this_thread::sleep_for(std::chrono::milliseconds(5)); } }); for (int i = 0; i < kCycles; ++i) { ConsumerThread consumer(sock); consumer.start(); // Let it run briefly so the producer can connect during some cycles. std::this_thread::sleep_for(std::chrono::milliseconds(10 + (i % 5) * 5)); // stop() must return without deadlock every single time. consumer.stop(); } producer_running.store(false); producer.join(); test_done.store(true); watchdog.join(); // If we reach here, no deadlock across kCycles start/stop cycles. SUCCEED(); } TEST(RaceConditionTest, ProducerSurvivesConsumerCrash) { const std::string sock = "/tmp/test_crash.sock"; const std::string sysfs = "./fake_sysfs_race"; // Prepare sysfs file so the producer is in Enabled state. { std::ofstream(sysfs) << "1\n"; } // Track what the producer sends. std::vector sent_values; std::mutex sent_mutex; std::vector logs; std::mutex log_mutex; auto make_safe_send = [&](const std::string& path) { return [&, path](int value) { try { UnixIpcBridge bridge(path); bridge.send(value); std::lock_guard lk(sent_mutex); sent_values.push_back(value); } catch (const std::runtime_error&) { // Consumer is down — expected during the "crash" window. } }; }; Producer producer( sysfs, make_safe_send(sock), []() { return 123; }, [&](const std::string& msg) { std::lock_guard lk(log_mutex); logs.push_back(msg); }, [](std::chrono::milliseconds) { // Use a short sleep so the test runs fast. std::this_thread::sleep_for(std::chrono::milliseconds(20)); }); // Phase 1: start consumer, start producer, let a few values flow. { ConsumerThread consumer(sock); QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); consumer.start(); producer.start(); // Wait for at least 2 values to arrive. for (int attempt = 0; spy.count() < 2 && attempt < 50; ++attempt) { spy.wait(100); } ASSERT_GE(spy.count(), 2) << "Phase 1: producer should have delivered values"; // Simulate a hard crash: force-close the consumer's server fd from // outside its thread, causing accept() to fail with EBADF. This is // what happens when the kernel reclaims fds on SIGKILL / abort(). // // We find the server fd by calling getsockname() on open fds and // matching against our socket path. for (int fd = 3; fd < 1024; ++fd) { struct sockaddr_un addr = {}; socklen_t len = sizeof(addr); if (getsockname(fd, reinterpret_cast(&addr), &len) == 0 && addr.sun_family == AF_UNIX && std::string(addr.sun_path) == sock) { ::close(fd); // Yank the fd — consumer thread crashes out of accept() break; } } // Destructor calls stop(), which joins the (now-exited) thread and // cleans up. In a real crash no cleanup runs, but we can't leak // threads in a test process. } // Phase 2: producer is still running with no consumer (sends will fail). std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Phase 3: bring up a fresh consumer. Producer should resume delivering. { ConsumerThread consumer2(sock); QSignalSpy spy2(&consumer2, &ConsumerThread::valueReceived); consumer2.start(); for (int attempt = 0; spy2.count() < 2 && attempt < 50; ++attempt) { spy2.wait(100); } consumer2.stop(); ASSERT_GE(spy2.count(), 2) << "Phase 3: producer must deliver to a new consumer after crash"; // Values received by the second consumer should all be 123. for (int i = 0; i < spy2.count(); ++i) { EXPECT_EQ(spy2.at(i).at(0).toInt(), 123); } } producer.stop(); // Producer logged throughout all three phases. { std::lock_guard lk(log_mutex); EXPECT_GE(logs.size(), 3u) << "Producer should have kept logging"; } }