diff --git a/tests/test_race_conditions.cxx b/tests/test_race_conditions.cxx index b367719..7b06a30 100644 --- a/tests/test_race_conditions.cxx +++ b/tests/test_race_conditions.cxx @@ -5,13 +5,17 @@ #include #include +#include #include #include +#include #include #include #include +#include #include "Consumer.hpp" +#include "Producer.hpp" #include "UnixIpcBridge.hpp" static int argc_ = 0; @@ -39,9 +43,6 @@ TEST(RaceConditionTest, RepeatedStartStopWhileProducerSends) } }); - // Producer thread: keeps trying to send values. connect() failures - // (consumer mid-restart) are expected and silently ignored. - std::atomic producer_running{true}; std::thread producer([&]() { while (producer_running.load()) @@ -59,7 +60,6 @@ TEST(RaceConditionTest, RepeatedStartStopWhileProducerSends) } }); - // Main thread: repeatedly start/stop the consumer. for (int i = 0; i < kCycles; ++i) { ConsumerThread consumer(sock); @@ -81,3 +81,97 @@ TEST(RaceConditionTest, RepeatedStartStopWhileProducerSends) // If we reach here, no deadlock across kCycles start/stop cycles. SUCCEED(); } + +TEST(RaceConditionTest, ProducerSurvivesConsumerCrash) +{ + const std::string sock = "/tmp/test_crash.sock"; + const std::string sysfs = "./fake_sysfs_race"; + + // Prepare sysfs file so the producer is in Enabled state. + { std::ofstream(sysfs) << "1\n"; } + + // Track what the producer sends. + std::vector sent_values; + std::mutex sent_mutex; + std::vector logs; + std::mutex log_mutex; + + auto make_safe_send = [&](const std::string& path) { + return [&, path](int value) { + try + { + UnixIpcBridge bridge(path); + bridge.send(value); + std::lock_guard lk(sent_mutex); + sent_values.push_back(value); + } + catch (const std::runtime_error&) + { + // Consumer is down — expected during the "crash" window. + } + }; + }; + + Producer producer( + sysfs, make_safe_send(sock), []() { return 123; }, + [&](const std::string& msg) { + std::lock_guard lk(log_mutex); + logs.push_back(msg); + }, + [](std::chrono::milliseconds) { + // Use a short sleep so the test runs fast. + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + }); + + // Phase 1: start consumer, start producer, let a few values flow. + { + ConsumerThread consumer(sock); + QSignalSpy spy(&consumer, &ConsumerThread::valueReceived); + consumer.start(); + producer.start(); + + // Wait for at least 2 values to arrive. + for (int attempt = 0; spy.count() < 2 && attempt < 50; ++attempt) + { + spy.wait(100); + } + ASSERT_GE(spy.count(), 2) << "Phase 1: producer should have delivered values"; + + // "Crash" the consumer: stop + destroy. + consumer.stop(); + } + + // Phase 2: producer is still running with no consumer (sends will fail). + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + // Phase 3: bring up a fresh consumer. Producer should resume delivering. + { + ConsumerThread consumer2(sock); + QSignalSpy spy2(&consumer2, &ConsumerThread::valueReceived); + consumer2.start(); + + for (int attempt = 0; spy2.count() < 2 && attempt < 50; ++attempt) + { + spy2.wait(100); + } + + consumer2.stop(); + + ASSERT_GE(spy2.count(), 2) + << "Phase 3: producer must deliver to a new consumer after crash"; + + // Values received by the second consumer should all be 123. + for (int i = 0; i < spy2.count(); ++i) + { + EXPECT_EQ(spy2.at(i).at(0).toInt(), 123); + } + } + + producer.stop(); + + // Producer logged throughout all three phases. + { + std::lock_guard lk(log_mutex); + EXPECT_GE(logs.size(), 3u) << "Producer should have kept logging"; + } +}