fix8  version 1.4.0
Open Source C++ FIX Framework
connection.hpp
Go to the documentation of this file.
1 //-------------------------------------------------------------------------------------------------
2 /*
3 
4 Fix8 is released under the GNU LESSER GENERAL PUBLIC LICENSE Version 3.
5 
6 Fix8 Open Source FIX Engine.
7 Copyright (C) 2010-16 David L. Dight <fix@fix8.org>
8 
9 Fix8 is free software: you can redistribute it and / or modify it under the terms of the
10 GNU Lesser General Public License as published by the Free Software Foundation, either
11 version 3 of the License, or (at your option) any later version.
12 
13 Fix8 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
14 even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 
16 You should have received a copy of the GNU Lesser General Public License along with Fix8.
17 If not, see <http://www.gnu.org/licenses/>.
18 
19 BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE PROGRAM, TO
20 THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE
21 COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY
22 KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO
24 THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE,
25 YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
26 
27 IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT
28 HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE PROGRAM AS PERMITTED
29 ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
30 CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT
31 NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
32 THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH
33 HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
34 
35 */
36 //-------------------------------------------------------------------------------------------------
37 #ifndef FIX8_CONNECTION_HPP_
38 #define FIX8_CONNECTION_HPP_
39 
40 #include <Poco/Net/StreamSocket.h>
41 #include <Poco/Timespan.h>
42 #include <Poco/Net/NetException.h>
43 
44 //----------------------------------------------------------------------------------------
45 namespace FIX8 {
46 
47 class Session;
48 
49 //----------------------------------------------------------------------------------------
51 
52 template <typename T>
54 {
55 protected:
57  Poco::Net::StreamSocket *_sock;
62  volatile bool _started;
64 
65 private:
67 
68 public:
73  AsyncSocket(Poco::Net::StreamSocket *sock, Session& session, const ProcessModel pmodel=pm_pipeline)
74  : _sock(sock), _session(session), _pmodel(pmodel), _started(false), _thread(std::ref(*this)) {}
75 
77  virtual ~AsyncSocket() = default;
78 
81  size_t queued() const { return _msg_queue.size(); }
82 
85  bool started() const { return _started; }
86 
89  int operator()() { return execute(_cancellation_token); }
90 
94 
96  virtual void start()
97  {
98  if (!_started)
99  {
100  f8_scoped_lock guard(_start_mutex);
101  if (!_started)
102  {
103  _started = true;
104  _thread.start();
105  }
106  }
107  else
108  {
109  glout_warn << "AsyncSocket already started.";
110  }
111  }
112 
114  virtual void request_stop() { _thread.request_stop(); }
115 
117  virtual void quit() { _thread.request_stop(); join(); }
118 
121  Poco::Net::StreamSocket *socket() { return _sock; }
122 
125  int join() { return _thread.join(); }
126 
130 };
131 
132 //----------------------------------------------------------------------------------------
134 class FIXReader : public AsyncSocket<f8String>
135 {
138 
141 
142 #if FIX8_EXPERIMENTAL_BUFFERED_SOCKET_READ
143  char _read_buffer[_max_msg_len*2];
144  char *_read_buffer_rptr, *_read_buffer_wptr;
145 #endif
146 
150 
151  size_t _bg_sz; // 8=FIXx.x^A9=x
152 
156  bool read(f8String& to);
157 
158 #if FIX8_EXPERIMENTAL_BUFFERED_SOCKET_READ
159 
163  int sockRead(char *where, size_t sz)
164  {
165  const size_t available_in_buffer(static_cast<size_t>(_read_buffer_wptr - _read_buffer_rptr));
166  if (available_in_buffer < sz)
167  realSockRead(sz - available_in_buffer, _max_msg_len);
168  sz = std::min((size_t)(_read_buffer_wptr-_read_buffer_rptr), sz);
169  memcpy(where, _read_buffer_rptr, sz);
170  _read_buffer_rptr += sz;
171  const size_t shift(_max_msg_len);
172  if (static_cast<size_t>(_read_buffer_rptr - _read_buffer) >= shift)
173  {
174  memcpy(_read_buffer, &_read_buffer[shift], sizeof(_read_buffer) - shift);
175  _read_buffer_rptr -= shift;
176  _read_buffer_wptr -= shift;
177  }
178  return sz;
179  }
180 
186  int realSockRead(size_t sz, size_t maxsz)
187  {
188  const size_t max_sz(_read_buffer + sizeof(_read_buffer) - _read_buffer_wptr);
189  int maxremaining(std::min(maxsz, max_sz)), remaining(std::min(sz, max_sz));
190  char *ptr(_read_buffer_wptr), *eptr(_read_buffer + sizeof(_read_buffer));
191 
192  int rdsz(0);
193  while (remaining > 0 && _read_buffer_wptr < eptr)
194  {
195  rdsz = _sock->receiveBytes(_read_buffer_wptr, maxremaining);
196  if (rdsz <= 0)
197  {
198  if (errno == EAGAIN
199 #if defined EWOULDBLOCK && EAGAIN != EWOULDBLOCK
200  || errno == EWOULDBLOCK
201 #endif
202  )
203  continue;
204  throw PeerResetConnection("sockRead: connection gone");
205  }
206  _read_buffer_wptr += rdsz;
207  remaining -= rdsz;
208  maxremaining -= rdsz;
209  }
210  return _read_buffer_wptr - ptr;
211  }
212 #else
213 
217  int sockRead(char *where, const size_t sz)
218  {
219  unsigned remaining(static_cast<unsigned>(sz)), rddone(0);
220  while (remaining > 0)
221  {
222  const int rdSz(_sock->receiveBytes(where + rddone, remaining));
223  if (rdSz <= 0)
224  {
225  if (errno == EAGAIN
226 #if defined EWOULDBLOCK && EAGAIN != EWOULDBLOCK
227  || errno == EWOULDBLOCK
228 #endif
229  )
230  continue;
231  throw PeerResetConnection("sockRead: connection gone");
232  }
233  rddone += rdSz;
234  remaining -= rdSz;
235  }
236  return rddone;
237  }
238 #endif
239 
240 public:
245  FIXReader(Poco::Net::StreamSocket *sock, Session& session, const ProcessModel pmodel=pm_pipeline)
246  : AsyncSocket<f8String>(sock, session, pmodel), _callback_thread(std::ref(*this), &FIXReader::callback_processor)
247 #if FIX8_EXPERIMENTAL_BUFFERED_SOCKET_READ
248  , _read_buffer(), _read_buffer_rptr(_read_buffer), _read_buffer_wptr(_read_buffer)
249 #endif
250  , _bg_sz()
251  {
252  set_preamble_sz();
253  }
254 
256  virtual ~FIXReader()
257  {
258  stop();
259  join();
260  }
261 
263  virtual void start()
264  {
265  _socket_error = false;
266  if (_pmodel != pm_coro)
268  if (_pmodel == pm_pipeline)
269  {
270  if (_callback_thread.start())
271  _socket_error = true;
272  }
273  }
274 
276  virtual void quit()
277  {
278  if (_pmodel == pm_pipeline)
279  {
280  _callback_thread.request_stop();
281  _callback_thread.join();
282  }
283  if (_pmodel != pm_coro)
285  }
286 
288  virtual void stop()
289  {
290  if (_started)
291  {
293  if (_started)
294  {
295  if (_pmodel == pm_pipeline)
296  {
297  const f8String from;
298  _msg_queue.try_push(from);
299  _callback_thread.request_stop();
300  }
301  if (_pmodel != pm_coro)
303  }
304  }
305  else
306  {
307  glout_warn << "FIXReader: AsyncSocket already stopped.";
308  }
309  }
310 
315 
318  int join() { return _pmodel != pm_coro ? AsyncSocket<f8String>::join() : -1; }
319 
321  F8API void set_preamble_sz();
322 
325  bool is_socket_error() const { return _socket_error; }
326 
330  bool poll(const Poco::Timespan &ts = Poco::Timespan()) const
331  {
332  return _sock->poll(ts, Poco::Net::Socket::SELECT_READ);
333  }
334 
336 };
337 
338 //----------------------------------------------------------------------------------------
340 class FIXWriter : public AsyncSocket<Message *>
341 {
343 
344 public:
349  FIXWriter(Poco::Net::StreamSocket *sock, Session& session, const ProcessModel pmodel=pm_pipeline)
350  : AsyncSocket<Message *>(sock, session, pmodel) {}
351 
353  virtual ~FIXWriter()
354  {
355  quit();
356  }
357 
362  bool write(Message *from, bool destroy)
363  {
364  if (_pmodel == pm_pipeline) // pipeline mode ignores destroy flag
365  return _msg_queue.try_push(from);
366  f8_scoped_spin_lock guard(_con_spl);
367  if (destroy)
368  {
369  std::unique_ptr<Message> msg(from);
370  return _session.send_process(msg.get());
371  }
372  return _session.send_process(from);
373  }
374 
379  size_t write_batch(const std::vector<Message *>& msgs, bool destroy)
380  {
381  if (msgs.empty())
382  return 0;
383  if (msgs.size() == 1)
384  return write(msgs.front(), destroy) ? 1 : 0;
385  size_t result(0);
386  f8_scoped_spin_lock guard(_con_spl);
387  for (std::vector<Message *>::const_iterator itr(msgs.begin()), eitr(msgs.end()), litr(eitr-1); itr != eitr; ++itr)
388  {
389  Message* msg = *itr;
390  msg->set_end_of_batch(itr == litr);
391  if (_pmodel == pm_pipeline) // pipeline mode ignores destroy flag
392  {
393  _msg_queue.try_push(msg);
394  ++result;
395  continue;
396  }
397  if (_session.send_process(msg))
398  ++result;
399  }
400  if (destroy && _pmodel != pm_pipeline)
401  {
402  for (std::vector<Message *>::const_iterator itr(msgs.begin()), eitr(msgs.end()); itr != eitr; ++itr)
403  {
404  std::unique_ptr<Message> smsg(*itr);
405  }
406  }
408  return result;
409  }
412  int join() { return _pmodel == pm_pipeline ? AsyncSocket<Message *>::join() : -1; }
413 
417  bool write(Message& from)
418  {
419  if (_pmodel == pm_pipeline) // not permitted if pipeling
420  throw f8Exception("cannot send message directly if pipelining");
421  f8_scoped_spin_lock guard(_con_spl);
422  return _session.send_process(&from);
423  }
424 
428  bool poll(const Poco::Timespan &ts = Poco::Timespan()) const
429  {
430  return _sock->poll(ts, Poco::Net::Socket::SELECT_WRITE);
431  }
432 
438  int send(const char *data, size_t remaining, bool nb=false)
439  {
440  unsigned wrdone(0);
441 
442  while (remaining > 0)
443  {
444  const int wrtSz(_sock->sendBytes(data + wrdone, static_cast<int>(remaining)));
445  if (wrtSz < 0)
446  {
447  if (errno == EAGAIN
448 #if defined EWOULDBLOCK && EAGAIN != EWOULDBLOCK
449  || errno == EWOULDBLOCK
450 #endif
451  )
452  {
453  if (nb)
454  return wrdone;
455  continue;
456  }
457  throw PeerResetConnection("send: connection gone");
458  }
459 
460  wrdone += wrtSz;
461  remaining -= wrtSz;
462  }
463 
464  return wrdone;
465  }
466 
468  virtual void start()
469  {
470  if (_pmodel == pm_pipeline)
472  }
473 
475  virtual void stop()
476  {
477  if (_started)
478  {
480  if (_started)
481  {
482  _msg_queue.try_push(0);
483  if (_pmodel == pm_pipeline)
485  }
486  }
487  else
488  {
489  glout_warn << "FIXWriter: AsyncSocket already stopped.";
490  }
491  }
492 
496 
498  virtual void quit()
499  {
500  if (_pmodel == pm_pipeline)
502  }
503 
504 };
505 
506 //----------------------------------------------------------------------------------------
509 {
510 public:
513 
514 protected:
515  Poco::Net::StreamSocket *_sock;
516  Poco::Net::SocketAddress _addr;
522 
525  bool _secured;
526 
527 public:
537  Connection(Poco::Net::StreamSocket *sock, Poco::Net::SocketAddress& addr, Session &session, Role role,
538  const ProcessModel pmodel, unsigned hb_interval, bool secured)
539  : _sock(sock), _addr(addr), _connected(role == cn_acceptor), _session(session), _role(role), _pmodel(pmodel),
540  _hb_interval(hb_interval), _hb_interval20pc(hb_interval + hb_interval / 5),
541  _reader(sock, session, pmodel), _writer(sock, session, pmodel),
542  _secured(secured) {}
543 
545  virtual ~Connection() { stop(); _session.clear_connection(this); }
546 
549  Role get_role() const { return _role; }
550 
553  ProcessModel get_pmodel() const { return _pmodel; }
554 
557  bool is_secure() const { return _secured; }
558 
560  F8API void start();
561 
563  F8API void stop();
564 
567  virtual bool connect() { return _connected; }
568 
571  bool is_connected() const { return _connected; }
572 
577  virtual bool write(Message *from, bool destroy=true) { return _writer.write(from, destroy); }
578 
582  virtual bool write(Message& from) { return _writer.write(from); }
583 
588  size_t write_batch(const std::vector<Message *>& msgs, bool destroy) { return _writer.write_batch(msgs, destroy); }
589 
594  int send(const char *from, size_t sz) { return _writer.send(from, sz); }
595 
599  int send(const f8String& from) { return _writer.send(from.data(), from.size()); }
600 
603  void set_hb_interval(const unsigned hb_interval)
604  { _hb_interval = hb_interval; _hb_interval20pc = hb_interval + hb_interval / 5; }
605 
608  unsigned get_hb_interval() const { return _hb_interval; }
609 
612  unsigned get_hb_interval20pc() const { return _hb_interval20pc; }
613 
616  Poco::Net::SocketAddress get_peer_socket_address() const { return _sock->peerAddress(); }
617 
620  const Poco::Net::SocketAddress& get_socket_address() const { return _addr; }
621 
624  int join() { return _reader.join(); }
625 
628  bool is_socket_error() const { return _reader.is_socket_error(); }
629 
633  static void set_recv_buf_sz(const unsigned sz, Poco::Net::Socket *sock)
634  {
635  const unsigned current_sz(sock->getReceiveBufferSize());
636  sock->setReceiveBufferSize(sz);
637  glout_info << "fd(" << sock->impl()->sockfd() << ") ReceiveBufferSize old:" << current_sz
638  << " requested:" << sz << " new:" << sock->getReceiveBufferSize();
639  }
640 
644  static void set_send_buf_sz(const unsigned sz, Poco::Net::Socket *sock)
645  {
646  const unsigned current_sz(sock->getSendBufferSize());
647  sock->setSendBufferSize(sz);
648  glout_info << "fd(" << sock->impl()->sockfd() << ") SendBufferSize old:" << current_sz
649  << " requested:" << sz << " new:" << sock->getSendBufferSize();
650  }
653  void set_recv_buf_sz(const unsigned sz) const { set_recv_buf_sz(sz, _sock); }
654 
657  void set_send_buf_sz(const unsigned sz) const { set_send_buf_sz(sz, _sock); }
658 
661  void set_tcp_cork_flag(bool way) const
662  {
663 #if defined FIX8_HAVE_DECL_TCP_CORK && TCP_CORK != 0
664  _sock->setOption(IPPROTO_TCP, TCP_CORK, way ? 1 : 0);
665 #endif
666  }
667 
670  Session& get_session() { return _session; }
671 
674  int reader_execute() { return _reader.execute(_reader.cancellation_token()); }
675 
679  bool reader_poll(const Poco::Timespan &ts = Poco::Timespan()) const { return _reader.poll(ts); }
680 
683  int writer_execute() { return _writer.execute(_writer.cancellation_token()); }
684 
688  bool writer_poll(const Poco::Timespan &ts = Poco::Timespan()) const { return _writer.poll(ts); }
689 };
690 
691 //-------------------------------------------------------------------------------------------------
694 {
695  const bool _no_delay;
696 
697 public:
707  ClientConnection(Poco::Net::StreamSocket *sock, Poco::Net::SocketAddress& addr,
708  Session &session, unsigned hb_interval, ProcessModel pmodel=pm_pipeline, bool no_delay=true,
709  bool secured=false)
710  : Connection(sock, addr, session, cn_initiator, pmodel, hb_interval, secured), _no_delay(no_delay) {}
711 
713  virtual ~ClientConnection() {}
714 
717  F8API bool connect();
718 };
719 
720 //-------------------------------------------------------------------------------------------------
723 {
724 public:
737  ServerConnection(Poco::Net::StreamSocket *sock, Poco::Net::SocketAddress& addr,
738  Session &session, unsigned hb_interval, ProcessModel pmodel=pm_pipeline, bool no_delay=true,
739  bool reuse_addr=false, int linger=-1, bool keepalive=false, bool secured=false) :
740  Connection(sock, addr, session, cn_acceptor, pmodel, hb_interval, secured)
741  {
742  _sock->setLinger(linger >= 0, linger);
743  _sock->setNoDelay(no_delay);
744  _sock->setReuseAddress(reuse_addr);
745  _sock->setKeepAlive(keepalive);
746  }
747 
749  virtual ~ServerConnection() {}
750 };
751 
752 //-------------------------------------------------------------------------------------------------
753 // our buffered RAII ostream log target for Connection session member
754 #define scout ssout_info((&_session))
755 #define scout_info ssout_info((&_session))
756 #define scout_warn ssout_warn((&_session))
757 #define scout_error ssout_error((&_session))
758 #define scout_fatal ssout_fatal((&_session))
759 #define scout_debug ssout_debug((&_session))
760 
761 //-------------------------------------------------------------------------------------------------
762 
763 } // FIX8
764 
765 #endif // FIX8_CONNECTION_HPP_
virtual void start()
Start the processing threads.
Definition: connection.hpp:468
f8_atomic< bool > _connected
Definition: connection.hpp:517
Client (initiator) specialisation of Connection.
Definition: connection.hpp:693
unsigned get_hb_interval() const
Definition: connection.hpp:608
Session & _session
Definition: connection.hpp:59
ClientConnection(Poco::Net::StreamSocket *sock, Poco::Net::SocketAddress &addr, Session &session, unsigned hb_interval, ProcessModel pmodel=pm_pipeline, bool no_delay=true, bool secured=false)
Definition: connection.hpp:707
virtual int execute(f8_thread_cancellation_token &cancellation_token)
Definition: connection.hpp:93
void set_hb_interval(const unsigned hb_interval)
Definition: connection.hpp:603
ProcessModel _pmodel
Definition: connection.hpp:60
f8_thread_cancellation_token & callback_cancellation_token()
Definition: connection.hpp:335
f8String data
Definition: field.hpp:2036
Fix message writer.
Definition: connection.hpp:340
virtual ~AsyncSocket()=default
Dtor.
FIXReader _reader
Definition: connection.hpp:523
virtual F8API int execute(f8_thread_cancellation_token &cancellation_token)
Definition: connection.cpp:45
Fix8 Base Session. User sessions are derived from this class.
Definition: session.hpp:394
generic pthread_mutex wrapper
Definition: thread.hpp:299
size_t write_batch(const std::vector< Message * > &msgs, bool destroy)
Definition: connection.hpp:588
virtual ~ClientConnection()
Dtor.
Definition: connection.hpp:713
static void set_recv_buf_sz(const unsigned sz, Poco::Net::Socket *sock)
Definition: connection.hpp:633
AsyncSocket(Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
Definition: connection.hpp:73
bool reader_poll(const Poco::Timespan &ts=Poco::Timespan()) const
Definition: connection.hpp:679
bool write(Message *from, bool destroy)
Definition: connection.hpp:362
Fix message reader.
Definition: connection.hpp:134
F8API void stop()
Stop the reader and writer threads.
Definition: connection.cpp:322
Poco::Net::StreamSocket * _sock
Definition: connection.hpp:515
Server (acceptor) specialisation of Connection.
Definition: connection.hpp:722
ProcessModel get_pmodel() const
Definition: connection.hpp:553
FIXWriter _writer
Definition: connection.hpp:524
virtual void quit()
Stop the processing thread and quit.
Definition: connection.hpp:117
unsigned _hb_interval20pc
Definition: connection.hpp:521
virtual void quit()
Stop the processing threads and quit.
Definition: connection.hpp:498
int send(const f8String &from)
Definition: connection.hpp:599
Thread wrapper. Ctor provides T instance and specifies ptr to member to call or defaults to operator(...
Definition: thread.hpp:245
Role
Roles: acceptor, initiator or unknown.
Definition: connection.hpp:512
Role get_role() const
Definition: connection.hpp:549
Complete Fix connection (reader and writer).
Definition: connection.hpp:508
F8API void set_preamble_sz()
Calculate the length of the Fix message preamble, e.g. "8=FIX.4.4^A9=".
Definition: connection.cpp:201
Connection(Poco::Net::StreamSocket *sock, Poco::Net::SocketAddress &addr, Session &session, Role role, const ProcessModel pmodel, unsigned hb_interval, bool secured)
Definition: connection.hpp:537
virtual void start()
Start the processing threads.
Definition: connection.hpp:263
ProcessModel
Supported session process models.
Definition: f8types.hpp:56
virtual void start()
Start the processing thread.
Definition: connection.hpp:96
Thread cancellation token.
Definition: thread.hpp:206
f8_thread_cancellation_token _cancellation_token
Definition: connection.hpp:61
f8_thread< AsyncSocket > _thread
Definition: connection.hpp:66
f8_spin_lock _con_spl
Definition: connection.hpp:342
#define glout_warn
Definition: logger.hpp:604
bool is_socket_error() const
Definition: connection.hpp:628
static void set_send_buf_sz(const unsigned sz, Poco::Net::Socket *sock)
Definition: connection.hpp:644
f8_mutex _start_mutex
Definition: connection.hpp:63
bool read(f8String &to)
Definition: connection.cpp:207
void clear_connection(const Connection *connection)
Definition: session.hpp:586
bool is_connected() const
Definition: connection.hpp:571
Half duplex async socket wrapper with thread.
Definition: connection.hpp:53
virtual int join(int timeoutInMs=0)
Definition: thread.hpp:126
Base exception class.
Definition: f8exception.hpp:49
virtual void quit()
Stop the processing threads and quit.
Definition: connection.hpp:276
#define F8API
Definition: f8dll.h:60
bool poll(const Poco::Timespan &ts=Poco::Timespan()) const
Definition: connection.hpp:428
Poco::Net::StreamSocket * _sock
Definition: connection.hpp:57
F8API void start()
Start the reader and writer threads.
Definition: connection.cpp:315
#define FIX8_MAX_MSG_LENGTH
Definition: f8config.h:576
size_t queued() const
Definition: connection.hpp:81
bool write(Message &from)
Definition: connection.hpp:417
f8_thread_cancellation_token _callback_cancellation_token
Definition: connection.hpp:140
const Poco::Net::SocketAddress & get_socket_address() const
Definition: connection.hpp:620
Poco::Net::StreamSocket * socket()
Definition: connection.hpp:121
#define glout_info
Definition: logger.hpp:601
f8_thread_cancellation_token & cancellation_token()
Definition: connection.hpp:129
virtual ~FIXWriter()
Dtor.
Definition: connection.hpp:353
f8_atomic< bool > _socket_error
Definition: connection.hpp:137
bool is_secure() const
Definition: connection.hpp:557
virtual bool write(Message &from)
Definition: connection.hpp:582
void request_stop()
Definition: thread.hpp:293
void set_end_of_batch(bool is_end_of_batch)
Definition: message.hpp:1275
bool started() const
Definition: connection.hpp:85
virtual bool connect()
Definition: connection.hpp:567
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
std::atomic< T > f8_atomic
Definition: thread.hpp:55
generic spin_lock wrapper
Definition: thread.hpp:364
F8API bool send_process(Message *msg)
Definition: session.cpp:1009
tbb::concurrent_bounded_queue< T > f8_concurrent_queue
Definition: mpmc.hpp:48
virtual ~FIXReader()
Dtor.
Definition: connection.hpp:256
f8_concurrent_queue< T > _msg_queue
Definition: connection.hpp:58
void set_recv_buf_sz(const unsigned sz) const
Definition: connection.hpp:653
A connected peer has reset the connection (disconnected).
Poco::Net::SocketAddress get_peer_socket_address() const
Definition: connection.hpp:616
virtual void request_stop()
Start the processing thread.
Definition: connection.hpp:114
Poco::Net::SocketAddress _addr
Definition: connection.hpp:516
void set_tcp_cork_flag(bool way) const
Definition: connection.hpp:661
bool writer_poll(const Poco::Timespan &ts=Poco::Timespan()) const
Definition: connection.hpp:688
void set_send_buf_sz(const unsigned sz) const
Definition: connection.hpp:657
size_t write_batch(const std::vector< Message * > &msgs, bool destroy)
Definition: connection.hpp:379
virtual ~Connection()
Dtor.
Definition: connection.hpp:545
Session & _session
Definition: connection.hpp:518
unsigned get_hb_interval20pc() const
Definition: connection.hpp:612
int send(const char *from, size_t sz)
Definition: connection.hpp:594
virtual ~ServerConnection()
Dtor.
Definition: connection.hpp:749
unsigned _hb_interval
Definition: connection.hpp:521
FIXWriter(Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
Definition: connection.hpp:349
virtual F8API int execute(f8_thread_cancellation_token &cancellation_token)
Definition: connection.cpp:269
FIXReader(Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
Definition: connection.hpp:245
ProcessModel _pmodel
Definition: connection.hpp:520
volatile bool _started
Definition: connection.hpp:62
ServerConnection(Poco::Net::StreamSocket *sock, Poco::Net::SocketAddress &addr, Session &session, unsigned hb_interval, ProcessModel pmodel=pm_pipeline, bool no_delay=true, bool reuse_addr=false, int linger=-1, bool keepalive=false, bool secured=false)
Definition: connection.hpp:737
int send(const char *data, size_t remaining, bool nb=false)
Definition: connection.hpp:438
virtual void stop()
Send a message to the processing method instructing it to quit.
Definition: connection.hpp:288
virtual void stop()
Send a message to the processing method instructing it to quit.
Definition: connection.hpp:475
int sockRead(char *where, const size_t sz)
Definition: connection.hpp:217
std::string f8String
Definition: f8types.hpp:47
Session & get_session()
Definition: connection.hpp:670
bool is_socket_error() const
Definition: connection.hpp:325
F8API int callback_processor()
Definition: connection.cpp:164
f8_thread< FIXReader > _callback_thread
Definition: connection.hpp:139
virtual bool write(Message *from, bool destroy=true)
Definition: connection.hpp:577
bool poll(const Poco::Timespan &ts=Poco::Timespan()) const
Definition: connection.hpp:330