From b8f65122758fbbecdb5574acfbca01fe8303c179 Mon Sep 17 00:00:00 2001 From: Peng Li Date: Sun, 22 Jul 2018 15:04:48 +0800 Subject: [PATCH] Add worker thread --- include/Engine.h | 23 ++++++++++++++----- include/MultiTracker.h | 5 +++- include/VideoReader.h | 2 ++ include/VideoSource.h | 0 include/WorkerThread.h | 45 ++++++++++++++++++++++++++++++++++++ main.cpp | 15 ++++++++---- src/Engine.cpp | 62 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/MultiTracker.cpp | 37 ++++++++++++++++++++---------- src/WorkerThread.cpp | 50 ++++++++++++++++++++++++++++++++++++++++ 9 files changed, 213 insertions(+), 26 deletions(-) delete mode 100644 include/VideoSource.h create mode 100644 include/WorkerThread.h create mode 100644 src/WorkerThread.cpp diff --git a/include/Engine.h b/include/Engine.h index 5405539..93d6c9c 100644 --- a/include/Engine.h +++ b/include/Engine.h @@ -9,12 +9,16 @@ #include "MultiTracker.h" #include "VideoReader.h" #include "SharedPtr.h" +#include "WorkerThread.h" namespace suanzi{ class EngineObserver; TK_DECLARE_PTR(Engine); +TK_DECLARE_PTR(MultiTracker); + +struct Person; class Engine { @@ -24,16 +28,24 @@ public: virtual ~Engine(); virtual void start(); + //virtual void getCurrentCount(); + // virtual void capture(bool bb = false); void addObserver(EngineObserver* o); void setVideoSrc(VideoSrcType type, const std::string& url); +protected: + friend class MultiTracker; + void onStatusChanged(); + void onPersonsIn(const std::vector& p); + void onPersonsOut(const std::vector& p); + private: Engine(); void run(); + WorkerThread eventThread {"EventThread"}; DetectorPtr detector; MultiTrackerPtr multiTracker; std::set observer_list; - //std::string videoSrc; VideoReaderPtr reader; }; @@ -58,7 +70,7 @@ struct Person Gender gender = Female; Ages age = Kid; - std::string ageToString (Ages age){ + std::string ageToString (Ages age) const { switch (age){ case Kid: return "Kid"; case Teenager: return "Teenager"; @@ -66,7 +78,7 @@ struct Person } } - std::string toString(){ + std::string toString() const { std::stringstream ss; ss << "Person: id=" << id << ". Gender:" << (gender == Gender::Male ? "Male" : "Female" ) << ". Age: " << ageToString(age); @@ -77,9 +89,8 @@ struct Person class EngineObserver { public: - //virtual void onPersonIn(std::set persons) = 0; - virtual void onPersonIn(Person& p) = 0; - virtual void onPersonOut(Person& p) = 0; + virtual void onPersonsIn(const std::vector& p) = 0; + virtual void onPersonsOut(const std::vector& p) = 0; }; } // namespace suanzi diff --git a/include/MultiTracker.h b/include/MultiTracker.h index 58ef922..66ea04e 100644 --- a/include/MultiTracker.h +++ b/include/MultiTracker.h @@ -7,17 +7,19 @@ #include "PredictorWrapper.h" #include #include +#include "Engine.h" namespace suanzi { TK_DECLARE_PTR(Patch); TK_DECLARE_PTR(MultiTracker); TK_DECLARE_PTR(Tracker); + TK_DECLARE_PTR(Engine); class MultiTracker { public: - MultiTracker(); + MultiTracker(EngineWPtr e); virtual ~MultiTracker(); void update(unsigned int total, const Detection* d, const cv::Mat& image); @@ -28,6 +30,7 @@ namespace suanzi { double distance(TrackerPtr t, const cv::Mat& image, const Detection& d); PredictorWrapperPtr predictor; cv::HOGDescriptor descriptor; + EngineWPtr engine; }; class Patch diff --git a/include/VideoReader.h b/include/VideoReader.h index 79d39d0..5b5c5a0 100644 --- a/include/VideoReader.h +++ b/include/VideoReader.h @@ -16,6 +16,7 @@ namespace suanzi { TK_DECLARE_PTR(VideoReaderFactory); TK_DECLARE_PTR(VideoReader); TK_DECLARE_PTR(URLReader); + TK_DECLARE_PTR(Engine); class VideoReaderFactory { @@ -33,6 +34,7 @@ namespace suanzi { private: VideoSrcType type; + EngineWPtr engine; protected: std::string url; diff --git a/include/VideoSource.h b/include/VideoSource.h deleted file mode 100644 index e69de29..0000000 diff --git a/include/WorkerThread.h b/include/WorkerThread.h new file mode 100644 index 0000000..83c174c --- /dev/null +++ b/include/WorkerThread.h @@ -0,0 +1,45 @@ +#ifndef _WORKER_THREAD_H +#define _WORKER_THREAD_H + +#include +#include +#include +#include +#include +#include "SharedPtr.h" + +namespace suanzi { + +class WorkItem +{ +public: + virtual ~WorkItem(){} + virtual void run(){} + +protected: + WorkItem(){} +}; + +TK_DECLARE_PTR(WorkItem); + +class WorkerThread +{ +public: + WorkerThread(const std::string& name); + ~WorkerThread(); + void enqueue(WorkItemPtr w); + void enqueue(WorkItem* w); + +private: + std::string m_name; + std::thread m_thread; + void run(); + std::condition_variable m_cond; + std::mutex m_mutex; + std::queue m_queue; +}; + +} + + +#endif // _WORKER_THREAD_H diff --git a/main.cpp b/main.cpp index bfc076d..577c3fc 100644 --- a/main.cpp +++ b/main.cpp @@ -9,13 +9,18 @@ using namespace suanzi; class Callback : public EngineObserver { - void onPersonIn(Person& p){ - LOG_DEBUG(TAG, "OnPersonIn " << p.toString()) - + void onPersonsIn(const std::vector& p){ + LOG_DEBUG(TAG, "onPersonsIn"); + for (const auto& i : p){ + LOG_DEBUG(TAG, "OnPersonIn " << i.toString()); + } }; - void onPersonOut(Person& p) { - LOG_DEBUG(TAG, "OnPersonIn " << p.toString()) + void onPersonsOut(const std::vector& p) { + LOG_DEBUG(TAG, "onPersonsOut"); + for (const auto& i : p){ + LOG_DEBUG(TAG, "OnPersonOut " << i.toString()); + } }; }; diff --git a/src/Engine.cpp b/src/Engine.cpp index 74659a0..4f67103 100644 --- a/src/Engine.cpp +++ b/src/Engine.cpp @@ -1,17 +1,20 @@ #include #include +#include #include "Engine.h" #include "Logger.h" #include "PredictorWrapper.h" using namespace suanzi; +using namespace std; static const std::string TAG = "Engine"; - static std::mutex g_mutex; static EngineWPtr g_instance; +typedef std::shared_ptr> PersonsInfoPtr; +// class Engine EnginePtr Engine::create() { LOG_DEBUG(TAG, "create"); @@ -28,7 +31,6 @@ EnginePtr Engine::create() Engine::Engine() { detector = std::make_shared(); - multiTracker = std::make_shared(); } Engine::~Engine() @@ -66,6 +68,7 @@ void Engine::run() void Engine::start() { LOG_DEBUG(TAG, "start"); + multiTracker = std::make_shared(g_instance); if (!reader){ LOG_ERROR(TAG, "reader is null. exit"); return; @@ -78,3 +81,58 @@ void Engine::addObserver(EngineObserver *observer) { observer_list.insert(observer); } + + +// WorkItem class for event thread +class PersonInEventWorkItem : public WorkItem +{ +public: + PersonInEventWorkItem(std::set obs, PersonsInfoPtr info) : obs(obs), info(info){ + } + ~PersonInEventWorkItem(){} + void run (){ + for (auto o : obs){ + o->onPersonsIn(*(info.get())); + } + } +private: + std::set obs; + PersonsInfoPtr info; +}; + +class PersonOutEventWorkItem : public WorkItem +{ +public: + PersonOutEventWorkItem(std::set obs, PersonsInfoPtr info) : obs(obs), info(info){ + } + ~PersonOutEventWorkItem(){} + void run (){ + for (auto o : obs){ + o->onPersonsIn(*(info.get())); + } + } +private: + std::set obs; + PersonsInfoPtr info; +}; + +void Engine::onPersonsOut(const std::vector& p) +{ + PersonsInfoPtr pp = std::make_shared>(p); + eventThread.enqueue(new PersonOutEventWorkItem(this->observer_list, pp)); +} + +void Engine::onPersonsIn(const std::vector& p) +{ + PersonsInfoPtr pp = std::make_shared>(p); + eventThread.enqueue(new PersonInEventWorkItem(this->observer_list, pp)); +} + +void Engine::onStatusChanged() +{ + Person pm, p2; + std::vector ps; + ps.push_back(pm); + ps.push_back(p2); + onPersonsOut(ps); +} diff --git a/src/MultiTracker.cpp b/src/MultiTracker.cpp index b1f380e..0b4dab5 100644 --- a/src/MultiTracker.cpp +++ b/src/MultiTracker.cpp @@ -13,7 +13,8 @@ static const cv::Size PREFERRED_SIZE = Size(64, 128); #define MaxCost 100000 -MultiTracker::MultiTracker() +MultiTracker::MultiTracker(EngineWPtr e) +: engine(e) { LOG_DEBUG(TAG, "init - loading model.pkl"); predictor = PredictorWrapper::create("./python/model.pkl"); @@ -30,6 +31,11 @@ MultiTracker::~MultiTracker() trackers.clear(); } +static double calc_iou_ratio(const Detection& d1, const Detection& d2) +{ + // TODO + return 0.1; +} static std::vector similarity(const PatchPtr p1, const PatchPtr p2) { @@ -60,8 +66,7 @@ static std::vector similarity(const PatchPtr p1, const PatchPtr p2) double center_distance = sqrt(pow((d1.center_x - d2.center_x), 2) + pow((d1.center_y - d2.center_y), 2)); feature.push_back(center_distance / (d1.width + d1.height + d2.width + d2.height) * 4); - //TODO - double iou_ratio = 0.03; + double iou_ratio = calc_iou_ratio(d1, d2); feature.push_back(iou_ratio); return feature; @@ -82,32 +87,41 @@ double MultiTracker::distance(TrackerPtr tracker, const cv::Mat& image, const De return prob; } +static long cc = 0; + void MultiTracker::update(unsigned int total, const Detection* detections, const Mat& image) { + ////// + if ((cc % 50) == 0){ + if (EnginePtr e = engine.lock()){ + e->onStatusChanged(); + } + } + cc++; + + ////// int row = trackers.size(); int col = total; Eigen::MatrixXi cost_matrix = Eigen::MatrixXi::Zero(row, col); for (int i = 0; i < row; i++){ for (int j = 0; j < col; j++){ - // TODO - cost_matrix(i, j) = distance(trackers[i], image, detections[j]); + //if (calc_iou_ratio(trackers[i], detections[j]) < -0.1) + // cost_matrix(i, j) = MaxCost; + //else + cost_matrix(i, j) = distance(trackers[i], image, detections[j]); } } - // assignment Eigen::VectorXi tracker_inds, bb_inds; linear_sum_assignment(cost_matrix, tracker_inds, bb_inds); // handle unmatched trackers - vector unmatched_trackers; + //vector unmatched_trackers; for (int i = 0; i < row; i++){ if (!(tracker_inds.array() == i).any()){ - unmatched_trackers.push_back(trackers[i]); + trackers[i]->updateState(image); } } - for (auto t : unmatched_trackers){ - t->updateState(image); - } // handle unmatched detections vector unmatched_detection; @@ -155,7 +169,6 @@ PatchPtr MultiTracker::createPatch(const Mat& image, const Detection& detect) float sranges[] = {0, 256}; const float* ranges[] = {hranges, sranges}; calcHist(&hsv, 1, channels, Mat(), hist, 2, histSize, ranges, true, false); - Size sm = hist.size(); patch->image_crop = im.clone(); patch->detection = detect; diff --git a/src/WorkerThread.cpp b/src/WorkerThread.cpp new file mode 100644 index 0000000..c31d023 --- /dev/null +++ b/src/WorkerThread.cpp @@ -0,0 +1,50 @@ +#include "WorkerThread.h" +#include +#include "Logger.h" + +using namespace suanzi; +using namespace std; + +static const std::string TAG = "WorkerThread"; + +WorkerThread::WorkerThread(const std::string& name) +: m_name(name) +, m_thread (thread(&WorkerThread::run, this)) +{ +} + +WorkerThread::~WorkerThread() +{ +} + +void WorkerThread::run() +{ + LOG_DEBUG(TAG, "start workerThread " + m_name); + WorkItemPtr workItem; + while(true){ + unique_lock l(m_mutex); + if (!m_queue.empty()){ + workItem = m_queue.front(); + m_queue.pop(); + l.unlock(); + workItem->run(); + LOG_DEBUG(TAG, " ------ Queue Size: " << m_queue.size()); + } else { + m_cond.wait(l); + } + } + LOG_DEBUG(TAG, "End workerThread " + m_name); +} + +void WorkerThread::enqueue(WorkItemPtr item) +{ + unique_lock l(m_mutex); + m_queue.push(item); + m_cond.notify_one(); +} + +void WorkerThread::enqueue(WorkItem* w) +{ + WorkItemPtr p (w); + enqueue(p); +} -- 2.11.0