diff --git a/FWCore/SharedMemory/interface/ControllerChannel.h b/FWCore/SharedMemory/interface/ControllerChannel.h index 54b1ed80c925f..0cbf49e1d4f54 100644 --- a/FWCore/SharedMemory/interface/ControllerChannel.h +++ b/FWCore/SharedMemory/interface/ControllerChannel.h @@ -189,13 +189,43 @@ namespace edm::shared_memory { int id_; unsigned int maxWaitInSeconds_; std::string smName_; + struct SMORemover { + //handle removing the shared memory object from the system even + // if an exception happens during construction + SMORemover(const std::string& iName) : m_name(iName) { + //remove an object which was left from a previous failed job + boost::interprocess::shared_memory_object::remove(m_name.c_str()); + } + ~SMORemover() { boost::interprocess::shared_memory_object::remove(m_name.c_str()); }; + //ControllerChannel passes in smName_ so it owns the string + std::string const& m_name; + } smRemover_; boost::interprocess::managed_shared_memory managed_sm_; BufferInfo* toWorkerBufferInfo_; BufferInfo* fromWorkerBufferInfo_; + struct MutexRemover { + MutexRemover(std::string iName) : m_name(std::move(iName)) { + boost::interprocess::named_mutex::remove(m_name.c_str()); + } + ~MutexRemover() { boost::interprocess::named_mutex::remove(m_name.c_str()); }; + std::string const m_name; + }; + MutexRemover mutexRemover_; boost::interprocess::named_mutex mutex_; + + struct ConditionRemover { + ConditionRemover(std::string iName) : m_name(std::move(iName)) { + boost::interprocess::named_condition::remove(m_name.c_str()); + } + ~ConditionRemover() { boost::interprocess::named_condition::remove(m_name.c_str()); }; + std::string const m_name; + }; + + ConditionRemover cndFromMainRemover_; boost::interprocess::named_condition cndFromMain_; + ConditionRemover cndToMainRemover_; boost::interprocess::named_condition cndToMain_; edm::Transition* transitionType_; diff --git a/FWCore/SharedMemory/interface/WriteBuffer.h b/FWCore/SharedMemory/interface/WriteBuffer.h index 1080d678df262..f4ba2995e23ff 100644 --- a/FWCore/SharedMemory/interface/WriteBuffer.h +++ b/FWCore/SharedMemory/interface/WriteBuffer.h @@ -69,7 +69,20 @@ namespace edm::shared_memory { char* buffer_; BufferInfo* bufferInfo_; std::array bufferNames_; - std::unique_ptr sm_; + struct SMOwner { + SMOwner() = default; + SMOwner(std::string const& iName, std::size_t iLength); + ~SMOwner(); + SMOwner& operator=(SMOwner&&) = default; + boost::interprocess::managed_shared_memory* operator->() { return sm_.get(); } + boost::interprocess::managed_shared_memory* get() { return sm_.get(); } + operator bool() const { return bool(sm_); } + void reset(); + + private: + std::string name_; + std::unique_ptr sm_; + } sm_; }; } // namespace edm::shared_memory diff --git a/FWCore/SharedMemory/src/ControllerChannel.cc b/FWCore/SharedMemory/src/ControllerChannel.cc index cfcfce46527f8..97af82efeecaa 100644 --- a/FWCore/SharedMemory/src/ControllerChannel.cc +++ b/FWCore/SharedMemory/src/ControllerChannel.cc @@ -35,24 +35,25 @@ ControllerChannel::ControllerChannel(std::string const& iName, int id, unsigned : id_{id}, maxWaitInSeconds_{iMaxWaitInSeconds}, smName_{uniqueName(iName)}, - managed_sm_{open_or_create, smName_.c_str(), 1024}, + smRemover_{smName_}, + managed_sm_{create_only, smName_.c_str(), 1024}, toWorkerBufferInfo_{bufferInfo(channel_names::kToWorkerBufferInfo, managed_sm_)}, fromWorkerBufferInfo_{bufferInfo(channel_names::kFromWorkerBufferInfo, managed_sm_)}, - mutex_{open_or_create, uniqueName(channel_names::kMutex).c_str()}, - cndFromMain_{open_or_create, uniqueName(channel_names::kConditionFromMain).c_str()}, - cndToMain_{open_or_create, uniqueName(channel_names::kConditionToMain).c_str()} { - managed_sm_.destroy(channel_names::kStop); + mutexRemover_{uniqueName(channel_names::kMutex)}, + mutex_{create_only, uniqueName(channel_names::kMutex).c_str()}, + cndFromMainRemover_{uniqueName(channel_names::kConditionFromMain)}, + cndFromMain_{create_only, uniqueName(channel_names::kConditionFromMain).c_str()}, + cndToMainRemover_{uniqueName(channel_names::kConditionToMain)}, + cndToMain_{create_only, uniqueName(channel_names::kConditionToMain).c_str()} { stop_ = managed_sm_.construct(channel_names::kStop)(false); assert(stop_); keepEvent_ = managed_sm_.construct(channel_names::kKeepEvent)(true); assert(keepEvent_); - managed_sm_.destroy(channel_names::kTransitionType); transitionType_ = managed_sm_.construct(channel_names::kTransitionType)(edm::Transition::NumberOfTransitions); assert(transitionType_); - managed_sm_.destroy(channel_names::kTransitionID); transitionID_ = managed_sm_.construct(channel_names::kTransitionID)(0); assert(transitionID_); } @@ -64,10 +65,6 @@ ControllerChannel::~ControllerChannel() { managed_sm_.destroy(channel_names::kTransitionID); managed_sm_.destroy(channel_names::kToWorkerBufferInfo); managed_sm_.destroy(channel_names::kFromWorkerBufferInfo); - - named_mutex::remove(uniqueName(channel_names::kMutex).c_str()); - named_condition::remove(uniqueName(channel_names::kConditionFromMain).c_str()); - named_condition::remove(uniqueName(channel_names::kConditionToMain).c_str()); } // diff --git a/FWCore/SharedMemory/src/WorkerChannel.cc b/FWCore/SharedMemory/src/WorkerChannel.cc index 2659646b88e90..ad5488e363320 100644 --- a/FWCore/SharedMemory/src/WorkerChannel.cc +++ b/FWCore/SharedMemory/src/WorkerChannel.cc @@ -39,14 +39,14 @@ namespace { WorkerChannel::WorkerChannel(std::string const& iName, const std::string& iUniqueID) : managed_shm_{open_only, iName.c_str()}, - mutex_{open_or_create, unique_name(channel_names::kMutex, iUniqueID).c_str()}, - cndFromController_{open_or_create, unique_name(channel_names::kConditionFromMain, iUniqueID).c_str()}, + mutex_{open_only, unique_name(channel_names::kMutex, iUniqueID).c_str()}, + cndFromController_{open_only, unique_name(channel_names::kConditionFromMain, iUniqueID).c_str()}, stop_{managed_shm_.find(channel_names::kStop).first}, transitionType_{managed_shm_.find(channel_names::kTransitionType).first}, transitionID_{managed_shm_.find(channel_names::kTransitionID).first}, toWorkerBufferInfo_{managed_shm_.find(channel_names::kToWorkerBufferInfo).first}, fromWorkerBufferInfo_{managed_shm_.find(channel_names::kFromWorkerBufferInfo).first}, - cndToController_{open_or_create, unique_name(channel_names::kConditionToMain, iUniqueID).c_str()}, + cndToController_{open_only, unique_name(channel_names::kConditionToMain, iUniqueID).c_str()}, keepEvent_{managed_shm_.find(channel_names::kKeepEvent).first}, lock_{mutex_} { assert(stop_); diff --git a/FWCore/SharedMemory/src/WriteBuffer.cc b/FWCore/SharedMemory/src/WriteBuffer.cc index 7687cae9eceb9..79809a677255a 100644 --- a/FWCore/SharedMemory/src/WriteBuffer.cc +++ b/FWCore/SharedMemory/src/WriteBuffer.cc @@ -27,12 +27,25 @@ using namespace edm::shared_memory; // // constructors and destructor // +WriteBuffer::SMOwner::SMOwner(std::string const& iName, std::size_t iLength) : name_(iName) { + //do a remove first just in case a previous job had a failure and left a same named + // memory object + boost::interprocess::shared_memory_object::remove(iName.c_str()); + sm_ = std::make_unique( + boost::interprocess::create_only, iName.c_str(), iLength); +} + +WriteBuffer::SMOwner::~SMOwner() { + if (sm_) { + boost::interprocess::shared_memory_object::remove(name_.c_str()); + } +} + +void WriteBuffer::SMOwner::reset() { *this = SMOwner(); } WriteBuffer::~WriteBuffer() { if (sm_) { sm_->destroy(buffer_names::kBuffer); - sm_.reset(); - boost::interprocess::shared_memory_object::remove(bufferNames_[bufferInfo_->index_].c_str()); } } // @@ -51,9 +64,8 @@ void WriteBuffer::growBuffer(std::size_t iLength) { << "in growBuffer while destroying the shared memory object the following exception was caught\n" << iExcept.what(); } - sm_.reset(); try { - boost::interprocess::shared_memory_object::remove(bufferNames_[bufferInfo_->index_].c_str()); + sm_.reset(); } catch (boost::interprocess::interprocess_exception const& iExcept) { throw cms::Exception("SharedMemory") << "in growBuffer while removing the shared memory object named '" << bufferNames_[bufferInfo_->index_] @@ -62,8 +74,7 @@ void WriteBuffer::growBuffer(std::size_t iLength) { } } try { - sm_ = std::make_unique( - boost::interprocess::open_or_create, bufferNames_[newBuffer].c_str(), iLength + 1024); + sm_ = SMOwner(bufferNames_[newBuffer], iLength + 1024); } catch (boost::interprocess::interprocess_exception const& iExcept) { throw cms::Exception("SharedMemory") << "in growBuffer while creating the shared memory object '" << bufferNames_[newBuffer] << "' of length " << iLength + 1024