Support Class Library
A set of tools providing classes and utility
Channel.h
Go to the documentation of this file.
1 #pragma once
2 #include <mutex>
3 #include <queue>
4 #include <deque>
5 #include <condition_variable>
6 #include <chrono>
7 #include <tuple>
8 #include <utility>
9 #include <scl/macros.h>
12 
13 //TODO: Refactor names i.e. sender -> emitter, send -> emit
14 
15 namespace scl{
16  namespace async{
17  template <class, class, template<class> class, class>
18  class Channel;
19 
20  namespace details{
21  template <class>
23 
24  template <class>
26 
31  template <class Chan>
37  using channel_type = Chan;
38 
43  using sender_type = typename channel_type::sender_type;
44 
49  using receiver_type = typename channel_type::receiver_type;
50 
55  using type = std::tuple<sender_type&, receiver_type&>;
56 
63  static type factory(sender_type& sender, receiver_type& receiver){
64  return type{sender, receiver};
65  }
66  };
67 
72  template <class Actor>
78  using actor_type = Actor;
79 
85 
90  using lock_type = typename channel_type::lock_type;
91 
97  template <class L>
98  using guard_type = typename channel_type::template guard_type<L>;
99 
104  using value_type = typename channel_type::value_type;
105 
110  using queue_type = typename channel_type::queue_type;
111 
116 
120  std::unique_lock<lock_type> lock_;
121 
126  explicit channel_actor_traits(actor_type& act) : actor{&act} {
127  this->lock_ = {this->actor->channel->lock, std::defer_lock};
128  }
129 
133  void lock(){
134  this->lock_.lock();
135  }
136 
140  void unlock(){
141  this->lock_.unlock();
142  }
143 
147  void notify(){
148  this->actor->channel->condition.notify_one();
149  }
150 
156  void wait(){
157  this->lock();
158  this->actor->channel->condition.wait(this->lock_);
159  }
160 
168  template <class Pred>
169  void waitUntil(Pred&& pred){
170  this->lock();
171  this->actor->channel->condition.wait(
172  this->lock_,
173  std::forward<Pred>(pred)
174  );
175  }
176 
186  template <class Rep, class Period>
187  std::cv_status waitFor(const std::chrono::duration<Rep, Period>& time){
188  this->lock();
189  return this->actor->channel->condition.wait_for(this->lock_, time);
190  }
191 
202  template <class Rep, class Period, class Pred>
203  bool waitFor(const std::chrono::duration<Rep, Period>& time, Pred&& pred){
204  this->lock();
205  return this->actor->channel->condition.wait_for(
206  this->lock_,
207  time,
208  std::forward<Pred>(pred)
209  );
210  }
211  };
212  }
213 
221  template <class T, class Lock = std::mutex, template<class> class Guard = std::lock_guard, class Container = std::deque<T>>
222  class Channel{
223  public:
228  using lock_type = Lock;
229 
235  template <class L>
236  using guard_type = Guard<L>;
237 
242  using queue_type = std::queue<T, Container>;
243 
248  using value_type = typename queue_type::value_type;
249 
254  using size_type = typename queue_type::size_type;
255 
261 
267 
273 
279 
280  friend sender_type;
284 
285  template <size_t I, class U, class L, template<class> class G, class C>
286  friend auto std::get(Channel<U, L, G, C>& chan) -> META::tuple_element_t<I, typename Channel<U, L, G, C>::transport_type>;
287 
288  protected:
292  mutable lock_type lock;
293 
297  mutable std::condition_variable_any condition;
298 
303 
308 
313 
319  return transport_traits::factory(this->sender(), this->receiver());
320  }
321 
322  public:
326  Channel() : lock{}, condition{}, queue{}, sender_{*this}, receiver_{*this} {
327  }
328 
329  Channel(const Channel&) = delete;
330  Channel(Channel&&) = delete;
331  Channel& operator=(const Channel&) = delete;
332  Channel& operator=(Channel&&) = delete;
333 
338  size_type size() const{
339  guard_type<lock_type> _{lock};
340  return this->queue.size();
341  }
342 
346  size_type length() const{
347  return this->size();
348  }
349 
354  bool isEmpty() const{
355  return this->size() <= 0;
356  }
357 
363  return this->receiver_;
364 // return receiver_type{this};
365  }
366 
372  return this->sender_;
373 // return sender_type{this};
374  }
375 
382  template <class U = value_type>
383  sender_type& operator<<(U&& value){
384  return this->sender() << std::forward<U>(value);
385  }
386 
393  template <class U = value_type>
395  return this->receiver() >> value;
396  }
397  };
398 
409  template <class U, class T, class L, template<class> class G, class C>
410  void operator>>(U&& value, Channel<T, L, G, C>& channel) {
411  channel << std::forward<U>(value);
412  }
413 
424  template <class U, class T, class L, template<class> class G, class C>
425  void operator<<(U& value, Channel<T, L, G, C>& channel){
426  channel >> value;
427  }
428 
429  namespace details{
434  template <class Chan>
435  class ChannelSender{
436  public:
441  using channel_type = Chan;
442 
448 
454 
459  template <class L>
460  using guard_type = typename sender_traits::template guard_type<L>;
461 
467 
473 
475  friend channel_type;
476 
477  protected:
482 
487 
492  explicit ChannelSender(channel_type& chan) : channel{&chan}, traits{*this} {
493  }
494 
501  template <class... Args>
502  ChannelSender& doPush(Args&&... args){
503  guard_type<sender_traits> _{this->traits};
504  this->channel->queue.emplace(std::forward<Args>(args)...);
505  this->traits.notify();
506  return *this;
507  }
508 
509  public:
510  ChannelSender(const ChannelSender&) = delete;
511  ChannelSender& operator=(const ChannelSender&) = delete;
512  ChannelSender(ChannelSender&&) = delete;
513  ChannelSender& operator=(ChannelSender&&) = delete;
514 
521  template <class U = value_type>
522  ChannelSender& push(U&& value){
523  return this->doPush(std::forward<U>(value));
524  }
525 
532  template <class... Args>
533  ChannelSender& pushEmplace(Args&&... args){
534  return this->doPush(std::forward<Args&&>(args)...);
535  }
536 
540  template <class U = value_type>
541  ChannelSender& queue(U&& value){
542  return this->push(std::forward<U>(value));
543  }
544 
548  template <class U = value_type>
549  ChannelSender& enqueue(U&& value){
550  return this->push(std::forward<U>(value));
551  }
552 
556  template <class U = value_type>
557  ChannelSender& send(U&& value){ return this->push(value); }
558 
562  template <class U = value_type>
563  ChannelSender& operator<<(U&& value){ return this->push(value); }
564  };
565 
566  template <class Chan>
567  class ChannelReceiver{
568  public:
573  using channel_type = Chan;
574 
580 
586 
592  template <class L>
593  using guard_type = typename receiver_traits::template guard_type<L>;
594 
600 
606 
612 
614  friend channel_type;
615 
616  protected:
621 
626 
631  explicit ChannelReceiver(channel_type& chan) : channel{&chan}, traits{*this} {
632  }
633 
634  public:
635  ChannelReceiver(const ChannelReceiver&) = delete;
636  ChannelReceiver& operator=(const ChannelReceiver&) = delete;
637  ChannelReceiver(ChannelReceiver&&) = delete;
638  ChannelReceiver& operator=(ChannelReceiver&&) = delete;
639 
645  this->traits.waitUntil([&]{
646  return !this->channel->queue.empty();
647  });
648 
649  auto value = this->channel->queue.front(); //get front
650  this->channel->queue.pop(); //remove from queue
651  this->traits.unlock();
652  return value;
653  }
654 
662  template <class Rep, class Period>
663  optional_type tryPop(const std::chrono::duration<Rep, Period>& duration){
664  bool isEmpty = this->traits.waitFor(duration, [&]{
665  return !this->channel->queue.empty();
666  });
667 
668  if(isEmpty)
669  return optional_type{};
670 
671  auto value = this->channel->queue.front(); //get front
672  this->channel->queue.pop(); //remove from queue
673  this->traits.unlock();
674  return optional_type{std::move(value)};
675  }
676 
680  value_type dequeue(){ return this->pop(); }
681 
685  value_type receive(){ return this->pop(); }
686 
690  template <class Rep, class Period>
691  optional_type tryDeque(const std::chrono::duration<Rep, Period>& duration){
692  return this->tryPop(duration);
693  }
694 
698  template <class Rep, class Period>
699  optional_type tryReceive(const std::chrono::duration<Rep, Period>& duration){
700  return this->tryPop(duration);
701  }
702 
708  template <class U = value_type>
710  value = std::move(this->receive());
711  return *this;
712  }
713  };
714  }
715  }
716 }
717 
718 namespace std{
729  template <size_t I, class T, class Lock, template<class> class Guard, class Container>
731  return std::get<I>(channel.interface());
732  }
733 
734  template <class T, class Lock, template<class> class Guard, class Container>
735  struct tuple_size<scl::async::Channel<T, Lock, Guard, Container>>
736  : std::tuple_size<typename scl::async::Channel<T, Lock, Guard, Container>::transport_type>{
737  };
738 }
typename std::tuple_element< I, Tuple >::type tuple_element_t
Definition: type_query.h:38
void lock()
Lock the channel.
Definition: Channel.h:133
size_type size() const
Retrieve the size of the queue.
Definition: Channel.h:338
typename sender_traits::queue_type queue_type
The type of queue used in the channel.
Definition: Channel.h:466
typename channel_type::queue_type queue_type
The type of the queue used in the channel.
Definition: Channel.h:110
bool isEmpty() const
Determine whether or not the channel is currently empty.
Definition: Channel.h:354
optional_type tryPop(const std::chrono::duration< Rep, Period > &duration)
Wait for a given duration before popping.
Definition: Channel.h:663
typename receiver_traits::queue_type queue_type
The type of queue used by the channel.
Definition: Channel.h:599
ChannelReceiver(channel_type &chan)
Construct a receiver from its channel.
Definition: Channel.h:631
ChannelSender & enqueue(U &&value)
Alias for ChannelSender::push.
Definition: Channel.h:549
typename receiver_traits::lock_type lock_type
The type of locks used by the traits.
Definition: Channel.h:585
sender_traits traits
The traits for the sender.
Definition: Channel.h:486
std::unique_lock< lock_type > lock_
The handle to the channel&#39;s lock.
Definition: Channel.h:120
channel_type * channel
A (safe) pointer to the channel.
Definition: Channel.h:620
void unlock()
Unlock the channel.
Definition: Channel.h:140
Chan channel_type
The type of the channel.
Definition: Channel.h:37
Global namespace of the SCL.
Definition: alias.hpp:3
typename sender_traits::lock_type lock_type
The type of lock used in the sender.
Definition: Channel.h:453
lock_type lock
The lock being used for the channel&#39;s queue.
Definition: Channel.h:292
void operator>>(U &&value, Channel< T, L, G, C > &channel)
Forward a value into a channel.
Definition: Channel.h:410
std::mutex lock_type
lock_type The type of locks used in the channel
Definition: Channel.h:228
typename receiver_traits::value_type value_type
The type of values used by the channel.
Definition: Channel.h:605
sender_type & operator<<(U &&value)
Forward a value into the channel.
Definition: Channel.h:383
STL namespace.
receiver_type & receiver()
Get a receiver for this channel.
Definition: Channel.h:362
std::lock_guard< L > guard_type
Definition: Channel.h:236
A channel to use for asynchronous communication of data in a queued manner.
Definition: Channel.h:18
Traits for actors that interact with the channel.
Definition: Channel.h:73
Traits for the implementation of the transport of information in a scl::async::Channel.
Definition: Channel.h:32
std::queue< message_type, std::deque< message_type > > queue_type
queue_type The type of the channel&#39;s queue
Definition: Channel.h:242
void notify()
Notify the channel.
Definition: Channel.h:147
static type factory(sender_type &sender, receiver_type &receiver)
Create a transport payload from a sender and a receiver.
Definition: Channel.h:63
typename channel_type::template guard_type< L > guard_type
Definition: Channel.h:98
value_type receive()
Alias for ChannelReceiver::receive.
Definition: Channel.h:685
typename queue_type::value_type value_type
The type of values exposed through the channel.
Definition: Channel.h:248
ChannelSender & send(U &&value)
Alias for ChannelSender::push.
Definition: Channel.h:557
std::tuple< sender_type &, receiver_type & > type
The type of transport payload.
Definition: Channel.h:55
channel_actor_traits(actor_type &act)
Construct a trait from a pointer to its actor.
Definition: Channel.h:126
ChannelSender & queue(U &&value)
Alias for ChannelSender::push.
Definition: Channel.h:541
std::cv_status waitFor(const std::chrono::duration< Rep, Period > &time)
Wait for a specific duration for the channel.
Definition: Channel.h:187
std::condition_variable_any condition
The condition used to wait for the channel&#39;s queue.
Definition: Channel.h:297
receiver_type receiver_
The receiver for this channel.
Definition: Channel.h:312
void waitUntil(Pred &&pred)
Wait for the channel until a condition is met.
Definition: Channel.h:169
ChannelSender & pushEmplace(Args &&... args)
Construct and push in-place the value.
Definition: Channel.h:533
optional_type tryReceive(const std::chrono::duration< Rep, Period > &duration)
Alias for ChannelReceiver::tryPop.
Definition: Channel.h:699
typename channel_type::receiver_type receiver_type
The type of the receiver.
Definition: Channel.h:49
typename transport_traits::type transport_type
The type of the transport payload.
Definition: Channel.h:278
typename sender_traits::value_type value_type
The type of values used in the channel.
Definition: Channel.h:472
typename channel_type::value_type value_type
The type of values transmitted through channels.
Definition: Channel.h:104
typename queue_type::size_type size_type
The type used to quantize the size of the queue.
Definition: Channel.h:254
typename channel_type::sender_type sender_type
sender_type The type of the sender
Definition: Channel.h:43
constexpr scl::utils::Placeholder _
Definition: Placeholder.h:39
typename actor_type::channel_type channel_type
The type of the channel.
Definition: Channel.h:84
value_type pop()
Get a value from the channel.
Definition: Channel.h:644
ChannelSender & push(U &&value)
Forward a value into the queue.
Definition: Channel.h:522
ChannelSender & doPush(Args &&... args)
Implementation details to push data onto the channel.
Definition: Channel.h:502
typename sender_traits::template guard_type< L > guard_type
The type of guards used in the sender.
Definition: Channel.h:460
sender_type sender_
The sender for this channel.
Definition: Channel.h:307
transport_type interface()
Implementation details to get a receiver and sender for this channel.
Definition: Channel.h:318
Class used to send data to a Channel.
Definition: Channel.h:22
channel_type * channel
A (safe) pointer to the channel.
Definition: Channel.h:481
actor_type * actor
A (safe) pointer to the actor.
Definition: Channel.h:115
typename channel_type::lock_type lock_type
The type of lock used by the channel.
Definition: Channel.h:90
ChannelSender & operator<<(U &&value)
Alias for ChannelSender::push.
Definition: Channel.h:563
void wait()
Wait for the channel.
Definition: Channel.h:156
Channel()
Construct a channel.
Definition: Channel.h:326
value_type dequeue()
Alias for ChannelReceiver::receive.
Definition: Channel.h:680
size_type length() const
Alias for Channel::size.
Definition: Channel.h:346
ChannelSender(channel_type &chan)
Construct a sender from a (safe) pointer to its channel.
Definition: Channel.h:492
friend receiver_type
Definition: Channel.h:281
receiver_traits traits
The traits for this receiver.
Definition: Channel.h:625
optional_type tryDeque(const std::chrono::duration< Rep, Period > &duration)
Alias for ChannelReceiver::tryPop.
Definition: Channel.h:691
receiver_type & operator>>(U &value)
Extract a value from the channel.
Definition: Channel.h:394
friend sender_type
Definition: Channel.h:280
sender_type & sender()
Get a sender for this channel.
Definition: Channel.h:371
Chan channel_type
The type of the channel.
Definition: Channel.h:573
typename receiver_traits::template guard_type< L > guard_type
Definition: Channel.h:593
ChannelReceiver & operator>>(U &value)
Alias for ChannelReceiver::receive.
Definition: Channel.h:709
queue_type queue
The underlying queue used to store data.
Definition: Channel.h:302
bool waitFor(const std::chrono::duration< Rep, Period > &time, Pred &&pred)
Wait for a given period of time until the predicate is satisfied.
Definition: Channel.h:203