/* SPDX-License-Identifier: MPL-2.0 */ #include "precompiled.hpp" #include "dist.hpp" #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" #include "likely.hpp" zmq::dist_t::dist_t () : _matching (0), _active (0), _eligible (0), _more (false) { } zmq::dist_t::~dist_t () { zmq_assert (_pipes.empty ()); } void zmq::dist_t::attach (pipe_t *pipe_) { // If we are in the middle of sending a message, we'll add new pipe // into the list of eligible pipes. Otherwise we add it to the list // of active pipes. if (_more) { _pipes.push_back (pipe_); _pipes.swap (_eligible, _pipes.size () - 1); _eligible++; } else { _pipes.push_back (pipe_); _pipes.swap (_active, _pipes.size () - 1); _active++; _eligible++; } } bool zmq::dist_t::has_pipe (pipe_t *pipe_) { std::size_t claimed_index = _pipes.index (pipe_); // If pipe claims to be outside the available index space it can't be in the distributor. if (claimed_index >= _pipes.size ()) { return false; } return _pipes[claimed_index] == pipe_; } void zmq::dist_t::match (pipe_t *pipe_) { // If pipe is already matching do nothing. if (_pipes.index (pipe_) < _matching) return; // If the pipe isn't eligible, ignore it. if (_pipes.index (pipe_) >= _eligible) return; // Mark the pipe as matching. _pipes.swap (_pipes.index (pipe_), _matching); _matching++; } void zmq::dist_t::reverse_match () { const pipes_t::size_type prev_matching = _matching; // Reset matching to 0 unmatch (); // Mark all matching pipes as not matching and vice-versa. // To do this, push all pipes that are eligible but not // matched - i.e. between "matching" and "eligible" - // to the beginning of the queue. for (pipes_t::size_type i = prev_matching; i < _eligible; ++i) { _pipes.swap (i, _matching++); } } void zmq::dist_t::unmatch () { _matching = 0; } void zmq::dist_t::pipe_terminated (pipe_t *pipe_) { // Remove the pipe from the list; adjust number of matching, active and/or // eligible pipes accordingly. if (_pipes.index (pipe_) < _matching) { _pipes.swap (_pipes.index (pipe_), _matching - 1); _matching--; } if (_pipes.index (pipe_) < _active) { _pipes.swap (_pipes.index (pipe_), _active - 1); _active--; } if (_pipes.index (pipe_) < _eligible) { _pipes.swap (_pipes.index (pipe_), _eligible - 1); _eligible--; } _pipes.erase (pipe_); } void zmq::dist_t::activated (pipe_t *pipe_) { // Move the pipe from passive to eligible state. if (_eligible < _pipes.size ()) { _pipes.swap (_pipes.index (pipe_), _eligible); _eligible++; } // If there's no message being sent at the moment, move it to // the active state. if (!_more && _active < _pipes.size ()) { _pipes.swap (_eligible - 1, _active); _active++; } } int zmq::dist_t::send_to_all (msg_t *msg_) { _matching = _active; return send_to_matching (msg_); } int zmq::dist_t::send_to_matching (msg_t *msg_) { // Is this end of a multipart message? const bool msg_more = (msg_->flags () & msg_t::more) != 0; // Push the message to matching pipes. distribute (msg_); // If multipart message is fully sent, activate all the eligible pipes. if (!msg_more) _active = _eligible; _more = msg_more; return 0; } void zmq::dist_t::distribute (msg_t *msg_) { // If there are no matching pipes available, simply drop the message. if (_matching == 0) { int rc = msg_->close (); errno_assert (rc == 0); rc = msg_->init (); errno_assert (rc == 0); return; } if (msg_->is_vsm ()) { for (pipes_t::size_type i = 0; i < _matching;) { if (!write (_pipes[i], msg_)) { // Use same index again because entry will have been removed. } else { ++i; } } int rc = msg_->init (); errno_assert (rc == 0); return; } // Add matching-1 references to the message. We already hold one reference, // that's why -1. msg_->add_refs (static_cast (_matching) - 1); // Push copy of the message to each matching pipe. int failed = 0; for (pipes_t::size_type i = 0; i < _matching;) { if (!write (_pipes[i], msg_)) { ++failed; // Use same index again because entry will have been removed. } else { ++i; } } if (unlikely (failed)) msg_->rm_refs (failed); // Detach the original message from the data buffer. Note that we don't // close the message. That's because we've already used all the references. const int rc = msg_->init (); errno_assert (rc == 0); } bool zmq::dist_t::has_out () { return true; } bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { _pipes.swap (_pipes.index (pipe_), _matching - 1); _matching--; _pipes.swap (_pipes.index (pipe_), _active - 1); _active--; _pipes.swap (_active, _eligible - 1); _eligible--; return false; } if (!(msg_->flags () & msg_t::more)) pipe_->flush (); return true; } bool zmq::dist_t::check_hwm () { for (pipes_t::size_type i = 0; i < _matching; ++i) if (!_pipes[i]->check_hwm ()) return false; return true; }