// test_race_conditions.cxx // SPDX-License-Identifier: GPL-3.0-or-later // Author: Unai Blazquez #include #include #include #include #include #include #include #include #include #include #include "Consumer.hpp" #include "Producer.hpp" #include "UnixIpcBridge.hpp" static int argc_ = 0; static QCoreApplication app_(argc_, nullptr); TEST(RaceConditionTest, RepeatedStartStopWhileProducerSends) { const std::string sock = "/tmp/test_race.sock"; constexpr int kCycles = 20; // Watchdog: if the test takes longer than 15s, declare deadlock. std::atomic test_done{false}; std::thread watchdog([&test_done]() { for (int i = 0; i < 150 && !test_done.load(); ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } if (!test_done.load()) { std::cerr << "DEADLOCK DETECTED: RepeatedStartStopWhileProducerSends timed out" << std::endl; std::abort(); } }); std::atomic producer_running{true}; std::thread producer([&]() { while (producer_running.load()) { try { UnixIpcBridge bridge(sock); bridge.send(42); } catch (const std::runtime_error&) { // Expected: consumer socket not ready or just torn down. } std::this_thread::sleep_for(std::chrono::milliseconds(5)); } }); for (int i = 0; i < kCycles; ++i) { ConsumerThread consumer(sock); consumer.start(); // Let it run briefly so the producer can connect during some cycles. std::this_thread::sleep_for(std::chrono::milliseconds(10 + (i % 5) * 5)); // stop() must return without deadlock every single time. consumer.stop(); } producer_running.store(false); producer.join(); test_done.store(true); watchdog.join(); // If we reach here, no deadlock across kCycles start/stop cycles. SUCCEED(); } TEST(RaceConditionTest, ProducerSurvivesConsumerCrash) { const std::string sock = "/tmp/test_crash.sock"; const std::string sysfs = "./fake_sysfs_race"; // Prepare sysfs file so the producer is in Enabled state. { std::ofstream(sysfs) << "1\n"; } // Track what the producer sends. std::vector sent_values; std::mutex sent_mutex; std::vector logs; std::mutex log_mutex; auto make_safe_send = [&](const std::string& path) { return [&, path](int value) { try { UnixIpcBridge bridge(path); bridge.send(value); std::lock_guard lk(sent_mutex); sent_values.push_back(value); } catch (const std::runtime_error&) { // Consumer is down — expected during the "crash" window. } }; }; Producer producer( sysfs, make_safe_send(sock), []() { return 123; }, [&](const std::string& msg) { std::lock_guard lk(log_mutex); logs.push_back(msg); }, [](std::chrono::milliseconds) { // Use a short sleep so the test runs fast. std::this_thread::sleep_for(std::chrono::milliseconds(20)); }); // Phase 1: start consumer, start producer, let a few values flow. { ConsumerThread consumer(sock); QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); consumer.start(); producer.start(); // Wait for at least 2 values to arrive. for (int attempt = 0; spy.count() < 2 && attempt < 50; ++attempt) { spy.wait(100); } ASSERT_GE(spy.count(), 2) << "Phase 1: producer should have delivered values"; // "Crash" the consumer: stop + destroy. consumer.stop(); } // Phase 2: producer is still running with no consumer (sends will fail). std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Phase 3: bring up a fresh consumer. Producer should resume delivering. { ConsumerThread consumer2(sock); QSignalSpy spy2(&consumer2, &ConsumerThread::valueReceived); consumer2.start(); for (int attempt = 0; spy2.count() < 2 && attempt < 50; ++attempt) { spy2.wait(100); } consumer2.stop(); ASSERT_GE(spy2.count(), 2) << "Phase 3: producer must deliver to a new consumer after crash"; // Values received by the second consumer should all be 123. for (int i = 0; i < spy2.count(); ++i) { EXPECT_EQ(spy2.at(i).at(0).toInt(), 123); } } producer.stop(); // Producer logged throughout all three phases. { std::lock_guard lk(log_mutex); EXPECT_GE(logs.size(), 3u) << "Producer should have kept logging"; } }