uv_ipc_common.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. #include "uv_ipc_common.h"
  2. #include <map>
  3. #ifdef _WIN32
  4. #else
  5. #include <unistd.h>
  6. #include <string.h>
  7. #endif
  8. void UVIPCLocker::Lock(bool write/* = true*/)
  9. {
  10. write ? uv_rwlock_wrlock(&uv_rw_locker) : uv_rwlock_rdlock(&uv_rw_locker);
  11. }
  12. void UVIPCLocker::Unlock(bool write/* = true*/)
  13. {
  14. write ? uv_rwlock_wrunlock(&uv_rw_locker) : uv_rwlock_rdunlock(&uv_rw_locker);
  15. }
  16. UVIPCLocker g_owner_locker;
  17. std::map<void*, UVIPC* > g_owner_list;
  18. //////////////////////////////////////////////////////////////////////////
  19. UVIPCMessage::UVIPCMessage(const char* in_buf, int in_len, bool using_external_buffer_)
  20. {
  21. buf = NULL;
  22. len = 0;
  23. using_external_buffer = using_external_buffer_;
  24. if (using_external_buffer)
  25. {
  26. buf = const_cast<char*>(in_buf);
  27. len = in_len;
  28. }
  29. else
  30. {
  31. if (in_len > 0)
  32. {
  33. //owner|buffer
  34. char* real_buf = (char*)malloc(in_len + UVIPC_MSG_HEADER_LEN);
  35. if (real_buf)
  36. {
  37. unsigned long long& owner_ = *(unsigned long long*)real_buf;
  38. owner_ = (unsigned long long)this;
  39. int& buf_len = *(int*)(real_buf + sizeof(unsigned long long));
  40. buf_len = in_len;
  41. buf = real_buf + UVIPC_MSG_HEADER_LEN;
  42. in_buf ? memcpy(buf, in_buf, in_len) : memset(buf, 0, in_len); //checked safe
  43. len = in_len;
  44. }
  45. }
  46. }
  47. ref_count.Inc();
  48. }
  49. UVIPCMessage::~UVIPCMessage()
  50. {
  51. if (buf && !using_external_buffer)
  52. {
  53. char* real_buf = buf - UVIPC_MSG_HEADER_LEN;
  54. free(real_buf);
  55. }
  56. }
  57. char* UVIPCMessage::GetRealBuf()
  58. {
  59. if (buf && !using_external_buffer)
  60. {
  61. return buf - UVIPC_MSG_HEADER_LEN;
  62. }
  63. return NULL;
  64. }
  65. int UVIPCMessage::GetRealLen()
  66. {
  67. if (buf && !using_external_buffer)
  68. {
  69. return len + UVIPC_MSG_HEADER_LEN;
  70. }
  71. return 0;
  72. }
  73. char* UVIPCMessage::GetBuf()
  74. {
  75. return buf;
  76. }
  77. int UVIPCMessage::GetLen()
  78. {
  79. return len;
  80. }
  81. //////////////////////////////////////////////////////////////////////////
  82. UVIPC::UVIPC()
  83. {
  84. buffered_ipc_msg = NULL;
  85. buffered_ipc_msg_offset = 0;
  86. _msg_callback_loop = NULL;
  87. }
  88. UVIPC::~UVIPC()
  89. {
  90. Stop();
  91. }
  92. bool UVIPC::IsRunning(bool check_connection/* = false*/)
  93. {
  94. if (check_connection)
  95. {
  96. if (uv_data.is_server_mode)
  97. return uv_data.ut.part_s.client ? true : false;
  98. else
  99. return uv_data.ut.part_c.connect ? true : false;
  100. }
  101. else
  102. {
  103. return uv_data.owner ? true : false;
  104. }
  105. }
  106. void uv_ipc_timer_cb(uv_timer_t* handle)
  107. {
  108. UVIPC::UVIPCTimerData* timer_data = (UVIPC::UVIPCTimerData*)handle;
  109. if (timer_data && timer_data->owener)
  110. {
  111. switch (timer_data->type)
  112. {
  113. case UVIPC::send_msg_timer:
  114. {
  115. timer_data->owener->uv_cb_ipc_sendmsg_timer(handle);
  116. }
  117. break;
  118. #ifndef USING_ASYNC_IN_MSGCALLBACK
  119. case UVIPC::recv_msg_timer:
  120. {
  121. timer_data->owener->uv_cb_ipc_recvmsg_timer(handle);
  122. }
  123. break;
  124. #endif
  125. default:
  126. break;
  127. }
  128. }
  129. }
  130. bool UVIPC::Start(char* name_, bool is_server, uv_loop_t* loop_, UVIPCSink* sink_, uv_loop_t* loop_msg_callback/* = NULL*/)
  131. {
  132. bool succ = false;
  133. do
  134. {
  135. if (IsRunning())
  136. {
  137. break;
  138. }
  139. if (NULL == name_ || NULL == loop_)
  140. break;
  141. succ = is_server ? StartWithServer(name_, is_server, loop_) : StartWithClient(name_, is_server, loop_);
  142. if (!succ)
  143. break;
  144. uv_data.uv_pipe_name = name_;
  145. uv_data.external_sink = sink_;
  146. //outgoing msg timer
  147. uv_timer_init(loop_, &uv_timer_data.uv_timer_req);
  148. uv_timer_start(&uv_timer_data.uv_timer_req, uv_ipc_timer_cb, 100, 30);
  149. uv_timer_data.owener = this;
  150. uv_timer_data.owner_ipc_data = &uv_data;
  151. uv_timer_data.type = send_msg_timer;
  152. //incoming msg timer if need
  153. if (loop_msg_callback)
  154. {
  155. _msg_callback_loop = loop_msg_callback;
  156. #ifdef USING_ASYNC_IN_MSGCALLBACK
  157. uv_async_init(_msg_callback_loop, &_msg_callback_event, msg_callback_thread_sync_proc);
  158. _msg_callback_event.data = (void*)this;
  159. #else
  160. StartMsgCallbackTimer(_msg_callback_loop);
  161. #endif
  162. }
  163. } while (false);
  164. return succ;
  165. }
  166. #ifdef USING_ASYNC_IN_MSGCALLBACK
  167. void UVIPC::SignalMsgCallbackThread()
  168. {
  169. uv_async_send(&_msg_callback_event);
  170. }
  171. void UVIPC::msg_callback_thread_sync_proc(uv_async_t* handle)
  172. {
  173. if (handle)
  174. {
  175. UVIPC* owner = (UVIPC*)handle->data;
  176. if (owner)
  177. owner->HandleUVMsgCallback(owner->uv_data.external_sink);
  178. }
  179. }
  180. #else
  181. void UVIPC::StartMsgCallbackTimer(uv_loop_t* msg_callback_loop)
  182. {
  183. if (msg_callback_loop)
  184. {
  185. uv_async_t* timer_async_t = new uv_async_t;
  186. if (timer_async_t)
  187. {
  188. #ifndef USING_ASYNC_IN_MSGCALLBACK
  189. async_handle_list.push_back(timer_async_t);
  190. #endif
  191. uv_async_init(msg_callback_loop, timer_async_t, start_msg_callback_timer_sync_proc);
  192. timer_async_t->data = (void*)this;
  193. uv_async_send(timer_async_t);
  194. }
  195. }
  196. }
  197. void UVIPC::start_msg_callback_timer_sync_proc(uv_async_t* handle)
  198. {
  199. if (handle && handle->data)
  200. {
  201. UVIPC* owner = (UVIPC*)handle->data;
  202. uv_timer_init(owner->_msg_callback_loop, &owner->msg_callback_uv_timer_data.uv_timer_req);
  203. uv_timer_start(&owner->msg_callback_uv_timer_data.uv_timer_req, uv_ipc_timer_cb, 100, 30);
  204. owner->msg_callback_uv_timer_data.owener = owner;
  205. owner->msg_callback_uv_timer_data.owner_ipc_data = &owner->uv_data;
  206. owner->msg_callback_uv_timer_data.type = recv_msg_timer;
  207. }
  208. }
  209. #endif
  210. bool UVIPC::Stop()
  211. {
  212. bool succ = false;
  213. do
  214. {
  215. if (!IsRunning())
  216. {
  217. succ = true;
  218. break;
  219. }
  220. if (uv_data.is_server_mode)
  221. {
  222. if (uv_data.ut.part_s.client)
  223. {
  224. uv_read_stop((uv_stream_t*)uv_data.ut.part_s.client);
  225. uv_timer_stop(&uv_timer_data.uv_timer_req);
  226. uv_close((uv_handle_t*)uv_data.ut.part_s.client, NULL);
  227. free(uv_data.ut.part_s.client);
  228. uv_data.ut.part_s.client = NULL;
  229. }
  230. uv_close((uv_handle_t*)&uv_data.handle, NULL);
  231. }
  232. else
  233. {
  234. if (uv_data.ut.part_c.connect)
  235. {
  236. uv_read_stop((uv_stream_t*)uv_data.ut.part_c.connect->handle);
  237. uv_timer_stop(&uv_timer_data.uv_timer_req);
  238. uv_close((uv_handle_t*)uv_data.ut.part_c.connect->handle, NULL);
  239. free(uv_data.ut.part_c.connect);
  240. uv_data.ut.part_c.connect= NULL;
  241. }
  242. }
  243. uv_data.uv_pipe_name.clear();
  244. uv_data.is_server_mode = false;
  245. uv_data.uv_looper = NULL;
  246. uv_data.external_sink = NULL;
  247. uv_data.owner = NULL;
  248. } while (false);
  249. if (buffered_ipc_msg)
  250. {
  251. free(buffered_ipc_msg);
  252. buffered_ipc_msg = NULL;
  253. }
  254. buffered_ipc_msg_offset = 0;
  255. _msg_callback_loop = NULL;
  256. #ifndef USING_ASYNC_IN_MSGCALLBACK
  257. uv_async_t* item = NULL;
  258. for (int i = 0; i < async_handle_list.size(); i++)
  259. {
  260. item = async_handle_list[i];
  261. if (item)
  262. delete item;
  263. }
  264. #endif
  265. return succ;
  266. }
  267. void on_pipe_client_connection_cb(uv_stream_t* server,int status)
  268. {
  269. UVIPC::UVIPCData* ipc_data = (UVIPC::UVIPCData*)server;
  270. if (NULL == ipc_data || NULL == ipc_data->owner)
  271. return;
  272. ipc_data->owner->uv_cb_client_connection_notify(server, status);
  273. }
  274. bool UVIPC::StartWithServer(char* name_, bool is_server, uv_loop_t* loop_)
  275. {
  276. bool succ = false;
  277. do
  278. {
  279. if (NULL == name_ || NULL == loop_ || !is_server)
  280. break;
  281. uv_fs_t req;
  282. uv_fs_unlink(loop_, &req, name_, NULL);
  283. uv_pipe_init(loop_, (uv_pipe_t*)&uv_data, 0);
  284. uv_data.owner = this;
  285. int ret=uv_pipe_bind((uv_pipe_t*)&uv_data, name_);
  286. if(ret)
  287. {
  288. break;
  289. }
  290. ret=uv_listen((uv_stream_t*)&uv_data, 128, on_pipe_client_connection_cb);
  291. if(ret)
  292. {
  293. break;
  294. }
  295. uv_data.is_server_mode = is_server;
  296. uv_data.uv_looper = loop_;
  297. succ = true;
  298. } while (false);
  299. return succ;
  300. }
  301. void on_server_connected_cb(uv_connect_t* req,int status)
  302. {
  303. UVIPC::UVIPCData* ipc_data = (UVIPC::UVIPCData*)req->handle;
  304. if (ipc_data && ipc_data->owner)
  305. ipc_data->owner->uv_cb_server_connected_notify(req, status);
  306. }
  307. bool UVIPC::StartWithClient(char* name_, bool is_server, uv_loop_t* loop_)
  308. {
  309. bool succ = false;
  310. do
  311. {
  312. if (is_server || NULL == name_
  313. || NULL == loop_ || NULL != uv_data.ut.part_c.connect)
  314. break;
  315. uv_data.owner = this;
  316. int ret = uv_pipe_init(loop_, (uv_pipe_t*)&uv_data, 0);
  317. uv_data.ut.part_c.connect = (uv_connect_t*)malloc(sizeof(uv_connect_t));
  318. uv_pipe_connect(uv_data.ut.part_c.connect, (uv_pipe_t*)&uv_data, name_, on_server_connected_cb);
  319. uv_data.is_server_mode = is_server;
  320. uv_data.uv_looper = loop_;
  321. succ = true;
  322. } while (false);
  323. return succ;
  324. }
  325. void alloc_buffer_for_uv_pipe(uv_handle_t* handle,size_t suggested_size,uv_buf_t* buf)
  326. {
  327. buf->base = (char*)malloc(suggested_size);
  328. buf->len = suggested_size;
  329. }
  330. void on_pipe_data_read_cb(uv_stream_t* client, ssize_t nread,const uv_buf_t* buf)
  331. {
  332. UVIPC* owner_ = NULL;
  333. {
  334. g_owner_locker.Lock(false);
  335. std::map<void *, UVIPC *>::iterator iter_ = g_owner_list.find(client);
  336. if (g_owner_list.end() != iter_)
  337. {
  338. owner_ = iter_->second;
  339. }
  340. g_owner_locker.Unlock(false);
  341. }
  342. if (owner_)
  343. owner_->uv_cb_pipe_data_read(client, nread, buf);
  344. }
  345. void UVIPC::uv_cb_client_connection_notify(uv_stream_t* server, int status)
  346. {
  347. if(status < 0 || NULL == uv_data.uv_looper
  348. || NULL != uv_data.ut.part_s.client)
  349. {
  350. return;
  351. }
  352. uv_data.ut.part_s.client = (uv_pipe_t*)malloc(sizeof(uv_pipe_t));
  353. if (NULL == uv_data.ut.part_s.client)
  354. {
  355. return;
  356. }
  357. uv_pipe_init(uv_data.uv_looper, uv_data.ut.part_s.client ,0);
  358. int ret=uv_accept(server, (uv_stream_t*)uv_data.ut.part_s.client);
  359. if(ret==0)
  360. {
  361. {
  362. g_owner_locker.Lock(true);
  363. g_owner_list.insert(std::make_pair((void*)uv_data.ut.part_s.client, this));
  364. g_owner_locker.Unlock(true);
  365. }
  366. if (uv_data.external_sink)
  367. uv_data.external_sink->onConnect();
  368. uv_read_start((uv_stream_t*)uv_data.ut.part_s.client, alloc_buffer_for_uv_pipe, on_pipe_data_read_cb);
  369. }
  370. }
  371. #define BUFFERED_MSG_MAX_LEN (30*1024*1024)
  372. void UVIPC::uv_cb_pipe_data_read(uv_stream_t* client, ssize_t nread,const uv_buf_t* buf)
  373. {
  374. if(nread<0)
  375. {
  376. if(nread!=UV_EOF)
  377. {
  378. }
  379. if (uv_data.external_sink)
  380. uv_data.external_sink->onDisconnect();
  381. Stop();
  382. }
  383. else if(nread > 0 && uv_data.external_sink)
  384. {
  385. if (NULL == buffered_ipc_msg)
  386. {
  387. buffered_ipc_msg = (char*)malloc(BUFFERED_MSG_MAX_LEN);
  388. buffered_ipc_msg_offset = 0;
  389. }
  390. const char* p(NULL), *end(NULL);
  391. if (buffered_ipc_msg_offset == 0)
  392. {
  393. p = buf->base;
  394. end = p + nread;
  395. }
  396. else
  397. {
  398. ssize_t buffer_remain = BUFFERED_MSG_MAX_LEN - buffered_ipc_msg_offset;
  399. if (buffer_remain > nread)
  400. {
  401. memcpy(buffered_ipc_msg + buffered_ipc_msg_offset, buf->base, nread); //checked safe
  402. buffered_ipc_msg_offset += nread;
  403. p = buffered_ipc_msg;
  404. end = p + buffered_ipc_msg_offset;
  405. }
  406. else
  407. {
  408. //fix me.
  409. }
  410. }
  411. while (p < end) {
  412. if (p + UVIPC_MSG_HEADER_LEN >= end)
  413. break;
  414. int msgLen = *(int*)(p+sizeof(unsigned long long));
  415. int dataLen = msgLen + UVIPC_MSG_HEADER_LEN;
  416. if (p + dataLen <= end)
  417. {
  418. char* real_buf = (char*)(p + UVIPC_MSG_HEADER_LEN);
  419. UVIPCMessage* msg = new UVIPCMessage(real_buf, msgLen);
  420. if (msg)
  421. {
  422. uv_data.owner->HandleMessageRecvNotification(msg);
  423. msg->Release();
  424. }
  425. p = p + dataLen;
  426. }
  427. else {
  428. // Last message is partial.
  429. break;
  430. }
  431. }
  432. memmove(buffered_ipc_msg, p, end - p); //checked safe
  433. buffered_ipc_msg_offset = end - p;
  434. }
  435. if (buf && buf->base)
  436. {
  437. free(buf->base);
  438. }
  439. }
  440. void UVIPC::uv_cb_pipe_data_write(uv_write_t* req, int status)
  441. {
  442. }
  443. void on_write_data_cb(uv_write_t* req,int status)
  444. {
  445. if (status == 0) {
  446. }
  447. uv_buf_t* w_buf = (uv_buf_t*)req->data;
  448. if (w_buf)
  449. {
  450. unsigned long long* real_buf = (unsigned long long*)(w_buf->base);
  451. UVIPCMessage* msg = (UVIPCMessage*)*real_buf;
  452. msg->Release();
  453. free(w_buf);
  454. }
  455. free(req);
  456. }
  457. void UVIPC::uv_cb_server_connected_notify(uv_connect_t* req,int status)
  458. {
  459. do
  460. {
  461. {
  462. g_owner_locker.Lock(true);
  463. g_owner_list.insert(std::make_pair((void*)req->handle, this));
  464. g_owner_locker.Unlock(true);
  465. }
  466. if (uv_data.external_sink)
  467. {
  468. (0 == status) ? uv_data.external_sink->onConnect() : uv_data.external_sink->onDisconnect();
  469. }
  470. if (0 == status)
  471. uv_read_start((uv_stream_t*)req->handle, alloc_buffer_for_uv_pipe, on_pipe_data_read_cb);
  472. else
  473. Stop();
  474. } while (false);
  475. }
  476. bool UVIPC::SendMessage(const char* buf, int len)
  477. {
  478. if (NULL == buf || len <= 0)
  479. return false;
  480. if (!IsRunning(true))
  481. return false;
  482. UVIPCMessage* msg = new UVIPCMessage(buf, len);
  483. outmsg_locker.Lock();
  484. outmsg_list.push_back(msg);
  485. outmsg_locker.Unlock();
  486. return true;
  487. }
  488. bool UVIPC::SendMessage(UVIPCMessage* msg)
  489. {
  490. if (!IsRunning(true))
  491. return false;
  492. outmsg_locker.Lock();
  493. outmsg_list.push_back(msg);
  494. outmsg_locker.Unlock();
  495. return true;
  496. }
  497. void UVIPC::uv_cb_ipc_sendmsg_timer(uv_timer_t* handle)
  498. {
  499. if (outmsg_list.size() <= 0
  500. || (uv_data.is_server_mode && NULL == uv_data.ut.part_s.client)
  501. || (!uv_data.is_server_mode && NULL == uv_data.ut.part_c.connect))
  502. {
  503. if (outmsg_list.size() <= 0 && uv_data.external_sink && IsRunning(true))
  504. uv_data.external_sink->onIdle();
  505. return;
  506. }
  507. std::list<UVIPCMessage* > tmp_outmsg_list;
  508. outmsg_locker.Lock();
  509. tmp_outmsg_list.swap(outmsg_list);
  510. outmsg_locker.Unlock();
  511. std::list<UVIPCMessage* >::iterator it_msg = tmp_outmsg_list.begin();
  512. while(tmp_outmsg_list.end() != it_msg)
  513. {
  514. UVIPCMessage* msg = *it_msg;
  515. if (msg)
  516. {
  517. uv_write_t* w_req = (uv_write_t*)malloc(sizeof(uv_write_t));
  518. uv_buf_t* w_buf = (uv_buf_t*)malloc(sizeof(uv_buf_t));
  519. w_buf->base = msg->GetRealBuf();
  520. w_buf->len = msg->GetRealLen();
  521. w_req->data = w_buf;
  522. int ret = uv_write(w_req, (uv_data.is_server_mode ? (uv_stream_t*)uv_data.ut.part_s.client : uv_data.ut.part_c.connect->handle),
  523. w_buf, 1, on_write_data_cb);
  524. it_msg = tmp_outmsg_list.erase(it_msg);
  525. }
  526. }
  527. }
  528. void UVIPC::HandleMessageRecvNotification(UVIPCMessage* msg)
  529. {
  530. if (NULL == _msg_callback_loop)
  531. {
  532. if (uv_data.external_sink)
  533. uv_data.external_sink->onMessageRecvNotification(msg);
  534. }
  535. else
  536. {
  537. msg->AddRef();
  538. inmsg_locker.Lock();
  539. inmsg_list.push_back(msg);
  540. inmsg_locker.Unlock();
  541. #ifdef USING_ASYNC_IN_MSGCALLBACK
  542. SignalMsgCallbackThread();
  543. #endif
  544. }
  545. }
  546. #ifndef USING_ASYNC_IN_MSGCALLBACK
  547. void UVIPC::uv_cb_ipc_recvmsg_timer(uv_timer_t* handle)
  548. {
  549. HandleUVMsgCallback(uv_data.external_sink);
  550. }
  551. #endif
  552. void UVIPC::HandleUVMsgCallback(UVIPCSink* external_sink)
  553. {
  554. if (external_sink)
  555. {
  556. std::list<UVIPCMessage* > tmp_inmsg_list;
  557. inmsg_locker.Lock();
  558. tmp_inmsg_list.swap(inmsg_list);
  559. inmsg_locker.Unlock();
  560. std::list<UVIPCMessage* >::iterator it_msg = tmp_inmsg_list.begin();
  561. while (tmp_inmsg_list.end() != it_msg)
  562. {
  563. UVIPCMessage* msg = *it_msg;
  564. if (msg)
  565. {
  566. external_sink->onMessageRecvNotification(msg);
  567. msg->Release();
  568. }
  569. it_msg++;
  570. }
  571. }
  572. }
  573. /////////////////////////////////////////
  574. static void z_sleep(unsigned int millisecond) {
  575. #ifdef _WIN32
  576. ::Sleep(millisecond);
  577. #else
  578. usleep(millisecond * 1000);
  579. #endif
  580. }
  581. void UVIPCInSubThread::work_thread_sync_proc(uv_async_t* handle) {
  582. if (handle && handle->data)
  583. {
  584. UVIPCInSubThread* p_work = (UVIPCInSubThread*)handle->data;
  585. if (0xccdd == p_work->_action)
  586. {
  587. p_work->_action = 0;
  588. uv_stop(p_work->_sub_thread_loop);
  589. }
  590. }
  591. }
  592. void UVIPCInSubThread::work_thread_proc(void* param)
  593. {
  594. if (NULL == param)
  595. return;
  596. UVIPCInSubThread* p_work = (UVIPCInSubThread*)param;
  597. p_work->_sub_thread_loop = new uv_loop_t;
  598. if (p_work->_sub_thread_loop)
  599. {
  600. uv_loop_init(p_work->_sub_thread_loop);
  601. uv_async_init(p_work->_sub_thread_loop, &p_work->_async, work_thread_sync_proc);
  602. p_work->BeforeSubThreadLoopRun();
  603. p_work->_running = true;
  604. uv_sem_post(&p_work->sem_start_thread);
  605. uv_run(p_work->_sub_thread_loop, UV_RUN_DEFAULT);
  606. p_work->AfterSubThreadLoopRun();
  607. uv_close((uv_handle_t*)&p_work->_async, NULL);
  608. uv_loop_close(p_work->_sub_thread_loop);
  609. p_work->_running = false;
  610. uv_sem_post(&p_work->sem_stop_thread);
  611. }
  612. if (p_work->_sub_thread_loop)
  613. {
  614. delete p_work->_sub_thread_loop;
  615. p_work->_sub_thread_loop = NULL;
  616. }
  617. }
  618. UVIPCInSubThread::UVIPCInSubThread()
  619. {
  620. Reset();
  621. }
  622. void UVIPCInSubThread::Reset()
  623. {
  624. _is_server_mode = false;
  625. memset(&_async, 0, sizeof(_async)); //checked safe
  626. _pipe_name.clear();
  627. _sub_thread_loop = NULL;
  628. _msg_callback_loop = NULL;
  629. _sink = NULL;
  630. _running = false;
  631. _action = 0;
  632. }
  633. UVIPCInSubThread::~UVIPCInSubThread()
  634. {
  635. }
  636. bool UVIPCInSubThread::StartInSubThread(char* name_, bool is_server, UVIPCSink* sink_, uv_loop_t* msg_callback_loop_ /*= NULL*/)
  637. {
  638. if (NULL == name_)
  639. return false;
  640. if (_running)
  641. return false;
  642. _pipe_name = name_;
  643. _sink = sink_;
  644. _is_server_mode = is_server;
  645. uv_thread_t work_thread;
  646. _msg_callback_loop = msg_callback_loop_;
  647. uv_sem_init(&sem_start_thread, 0);
  648. uv_thread_create(&work_thread, work_thread_proc, this);
  649. uv_sem_wait(&sem_start_thread);
  650. return true;
  651. }
  652. bool UVIPCInSubThread::Stop()
  653. {
  654. if (_running && 0xccdd != _action)
  655. {
  656. uv_sem_init(&sem_stop_thread, 0);
  657. _action = 0xccdd;
  658. _async.data = (void*)this;
  659. uv_async_send(&_async);
  660. uv_sem_wait(&sem_stop_thread);
  661. }
  662. return true;
  663. }
  664. void UVIPCInSubThread::BeforeSubThreadLoopRun()
  665. {
  666. UVIPC::Start(const_cast<char*>(_pipe_name.c_str()), _is_server_mode, _sub_thread_loop, _sink, _msg_callback_loop);
  667. }
  668. void UVIPCInSubThread::AfterSubThreadLoopRun()
  669. {
  670. UVIPC::Stop();
  671. }