Add worker thread
authorPeng Li <seudut@gmail.com>
Sun, 22 Jul 2018 07:04:48 +0000 (15:04 +0800)
committerPeng Li <seudut@gmail.com>
Sun, 22 Jul 2018 07:15:17 +0000 (15:15 +0800)
include/Engine.h
include/MultiTracker.h
include/VideoReader.h
include/VideoSource.h [deleted file]
include/WorkerThread.h [new file with mode: 0644]
main.cpp
src/Engine.cpp
src/MultiTracker.cpp
src/WorkerThread.cpp [new file with mode: 0644]

index 5405539..93d6c9c 100644 (file)
@@ -9,12 +9,16 @@
 #include "MultiTracker.h"
 #include "VideoReader.h"
 #include "SharedPtr.h"
+#include "WorkerThread.h"
 
 namespace suanzi{
 
 class EngineObserver;
 
 TK_DECLARE_PTR(Engine);
+TK_DECLARE_PTR(MultiTracker);
+
+struct Person;
 
 class Engine
 {
@@ -24,16 +28,24 @@ public:
     virtual ~Engine();
 
     virtual void start();
+    //virtual void getCurrentCount();
+    // virtual void capture(bool bb = false);
     void addObserver(EngineObserver* o);
     void setVideoSrc(VideoSrcType type, const std::string& url);
 
+protected:
+    friend class MultiTracker;
+    void onStatusChanged();
+    void onPersonsIn(const std::vector<Person>& p);
+    void onPersonsOut(const std::vector<Person>& p);
+
 private:
     Engine();
     void run();
+    WorkerThread eventThread {"EventThread"};
     DetectorPtr detector;
     MultiTrackerPtr multiTracker;
     std::set<EngineObserver *> observer_list;
-    //std::string videoSrc;
     VideoReaderPtr reader;
 };
 
@@ -58,7 +70,7 @@ struct Person
     Gender gender = Female;
     Ages age = Kid;
 
-    std::string ageToString (Ages age){
+    std::string ageToString (Ages age) const {
         switch (age){
         case Kid: return "Kid";
         case Teenager: return "Teenager";
@@ -66,7 +78,7 @@ struct Person
         }
     }
 
-    std::string toString(){
+    std::string toString() const {
         std::stringstream ss;
         ss << "Person: id=" << id << ". Gender:" << (gender == Gender::Male ? "Male" : "Female" ) <<
             ". Age: " << ageToString(age);
@@ -77,9 +89,8 @@ struct Person
 class EngineObserver
 {
 public:
-    //virtual void onPersonIn(std::set<Person> persons) = 0;
-    virtual void onPersonIn(Person& p) = 0;
-    virtual void onPersonOut(Person& p) = 0;
+    virtual void onPersonsIn(const std::vector<Person>& p) = 0;
+    virtual void onPersonsOut(const std::vector<Person>& p) = 0;
 };
 
 } // namespace suanzi
index 58ef922..66ea04e 100644 (file)
@@ -7,17 +7,19 @@
 #include "PredictorWrapper.h"
 #include <opencv2/opencv.hpp>
 #include <utility>
+#include "Engine.h"
 
 namespace suanzi {
 
     TK_DECLARE_PTR(Patch);
     TK_DECLARE_PTR(MultiTracker);
     TK_DECLARE_PTR(Tracker);
+    TK_DECLARE_PTR(Engine);
 
     class MultiTracker 
     {
     public:
-        MultiTracker();
+        MultiTracker(EngineWPtr e);
         virtual ~MultiTracker();
         void update(unsigned int total, const Detection* d, const cv::Mat& image);
 
@@ -28,6 +30,7 @@ namespace suanzi {
         double distance(TrackerPtr t, const cv::Mat& image, const Detection& d);
         PredictorWrapperPtr predictor;
         cv::HOGDescriptor descriptor;
+        EngineWPtr engine;
     };
 
     class Patch
index 79d39d0..5b5c5a0 100644 (file)
@@ -16,6 +16,7 @@ namespace suanzi {
     TK_DECLARE_PTR(VideoReaderFactory);
     TK_DECLARE_PTR(VideoReader);
     TK_DECLARE_PTR(URLReader);
+    TK_DECLARE_PTR(Engine);
 
     class VideoReaderFactory
     {
@@ -33,6 +34,7 @@ namespace suanzi {
 
         private:
             VideoSrcType type;
+            EngineWPtr engine;
 
         protected:
             std::string url;
diff --git a/include/VideoSource.h b/include/VideoSource.h
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/include/WorkerThread.h b/include/WorkerThread.h
new file mode 100644 (file)
index 0000000..83c174c
--- /dev/null
@@ -0,0 +1,45 @@
+#ifndef _WORKER_THREAD_H
+#define _WORKER_THREAD_H
+
+#include <string>
+#include <thread>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include "SharedPtr.h"
+
+namespace suanzi {
+
+class WorkItem
+{
+public:
+    virtual ~WorkItem(){}
+    virtual void run(){}
+
+protected:
+    WorkItem(){}
+};
+
+TK_DECLARE_PTR(WorkItem);
+
+class WorkerThread
+{
+public:
+    WorkerThread(const std::string& name);
+    ~WorkerThread();
+    void enqueue(WorkItemPtr w);
+    void enqueue(WorkItem* w);
+
+private:
+    std::string m_name;
+    std::thread m_thread;
+    void run();
+    std::condition_variable m_cond;
+    std::mutex m_mutex;
+    std::queue<WorkItemPtr> m_queue;
+};
+
+}
+
+
+#endif // _WORKER_THREAD_H
index bfc076d..577c3fc 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -9,13 +9,18 @@ using namespace suanzi;
 
 class Callback : public EngineObserver
 {
-    void onPersonIn(Person& p){
-        LOG_DEBUG(TAG, "OnPersonIn " << p.toString())
-
+    void onPersonsIn(const std::vector<Person>& p){
+        LOG_DEBUG(TAG, "onPersonsIn");
+        for (const auto& i : p){
+            LOG_DEBUG(TAG, "OnPersonIn " << i.toString());
+        }
     };
 
-    void onPersonOut(Person& p) {
-        LOG_DEBUG(TAG, "OnPersonIn " << p.toString())
+    void onPersonsOut(const std::vector<Person>& p) {
+        LOG_DEBUG(TAG, "onPersonsOut");
+        for (const auto& i : p){
+            LOG_DEBUG(TAG, "OnPersonOut " << i.toString());
+        }
     };
 };
 
index 74659a0..4f67103 100644 (file)
@@ -1,17 +1,20 @@
 #include <mutex>
 #include <thread>
+#include <condition_variable>
 #include "Engine.h"
 #include "Logger.h"
 #include "PredictorWrapper.h"
 
 using namespace suanzi;
+using namespace std;
 
 static const std::string TAG = "Engine";
-
 static std::mutex g_mutex;
 static EngineWPtr g_instance;
 
+typedef std::shared_ptr<std::vector<Person>> PersonsInfoPtr;
 
+// class Engine
 EnginePtr Engine::create()
 {
     LOG_DEBUG(TAG, "create");
@@ -28,7 +31,6 @@ EnginePtr Engine::create()
 Engine::Engine()
 {
     detector = std::make_shared<Detector>();
-    multiTracker = std::make_shared<MultiTracker>();
 }
 
 Engine::~Engine()
@@ -66,6 +68,7 @@ void Engine::run()
 void Engine::start()
 {
     LOG_DEBUG(TAG, "start");
+    multiTracker = std::make_shared<MultiTracker>(g_instance);
     if (!reader){
         LOG_ERROR(TAG, "reader is null. exit");
         return;
@@ -78,3 +81,58 @@ void Engine::addObserver(EngineObserver *observer)
 {
     observer_list.insert(observer);
 }
+
+
+// WorkItem class for event thread
+class PersonInEventWorkItem : public WorkItem
+{
+public:
+    PersonInEventWorkItem(std::set<EngineObserver*> obs, PersonsInfoPtr info) : obs(obs), info(info){
+    }
+    ~PersonInEventWorkItem(){}
+    void run (){
+        for (auto o : obs){
+            o->onPersonsIn(*(info.get()));
+        }
+    }
+private:
+    std::set<EngineObserver*> obs;
+    PersonsInfoPtr info;
+};
+
+class PersonOutEventWorkItem : public WorkItem
+{
+public:
+    PersonOutEventWorkItem(std::set<EngineObserver*> obs, PersonsInfoPtr info) : obs(obs), info(info){
+    }
+    ~PersonOutEventWorkItem(){}
+    void run (){
+        for (auto o : obs){
+            o->onPersonsIn(*(info.get()));
+        }
+    }
+private:
+    std::set<EngineObserver*> obs;
+    PersonsInfoPtr info;
+};
+
+void Engine::onPersonsOut(const std::vector<Person>& p)
+{
+    PersonsInfoPtr pp = std::make_shared<std::vector<Person>>(p);
+    eventThread.enqueue(new PersonOutEventWorkItem(this->observer_list, pp));
+}
+
+void Engine::onPersonsIn(const std::vector<Person>& p)
+{
+    PersonsInfoPtr pp = std::make_shared<std::vector<Person>>(p);
+    eventThread.enqueue(new PersonInEventWorkItem(this->observer_list, pp));
+}
+
+void Engine::onStatusChanged()
+{
+    Person pm, p2;
+    std::vector<Person> ps;
+    ps.push_back(pm);
+    ps.push_back(p2);
+    onPersonsOut(ps);
+}
index b1f380e..0b4dab5 100644 (file)
@@ -13,7 +13,8 @@ static const cv::Size PREFERRED_SIZE = Size(64, 128);
 
 #define MaxCost  100000
 
-MultiTracker::MultiTracker()
+MultiTracker::MultiTracker(EngineWPtr e)
+: engine(e)
 {
     LOG_DEBUG(TAG, "init - loading model.pkl");
     predictor = PredictorWrapper::create("./python/model.pkl");
@@ -30,6 +31,11 @@ MultiTracker::~MultiTracker()
     trackers.clear();
 }
 
+static double calc_iou_ratio(const Detection& d1, const Detection& d2)
+{
+    // TODO
+    return 0.1;
+}
 
 static std::vector<double> similarity(const PatchPtr p1, const PatchPtr p2)
 {
@@ -60,8 +66,7 @@ static std::vector<double> similarity(const PatchPtr p1, const PatchPtr p2)
     double center_distance = sqrt(pow((d1.center_x - d2.center_x), 2) + pow((d1.center_y - d2.center_y), 2));
     feature.push_back(center_distance / (d1.width + d1.height + d2.width + d2.height) * 4);
 
-    //TODO
-    double iou_ratio = 0.03;
+    double iou_ratio = calc_iou_ratio(d1, d2);
     feature.push_back(iou_ratio);
 
     return feature;
@@ -82,32 +87,41 @@ double MultiTracker::distance(TrackerPtr tracker, const cv::Mat& image, const De
     return prob; 
 }
 
+static long cc = 0;
+
 void MultiTracker::update(unsigned int total, const Detection* detections, const Mat& image)
 {
+    //////
+    if ((cc % 50) == 0){
+        if (EnginePtr e = engine.lock()){
+            e->onStatusChanged();
+        }
+    }
+    cc++;
+
+    //////
     int row = trackers.size();
     int col = total;
     Eigen::MatrixXi cost_matrix = Eigen::MatrixXi::Zero(row, col);
     for (int i = 0; i < row; i++){
         for (int j = 0; j < col; j++){
-            // TODO
-            cost_matrix(i, j) = distance(trackers[i], image, detections[j]);
+            //if (calc_iou_ratio(trackers[i], detections[j]) < -0.1)
+            //    cost_matrix(i, j) = MaxCost;
+            //else
+                cost_matrix(i, j) = distance(trackers[i], image, detections[j]);
         }
     }
 
-    // assignment
     Eigen::VectorXi tracker_inds, bb_inds;
     linear_sum_assignment(cost_matrix, tracker_inds, bb_inds);
 
     // handle unmatched trackers
-    vector<TrackerPtr> unmatched_trackers;
+    //vector<TrackerPtr> unmatched_trackers;
     for (int i = 0; i < row; i++){
         if (!(tracker_inds.array() == i).any()){
-            unmatched_trackers.push_back(trackers[i]);
+            trackers[i]->updateState(image);
         }
     }
-    for (auto t : unmatched_trackers){
-        t->updateState(image);
-    }
 
     // handle unmatched detections
     vector<int> unmatched_detection;
@@ -155,7 +169,6 @@ PatchPtr MultiTracker::createPatch(const Mat& image, const Detection& detect)
     float sranges[] = {0, 256};
     const float* ranges[] = {hranges, sranges};
     calcHist(&hsv, 1, channels, Mat(), hist, 2, histSize, ranges, true, false);
-    Size sm = hist.size();
 
     patch->image_crop = im.clone();
     patch->detection = detect;
diff --git a/src/WorkerThread.cpp b/src/WorkerThread.cpp
new file mode 100644 (file)
index 0000000..c31d023
--- /dev/null
@@ -0,0 +1,50 @@
+#include "WorkerThread.h"
+#include <iostream>
+#include "Logger.h"
+
+using namespace suanzi;
+using namespace std;
+
+static const std::string TAG = "WorkerThread";
+
+WorkerThread::WorkerThread(const std::string& name) 
+: m_name(name)
+, m_thread (thread(&WorkerThread::run, this))
+{
+}
+
+WorkerThread::~WorkerThread()
+{
+}
+
+void WorkerThread::run()
+{
+    LOG_DEBUG(TAG, "start workerThread " + m_name);
+    WorkItemPtr workItem;
+    while(true){
+        unique_lock<mutex> l(m_mutex);
+        if (!m_queue.empty()){
+            workItem = m_queue.front();
+            m_queue.pop();
+            l.unlock();
+            workItem->run();
+            LOG_DEBUG(TAG, " ------ Queue Size: " << m_queue.size());
+        } else {
+            m_cond.wait(l);
+        }
+    }
+    LOG_DEBUG(TAG, "End workerThread " + m_name);
+}
+
+void WorkerThread::enqueue(WorkItemPtr item)
+{
+    unique_lock<mutex> l(m_mutex);
+    m_queue.push(item);
+    m_cond.notify_one();
+}
+
+void WorkerThread::enqueue(WorkItem* w)
+{
+    WorkItemPtr p (w);
+    enqueue(p);
+}