libfly  6.2.2
C++20 utility library for Linux, macOS, and Windows
task_runner.hpp
1 #pragma once
2 
3 #include "fly/fly.hpp"
4 #include "fly/task/types.hpp"
5 
6 #include <chrono>
7 #include <cstdint>
8 #include <functional>
9 #include <memory>
10 #include <mutex>
11 #include <queue>
12 #include <type_traits>
13 
17 #define FROM_HERE \
18  fly::task::TaskLocation({__FILE__, __FUNCTION__, static_cast<std::uint32_t>(__LINE__)})
19 
20 namespace fly::task {
21 
22 class TaskManager;
23 
112 class TaskRunner : public std::enable_shared_from_this<TaskRunner>
113 {
114  friend class TaskManager;
115 
116 public:
120  virtual ~TaskRunner() = default;
121 
132  template <typename TaskType>
133  bool post_task(TaskLocation &&location, TaskType &&task);
134 
150  template <typename OwnerType, typename TaskType>
151  bool post_task(TaskLocation &&location, std::weak_ptr<OwnerType> weak_owner, TaskType &&task);
152 
169  template <typename TaskType, typename ReplyType>
170  bool post_task_with_reply(TaskLocation &&location, TaskType &&task, ReplyType &&reply);
171 
195  template <typename OwnerType, typename TaskType, typename ReplyType>
197  TaskLocation &&location,
198  std::weak_ptr<OwnerType> weak_owner,
199  TaskType &&task,
200  ReplyType &&reply);
201 
213  template <typename TaskType>
214  bool
215  post_task_with_delay(TaskLocation &&location, std::chrono::milliseconds delay, TaskType &&task);
216 
233  template <typename OwnerType, typename TaskType>
235  TaskLocation &&location,
236  std::weak_ptr<OwnerType> weak_owner,
237  std::chrono::milliseconds delay,
238  TaskType &&task);
239 
257  template <typename TaskType, typename ReplyType>
259  TaskLocation &&location,
260  std::chrono::milliseconds delay,
261  TaskType &&task,
262  ReplyType &&reply);
263 
288  template <typename OwnerType, typename TaskType, typename ReplyType>
290  TaskLocation &&location,
291  std::weak_ptr<OwnerType> weak_owner,
292  std::chrono::milliseconds delay,
293  TaskType &&task,
294  ReplyType &&reply);
295 
296 protected:
302  TaskRunner(std::shared_ptr<TaskManager> task_manager) noexcept;
303 
312  virtual bool post_task_internal(TaskLocation &&location, Task &&task) = 0;
313 
319  virtual void task_complete(TaskLocation &&location) = 0;
320 
329  bool post_task_to_task_manager(TaskLocation &&location, Task &&task);
330 
343  TaskLocation &&location,
344  std::chrono::milliseconds delay,
345  Task &&task);
346 
347 private:
351  template <typename TaskType>
352  struct TaskHolder
353  {
354  TaskType m_task;
355  };
356 
366  template <typename TaskType>
367  Task wrap_task(TaskType &&task);
368 
382  template <typename OwnerType, typename TaskType>
383  Task wrap_task(std::weak_ptr<OwnerType> weak_owner, TaskType &&task);
384 
399  template <typename TaskType, typename ReplyType>
400  Task wrap_task(TaskType &&task, ReplyType &&reply);
401 
421  template <typename OwnerType, typename TaskType, typename ReplyType>
422  Task wrap_task(std::weak_ptr<OwnerType> weak_owner, TaskType &&task, ReplyType &&reply);
423 
430  void execute(TaskLocation &&location, Task &&task);
431 
432  std::weak_ptr<TaskManager> m_weak_task_manager;
433 };
434 
443 {
444 public:
452  static std::shared_ptr<ParallelTaskRunner> create(std::shared_ptr<TaskManager> task_manager);
453 
454 protected:
455  explicit ParallelTaskRunner(std::shared_ptr<TaskManager> task_manager) noexcept;
456 
465  bool post_task_internal(TaskLocation &&location, Task &&task) override;
466 
472  void task_complete(TaskLocation &&location) override;
473 };
474 
488 {
489 public:
497  static std::shared_ptr<SequencedTaskRunner> create(std::shared_ptr<TaskManager> task_manager);
498 
499 protected:
500  explicit SequencedTaskRunner(std::shared_ptr<TaskManager> task_manager) noexcept;
501 
512  bool post_task_internal(TaskLocation &&location, Task &&task) override;
513 
519  void task_complete(TaskLocation &&location) override;
520 
521 private:
525  struct PendingTask
526  {
527  TaskLocation m_location;
528  Task m_task;
529  };
530 
541  bool maybe_post_task(TaskLocation &&location, Task &&task);
542 
543  std::mutex m_pending_tasks_mutex;
544  std::queue<PendingTask> m_pending_tasks;
545  bool m_has_running_task {false};
546 };
547 
548 //==================================================================================================
549 template <typename TaskType>
550 bool TaskRunner::post_task(TaskLocation &&location, TaskType &&task)
551 {
552  return post_task_internal(std::move(location), wrap_task(std::forward<TaskType>(task)));
553 }
554 
555 //==================================================================================================
556 template <typename OwnerType, typename TaskType>
558  TaskLocation &&location,
559  std::weak_ptr<OwnerType> weak_owner,
560  TaskType &&task)
561 {
562  return post_task_internal(
563  std::move(location),
564  wrap_task(std::move(weak_owner), std::forward<TaskType>(task)));
565 }
566 
567 //==================================================================================================
568 template <typename TaskType, typename ReplyType>
569 bool TaskRunner::post_task_with_reply(TaskLocation &&location, TaskType &&task, ReplyType &&reply)
570 {
571  return post_task_internal(
572  std::move(location),
573  wrap_task(std::forward<TaskType>(task), std::forward<ReplyType>(reply)));
574 }
575 
576 //==================================================================================================
577 template <typename OwnerType, typename TaskType, typename ReplyType>
579  TaskLocation &&location,
580  std::weak_ptr<OwnerType> weak_owner,
581  TaskType &&task,
582  ReplyType &&reply)
583 {
584  return post_task_internal(
585  std::move(location),
586  wrap_task(
587  std::move(weak_owner),
588  std::forward<TaskType>(task),
589  std::forward<ReplyType>(reply)));
590 }
591 
592 //==================================================================================================
593 template <typename TaskType>
595  TaskLocation &&location,
596  std::chrono::milliseconds delay,
597  TaskType &&task)
598 {
600  std::move(location),
601  std::move(delay),
602  wrap_task(std::forward<TaskType>(task)));
603 }
604 
605 //==================================================================================================
606 template <typename OwnerType, typename TaskType>
608  TaskLocation &&location,
609  std::weak_ptr<OwnerType> weak_owner,
610  std::chrono::milliseconds delay,
611  TaskType &&task)
612 {
614  std::move(location),
615  std::move(delay),
616  wrap_task(std::move(weak_owner), std::forward<TaskType>(task)));
617 }
618 
619 //==================================================================================================
620 template <typename TaskType, typename ReplyType>
622  TaskLocation &&location,
623  std::chrono::milliseconds delay,
624  TaskType &&task,
625  ReplyType &&reply)
626 {
628  std::move(location),
629  std::move(delay),
630  wrap_task(std::forward<TaskType>(task), std::forward<ReplyType>(reply)));
631 }
632 
633 //==================================================================================================
634 template <typename OwnerType, typename TaskType, typename ReplyType>
636  TaskLocation &&location,
637  std::weak_ptr<OwnerType> weak_owner,
638  std::chrono::milliseconds delay,
639  TaskType &&task,
640  ReplyType &&reply)
641 {
643  std::move(location),
644  std::move(delay),
645  wrap_task(std::move(weak_owner), std::forward<TaskType>(task), std::move(reply)));
646 }
647 
648 //==================================================================================================
649 template <typename TaskType>
650 Task TaskRunner::wrap_task(TaskType &&task)
651 {
652  static_assert(std::is_invocable_v<TaskType>, "Task must be invocable without any arguments");
653 
654  TaskHolder<TaskType> holder {std::forward<TaskType>(task)};
655 
656  return [holder = std::move(holder)](TaskRunner *, TaskLocation) mutable {
657  FLY_UNUSED(std::invoke(std::move(holder.m_task)));
658  };
659 }
660 
661 //==================================================================================================
662 template <typename OwnerType, typename TaskType>
663 Task TaskRunner::wrap_task(std::weak_ptr<OwnerType> weak_owner, TaskType &&task)
664 {
665  using StrongOwnerType = std::shared_ptr<OwnerType>;
666 
667  static_assert(
668  std::is_invocable_v<TaskType, StrongOwnerType>,
669  "Task must be invocable with only a strong pointer to its owner");
670 
671  TaskHolder<TaskType> holder {std::forward<TaskType>(task)};
672 
673  return [weak_owner = std::move(weak_owner),
674  holder = std::move(holder)](TaskRunner *, TaskLocation) mutable {
675  if (StrongOwnerType owner = weak_owner.lock(); owner)
676  {
677  FLY_UNUSED(std::invoke(std::move(holder.m_task), std::move(owner)));
678  }
679  };
680 }
681 
682 //==================================================================================================
683 template <typename TaskType, typename ReplyType>
684 Task TaskRunner::wrap_task(TaskType &&task, ReplyType &&reply)
685 {
686  static_assert(std::is_invocable_v<TaskType>, "Task must be invocable without any arguments");
687 
688  using ResultType = std::invoke_result_t<TaskType>;
689  static constexpr bool s_result_is_void = std::is_void_v<ResultType>;
690 
691  static_assert(
692  (s_result_is_void && std::is_invocable_v<ReplyType>) ||
693  (!s_result_is_void && std::is_invocable_v<ReplyType, ResultType>),
694  "Either the task must return a non-void type and the reply must be invocable with only "
695  "that type, or the task must return void and the reply must be invocable without any "
696  "arguments");
697 
698  TaskHolder<TaskType> task_holder {std::forward<TaskType>(task)};
699  TaskHolder<ReplyType> reply_holder {std::forward<ReplyType>(reply)};
700 
701  return [task_holder = std::move(task_holder),
702  reply_holder =
703  std::move(reply_holder)](TaskRunner *runner, TaskLocation location) mutable {
704  if constexpr (s_result_is_void)
705  {
706  std::invoke(std::move(task_holder.m_task));
707  runner->post_task(std::move(location), std::move(reply_holder.m_task));
708  }
709  else
710  {
711  auto result = std::invoke(std::move(task_holder.m_task));
712 
713  runner->post_task(
714  std::move(location),
715  std::bind(std::move(reply_holder.m_task), std::move(result)));
716  }
717  };
718 }
719 
720 //==================================================================================================
721 template <typename OwnerType, typename TaskType, typename ReplyType>
722 Task TaskRunner::wrap_task(std::weak_ptr<OwnerType> weak_owner, TaskType &&task, ReplyType &&reply)
723 {
724  using StrongOwnerType = std::shared_ptr<OwnerType>;
725 
726  static_assert(
727  std::is_invocable_v<TaskType, StrongOwnerType>,
728  "Task must be invocable with only a strong pointer to its owner");
729 
730  using ResultType = std::invoke_result_t<TaskType, StrongOwnerType>;
731  static constexpr bool s_result_is_void = std::is_void_v<ResultType>;
732 
733  static_assert(
734  (s_result_is_void && std::is_invocable_v<ReplyType, StrongOwnerType>) ||
735  (!s_result_is_void && std::is_invocable_v<ReplyType, ResultType, StrongOwnerType>),
736  "Either the task must return a non-void type and the reply must be invocable with that "
737  "type and a strong pointer to its owner, or the task must return void and the reply must "
738  "be invocable with only a strong pointer to its owner");
739 
740  TaskHolder<TaskType> task_holder {std::forward<TaskType>(task)};
741  TaskHolder<ReplyType> reply_holder {std::forward<ReplyType>(reply)};
742 
743  return [weak_owner = std::move(weak_owner),
744  task_holder = std::move(task_holder),
745  reply_holder =
746  std::move(reply_holder)](TaskRunner *runner, TaskLocation location) mutable {
747  if (StrongOwnerType owner = weak_owner.lock(); owner)
748  {
749  if constexpr (s_result_is_void)
750  {
751  std::invoke(std::move(task_holder.m_task), std::move(owner));
752 
753  runner->post_task(
754  std::move(location),
755  std::move(weak_owner),
756  std::move(reply_holder.m_task));
757  }
758  else
759  {
760  auto result = std::invoke(std::move(task_holder.m_task), std::move(owner));
761 
762  runner->post_task(
763  std::move(location),
764  std::move(weak_owner),
765  std::bind(
766  std::move(reply_holder.m_task),
767  std::move(result),
768  std::placeholders::_1));
769  }
770  }
771  };
772 }
773 
774 } // namespace fly::task
Definition: task_runner.hpp:443
bool post_task_internal(TaskLocation &&location, Task &&task) override
Definition: task_runner.cpp:76
void task_complete(TaskLocation &&location) override
Definition: task_runner.cpp:82
static std::shared_ptr< ParallelTaskRunner > create(std::shared_ptr< TaskManager > task_manager)
Definition: task_runner.cpp:54
Definition: task_runner.hpp:488
bool post_task_internal(TaskLocation &&location, Task &&task) override
Definition: task_runner.cpp:110
void task_complete(TaskLocation &&location) override
Definition: task_runner.cpp:116
static std::shared_ptr< SequencedTaskRunner > create(std::shared_ptr< TaskManager > task_manager)
Definition: task_runner.cpp:88
Definition: task_manager.hpp:30
Definition: task_runner.hpp:113
bool post_task_with_delay(TaskLocation &&location, std::chrono::milliseconds delay, TaskType &&task)
Definition: task_runner.hpp:594
TaskRunner(std::shared_ptr< TaskManager > task_manager) noexcept
Definition: task_runner.cpp:8
bool post_task_to_task_manager(TaskLocation &&location, Task &&task)
Definition: task_runner.cpp:14
virtual void task_complete(TaskLocation &&location)=0
bool post_task_with_delay_and_reply(TaskLocation &&location, std::chrono::milliseconds delay, TaskType &&task, ReplyType &&reply)
Definition: task_runner.hpp:621
bool post_task_to_task_manager_with_delay(TaskLocation &&location, std::chrono::milliseconds delay, Task &&task)
Definition: task_runner.cpp:26
bool post_task(TaskLocation &&location, TaskType &&task)
Definition: task_runner.hpp:550
virtual ~TaskRunner()=default
virtual bool post_task_internal(TaskLocation &&location, Task &&task)=0
bool post_task_with_reply(TaskLocation &&location, TaskType &&task, ReplyType &&reply)
Definition: task_runner.hpp:569
Definition: types.hpp:17