22 #ifndef __M2ETIS_PUBSUB_FILTER_GENERALBOOLEANEXPRESSIONSFILTER_H__
23 #define __M2ETIS_PUBSUB_FILTER_GENERALBOOLEANEXPRESSIONSFILTER_H__
37 #include "boost/tuple/tuple.hpp"
58 template<
typename EventType,
typename NetworkType>
75 addSubscription(self_, filter);
78 auto filters = std::set<boost::shared_ptr<FilterExp<EventType>>>();
80 for (
auto subscription : subscriptions_) {
81 filters.insert(boost::get<1>(subscription.second));
85 fInfo->dynamic_filter_ = filters;
86 fInfo->isUnsubscribe_ =
false;
91 for (
auto predicateID_subscription_pair : predicate_subscription_association_table_) {
92 predicate_identifier_factory_.
freeID(predicateID_subscription_pair.first);
95 for (
auto subscriptionID_subscription_pair : subscription_subscriber_association_table) {
96 subscription_identifier_factory_.
freeID(subscriptionID_subscription_pair.first);
99 has_new_subscription_ =
true;
101 subscription_subscriptionID_association_table.clear();
102 subscription_subscriber_association_table.clear();
103 subscriptions_.clear();
104 predicate_subscription_association_table_.clear();
105 predicate_indexes_.clear();
108 fInfo->dynamic_filter_ = std::set<boost::shared_ptr<FilterExp<EventType>>>();
112 bool hasMoreSubscribers;
113 bool hasMoreSubscriptions;
115 removeSubscription(self_, filter, &hasMoreSubscribers, &hasMoreSubscriptions);
117 if (!hasMoreSubscriptions) {
120 fInfo->dynamic_filter_ = std::set<boost::shared_ptr<FilterExp<EventType>>>();
125 if (hasMoreSubscribers) {
130 fInfo->dynamic_filter_ = std::set<boost::shared_ptr<FilterExp<EventType>>>({filter});
131 fInfo->isUnsubscribe_ =
true;
143 bool hasMoreSubscribers;
144 bool hasMoreSubscriptions;
146 if (fInfo->isUnsubscribe_) {
148 removeSubscription(sender, *(fInfo->dynamic_filter_.begin()), &hasMoreSubscribers, &hasMoreSubscriptions);
153 for (
auto filter : fInfo->dynamic_filter_) {
154 addSubscription(sender, filter);
159 bool is_filter_found =
false;
160 auto erasable_subscriptions = std::vector<boost::shared_ptr<FilterExp<EventType>>>();
162 for (
auto filter_filterID_pair : subscription_subscriptionID_association_table) {
163 for (
auto filter_ptr : fInfo->dynamic_filter_) {
164 if (*(filter_filterID_pair.first) == *filter_ptr) {
165 is_filter_found =
true;
170 if (!is_filter_found) {
171 erasable_subscriptions.push_back(filter_filterID_pair.first);
174 is_filter_found =
false;
177 for (
auto erasable_subscription : erasable_subscriptions) {
178 removeSubscription(sender, erasable_subscription, &hasMoreSubscribers, &hasMoreSubscriptions);
185 bool hasMoreSubscribers;
186 bool hasMoreSubscriptions;
190 for (
auto filter_filterID_pair : subscription_subscriptionID_association_table) {
191 removeSubscription(sender, (filter_filterID_pair.first), &hasMoreSubscribers, &hasMoreSubscriptions);
197 bool hasMoreSubscribers;
198 bool hasMoreSubscriptions;
200 for (
auto filter_filterID_pair : subscription_subscriptionID_association_table) {
201 removeSubscription(sender, (filter_filterID_pair.first), &hasMoreSubscribers, &hasMoreSubscriptions);
211 static std::set<typename NetworkType::Key> matching_subscribers;
213 if (!event || current_event_ != event || has_new_subscription_) {
214 matching_subscribers.clear();
215 has_new_subscription_ =
false;
216 current_event_ = event;
218 for (
auto predicate : fulfilled_predicate_vector_) {
222 for (
auto predicate_index : predicate_indexes_) {
223 predicate_index->determineMatchingPredicates(*event, fulfilled_predicate_vector_);
227 std::map<SubscriptionIdentifierFactory::SubscriptionID, int> hit_vector;
230 for (
auto predicate : fulfilled_predicate_vector_) {
232 auto hit_subscriptions = predicate_subscription_association_table_[current_predicate_id];
233 for (
auto hit_subscription : hit_subscriptions) {
235 auto previous_hit_number = 0;
236 auto iter_hit_vector = hit_vector.find(hit_subscription);
237 if (iter_hit_vector != hit_vector.end()) {
238 previous_hit_number = hit_vector.at(hit_subscription);
240 hit_vector[hit_subscription] = previous_hit_number + 1;
241 M2ETIS_LOG_INFO(
"GeneralBooleanExpression",
"belonging to predicate id " << current_predicate_id <<
"new hit number " << previous_hit_number + 1);
244 ++current_predicate_id;
250 for (
auto & subscription : subscriptions_) {
251 auto iter_hit_vector = hit_vector.find(subscription.first);
252 if (iter_hit_vector != hit_vector.end()) {
254 if (boost::get<0>(subscription.second) <= hit_vector.at(subscription.first)) {
256 (boost::get<1>(subscription.second))->Accept(dynamic_filter_visitor);
259 auto subscribers_to_subscriptionID = subscription_subscriber_association_table.find(subscription.first)->second;
260 matching_subscribers.insert(subscribers_to_subscriptionID.begin(), subscribers_to_subscriptionID.end());
267 return (matching_subscribers.find(to) == matching_subscribers.end() ?
false :
true);
275 virtual void setSelf(
const typename NetworkType::Key &
self)
override {
282 void addSubscription(
const typename NetworkType::Key & subscriber_key, boost::shared_ptr<
FilterExp<EventType>> filter) {
284 for (
auto filter_filterID_pair : subscription_subscriptionID_association_table) {
285 if (*(filter_filterID_pair.first) == *filter) {
286 subscription_subscriber_association_table[filter_filterID_pair.second].insert(subscriber_key);
291 has_new_subscription_ =
true;
294 subscription_subscriber_association_table[subscription_id] = std::set<typename NetworkType::Key>({subscriber_key});
296 GeneralBooleanExpressionsPreProcessVisitor<EventType, NetworkType> preprocess_visitor(&predicate_indexes_, &predicate_subscription_association_table_, subscription_id, &predicate_identifier_factory_);
298 filter->Accept(preprocess_visitor);
301 if (!predicate_subscription_association_table_.empty()) {
302 auto maxPredicateID = (--predicate_subscription_association_table_.end())->first;
303 fulfilled_predicate_vector_ = std::vector<bool>(maxPredicateID + 1,
false);
305 fulfilled_predicate_vector_ = std::vector<bool>();
312 boost::tuple<int, boost::shared_ptr<FilterExp<EventType>>> subscription_data = {0, filter};
315 GetMinPredicatesVisitor<EventType> get_min_predicates_visitor;
317 filter->Accept(get_min_predicates_visitor);
319 boost::get<0>(subscription_data) = get_min_predicates_visitor.get_result();
321 subscriptions_[subscription_id] = subscription_data;
323 subscription_subscriptionID_association_table.push_back(std::make_pair(filter, subscription_id));
330 void removeSubscription(
const typename NetworkType::Key & subscriber_key, boost::shared_ptr<FilterExp<EventType>> filter,
bool * hasMoreSubscribers,
bool * hasMoreSubscriptions) {
331 *hasMoreSubscriptions =
true;
332 *hasMoreSubscribers =
true;
336 auto erasable_subscription_iter = subscription_subscriptionID_association_table.end();
338 for (
auto filter_id_pair_iter = subscription_subscriptionID_association_table.begin(); filter_id_pair_iter != subscription_subscriptionID_association_table.end(); ++filter_id_pair_iter) {
339 if (*(filter_id_pair_iter->first) == *filter) {
340 subscriptionID = filter_id_pair_iter->second;
341 erasable_subscription_iter = filter_id_pair_iter;
345 if (erasable_subscription_iter == subscription_subscriptionID_association_table.end()) {
347 *hasMoreSubscribers =
true;
350 checkForSubscriptions(subscriber_key, hasMoreSubscriptions);
355 subscription_subscriber_association_table[subscriptionID].erase(subscriber_key);
357 if (subscription_subscriber_association_table[subscriptionID].
size() == 0) {
359 *hasMoreSubscribers =
false;
360 subscription_subscriber_association_table.erase(subscriptionID);
362 subscriptions_.erase(subscriptionID);
363 subscription_subscriptionID_association_table.erase(erasable_subscription_iter);
367 std::set<PredicateIdentifierFactory::PredicateID> erasable_predicates;
369 for (
auto & predicateID_subscriptionSet_pair : predicate_subscription_association_table_) {
370 auto number_of_subscriptions_to_predicate = predicateID_subscriptionSet_pair.second.count(subscriptionID);
372 if (number_of_subscriptions_to_predicate == 0) {
377 if (number_of_subscriptions_to_predicate == 1) {
384 erasable_predicates.insert(predicateID_subscriptionSet_pair.first);
386 for (
auto predicate_index : predicate_indexes_) {
387 predicate_index->removePredicate(predicateID_subscriptionSet_pair.first);
391 predicateID_subscriptionSet_pair.second.erase(predicateID_subscriptionSet_pair.second.find(subscriptionID));
394 for (
auto erasable_predicate : erasable_predicates) {
395 predicate_subscription_association_table_.erase(erasable_predicate);
398 subscription_identifier_factory_.
freeID(subscriptionID);
403 checkForSubscriptions(subscriber_key, hasMoreSubscriptions);
406 void checkForSubscriptions(
const typename NetworkType::Key & subscriber_key,
bool * hasMoreSubscriptions) {
408 *hasMoreSubscriptions =
false;
410 for (
auto subscription_subscriber_pair : subscription_subscriber_association_table) {
411 if (subscription_subscriber_pair.second.count(subscriber_key) > 0) {
412 *hasMoreSubscriptions =
true;
419 typename NetworkType::Key self_;
430 std::vector<std::shared_ptr<PredicateIndex<EventType>>> predicate_indexes_;
433 std::vector<bool> fulfilled_predicate_vector_;
438 std::map<PredicateIdentifierFactory::PredicateID, std::multiset<SubscriptionIdentifierFactory::SubscriptionID>> predicate_subscription_association_table_;
440 SubscriptionIdentifierFactory subscription_identifier_factory_;
441 PredicateIdentifierFactory predicate_identifier_factory_;
447 std::map<SubscriptionIdentifierFactory::SubscriptionID, boost::tuple<int, boost::shared_ptr<FilterExp<EventType>>>> subscriptions_;
449 std::map<SubscriptionIdentifierFactory::SubscriptionID, std::set<typename NetworkType::Key>> subscription_subscriber_association_table;
452 bool has_new_subscription_;
453 boost::shared_ptr<EventType> current_event_;
virtual ~GeneralBooleanExpressionsFilter()
virtual void processRoutingStrategyUnsubscribeNotification(const typename NetworkType::Key &sender) override
removes all filters of the sender with the key given intended for routing strategies to signal purged...
void freeID(SubscriptionID id)
FilterUnsubscribeInformation
virtual void getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo) override
removes all filters
std::vector< bool >::size_type PredicateID
boost::shared_ptr< EventType > PayloadPtr
void freeID(PredicateID id)
boost::shared_ptr< FilterInfo > Ptr
#define M2ETIS_LOG_INFO(module, message)
virtual bool match(const typename NetworkType::Key &to, typename message::FilterInfo::Ptr filterInfo, typename BaseFilter< EventType, NetworkType >::PayloadPtr event) override
virtual BaseFilter< EventType, NetworkType >::FilterUnsubscribeInformation getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo, boost::shared_ptr< FilterExp< EventType >> filter) override
virtual void getSubscribePayload(boost::shared_ptr< FilterExp< EventType >> filter, bool, typename message::FilterInfo::Ptr filterInfo) override
boost::shared_ptr< GeneralBooleanExpressionsFilterInfo< EventType > > Ptr
virtual std::string processSubscribePayload(const typename NetworkType::Key &sender, typename message::FilterInfo::Ptr filterInfo) override
processes the set of received filters from sender
#define M2ETIS_LOG_ERROR(module, message)
SubscriptionID createSubscriptionIdentifier()
virtual std::string getPublishPayload(const typename BaseFilter< EventType, NetworkType >::PayloadPtr message_text) const override
message::GeneralBooleanExpressionsFilterInfo< EventType > FilterInfoType
virtual void setSelf(const typename NetworkType::Key &self) override
virtual void processUnsubscribePayload(const typename NetworkType::Key &sender, typename message::FilterInfo::Ptr filterInfo) override
removes all filters of the sender with the key given
virtual bool match(typename BaseFilter< EventType, NetworkType >::PayloadPtr event) override
GeneralBooleanExpressionsFilter()