// (C) Copyright 2008 Anthony Williams // // Distributed under the Boost Software License, Version 1.0. (See // accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #ifndef N2561_FUTURE_HPP #define N2561_FUTURE_HPP #include #include #include #include #include #include #include #include #include namespace jss { class future_uninitialized: public std::logic_error { public: future_uninitialized(): std::logic_error("Future Uninitialized") {} }; class future_moved: public std::logic_error { public: future_moved(): std::logic_error("Future moved") {} }; class broken_promise: public std::logic_error { public: broken_promise(): std::logic_error("Broken promise") {} }; class future_already_retrieved: public std::logic_error { public: future_already_retrieved(): std::logic_error("Future already retrieved") {} }; class promise_already_satisfied: public std::logic_error { public: promise_already_satisfied(): std::logic_error("Promise already satisfied") {} }; class task_already_started: public std::logic_error { public: task_already_started(): std::logic_error("Task already started") {} }; class task_moved: public std::logic_error { public: task_moved(): std::logic_error("Task moved") {} }; namespace future_state { enum state { uninitialized, waiting, ready, moved }; } namespace detail { struct future_object_base { boost::exception_ptr exception; bool done; boost::mutex mutex; boost::condition_variable waiters; future_object_base(): done(false) {} virtual ~future_object_base() {} void mark_finished_internal() { done=true; waiters.notify_all(); } void wait_internal(boost::unique_lock& lock) { while(!done) { waiters.wait(lock); } } void wait() { boost::unique_lock lock(mutex); wait_internal(lock); } bool timed_wait_until_internal(boost::unique_lock& lock,boost::system_time const& target_time) { while(!done) { bool const success=waiters.timed_wait(lock,target_time); if(!success && !done) { return false; } } return true; } bool timed_wait_until(boost::system_time const& target_time) { boost::unique_lock lock(mutex); return timed_wait_until_internal(lock,target_time); } void mark_exceptional_finish_internal(boost::exception_ptr const& e) { exception=e; mark_finished_internal(); } void mark_exceptional_finish() { boost::lock_guard lock(mutex); mark_exceptional_finish_internal(boost::current_exception()); } private: future_object_base(future_object_base const&); future_object_base& operator=(future_object_base const&); }; template struct future_traits { typedef boost::scoped_ptr storage_type; typedef T const& source_reference_type; typedef boost::detail::thread_move_t move_source_type; static void init(storage_type& storage,T const& t) { storage.reset(new T(t)); } static void init(storage_type& storage,boost::detail::thread_move_t t) { storage.reset(new T(t)); } static T move(storage_type& storage) { T res(*storage); cleanup(storage); return res; } static void move(storage_type& storage,T& dest) { dest=*storage; cleanup(storage); } static void cleanup(storage_type& storage) { storage.reset(); } }; template struct future_traits { typedef T* storage_type; typedef T& source_reference_type; struct move_source_type {}; static void init(storage_type& storage,T& t) { storage=&t; } static T& move(storage_type& storage) { T& res=*storage; cleanup(storage); return res; } static void cleanup(storage_type& storage) { storage=0; } }; template<> struct future_traits { typedef bool storage_type; static void init(storage_type& storage) { storage=true; } static void move(storage_type& storage) { cleanup(storage); } static void cleanup(storage_type& storage) { storage=false; } }; template struct future_object: detail::future_object_base { typedef typename future_traits::storage_type storage_type; typedef typename future_traits::source_reference_type source_reference_type; typedef typename future_traits::move_source_type move_source_type; typedef typename boost::add_reference::type reference; storage_type result; future_object(): result(0) {} void mark_finished_with_result_internal(source_reference_type result_) { future_traits::init(result,result_); mark_finished_internal(); } void mark_finished_with_result_internal(move_source_type result_) { future_traits::init(result,result_); mark_finished_internal(); } void mark_finished_with_result(source_reference_type result_) { boost::lock_guard lock(mutex); mark_finished_with_result_internal(result_); } void mark_finished_with_result(move_source_type result_) { boost::lock_guard lock(mutex); mark_finished_with_result_internal(result_); } T move() { boost::unique_lock lock(mutex); wait_internal(lock); if(exception) { boost::rethrow_exception(exception); } if(!result) { throw future_moved(); } return future_traits::move(result); } bool try_move(reference dest) { boost::lock_guard lock(mutex); if(!done) { return false; } if(exception) { boost::rethrow_exception(exception); } if(!result) { throw future_moved(); } future_traits::move(result,dest); return true; } bool timed_move_until(reference dest,boost::system_time const& target_time) { boost::unique_lock lock(mutex); if(!timed_wait_until_internal(lock,target_time)) { return false; } if(exception) { boost::rethrow_exception(exception); } if(!result) { throw future_moved(); } future_traits::move(result,dest); return true; } reference get() { boost::unique_lock lock(mutex); wait_internal(lock); if(exception) { boost::rethrow_exception(exception); } if(!result) { throw future_moved(); } return *result; } bool try_get(reference dest) { boost::lock_guard lock(mutex); if(!done) { return false; } if(exception) { boost::rethrow_exception(exception); } if(!result) { throw future_moved(); } dest=*result; return true; } bool timed_get_until(reference dest,boost::system_time const& target_time) { boost::unique_lock lock(mutex); if(!timed_wait_until_internal(lock,target_time)) { return false; } if(exception) { boost::rethrow_exception(exception); } if(!result) { throw future_moved(); } dest=*result; return true; } future_state::state get_state() { boost::lock_guard guard(mutex); if(!done) { return future_state::waiting; } else { return (!result && !exception)?future_state::moved:future_state::ready; } } private: future_object(future_object const&); future_object& operator=(future_object const&); }; template<> struct future_object: detail::future_object_base { bool result; future_object(): result(false) {} void mark_finished_with_result_internal() { result=true; mark_finished_internal(); } void mark_finished_with_result() { boost::lock_guard lock(mutex); mark_finished_with_result_internal(); } void move() { boost::unique_lock lock(mutex); wait_internal(lock); if(exception) { boost::rethrow_exception(exception); } if(!result) { throw future_moved(); } result=false; } void get() { boost::unique_lock lock(mutex); wait_internal(lock); if(exception) { boost::rethrow_exception(exception); } if(!result) { throw future_moved(); } } future_state::state get_state() { boost::lock_guard guard(mutex); if(!done) { return future_state::waiting; } else { return (!result && !exception)?future_state::moved:future_state::ready; } } private: future_object(future_object const&); future_object& operator=(future_object const&); }; } template class shared_future; template class unique_future { unique_future(unique_future & rhs);// = delete; unique_future& operator=(unique_future& rhs);// = delete; typedef boost::shared_ptr > future_ptr; future_ptr future; friend class shared_future; typedef typename boost::add_reference::type reference; public: unique_future(future_ptr future_): future(future_) {} typedef future_state::state state; unique_future() {} // unique_future(unique_future &&); unique_future(boost::detail::thread_move_t other): future(other->future) { other->future.reset(); } ~unique_future() {} // unique_future& operator=(unique_future &&); unique_future& operator=(boost::detail::thread_move_t other) { future=other->future; other->future.reset(); return *this; } operator boost::detail::thread_move_t() { return boost::detail::thread_move_t(*this); } void swap(unique_future& other) { future.swap(other.future); } // retrieving the value R move() { if(!future) { throw future_uninitialized(); } return future->move(); } bool try_move(reference dest) { if(!future) { throw future_uninitialized(); } return future->try_move(dest); } template bool timed_move(reference dest, Duration const& rel_time) { return timed_move_until(dest,boost::get_system_time()+rel_time); } bool timed_move_until(reference dest, boost::system_time const& abs_time) { if(!future) { throw future_uninitialized(); } return future->timed_move_until(dest,abs_time); } reference get() { if(!future) { throw future_uninitialized(); } return future->get(); } bool try_get(reference dest) { if(!future) { throw future_uninitialized(); } return future->try_get(dest); } template bool timed_get(reference dest, Duration const& rel_time) { return timed_get_until(dest,boost::get_system_time()+rel_time); } bool timed_get_until(reference dest, boost::system_time const& abs_time) { if(!future) { throw future_uninitialized(); } return future->timed_get_until(dest,abs_time); } // functions to check state, and wait for ready state get_state() const { if(!future) { return future_state::uninitialized; } return future->get_state(); } bool is_ready() const { return get_state()==future_state::ready; } bool has_exception() const { if(!future) { return false; } boost::lock_guard guard(future->mutex); return future->done && future->exception; } bool has_value() const { if(!future) { return false; } boost::lock_guard guard(future->mutex); return future->done && future->result; } bool was_moved() const { return get_state()==future_state::moved; } void wait() const { if(!future) { throw future_uninitialized(); } future->wait(); } template bool timed_wait(Duration const& rel_time) const { return timed_wait_until(boost::get_system_time()+rel_time); } bool timed_wait_until(boost::system_time const& abs_time) const { if(!future) { throw future_uninitialized(); } return future->timed_wait_until(abs_time); } }; template <> class unique_future { unique_future(unique_future & rhs);// = delete; unique_future& operator=(unique_future& rhs);// = delete; typedef boost::shared_ptr > future_ptr; future_ptr future; friend class shared_future; public: unique_future(future_ptr future_): future(future_) {} typedef future_state::state state; unique_future() {} // unique_future(unique_future &&); unique_future(boost::detail::thread_move_t other): future(other->future) { other->future.reset(); } ~unique_future() {} // unique_future& operator=(unique_future &&); unique_future& operator=(boost::detail::thread_move_t other) { future=other->future; other->future.reset(); return *this; } operator boost::detail::thread_move_t() { return boost::detail::thread_move_t(*this); } void swap(unique_future& other) { future.swap(other.future); } // retrieving the value void move() { if(!future) { throw future_uninitialized(); } future->move(); } void get() { if(!future) { throw future_uninitialized(); } future->get(); } // functions to check state, and wait for ready state get_state() const { if(!future) { return future_state::uninitialized; } return future->get_state(); } bool is_ready() const { return get_state()==future_state::ready; } bool has_exception() const { if(!future) { return false; } boost::lock_guard guard(future->mutex); return future->done && future->exception; } bool has_value() const { if(!future) { return false; } boost::lock_guard guard(future->mutex); return future->done && future->result; } bool was_moved() const { return get_state()==future_state::moved; } void wait() const { if(!future) { throw future_uninitialized(); } future->wait(); } template bool timed_wait(Duration const& rel_time) const { return timed_wait_until(boost::get_system_time()+rel_time); } bool timed_wait_until(boost::system_time const& abs_time) const { if(!future) { throw future_uninitialized(); } return future->timed_wait_until(abs_time); } }; template class shared_future { typedef boost::shared_ptr > future_ptr; future_ptr future; shared_future(unique_future& other); shared_future& operator=(unique_future& other); typedef typename boost::add_reference::type reference; public: shared_future(future_ptr future_): future(future_) {} shared_future(shared_future const& other): future(other.future) {} typedef future_state::state state; shared_future() {} // shared_future(shared_future &&); shared_future(boost::detail::thread_move_t other): future(other->future) { other->future.reset(); } // shared_future(unique_future &&); // shared_future(const unique_future &) = delete; shared_future(boost::detail::thread_move_t > other): future(other->future) { other->future.reset(); } ~shared_future() {} shared_future& operator=(shared_future const& other) { future=other.future; return *this; } // shared_future& operator=(shared_future &&); shared_future& operator=(boost::detail::thread_move_t other) { future.swap(other->future); other->future.reset(); return *this; } shared_future& operator=(boost::detail::thread_move_t > other) { future.swap(other->future); other->future.reset(); return *this; } operator boost::detail::thread_move_t() { return boost::detail::thread_move_t(*this); } void swap(shared_future& other) { future.swap(other.future); } // retrieving the value reference get() { if(!future) { throw future_uninitialized(); } return future->get(); } bool try_get(reference dest) { if(!future) { throw future_uninitialized(); } return future->try_get(dest); } template bool timed_get(reference dest, Duration const& rel_time) { return timed_get_until(dest,boost::get_system_time()+rel_time); } bool timed_get_until(reference dest, boost::system_time const& abs_time) { if(!future) { throw future_uninitialized(); } return future->timed_get_until(dest,abs_time); } // functions to check state, and wait for ready state get_state() const { if(!future) { return future_state::uninitialized; } return future->get_state(); } bool is_ready() const { return get_state()==future_state::ready; } bool has_exception() const { if(!future) { return false; } boost::lock_guard guard(future->mutex); return future->done && future->exception; } bool has_value() const { if(!future) { return false; } boost::lock_guard guard(future->mutex); return future->done && future->result; } bool was_moved() const { return get_state()==future_state::moved; } void wait() const { if(!future) { throw future_uninitialized(); } future->wait(); } template bool timed_wait(Duration const& rel_time) const { return timed_wait_until(boost::get_system_time()+rel_time); } bool timed_wait_until(boost::system_time const& abs_time) const { if(!future) { throw future_uninitialized(); } return future->timed_wait_until(abs_time); } }; template <> class shared_future { typedef boost::shared_ptr > future_ptr; future_ptr future; shared_future(unique_future& other); shared_future& operator=(unique_future& other); public: shared_future(future_ptr future_): future(future_) {} shared_future(shared_future const& other): future(other.future) {} typedef future_state::state state; shared_future() {} // shared_future(shared_future &&); shared_future(boost::detail::thread_move_t other): future(other->future) { other->future.reset(); } // shared_future(unique_future &&); // shared_future(const unique_future &) = delete; shared_future(boost::detail::thread_move_t > other): future(other->future) { other->future.reset(); } ~shared_future() {} shared_future& operator=(shared_future const& other) { future=other.future; return *this; } // shared_future& operator=(shared_future &&); shared_future& operator=(boost::detail::thread_move_t other) { future.swap(other->future); other->future.reset(); return *this; } shared_future& operator=(boost::detail::thread_move_t > other) { future.swap(other->future); other->future.reset(); return *this; } operator boost::detail::thread_move_t() { return boost::detail::thread_move_t(*this); } void swap(shared_future& other) { future.swap(other.future); } // retrieving the value void get() { if(!future) { throw future_uninitialized(); } future->get(); } // functions to check state, and wait for ready state get_state() const { if(!future) { return future_state::uninitialized; } return future->get_state(); } bool is_ready() const { return get_state()==future_state::ready; } bool has_exception() const { if(!future) { return false; } boost::lock_guard guard(future->mutex); return future->done && future->exception; } bool has_value() const { if(!future) { return false; } boost::lock_guard guard(future->mutex); return future->done && future->result; } bool was_moved() const { return get_state()==future_state::moved; } void wait() const { if(!future) { throw future_uninitialized(); } future->wait(); } template bool timed_wait(Duration const& rel_time) const { return timed_wait_until(boost::get_system_time()+rel_time); } bool timed_wait_until(boost::system_time const& abs_time) const { if(!future) { throw future_uninitialized(); } return future->timed_wait_until(abs_time); } }; template class promise { typedef boost::shared_ptr > future_ptr; future_ptr future; bool future_obtained; promise(promise & rhs);// = delete; promise & operator=(promise & rhs);// = delete; public: // template explicit promise(Allocator a); promise(): future(new detail::future_object),future_obtained(false) {} // promise(promise && rhs); promise(boost::detail::thread_move_t rhs): future(rhs->future),future_obtained(rhs->future_obtained) { rhs->future.reset(); } ~promise() { if(future) { boost::lock_guard lock(future->mutex); if(!future->done) { try { throw broken_promise(); } catch(...) { future->mark_exceptional_finish_internal(boost::current_exception()); } } } } // Assignment promise & operator=(boost::detail::thread_move_t rhs) { future=rhs->future; future_obtained=rhs->future_obtained; rhs->future.reset(); return *this; } void swap(promise& other) { future.swap(other.future); std::swap(future_obtained,other.future_obtained); } // Result retrieval unique_future get_future() { if(!future) { throw future_moved(); } if(future_obtained) { throw future_already_retrieved(); } future_obtained=true; return unique_future(future); } void set_value(typename detail::future_traits::source_reference_type r) { if(!future) { throw future_moved(); } boost::lock_guard lock(future->mutex); if(future->done) { throw promise_already_satisfied(); } future->mark_finished_with_result_internal(r); } // void set_value(R && r); void set_value(typename detail::future_traits::move_source_type r) { if(!future) { throw future_moved(); } boost::lock_guard lock(future->mutex); if(future->done) { throw promise_already_satisfied(); } future->mark_finished_with_result_internal(r); } void set_exception(boost::exception_ptr p) { if(!future) { throw future_moved(); } boost::lock_guard lock(future->mutex); if(future->done) { throw promise_already_satisfied(); } future->mark_exceptional_finish_internal(p); } }; template <> class promise { typedef boost::shared_ptr > future_ptr; future_ptr future; bool future_obtained; promise(promise & rhs);// = delete; promise & operator=(promise & rhs);// = delete; public: // template explicit promise(Allocator a); promise(): future(new detail::future_object),future_obtained(false) {} // promise(promise && rhs); promise(boost::detail::thread_move_t rhs): future(rhs->future),future_obtained(rhs->future_obtained) { rhs->future.reset(); } ~promise() { if(future) { boost::lock_guard lock(future->mutex); if(!future->done) { try { throw broken_promise(); } catch(...) { future->mark_exceptional_finish_internal(boost::current_exception()); } } } } // Assignment promise & operator=(boost::detail::thread_move_t rhs) { future=rhs->future; future_obtained=rhs->future_obtained; rhs->future.reset(); return *this; } void swap(promise& other) { future.swap(other.future); std::swap(future_obtained,other.future_obtained); } // Result retrieval unique_future get_future() { if(!future) { throw future_moved(); } if(future_obtained) { throw future_already_retrieved(); } future_obtained=true; return unique_future(future); } void set_value() { if(!future) { throw future_moved(); } boost::lock_guard lock(future->mutex); if(future->done) { throw promise_already_satisfied(); } future->mark_finished_with_result_internal(); } void set_exception(boost::exception_ptr p) { if(!future) { throw future_moved(); } boost::lock_guard lock(future->mutex); if(future->done) { throw promise_already_satisfied(); } future->mark_exceptional_finish_internal(p); } }; namespace detail { template struct task_base: detail::future_object { bool started; task_base(): started(false) {} void run() { { boost::lock_guard lk(this->mutex); if(started) { throw task_already_started(); } started=true; } do_run(); } virtual void do_run()=0; }; template struct task_object: task_base { F f; task_object(F const& f_): f(f_) {} task_object(boost::detail::thread_move_t f_): f(f_) {} void do_run() { try { this->mark_finished_with_result(f()); } catch(...) { this->mark_exceptional_finish(); } } }; template struct task_object: task_base { F f; task_object(F const& f_): f(f_) {} task_object(boost::detail::thread_move_t f_): f(f_) {} void do_run() { try { f(); this->mark_finished_with_result(); } catch(...) { this->mark_exceptional_finish(); } } }; } template class packaged_task { boost::shared_ptr > task; bool future_obtained; packaged_task(packaged_task&);// = delete; packaged_task& operator=(packaged_task&);// = delete; public: // construction and destruction template explicit packaged_task(F const& f): task(new detail::task_object(f)),future_obtained(false) {} explicit packaged_task(R(*f)()): task(new detail::task_object(f)),future_obtained(false) {} template explicit packaged_task(boost::detail::thread_move_t f): task(new detail::task_object(f)),future_obtained(false) {} // template // explicit packaged_task(F const& f, Allocator a); // template // explicit packaged_task(F&& f, Allocator a); // packaged_task(packaged_task&& other); packaged_task(boost::detail::thread_move_t other): future_obtained(other->future_obtained) { task.swap(other->task); } ~packaged_task() {} // assignment // packaged_task& operator=(packaged_task&& other); packaged_task& operator=(boost::detail::thread_move_t other) { packaged_task temp(other); swap(temp); return *this; } void swap(packaged_task& other) { task.swap(other.task); std::swap(future_obtained,other.future_obtained); } // result retrieval unique_future get_future() { if(!task) { throw task_moved(); } else if(!future_obtained) { future_obtained=true; return unique_future(task); } else { throw future_already_retrieved(); } } // execution void operator()() { if(!task) { throw task_moved(); } task->run(); } }; } #endif