fix8  version 1.4.0
Open Source C++ FIX Framework
FIX8::FIXWriter Class Reference

Fix message writer. More...

#include <connection.hpp>

Inheritance diagram for FIX8::FIXWriter:
FIX8::AsyncSocket< Message * >

Public Member Functions

 FIXWriter (Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
 
virtual ~FIXWriter ()
 Dtor. More...
 
bool write (Message *from, bool destroy)
 
size_t write_batch (const std::vector< Message * > &msgs, bool destroy)
 
int join ()
 
bool write (Message &from)
 
bool poll (const Poco::Timespan &ts=Poco::Timespan()) const
 
int send (const char *data, size_t remaining, bool nb=false)
 
virtual void start ()
 Start the processing threads. More...
 
virtual void stop ()
 Send a message to the processing method instructing it to quit. More...
 
virtual F8API int execute (f8_thread_cancellation_token &cancellation_token)
 
virtual void quit ()
 Stop the processing threads and quit. More...
 
- Public Member Functions inherited from FIX8::AsyncSocket< Message * >
 AsyncSocket (Poco::Net::StreamSocket *sock, Session &session, const ProcessModel pmodel=pm_pipeline)
 
virtual ~AsyncSocket ()=default
 Dtor. More...
 
size_t queued () const
 
bool started () const
 
int operator() ()
 
virtual void request_stop ()
 Start the processing thread. More...
 
Poco::Net::StreamSocket * socket ()
 
int join ()
 
f8_thread_cancellation_tokencancellation_token ()
 

Private Attributes

f8_spin_lock _con_spl
 

Additional Inherited Members

- Protected Attributes inherited from FIX8::AsyncSocket< Message * >
coroutine _coro
 
Poco::Net::StreamSocket * _sock
 
f8_concurrent_queue< Message * > _msg_queue
 
Session_session
 
ProcessModel _pmodel
 
f8_thread_cancellation_token _cancellation_token
 
volatile bool _started
 
f8_mutex _start_mutex
 

Detailed Description

Fix message writer.

Definition at line 340 of file connection.hpp.

Constructor & Destructor Documentation

FIX8::FIXWriter::FIXWriter ( Poco::Net::StreamSocket *  sock,
Session session,
const ProcessModel  pmodel = pm_pipeline 
)
inline

Ctor.

Parameters
sockconnected socket
sessionsession
pmodelprocess model

Definition at line 349 of file connection.hpp.

350  : AsyncSocket<Message *>(sock, session, pmodel) {}
virtual FIX8::FIXWriter::~FIXWriter ( )
inlinevirtual

Dtor.

Definition at line 353 of file connection.hpp.

References quit().

354  {
355  quit();
356  }
virtual void quit()
Stop the processing threads and quit.
Definition: connection.hpp:498

Member Function Documentation

int FIXWriter::execute ( f8_thread_cancellation_token cancellation_token)
virtual

Writer thread method. Reads messages from the queue and sends them over the socket.

Returns
0 on success

Reimplemented from FIX8::AsyncSocket< Message * >.

Definition at line 269 of file connection.cpp.

References scout_error, scout_info, and FIX8::f8Exception::what().

Referenced by FIX8::Connection::writer_execute().

270 {
271  int result(0), processed(0), invalid(0);
272 
273  while (!cancellation_token && !_session.is_shutdown())
274  {
275  try
276  {
277  Message *inmsg(0);
278  _msg_queue.pop (inmsg); // will block
279  if (!inmsg)
280  break;
281  unique_ptr<Message> msg(inmsg);
282  _session.send_process(msg.get());
283  ++processed;
284  }
285  catch (PeerResetConnection& e)
286  {
287  scout_error << e.what();
288  result = -1;
289  break;
290  }
291  catch (Poco::Net::NetException& e)
292  {
293  scout_error << e.what();
294  ++invalid;
295  break;
296  }
297  catch (exception& e)
298  {
299  scout_error << e.what();
300  ++invalid;
301  break; //?
302  }
303  }
304 
305  scout_info << "FIXWriter: " << processed << " messages processed, " << invalid << " invalid";
306 
307  {
309  _started = false;
310  }
311  return result;
312 }
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
F8API bool send_process(Message *msg)
Definition: session.cpp:1009
f8_concurrent_queue< Message * > _msg_queue
Definition: connection.hpp:58
A connected peer has reset the connection (disconnected).
#define scout_info
Definition: connection.hpp:755
const char * what() const
Definition: f8exception.hpp:85
bool is_shutdown()
Definition: session.hpp:778
#define scout_error
Definition: connection.hpp:757
int FIX8::FIXWriter::join ( )
inline

Wait till writer thead has finished.

Returns
0 on success

Definition at line 412 of file connection.hpp.

References FIX8::AsyncSocket< Message * >::_pmodel, FIX8::AsyncSocket< T >::join(), and FIX8::pm_pipeline.

bool FIX8::FIXWriter::poll ( const Poco::Timespan &  ts = Poco::Timespan()) const
inline

Check to see if a write would block

Parameters
tstimeout
Returns
true if a write would block

Definition at line 428 of file connection.hpp.

References FIX8::AsyncSocket< Message * >::_sock.

Referenced by FIX8::Connection::writer_poll().

429  {
430  return _sock->poll(ts, Poco::Net::Socket::SELECT_WRITE);
431  }
Poco::Net::StreamSocket * _sock
Definition: connection.hpp:57
virtual void FIX8::FIXWriter::quit ( )
inlinevirtual

Stop the processing threads and quit.

Reimplemented from FIX8::AsyncSocket< Message * >.

Definition at line 498 of file connection.hpp.

References FIX8::AsyncSocket< Message * >::_pmodel, FIX8::pm_pipeline, and FIX8::AsyncSocket< T >::quit().

Referenced by ~FIXWriter().

499  {
500  if (_pmodel == pm_pipeline)
502  }
virtual void quit()
Stop the processing thread and quit.
Definition: connection.hpp:117
int FIX8::FIXWriter::send ( const char *  data,
size_t  remaining,
bool  nb = false 
)
inline

Send message over socket.

Parameters
datachar * buffer to send
remainingnumber of bytes
nb- if true, don't block
Returns
number of bytes sent

Definition at line 438 of file connection.hpp.

References FIX8::AsyncSocket< Message * >::_sock.

Referenced by FIX8::Connection::send().

439  {
440  unsigned wrdone(0);
441 
442  while (remaining > 0)
443  {
444  const int wrtSz(_sock->sendBytes(data + wrdone, static_cast<int>(remaining)));
445  if (wrtSz < 0)
446  {
447  if (errno == EAGAIN
448 #if defined EWOULDBLOCK && EAGAIN != EWOULDBLOCK
449  || errno == EWOULDBLOCK
450 #endif
451  )
452  {
453  if (nb)
454  return wrdone;
455  continue;
456  }
457  throw PeerResetConnection("send: connection gone");
458  }
459 
460  wrdone += wrtSz;
461  remaining -= wrtSz;
462  }
463 
464  return wrdone;
465  }
f8String data
Definition: field.hpp:2036
Poco::Net::StreamSocket * _sock
Definition: connection.hpp:57
virtual void FIX8::FIXWriter::start ( )
inlinevirtual

Start the processing threads.

Reimplemented from FIX8::AsyncSocket< Message * >.

Definition at line 468 of file connection.hpp.

References FIX8::AsyncSocket< Message * >::_pmodel, FIX8::pm_pipeline, and FIX8::AsyncSocket< T >::start().

469  {
470  if (_pmodel == pm_pipeline)
472  }
virtual void start()
Start the processing thread.
Definition: connection.hpp:96
virtual void FIX8::FIXWriter::stop ( )
inlinevirtual

Send a message to the processing method instructing it to quit.

Definition at line 475 of file connection.hpp.

References FIX8::AsyncSocket< Message * >::_msg_queue, FIX8::AsyncSocket< Message * >::_pmodel, FIX8::AsyncSocket< Message * >::_start_mutex, FIX8::AsyncSocket< Message * >::_started, glout_warn, FIX8::pm_pipeline, and FIX8::AsyncSocket< T >::request_stop().

476  {
477  if (_started)
478  {
480  if (_started)
481  {
482  _msg_queue.try_push(0);
483  if (_pmodel == pm_pipeline)
485  }
486  }
487  else
488  {
489  glout_warn << "FIXWriter: AsyncSocket already stopped.";
490  }
491  }
f8_scoped_lock_impl< f8_mutex > f8_scoped_lock
Definition: thread.hpp:451
#define glout_warn
Definition: logger.hpp:604
f8_concurrent_queue< Message * > _msg_queue
Definition: connection.hpp:58
virtual void request_stop()
Start the processing thread.
Definition: connection.hpp:114
bool FIX8::FIXWriter::write ( Message from,
bool  destroy 
)
inline

Place Fix message on outbound message queue.

Parameters
frommessage to send
destroyif true delete after send
Returns
true in success

Definition at line 362 of file connection.hpp.

References FIX8::AsyncSocket< Message * >::_msg_queue, FIX8::AsyncSocket< Message * >::_pmodel, FIX8::AsyncSocket< Message * >::_session, FIX8::pm_pipeline, and FIX8::Session::send_process().

Referenced by FIX8::Connection::write(), and write_batch().

363  {
364  if (_pmodel == pm_pipeline) // pipeline mode ignores destroy flag
365  return _msg_queue.try_push(from);
367  if (destroy)
368  {
369  std::unique_ptr<Message> msg(from);
370  return _session.send_process(msg.get());
371  }
372  return _session.send_process(from);
373  }
f8_scoped_lock_impl< f8_spin_lock > f8_scoped_spin_lock
Definition: thread.hpp:452
f8_spin_lock _con_spl
Definition: connection.hpp:342
F8API bool send_process(Message *msg)
Definition: session.cpp:1009
f8_concurrent_queue< Message * > _msg_queue
Definition: connection.hpp:58
bool FIX8::FIXWriter::write ( Message from)
inline

Send Fix message directly

Parameters
frommessage to send
Returns
true in success

Definition at line 417 of file connection.hpp.

References FIX8::AsyncSocket< Message * >::_pmodel, FIX8::AsyncSocket< Message * >::_session, FIX8::pm_pipeline, and FIX8::Session::send_process().

418  {
419  if (_pmodel == pm_pipeline) // not permitted if pipeling
420  throw f8Exception("cannot send message directly if pipelining");
422  return _session.send_process(&from);
423  }
f8_scoped_lock_impl< f8_spin_lock > f8_scoped_spin_lock
Definition: thread.hpp:452
f8_spin_lock _con_spl
Definition: connection.hpp:342
F8API bool send_process(Message *msg)
Definition: session.cpp:1009
size_t FIX8::FIXWriter::write_batch ( const std::vector< Message * > &  msgs,
bool  destroy 
)
inline

Place Fix messages on outbound message queue as a single batch.

Parameters
msgsmessages to send
destroyif true delete after send
Returns
count of messages written
Todo:
: need assert on result==msgs.size()

Definition at line 379 of file connection.hpp.

References FIX8::AsyncSocket< Message * >::_msg_queue, FIX8::AsyncSocket< Message * >::_pmodel, FIX8::AsyncSocket< Message * >::_session, FIX8::pm_pipeline, FIX8::Session::send_process(), FIX8::Message::set_end_of_batch(), and write().

Referenced by FIX8::Connection::write_batch().

380  {
381  if (msgs.empty())
382  return 0;
383  if (msgs.size() == 1)
384  return write(msgs.front(), destroy) ? 1 : 0;
385  size_t result(0);
387  for (std::vector<Message *>::const_iterator itr(msgs.begin()), eitr(msgs.end()), litr(eitr-1); itr != eitr; ++itr)
388  {
389  Message* msg = *itr;
390  msg->set_end_of_batch(itr == litr);
391  if (_pmodel == pm_pipeline) // pipeline mode ignores destroy flag
392  {
393  _msg_queue.try_push(msg);
394  ++result;
395  continue;
396  }
397  if (_session.send_process(msg))
398  ++result;
399  }
400  if (destroy && _pmodel != pm_pipeline)
401  {
402  for (std::vector<Message *>::const_iterator itr(msgs.begin()), eitr(msgs.end()); itr != eitr; ++itr)
403  {
404  std::unique_ptr<Message> smsg(*itr);
405  }
406  }
408  return result;
409  }
f8_scoped_lock_impl< f8_spin_lock > f8_scoped_spin_lock
Definition: thread.hpp:452
bool write(Message *from, bool destroy)
Definition: connection.hpp:362
f8_spin_lock _con_spl
Definition: connection.hpp:342
F8API bool send_process(Message *msg)
Definition: session.cpp:1009
f8_concurrent_queue< Message * > _msg_queue
Definition: connection.hpp:58

Member Data Documentation

f8_spin_lock FIX8::FIXWriter::_con_spl
private

Definition at line 342 of file connection.hpp.


The documentation for this class was generated from the following files: