fix8  version 1.4.0
Open Source C++ FIX Framework
FIX8::Session Class Referenceabstract

Fix8 Base Session. User sessions are derived from this class. More...

#include <session.hpp>

Inheritance diagram for FIX8::Session:
hf_session_client hf_session_server myfix_session_client myfix_session_server

Classes

struct  RetransmissionContext
 Provides context to your retrans handler. More...
 

Public Types

enum  SessionControl {
  shutdown, print, printnohb, debug,
  count
}
 
using Control = ebitset_r< SessionControl >
 
using SequencePair = std::pair< const unsigned, const f8String >
 

Public Member Functions

F8API Session (const F8MetaCntx &ctx, const SessionID &sid, Persister *persist=nullptr, Logger *logger=nullptr, Logger *plogger=nullptr)
 
F8API Session (const F8MetaCntx &ctx, const sender_comp_id &sci=sender_comp_id(), Persister *persist=nullptr, Logger *logger=nullptr, Logger *plogger=nullptr)
 
virtual F8API ~Session ()
 Dtor. More...
 
F8API int start (Connection *connection, bool wait=true, const unsigned send_seqnum=0, const unsigned recv_seqnum=0, const f8String davi=f8String())
 
void clear_connection (const Connection *connection)
 
virtual F8API bool process (const f8String &from)
 
virtual F8API bool retrans_callback (const SequencePair &with, RetransmissionContext &rctx)
 
virtual F8API bool send (Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
 
virtual F8API bool send (Message &msg, const unsigned custom_seqnum=0, const bool no_increment=false)
 
virtual F8API size_t send_batch (const std::vector< Message * > &msgs, bool destroy=true)
 
F8API bool send_process (Message *msg)
 
virtual F8API int modify_header (MessageBase *msg)
 
F8API void update_persist_seqnums ()
 Force persister to sync next send/receive seqnums. More...
 
F8API void stop ()
 stop the session. More...
 
Connectionget_connection ()
 
Timer< Session > & get_timer ()
 
const F8MetaCntxget_ctx () const
 
bool is_loggable (Logger::Level level) const
 
bool enqueue (const std::string &what, Logger::Level lev, const char *fl=nullptr, unsigned value=0) const
 
bool log (const std::string &what, Logger::Level lev, const char *fl=nullptr, unsigned value=0) const
 
bool plog (const std::string &what, Logger::Level lev, const unsigned direction=0) const
 
const Tickvalget_last_received () const
 
const Tickvalget_last_sent () const
 
void update_sent ()
 Update the last sent time. More...
 
void update_received ()
 Update the last received time. More...
 
F8API void compid_check (const unsigned seqnum, const Message *msg, const SessionID &id) const
 
F8API bool sequence_check (const unsigned seqnum, const Message *msg)
 
virtual bool activation_check (const unsigned seqnum, const Message *msg)
 
F8API bool enforce (const unsigned seqnum, const Message *msg)
 
const SessionIDget_sid () const
 
unsigned get_next_send_seq () const
 
void set_login_parameters (const LoginParameters &loginParamaters)
 
void get_login_parameters (LoginParameters &loginParamaters) const
 
const LoginParametersget_login_parameters () const
 
void set_reset_sequence_numbers_flag (bool flag)
 
void set_persister (Persister *pst)
 
Controlcontrol ()
 
bool is_shutdown ()
 
void set_session_config (struct SessionConfig *sf)
 
virtual F8API Messagegenerate_logout (const char *msgstr=nullptr)
 
virtual F8API Messagegenerate_heartbeat (const f8String &testReqID)
 
virtual F8API Messagegenerate_resend_request (const unsigned begin, const unsigned end=0)
 
virtual F8API Messagegenerate_sequence_reset (const unsigned newseqnum, const bool gapfillflag=false)
 
virtual F8API Messagegenerate_test_request (const f8String &testReqID)
 
virtual F8API Messagegenerate_reject (const unsigned seqnum, const char *what, const char *msgtype=nullptr)
 
virtual F8API Messagegenerate_business_reject (const unsigned seqnum, const Message *msg, const int reason, const char *what)
 
void do_state_change (const States::SessionStates new_state)
 
States::SessionStates get_session_state () const
 

Static Public Member Functions

static const Messagedetach (const Message *&msg)
 
static const f8Stringget_session_state_string (const States::SessionStates state)
 
static F8API const f8String copyright_string ()
 

Protected Member Functions

F8API bool heartbeat_service ()
 Heartbeat generation service thread method. More...
 
F8API bool activation_service ()
 Session start/stop service thread method. More...
 
virtual F8API bool handle_logon (const unsigned seqnum, const Message *msg)
 
virtual F8API Messagegenerate_logon (const unsigned heartbeat_interval, const f8String davi=f8String())
 
virtual F8API bool handle_logout (const unsigned seqnum, const Message *msg)
 
virtual F8API bool handle_heartbeat (const unsigned seqnum, const Message *msg)
 
virtual F8API bool handle_resend_request (const unsigned seqnum, const Message *msg)
 
virtual F8API bool handle_sequence_reset (const unsigned seqnum, const Message *msg)
 
virtual F8API bool handle_test_request (const unsigned seqnum, const Message *msg)
 
virtual bool handle_reject (const unsigned seqnum, const Message *msg)
 
virtual bool handle_admin (const unsigned seqnum, const Message *msg)
 
virtual F8API bool handle_outbound_reject (const unsigned seqnum, const Message *msg, const char *errstr)
 
virtual bool handle_application (const unsigned seqnum, const Message *&msg)=0
 
virtual void state_change (const States::SessionStates before, const States::SessionStates after)
 
virtual void modify_outbound (Message *msg)
 
virtual bool authenticate (SessionID &id, const Message *msg)
 
virtual F8API void recover_seqnums ()
 Recover next expected and next to send sequence numbers from persitence layer. More...
 
Messagecreate_msg (const f8String &msg_type) const
 
void set_scheduler (int priority)
 
void set_affinity (int core_id)
 

Protected Attributes

Control _control
 
f8_atomic< unsigned > _next_send_seq
 
f8_atomic< unsigned > _next_receive_seq
 
f8_atomic< States::SessionStates_state
 
f8_atomic< bool > _active
 
Tickval _last_sent
 
Tickval _last_received
 
const F8MetaCntx_ctx
 
sender_comp_id _sci
 
Connection_connection
 
unsigned _req_next_send_seq
 
unsigned _req_next_receive_seq
 
SessionID _sid
 
struct SessionConfig_sf
 
LoginParameters _loginParameters
 
f8_spin_lock _per_spl
 
Persister_persist
 
Logger_logger
 
Logger_plogger
 
Timer< Session_timer
 
TimerEvent< Session_hb_processor
 
TimerEvent< Session_session_scheduler
 
std::string _batchmsgs_buffer
 
Session_Schedule_schedule
 

Static Protected Attributes

static F8API const std::vector< f8String_state_names
 string representation of Sessionstates More...
 

Private Member Functions

void atomic_init (States::SessionStates st)
 

Detailed Description

Fix8 Base Session. User sessions are derived from this class.

Definition at line 394 of file session.hpp.

Member Typedef Documentation

Definition at line 403 of file session.hpp.

using FIX8::Session::SequencePair = std::pair<const unsigned, const f8String>

Definition at line 615 of file session.hpp.

Member Enumeration Documentation

Enumerator
shutdown 
print 
printnohb 
debug 
count 

Definition at line 401 of file session.hpp.

Constructor & Destructor Documentation

Session::Session ( const F8MetaCntx ctx,
const SessionID sid,
Persister persist = nullptr,
Logger logger = nullptr,
Logger plogger = nullptr 
)

Ctor. Initiator.

Parameters
ctxreference to generated metadata
sidsessionid of connecting session
persistpersister for this session
loggerlogger for this session
ploggerprotocol logger for this session

Definition at line 105 of file session.cpp.

References _batchmsgs_buffer, _logger, _persist, _plogger, _sid, _timer, FIX8_MAX_MSG_LENGTH, glout_warn, and FIX8::HEADER_CALC_OFFSET().

105  :
108  _sid(sid), _sf(), _persist(persist), _logger(logger), _plogger(plogger), // initiator
109  _timer(*this, 10), _hb_processor(&Session::heartbeat_service, true),
111 {
112  _timer.start();
114 
115  if (!_logger)
116  {
117  glout_warn << "Warning: no session logger defined for " << _sid;
118  }
119 
120  if (!_plogger)
121  {
122  glout_warn << "Warning: no protocol logger defined for " << _sid;
123  }
124 
125  if (!_persist)
126  {
127  glout_warn << "Warning: no persister defined for " << _sid;
128  }
129 }
Logger * _logger
Definition: session.hpp:422
unsigned _req_next_send_seq
Definition: session.hpp:414
Logger * _plogger
Definition: session.hpp:422
TimerEvent< Session > _hb_processor
Definition: session.hpp:425
Persister * _persist
Definition: session.hpp:421
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
const size_t HEADER_CALC_OFFSET(32)
TimerEvent< Session > _session_scheduler
Definition: session.hpp:425
SessionID _sid
Definition: session.hpp:415
Session_Schedule * _schedule
Definition: session.hpp:427
F8API bool activation_service()
Session start/stop service thread method.
Definition: session.cpp:788
std::string _batchmsgs_buffer
Definition: session.hpp:426
#define glout_warn
Definition: logger.hpp:604
Connection * _connection
Definition: session.hpp:413
struct SessionConfig * _sf
Definition: session.hpp:416
#define FIX8_MAX_MSG_LENGTH
Definition: f8config.h:576
unsigned _req_next_receive_seq
Definition: session.hpp:414
F8API bool heartbeat_service()
Heartbeat generation service thread method.
Definition: session.cpp:808
const F8MetaCntx & _ctx
Definition: session.hpp:411
Timer< Session > _timer
Definition: session.hpp:424
Session::Session ( const F8MetaCntx ctx,
const sender_comp_id sci = sender_comp_id(),
Persister persist = nullptr,
Logger logger = nullptr,
Logger plogger = nullptr 
)

Ctor. Acceptor.

Parameters
ctxreference to generated metadata
scisender comp id of hosting session
persistpersister for this session
loggerlogger for this session
ploggerprotocol logger for this session

Definition at line 132 of file session.cpp.

References _batchmsgs_buffer, _timer, FIX8_MAX_MSG_LENGTH, and FIX8::HEADER_CALC_OFFSET().

132  :
135  _sf(), _persist(persist), _logger(logger), _plogger(plogger), // acceptor
136  _timer(*this, 10), _hb_processor(&Session::heartbeat_service, true),
138 {
139  _timer.start();
141 }
Logger * _logger
Definition: session.hpp:422
unsigned _req_next_send_seq
Definition: session.hpp:414
Logger * _plogger
Definition: session.hpp:422
TimerEvent< Session > _hb_processor
Definition: session.hpp:425
Persister * _persist
Definition: session.hpp:421
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
const size_t HEADER_CALC_OFFSET(32)
TimerEvent< Session > _session_scheduler
Definition: session.hpp:425
Session_Schedule * _schedule
Definition: session.hpp:427
F8API bool activation_service()
Session start/stop service thread method.
Definition: session.cpp:788
std::string _batchmsgs_buffer
Definition: session.hpp:426
sender_comp_id _sci
Definition: session.hpp:412
Connection * _connection
Definition: session.hpp:413
struct SessionConfig * _sf
Definition: session.hpp:416
#define FIX8_MAX_MSG_LENGTH
Definition: f8config.h:576
unsigned _req_next_receive_seq
Definition: session.hpp:414
F8API bool heartbeat_service()
Heartbeat generation service thread method.
Definition: session.cpp:808
const F8MetaCntx & _ctx
Definition: session.hpp:411
Timer< Session > _timer
Definition: session.hpp:424
Session::~Session ( )
virtual

Dtor.

Definition at line 152 of file session.cpp.

References _connection, _logger, _per_spl, _persist, _plogger, _schedule, FIX8::Connection::cn_acceptor, FIX8::Connection::get_role(), FIX8::hypersleep< h_seconds >(), slout_info, and FIX8::Logger::stop().

153 {
154  slout_info << "Session terminating";
155  if (_logger)
156  _logger->stop();
157  if (_plogger)
158  _plogger->stop();
159  hypersleep<h_seconds>(1); // needed for service threads to exit gracefully
160 
162  { f8_scoped_spin_lock guard(_per_spl); delete _persist; _persist = 0; }
163  delete _schedule;
164 }
Logger * _logger
Definition: session.hpp:422
void stop()
Stop the logging thread.
Definition: logger.hpp:310
Logger * _plogger
Definition: session.hpp:422
Persister * _persist
Definition: session.hpp:421
Session_Schedule * _schedule
Definition: session.hpp:427
Role get_role() const
Definition: connection.hpp:549
Connection * _connection
Definition: session.hpp:413
f8_spin_lock _per_spl
Definition: session.hpp:420
#define slout_info
Definition: session.hpp:883
int hypersleep< h_seconds >(unsigned amt)
Definition: hypersleep.hpp:83

Member Function Documentation

virtual bool FIX8::Session::activation_check ( const unsigned  seqnum,
const Message msg 
)
inlinevirtual

Check that the session is active for this application message

Parameters
seqnummessage sequence number
msgMessage
Returns
true if active

Definition at line 736 of file session.hpp.

References _active.

Referenced by process().

736 { return _active; }
f8_atomic< bool > _active
Definition: session.hpp:409
bool Session::activation_service ( )
protected

Session start/stop service thread method.

Definition at line 788 of file session.cpp.

References _active, _connection, FIX8::Session_Schedule::_sch, _schedule, FIX8::Connection::is_connected(), is_shutdown(), slout_info, and FIX8::Schedule::test().

789 {
790  //cout << "activation_service()" << endl;
791  if (is_shutdown())
792  return false;
793 
795  {
796  const bool curr(_active);
797  _active = _schedule->_sch.test(curr);
798  if (curr != _active)
799  {
800  slout_info << "Session activation transitioned to " << (_active ? "active" : "inactive");
801  }
802  }
803 
804  return true;
805 }
Session_Schedule * _schedule
Definition: session.hpp:427
bool test(bool prev=false) const
Definition: session.hpp:247
f8_atomic< bool > _active
Definition: session.hpp:409
Connection * _connection
Definition: session.hpp:413
bool is_connected() const
Definition: connection.hpp:571
#define slout_info
Definition: session.hpp:883
bool is_shutdown()
Definition: session.hpp:778
void Session::atomic_init ( States::SessionStates  st)
private

Initialise atomic members.

Parameters
stthe initial session state

Definition at line 144 of file session.cpp.

References _active, _next_receive_seq, _next_send_seq, and do_state_change().

Referenced by start().

145 {
146  do_state_change(st);
148  _active = true;
149 }
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
f8_atomic< bool > _active
Definition: session.hpp:409
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
virtual bool FIX8::Session::authenticate ( SessionID id,
const Message msg 
)
inlineprotectedvirtual

Call user defined authentication with logon message.

Parameters
idSession id of inbound connection
msgMessage
Returns
true on success

Definition at line 520 of file session.hpp.

Referenced by handle_logon().

520 { return true; }
void FIX8::Session::clear_connection ( const Connection connection)
inline

Clear reference to connection. Called by ~Connection() to clear reference.

Parameters
connectionbeing deleted

Definition at line 586 of file session.hpp.

Referenced by FIX8::Connection::~Connection().

587  {
588  if (connection == _connection)
589  _connection = nullptr;
590  }
Connection * _connection
Definition: session.hpp:413
void Session::compid_check ( const unsigned  seqnum,
const Message msg,
const SessionID id 
) const

Check that a message has the correct sender/target compid for this session. Throws BadCompidId on error.

Parameters
seqnummessage sequence number
msgMessage
idSession id of inbound connection

Definition at line 415 of file session.cpp.

References FIX8::LoginParameters::_enforce_compids, _loginParameters, FIX8::MessageBase::get(), and FIX8::Message::Header().

Referenced by enforce().

416 {
418  {
419  if (!id.same_sender_comp_id(msg->Header()->get<target_comp_id>()->get()))
420  throw BadCompidId(msg->Header()->get<target_comp_id>()->get());
421  if (!id.same_target_comp_id(msg->Header()->get<sender_comp_id>()->get()))
422  throw BadCompidId(msg->Header()->get<sender_comp_id>()->get());
423  }
424 }
LoginParameters _loginParameters
Definition: session.hpp:418
bool get(T &to) const
Definition: message.hpp:671
MessageBase * Header() const
Definition: message.hpp:1098
A message was received with an invalid sender/target compid id.
Control& FIX8::Session::control ( )
inline

Get the control object for this session.

Returns
the control object

Definition at line 774 of file session.hpp.

References _control.

774 { return _control; }
Control _control
Definition: session.hpp:406
const f8String Session::copyright_string ( )
static

Return the version and copyright for this version

Returns
string

Definition at line 1207 of file session.cpp.

Referenced by start().

1208 {
1209  time_t now(time(0));
1210 #ifdef _MSC_VER
1211  struct tm *ptim(localtime (&now));
1212 #else
1213  struct tm tim;
1214  localtime_r(&now, &tim);
1215  struct tm *ptim(&tim);
1216 #endif
1217  ostringstream ostr;
1218  ostr << endl << package_version << ' ' << copyright_short << setw(2) << (ptim->tm_year - 100) << copyright_short2;
1219  return ostr.str();
1220 }
Message* FIX8::Session::create_msg ( const f8String msg_type) const
inlineprotected

Create a new Fix message from metadata layer.

Parameters
msg_typemessage type string
Returns
new Message

Definition at line 528 of file session.hpp.

References FIX8::F8MetaCntx::_bme, FIX8::BaseMsgEntry::_create, FIX8::Minst::_do, and FIX8::GeneratedTable< Key, Val >::find_ptr().

Referenced by generate_business_reject(), generate_heartbeat(), generate_logon(), generate_logout(), generate_reject(), generate_resend_request(), generate_sequence_reset(), and generate_test_request().

529  {
530  const BaseMsgEntry *bme(_ctx._bme.find_ptr(msg_type.c_str()));
531  if (!bme)
532  throw InvalidMetadata<f8String>(msg_type);
533  return bme->_create._do(true);
534  }
const MsgTable & _bme
Framework generated lookup table to generate Fix messages.
Definition: message.hpp:216
Field< f8String, Common_MsgType > msg_type
Definition: field.hpp:2147
const F8MetaCntx & _ctx
Definition: session.hpp:411
const Val * find_ptr(const Key &key) const
Definition: f8types.hpp:154
static const Message* FIX8::Session::detach ( const Message *&  msg)
inlinestatic

Detach message passed to handle_application. Will set source to 0; Not thread safe and should never be called across threads. It should only be called from Session::handle_application().

Parameters
msgref to ptr containing message
Returns
detached Message *

Definition at line 843 of file session.hpp.

844  {
845  const Message *tmp(msg);
846  msg = 0;
847  return tmp;
848  }
void FIX8::Session::do_state_change ( const States::SessionStates  new_state)
inline

Call the virtual state_change method with before and after, then set the new state

Parameters
new_statenew session state to set

Definition at line 827 of file session.hpp.

References state_change().

Referenced by atomic_init(), handle_heartbeat(), handle_logon(), handle_resend_request(), handle_sequence_reset(), heartbeat_service(), process(), retrans_callback(), sequence_check(), and start().

828  {
829  const States::SessionStates old_state(_state.exchange(new_state));
830  if (old_state != new_state)
831  state_change(old_state, new_state);
832  }
virtual void state_change(const States::SessionStates before, const States::SessionStates after)
Definition: session.hpp:510
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
bool Session::enforce ( const unsigned  seqnum,
const Message msg 
)

Enforce session semantics. Checks compids, sequence numbers.

Parameters
seqnummessage sequence number
msgMessage
Returns
true if message FAILS enforce rules

Definition at line 252 of file session.cpp.

References _sid, _state, FIX8::Common_MsgType_SEQUENCE_RESET(), compid_check(), FIX8::MessageBase::get_msgtype(), FIX8::States::is_established(), sequence_check(), and FIX8::States::st_logon_received.

Referenced by handle_heartbeat(), handle_logon(), handle_logout(), handle_resend_request(), handle_sequence_reset(), and handle_test_request().

253 {
255  {
257  compid_check(seqnum, msg, _sid);
258  if (msg->get_msgtype() != Common_MsgType_SEQUENCE_RESET && sequence_check(seqnum, msg))
259  return false;
260  }
261 
262  return true;
263 }
F8API bool sequence_check(const unsigned seqnum, const Message *msg)
Definition: session.cpp:427
F8API void compid_check(const unsigned seqnum, const Message *msg, const SessionID &id) const
Definition: session.cpp:415
const f8String Common_MsgType_SEQUENCE_RESET("4")
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
SessionID _sid
Definition: session.hpp:415
const f8String & get_msgtype() const
Definition: message.hpp:538
static bool is_established(SessionStates ss)
Definition: session.hpp:175
bool FIX8::Session::enqueue ( const std::string &  what,
Logger::Level  lev,
const char *  fl = nullptr,
unsigned  value = 0 
) const
inline

Log a message to the session logger. Do not check for level permission

Parameters
whatstring to log
levlog level
flpointer to fileline
valueoptional value for the logger to use
Returns
true on success

Definition at line 686 of file session.hpp.

References FIX8::Logger::enqueue().

687  { return _logger ? _logger->enqueue(what, lev, fl, value) : false; }
Logger * _logger
Definition: session.hpp:422
bool enqueue(const std::string &what, Level lev=Logger::Info, const char *fl=nullptr, const unsigned val=0)
Definition: logger.hpp:294
Message * Session::generate_business_reject ( const unsigned  seqnum,
const Message msg,
const int  reason,
const char *  what 
)
virtual

Generate a business_reject message.

Parameters
seqnummessage sequence number
msgsource message
reasonrejection reason code
whatrejection text
Returns
new Message

Definition at line 901 of file session.cpp.

References FIX8::Common_MsgType_BUSINESS_REJECT(), create_msg(), and FIX8::MessageBase::get_msgtype().

902 {
903  Message *msg;
904  try
905  {
907  }
909  {
910  // since this is an application message, it may not be supported in supplied schema
911  return nullptr;
912  }
913 
914  *msg << new ref_seq_num(seqnum);
915  *msg << new ref_msg_type(imsg->get_msgtype());
916  *msg << new business_reject_reason(reason);
917  if (what)
918  *msg << new text(what);
919 
920  return msg;
921 }
Indicates a static metadata lookup failed. With the exception of user defined fields there should nev...
const f8String Common_MsgType_BUSINESS_REJECT("j")
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
Message * create_msg(const f8String &msg_type) const
Definition: session.hpp:528
Field< f8String, Common_Text > text
Definition: field.hpp:2151
Field< int, Common_BusinessRejectReason > business_reject_reason
Definition: field.hpp:2164
Field< SeqNum, Common_RefSeqNum > ref_seq_num
Definition: field.hpp:2141
Field< f8String, Common_RefMsgType > ref_msg_type
Definition: field.hpp:2153
Message * Session::generate_heartbeat ( const f8String testReqID)
virtual

Generate a heartbeat message.

Parameters
testReqIDtest request id
Returns
new Message

Definition at line 878 of file session.cpp.

References FIX8::Common_MsgType_HEARTBEAT(), and create_msg().

Referenced by handle_test_request(), and heartbeat_service().

879 {
881  if (!testReqID.empty())
882  *msg << new test_request_id(testReqID);
883 
884  return msg;
885 }
const f8String Common_MsgType_HEARTBEAT("0")
Field< f8String, Common_TestReqID > test_request_id
Definition: field.hpp:2150
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
Message * create_msg(const f8String &msg_type) const
Definition: session.hpp:528
Message * Session::generate_logon ( const unsigned  heartbeat_interval,
const f8String  davi = f8String() 
)
protectedvirtual

Generate a logon message.

Parameters
heartbeat_intervalheartbeat interval
davidefault appl version id (FIXT)
Returns
new Message

Definition at line 933 of file session.cpp.

References _loginParameters, FIX8::LoginParameters::_reset_sequence_numbers, FIX8::Common_MsgType_LOGON(), create_msg(), and FIX8::MessageBase::is_legal().

Referenced by handle_logon(), and start().

934 {
936  *msg << new heartbeat_interval(heartbtint)
937  << new encrypt_method(0); // FIXME
938  if (!davi.empty() && msg->is_legal<default_appl_ver_id>())
939  *msg << new default_appl_ver_id(davi);
941  *msg << new reset_seqnum_flag(true);
942 
943  return msg;
944 }
Field< int, Common_EncryptMethod > encrypt_method
Definition: field.hpp:2163
const f8String Common_MsgType_LOGON("A")
LoginParameters _loginParameters
Definition: session.hpp:418
Field< f8String, Common_DefaultApplVerID > default_appl_ver_id
Definition: field.hpp:2152
Field< Boolean, Common_ResetSeqNumFlag > reset_seqnum_flag
Definition: field.hpp:2160
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
Message * create_msg(const f8String &msg_type) const
Definition: session.hpp:528
Field< int, Common_HeartBtInt > heartbeat_interval
Definition: field.hpp:2162
Message * Session::generate_logout ( const char *  msgstr = nullptr)
virtual

Generate a logout message.

Returns
new Message

Definition at line 947 of file session.cpp.

References FIX8::Common_MsgType_LOGOUT(), and create_msg().

Referenced by heartbeat_service(), and process().

948 {
950  if (msgstr)
951  *msg << new text(msgstr);
952 
953  return msg;
954 }
const f8String Common_MsgType_LOGOUT("5")
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
Message * create_msg(const f8String &msg_type) const
Definition: session.hpp:528
Field< f8String, Common_Text > text
Definition: field.hpp:2151
Message * Session::generate_reject ( const unsigned  seqnum,
const char *  what,
const char *  msgtype = nullptr 
)
virtual

Generate a reject message.

Parameters
seqnummessage sequence number
whatrejection text
msgtypeoffending msgtype
Returns
new Message

Definition at line 888 of file session.cpp.

References FIX8::Common_MsgType_REJECT(), and create_msg().

Referenced by handle_logon(), and handle_outbound_reject().

889 {
891  *msg << new ref_seq_num(seqnum);
892  if (what)
893  *msg << new text(what);
894  if (msgtype)
895  *msg << new ref_msg_type(msgtype);
896 
897  return msg;
898 }
const f8String Common_MsgType_REJECT("3")
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
Message * create_msg(const f8String &msg_type) const
Definition: session.hpp:528
Field< f8String, Common_Text > text
Definition: field.hpp:2151
Field< SeqNum, Common_RefSeqNum > ref_seq_num
Definition: field.hpp:2141
Field< f8String, Common_RefMsgType > ref_msg_type
Definition: field.hpp:2153
Message * Session::generate_resend_request ( const unsigned  begin,
const unsigned  end = 0 
)
virtual

Generate a resend request message.

Parameters
beginbegin sequence number
endsequence number
Returns
new Message

Definition at line 957 of file session.cpp.

References FIX8::Common_MsgType_RESEND_REQUEST(), and create_msg().

Referenced by sequence_check().

958 {
960  *msg << new begin_seq_num(begin) << new end_seq_num(end);
961 
962  return msg;
963 }
Field< SeqNum, Common_EndSeqNo > end_seq_num
Definition: field.hpp:2139
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
const f8String Common_MsgType_RESEND_REQUEST("2")
Message * create_msg(const f8String &msg_type) const
Definition: session.hpp:528
Field< SeqNum, Common_BeginSeqNo > begin_seq_num
Definition: field.hpp:2138
Message * Session::generate_sequence_reset ( const unsigned  newseqnum,
const bool  gapfillflag = false 
)
virtual

Generate a sequence reset message.

Parameters
newseqnumnew sequence number
gapfillflaggap fill flag
Returns
new Message

Definition at line 966 of file session.cpp.

References FIX8::Common_MsgType_SEQUENCE_RESET(), and create_msg().

Referenced by handle_resend_request(), and retrans_callback().

967 {
969  *msg << new new_seq_num(newseqnum);
970 
971  if (gapfillflag)
972  *msg << new gap_fill_flag(true);
973 
974  return msg;
975 }
const f8String Common_MsgType_SEQUENCE_RESET("4")
Field< Boolean, Common_GapFillFlag > gap_fill_flag
Definition: field.hpp:2158
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
Message * create_msg(const f8String &msg_type) const
Definition: session.hpp:528
Field< SeqNum, Common_NewSeqNo > new_seq_num
Definition: field.hpp:2140
Message * Session::generate_test_request ( const f8String testReqID)
virtual

Generate a test request message.

Parameters
testReqIDtest request id
Returns
new Message

Definition at line 924 of file session.cpp.

References FIX8::Common_MsgType_TEST_REQUEST(), and create_msg().

Referenced by heartbeat_service().

925 {
927  *msg << new test_request_id(testReqID);
928 
929  return msg;
930 }
const f8String Common_MsgType_TEST_REQUEST("1")
Field< f8String, Common_TestReqID > test_request_id
Definition: field.hpp:2150
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
Message * create_msg(const f8String &msg_type) const
Definition: session.hpp:528
Connection* FIX8::Session::get_connection ( )
inline

Get the connection object.

Returns
the connection object

Definition at line 665 of file session.hpp.

References _connection.

Referenced by MyMenu::send_all_preloaded().

665 { return _connection; }
Connection * _connection
Definition: session.hpp:413
const F8MetaCntx& FIX8::Session::get_ctx ( ) const
inline

Get the metadata context object.

Returns
the context object

Definition at line 673 of file session.hpp.

References _ctx.

673 { return _ctx; }
const F8MetaCntx & _ctx
Definition: session.hpp:411
const Tickval& FIX8::Session::get_last_received ( ) const
inline

Return the last received timstamp

Returns
Tickval on success

Definition at line 708 of file session.hpp.

References _last_received.

708 { return _last_received; }
Tickval _last_received
Definition: session.hpp:410
const Tickval& FIX8::Session::get_last_sent ( ) const
inline

Return the last sent timstamp

Returns
Tickval on success

Definition at line 712 of file session.hpp.

References _last_sent.

712 { return _last_sent; }
Tickval _last_sent
Definition: session.hpp:410
void FIX8::Session::get_login_parameters ( LoginParameters loginParamaters) const
inline

Get the LoginParameters

Parameters
loginParamatersto populate

Definition at line 758 of file session.hpp.

References _loginParameters.

758 { loginParamaters = _loginParameters; }
LoginParameters _loginParameters
Definition: session.hpp:418
const LoginParameters& FIX8::Session::get_login_parameters ( ) const
inline

Get the LoginParameters

Returns
loginParamaters

Definition at line 762 of file session.hpp.

References _loginParameters.

762 { return _loginParameters; }
LoginParameters _loginParameters
Definition: session.hpp:418
unsigned FIX8::Session::get_next_send_seq ( ) const
inline

Get the next sender sequence number

Returns
next sender sequence number

Definition at line 750 of file session.hpp.

References _next_send_seq.

Referenced by FIX8::MemoryPersister::get(), and FIX8::FilePersister::get().

750 { return _next_send_seq; }
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
States::SessionStates FIX8::Session::get_session_state ( ) const
inline

Get the current session state enumeration

Returns
States::SessionStates value

Definition at line 836 of file session.hpp.

References _state.

836 { return _state; }
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
static const f8String& FIX8::Session::get_session_state_string ( const States::SessionStates  state)
inlinestatic

Find the string representation for the given session state

Parameters
statesession state
Returns
string found or "unknown"

Definition at line 853 of file session.hpp.

854  {
855  static const f8String unknown("Unknown");
856  return state < _state_names.size() ? _state_names[state] : unknown;
857  }
static F8API const std::vector< f8String > _state_names
string representation of Sessionstates
Definition: session.hpp:430
std::string f8String
Definition: f8types.hpp:47
const SessionID& FIX8::Session::get_sid ( ) const
inline

Get the session id for this session.

Returns
the session id

Definition at line 746 of file session.hpp.

References _sid.

746 { return _sid; }
SessionID _sid
Definition: session.hpp:415
Timer<Session>& FIX8::Session::get_timer ( )
inline

Get the timer object.

Returns
the timer object

Definition at line 669 of file session.hpp.

References _timer.

669 { return _timer; }
Timer< Session > _timer
Definition: session.hpp:424
virtual bool FIX8::Session::handle_admin ( const unsigned  seqnum,
const Message msg 
)
inlineprotectedvirtual

Administrative message callback. Called on receipt of all admin messages.

Parameters
seqnummessage sequence number
msgMessage
Returns
true on success

Definition at line 490 of file session.hpp.

Referenced by process().

490 { return true; }
virtual bool FIX8::Session::handle_application ( const unsigned  seqnum,
const Message *&  msg 
)
protectedpure virtual

Application message callback. Called on receipt of all non-admin messages. You must implement this method. The message is passed as a reference to a pointer. Your application can detach and take ownership. If you want to take ownership, take a copy of the pointer and then set msg to 0. See Session::detach()

Parameters
seqnummessage sequence number
msgreference to Message ptr
Returns
true on success

Implemented in hf_session_server, myfix_session_server, hf_session_client, and myfix_session_client.

Referenced by process().

bool Session::handle_heartbeat ( const unsigned  seqnum,
const Message msg 
)
protectedvirtual

Heartbeat callback.

Parameters
seqnummessage sequence number
msgMessage
Returns
true on success

Definition at line 868 of file session.cpp.

References _state, do_state_change(), enforce(), FIX8::States::st_continuous, and FIX8::States::st_test_request_sent.

Referenced by process().

869 {
870  enforce(seqnum, msg);
871 
874  return true;
875 }
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
F8API bool enforce(const unsigned seqnum, const Message *msg)
Definition: session.cpp:252
bool Session::handle_logon ( const unsigned  seqnum,
const Message msg 
)
protectedvirtual

Logon callback.

Parameters
seqnummessage sequence number
msgMessage
Returns
true on success

Definition at line 470 of file session.cpp.

References FIX8::F8MetaCntx::_beginStr, FIX8::LoginParameters::_clients, _connection, _ctx, FIX8::LoginParameters::_enforce_compids, _hb_processor, _logger, FIX8::LoginParameters::_login_schedule, _loginParameters, _next_receive_seq, _next_send_seq, _per_spl, _persist, _plogger, _req_next_receive_seq, _req_next_send_seq, _schedule, _sci, FIX8::SessionConfig::_ses, _sf, _sid, _state, _timer, authenticate(), FIX8::Connection::cn_initiator, FIX8::Common_ResetSeqNumFlag(), FIX8::Configuration::create_logger(), FIX8::Configuration::create_persister(), do_state_change(), enforce(), generate_logon(), generate_reject(), FIX8::MessageBase::get(), FIX8::Connection::get_hb_interval(), FIX8::MessageBase::get_msgtype(), FIX8::Connection::get_peer_socket_address(), FIX8::Connection::get_pmodel(), FIX8::Connection::get_role(), FIX8::SessionID::get_senderCompID(), glout_error, glout_info, glout_warn, FIX8::MessageBase::have(), FIX8::Message::Header(), FIX8::Schedule::is_valid(), FIX8::pm_coro, FIX8::Configuration::protocol_log, recover_seqnums(), send(), FIX8::Configuration::session_log, FIX8::Connection::set_hb_interval(), slout_error, slout_info, FIX8::States::st_continuous, FIX8::States::st_logon_received, FIX8::States::st_session_terminated, stop(), and FIX8::Schedule::test().

Referenced by process().

471 {
473  {
474  send(generate_reject(seqnum, "Already logged on"), msg->get_msgtype().c_str());
475  return true;
476  }
478  const bool reset_given(msg->have(Common_ResetSeqNumFlag) && msg->get<reset_seqnum_flag>()->get());
479  sender_comp_id sci; // so this should be our tci
480  msg->Header()->get(sci);
481  target_comp_id tci; // so this should be our sci
482  msg->Header()->get(tci);
483  SessionID id(_ctx._beginStr, tci(), sci());
484 
486  {
487  if (id != _sid)
488  {
489  glout_warn << "Inbound TargetCompID not recognised (" << tci << "), expecting (" << _sid.get_senderCompID() << ')';
491  {
492  stop();
494  return false;
495  }
496  }
497 
498  enforce(seqnum, msg);
500  }
501  else // acceptor
502  {
503  default_appl_ver_id davi;
504  msg->get(davi);
505 
506  if (_sci() != tci())
507  {
508  glout_warn << "Inbound TargetCompID not recognised (" << tci << "), expecting (" << _sci << ')';
510  {
511  stop();
513  return false;
514  }
515  }
516 
517  if (!_loginParameters._clients.empty())
518  {
519  auto itr(_loginParameters._clients.find(sci()));
520  bool iserr(false);
521  if (itr == _loginParameters._clients.cend())
522  {
523  glout_error << "Remote (" << sci << ") not found (" << id << "). NOT authorised to proceed.";
524  iserr = true;
525  }
526 
527  if (!iserr && get<1>(itr->second) != Poco::Net::IPAddress()
528  && get<1>(itr->second) != _connection->get_peer_socket_address().host())
529  {
530  glout_error << "Remote (" << get<0>(itr->second) << ", " << sci << ") NOT authorised to proceed ("
531  << _connection->get_peer_socket_address().toString() << ").";
532  iserr = true;
533  }
534 
535  if (iserr)
536  {
537  stop();
539  return false;
540  }
541 
542  glout_info << "Remote (" << get<0>(itr->second) << ", " << sci << ") authorised to proceed ("
543  << _connection->get_peer_socket_address().toString() << ").";
544  }
545 
546  // important - these objects can't be created until we have a valid SessionID
547  if (_sf)
548  {
550  {
551  glout_warn << "Warning: no session logger defined for " << id;
552  }
553 
555  {
556  glout_warn << "Warning: no protocol logger defined for " << id;
557  }
558 
559  if (!_persist)
560  {
562  if (!(_persist = _sf->create_persister(_sf->_ses, &id, reset_given)))
563  {
564  glout_warn << "Warning: no persister defined for " << id;
565  }
566  }
567 
568 #if 0
569  if (_schedule)
570  slout_info << *_schedule;
571 #endif
572  }
573  else
574  {
575  glout_error << "Error: SessionConfig object missing in session";
576  }
577 
578  slout_info << "Connection from " << _connection->get_peer_socket_address().toString();
579 
580  if (reset_given) // ignore version restrictions on this behaviour
581  {
582  slout_info << "Resetting sequence numbers";
584  }
585  else
586  {
587  recover_seqnums();
588  if (_req_next_send_seq)
592  }
593 
594  if (authenticate(id, msg))
595  {
596  _sid = id;
597  enforce(seqnum, msg);
598  heartbeat_interval hbi;
599  msg->get(hbi);
601  send(generate_logon(hbi(), davi()));
603  slout_info << "Client setting heartbeat interval to " << hbi();
604  }
605  else
606  {
607  slout_error << id << " failed authentication";
608  stop();
610  return false;
611  }
612 
614  {
615  slout_error << id << " Session unavailable. Login not accepted.";
616  stop();
618  return false;
619  }
620  }
621 
622  slout_info << "Heartbeat interval is " << _connection->get_hb_interval();
623 
624  _timer.schedule(_hb_processor, 1000); // check every second
625 
626  return true;
627 }
Logger * _logger
Definition: session.hpp:422
unsigned _req_next_send_seq
Definition: session.hpp:414
unsigned get_hb_interval() const
Definition: connection.hpp:608
bool have(const unsigned short fnum) const
Definition: message.hpp:725
bool is_valid() const
Definition: session.hpp:242
void set_hb_interval(const unsigned hb_interval)
Definition: connection.hpp:603
Logger * _plogger
Definition: session.hpp:422
TimerEvent< Session > _hb_processor
Definition: session.hpp:425
Persister * _persist
Definition: session.hpp:421
ProcessModel get_pmodel() const
Definition: connection.hpp:553
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
SessionID _sid
Definition: session.hpp:415
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
const sender_comp_id & get_senderCompID() const
Definition: session.hpp:101
Session_Schedule * _schedule
Definition: session.hpp:427
Quickfix style sessionid.
Definition: session.hpp:46
Role get_role() const
Definition: connection.hpp:549
bool test(bool prev=false) const
Definition: session.hpp:247
LoginParameters _loginParameters
Definition: session.hpp:418
#define glout_error
Definition: logger.hpp:606
F8API Persister * create_persister(const XmlElement *from, const SessionID *sid=nullptr, bool flag=false) const
bool get(T &to) const
Definition: message.hpp:671
Schedule _login_schedule
Definition: session.hpp:359
const f8String _beginStr
Fix header beginstring.
Definition: message.hpp:228
sender_comp_id _sci
Definition: session.hpp:412
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
#define glout_warn
Definition: logger.hpp:604
Connection * _connection
Definition: session.hpp:413
const f8String & get_msgtype() const
Definition: message.hpp:538
MessageBase * Header() const
Definition: message.hpp:1098
F8API void stop()
stop the session.
Definition: session.cpp:230
virtual F8API Message * generate_logon(const unsigned heartbeat_interval, const f8String davi=f8String())
Definition: session.cpp:933
struct SessionConfig * _sf
Definition: session.hpp:416
f8_spin_lock _per_spl
Definition: session.hpp:420
#define glout_info
Definition: logger.hpp:601
unsigned _req_next_receive_seq
Definition: session.hpp:414
virtual F8API Message * generate_reject(const unsigned seqnum, const char *what, const char *msgtype=nullptr)
Definition: session.cpp:888
#define slout_info
Definition: session.hpp:883
Field template. There will ONLY be partial template specialisations of this template.
Definition: field.hpp:256
virtual bool authenticate(SessionID &id, const Message *msg)
Definition: session.hpp:520
Poco::Net::SocketAddress get_peer_socket_address() const
Definition: connection.hpp:616
const F8MetaCntx & _ctx
Definition: session.hpp:411
F8API bool enforce(const unsigned seqnum, const Message *msg)
Definition: session.cpp:252
const XmlElement * _ses
Timer< Session > _timer
Definition: session.hpp:424
const unsigned short Common_ResetSeqNumFlag(141)
F8API Logger * create_logger(const XmlElement *from, const Logtype ltype, const SessionID *sid=nullptr) const
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
#define slout_error
Definition: session.hpp:885
virtual F8API void recover_seqnums()
Recover next expected and next to send sequence numbers from persitence layer.
Definition: session.cpp:1124
bool Session::handle_logout ( const unsigned  seqnum,
const Message msg 
)
protectedvirtual

Logout callback.

Parameters
seqnummessage sequence number
msgMessage
Returns
true on success

Definition at line 630 of file session.cpp.

References enforce(), and slout_info.

Referenced by process().

631 {
632  enforce(seqnum, msg);
633 
634  //if (_state != States::st_logoff_sent)
635  // send(generate_logout());
636  slout_info << "peer has logged out";
637  // stop(); // FX-615
638  return true;
639 }
#define slout_info
Definition: session.hpp:883
F8API bool enforce(const unsigned seqnum, const Message *msg)
Definition: session.cpp:252
bool Session::handle_outbound_reject ( const unsigned  seqnum,
const Message msg,
const char *  errstr 
)
protectedvirtual

Outbound Reject callback. Override to receive callback when an inbound message has caused a reject

Parameters
seqnummessage sequence number
msgMessage
errstrreject message text
Returns
true on success

Definition at line 782 of file session.cpp.

References generate_reject(), FIX8::MessageBase::get_msgtype(), and send().

Referenced by handle_resend_request(), and process().

783 {
784  return send(generate_reject(seqnum, errstr, msg && !msg->get_msgtype().empty() ? msg->get_msgtype().c_str() : nullptr));
785 }
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
const f8String & get_msgtype() const
Definition: message.hpp:538
virtual F8API Message * generate_reject(const unsigned seqnum, const char *what, const char *msgtype=nullptr)
Definition: session.cpp:888
virtual bool FIX8::Session::handle_reject ( const unsigned  seqnum,
const Message msg 
)
inlineprotectedvirtual

Reject callback.

Parameters
seqnummessage sequence number
msgMessage
Returns
true on success

Definition at line 484 of file session.hpp.

Referenced by process().

484 { return false; }
bool Session::handle_resend_request ( const unsigned  seqnum,
const Message msg 
)
protectedvirtual

Resend request callback.

Parameters
seqnummessage sequence number
msgMessage
Returns
true on success

Definition at line 663 of file session.cpp.

References _next_send_seq, _persist, _state, do_state_change(), enforce(), generate_sequence_reset(), FIX8::Persister::get(), FIX8::MessageBase::get(), handle_outbound_reject(), retrans_callback(), send(), slout_debug, slout_warn, and FIX8::States::st_resend_request_received.

Referenced by process().

664 {
665  enforce(seqnum, msg);
666 
668  {
669  begin_seq_num begin;
670  end_seq_num end;
671 
672  if (!msg->get(begin))
673  {
674  slout_warn << "handle_resend_request: can't obtain BeginSeqNo from request";
675  }
676  if (!msg->get(end))
677  {
678  slout_warn << "handle_resend_request: can't obtain EndSeqNo from request";
679  }
680 
681  if ((begin() > end() && end()) || begin() == 0)
682  handle_outbound_reject(seqnum, msg, "Invalid resend range: Begin > End or Begin = 0");
683  else if (!_persist)
684  {
685  const int nxt(static_cast<int>(_next_send_seq)), nseq(begin() >= nxt ? begin() + 1 : nxt);
686  send(generate_sequence_reset(nseq, true), true, begin());
687  _next_send_seq = nseq;
688  slout_debug << "handle_resend_request scenario #" << (nseq == nxt ? 7 : 8);
689  }
690  else
691  {
692  //cout << "got resend request:" << begin() << " to " << end() << endl;
694  //f8_scoped_spin_lock guard(_per_spl, _connection->get_pmodel() == pm_coro); // no no nanette!
695  _persist->get(begin(), end(), *this, &Session::retrans_callback);
696  }
697  }
698  else
699  {
700  slout_warn << "resend request received during an existing resend sequence";
701  }
702 
703  return true;
704 }
Persister * _persist
Definition: session.hpp:421
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
bool get(T &to) const
Definition: message.hpp:671
virtual bool get(const unsigned seqnum, f8String &to) const =0
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
Field template. There will ONLY be partial template specialisations of this template.
Definition: field.hpp:256
F8API bool enforce(const unsigned seqnum, const Message *msg)
Definition: session.cpp:252
#define slout_warn
Definition: session.hpp:884
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
virtual F8API Message * generate_sequence_reset(const unsigned newseqnum, const bool gapfillflag=false)
Definition: session.cpp:966
virtual F8API bool handle_outbound_reject(const unsigned seqnum, const Message *msg, const char *errstr)
Definition: session.cpp:782
#define slout_debug
Definition: session.hpp:887
virtual F8API bool retrans_callback(const SequencePair &with, RetransmissionContext &rctx)
Definition: session.cpp:707
bool Session::handle_sequence_reset ( const unsigned  seqnum,
const Message msg 
)
protectedvirtual

Sequence reset callback.

Parameters
seqnummessage sequence number
msgMessage
Returns
true on success

Definition at line 642 of file session.cpp.

References _next_receive_seq, _state, do_state_change(), enforce(), FIX8::MessageBase::get(), slout_debug, FIX8::States::st_continuous, and FIX8::States::st_resend_request_sent.

Referenced by process().

643 {
644  enforce(seqnum, msg);
645 
646  new_seq_num nsn;
647  if (msg->get(nsn))
648  {
649  slout_debug << "newseqnum = " << nsn() << ", _next_receive_seq = " << _next_receive_seq << " seqnum:" << seqnum;
650  if (nsn() >= static_cast<int>(_next_receive_seq))
651  _next_receive_seq = nsn() - 1;
652  else if (nsn() < static_cast<int>(_next_receive_seq))
653  throw MsgSequenceTooLow(nsn(), _next_receive_seq);
654  }
655 
658 
659  return true;
660 }
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
bool get(T &to) const
Definition: message.hpp:671
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
Field template. There will ONLY be partial template specialisations of this template.
Definition: field.hpp:256
F8API bool enforce(const unsigned seqnum, const Message *msg)
Definition: session.cpp:252
A message was received with an out of sequence sequence number.
#define slout_debug
Definition: session.hpp:887
bool Session::handle_test_request ( const unsigned  seqnum,
const Message msg 
)
protectedvirtual

Test request callback.

Parameters
seqnummessage sequence number
msgMessage
Returns
true on success

Definition at line 771 of file session.cpp.

References enforce(), generate_heartbeat(), FIX8::MessageBase::get(), and send().

Referenced by process().

772 {
773  enforce(seqnum, msg);
774 
775  test_request_id testReqID;
776  msg->get(testReqID);
777  send(generate_heartbeat(testReqID()));
778  return true;
779 }
bool get(T &to) const
Definition: message.hpp:671
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
Field template. There will ONLY be partial template specialisations of this template.
Definition: field.hpp:256
F8API bool enforce(const unsigned seqnum, const Message *msg)
Definition: session.cpp:252
virtual F8API Message * generate_heartbeat(const f8String &testReqID)
Definition: session.cpp:878
bool Session::heartbeat_service ( )
protected

Heartbeat generation service thread method.

Definition at line 808 of file session.cpp.

References _connection, _last_received, _last_sent, _loginParameters, FIX8::LoginParameters::_silent_disconnect, _state, do_state_change(), FIX8::Logger::Error, generate_heartbeat(), generate_logout(), generate_test_request(), FIX8::Connection::get_hb_interval(), FIX8::Connection::get_hb_interval20pc(), FIX8::Connection::is_connected(), is_shutdown(), log(), FIX8::Tickval::now(), FIX8::Tickval::secs(), send(), slout_error, FIX8::States::st_logoff_sent, FIX8::States::st_session_terminated, FIX8::States::st_test_request_sent, stop(), and FIX8::Logger::Warn.

809 {
810  //cout << "heartbeat_service()" << endl;
811  if (is_shutdown())
812  return false;
813 
815  {
816  Tickval now(true);
817  if ((now - _last_sent).secs() >= static_cast<time_t>(_connection->get_hb_interval()))
818  {
819  const f8String testReqID;
820  send(generate_heartbeat(testReqID));
821  }
822 
823  now.now();
824  if ((now - _last_received).secs() > static_cast<time_t>(_connection->get_hb_interval20pc()))
825  {
826  if (_state == States::st_test_request_sent) // already sent
827  {
828  ostringstream ostr;
829  ostr << "Remote has ignored my test request. Aborting session...";
830  send(generate_logout(_loginParameters._silent_disconnect ? 0 : ostr.str().c_str()), true, 0, true); // so it won't increment
832  log(ostr.str(), Logger::Error);
833  try
834  {
835  stop();
836  }
837  catch (Poco::Net::NetException& e)
838  {
839  slout_error << e.what();
840  }
841  catch (exception& e)
842  {
843  slout_error << e.what();
844  }
845  return true;
846  }
848  {
849  ostringstream ostr;
850  ostr << "Have not received anything from remote for ";
851  if (_last_received.secs())
852  ostr << (now - _last_received).secs();
853  else
854  ostr << "more than " << _connection->get_hb_interval20pc();
855  ostr << " secs. Sending test request";
856  log(ostr.str(), Logger::Warn);
857  const f8String testReqID("TEST");
858  send(generate_test_request(testReqID));
860  }
861  }
862  }
863 
864  return true;
865 }
unsigned get_hb_interval() const
Definition: connection.hpp:608
Tickval _last_received
Definition: session.hpp:410
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
LoginParameters _loginParameters
Definition: session.hpp:418
virtual F8API Message * generate_logout(const char *msgstr=nullptr)
Definition: session.cpp:947
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
Connection * _connection
Definition: session.hpp:413
bool is_connected() const
Definition: connection.hpp:571
Tickval _last_sent
Definition: session.hpp:410
F8API void stop()
stop the session.
Definition: session.cpp:230
time_t secs() const
Definition: tickval.hpp:141
bool log(const std::string &what, Logger::Level lev, const char *fl=nullptr, unsigned value=0) const
Definition: session.hpp:695
virtual F8API Message * generate_test_request(const f8String &testReqID)
Definition: session.cpp:924
unsigned get_hb_interval20pc() const
Definition: connection.hpp:612
virtual F8API Message * generate_heartbeat(const f8String &testReqID)
Definition: session.cpp:878
#define slout_error
Definition: session.hpp:885
bool is_shutdown()
Definition: session.hpp:778
std::string f8String
Definition: f8types.hpp:47
bool FIX8::Session::is_loggable ( Logger::Level  level) const
inline

Check if the given log level is set for the session logger

Parameters
levellevel to test
Returns
true if available

Definition at line 678 of file session.hpp.

References FIX8::Logger::is_loggable().

678 { return _logger ? _logger->is_loggable(level) : false; }
Logger * _logger
Definition: session.hpp:422
bool is_loggable(Level level) const
Definition: logger.hpp:262
bool FIX8::Session::is_shutdown ( )
inline

See if this session is being shutdown.

Returns
true if shutdown is underway

Definition at line 778 of file session.hpp.

References FIX8::ebitset_r< T, B >::has(), shutdown, and FIX8::States::st_session_terminated.

Referenced by activation_service(), client_process(), and heartbeat_service().

f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
Control _control
Definition: session.hpp:406
integral_type has(const T sbit) const
Definition: f8utils.hpp:975
bool FIX8::Session::log ( const std::string &  what,
Logger::Level  lev,
const char *  fl = nullptr,
unsigned  value = 0 
) const
inline

Log a message to the session logger.

Parameters
whatstring to log
levlog level
flpointer to fileline
valueoptional value for the logger to use
Returns
true on success

Definition at line 695 of file session.hpp.

References FIX8::Logger::send().

Referenced by heartbeat_service().

696  { return _logger ? _logger->send(what, lev, fl, value) : false; }
Logger * _logger
Definition: session.hpp:422
bool send(const std::string &what, Level lev=Logger::Info, const char *fl=nullptr, const unsigned val=0)
Definition: logger.hpp:306
int Session::modify_header ( MessageBase msg)
virtual

Modify the header if desired. Called when message is sent.

Parameters
msgMessage
Returns
number of fields added/modifed

Definition at line 1003 of file session.cpp.

Referenced by send_process().

1004 {
1005  return 0;
1006 }
virtual void FIX8::Session::modify_outbound ( Message msg)
inlineprotectedvirtual

Permit modification of message just prior to sending.

Parameters
msgMessage

Definition at line 514 of file session.hpp.

Referenced by send_process().

514 {}
bool FIX8::Session::plog ( const std::string &  what,
Logger::Level  lev,
const unsigned  direction = 0 
) const
inline

Log a message to the protocol logger.

Parameters
whatFix message (string) to log
levlog level
direction0=out, 1=in
Returns
true on success

Definition at line 703 of file session.hpp.

References FIX8::Logger::send().

Referenced by process(), and send_process().

704  { return _plogger ? _plogger->send(what, lev, nullptr, direction) : false; }
Logger * _plogger
Definition: session.hpp:422
bool send(const std::string &what, Level lev=Logger::Info, const char *fl=nullptr, const unsigned val=0)
Definition: logger.hpp:306
bool Session::process ( const f8String from)
virtual

Process inbound messages. Called by connection object.

Parameters
fromraw fix message
Returns
true on success

Definition at line 277 of file session.cpp.

References _control, _ctx, _loginParameters, _next_receive_seq, FIX8::LoginParameters::_no_chksum_flag, FIX8::LoginParameters::_permissive_mode_flag, _plogger, FIX8::LoginParameters::_reliable, FIX8::LoginParameters::_silent_disconnect, _state, activation_check(), FIX8::Common_MsgByte_HEARTBEAT(), FIX8::Common_MsgByte_LOGON(), FIX8::Common_MsgByte_LOGOUT(), FIX8::Common_MsgByte_REJECT(), FIX8::Common_MsgByte_RESEND_REQUEST(), FIX8::Common_MsgByte_SEQUENCE_RESET(), FIX8::Common_MsgByte_TEST_REQUEST(), FIX8::Common_MsgType_HEARTBEAT(), FIX8::default_field_separator(), do_state_change(), FIX8::Message::factory(), FILE_LINE, FIX8::f8Exception::force_logoff(), generate_logout(), FIX8::MessageBase::get_msgtype(), glout_fatal, handle_admin(), handle_application(), handle_heartbeat(), handle_logon(), handle_logout(), handle_outbound_reject(), handle_reject(), handle_resend_request(), handle_sequence_reset(), handle_test_request(), FIX8::Logger::has_flag(), FIX8::Logger::inbound, FIX8::Logger::Info, FIX8::Message::is_admin(), plog(), print, printnohb, send(), slout_debug, slout_error, slout_fatal, FIX8::States::st_logoff_sent, FIX8::States::st_logon_received, FIX8::States::st_session_terminated, FIX8::States::st_wait_for_logon, stop(), update_persist_seqnums(), and FIX8::f8Exception::what().

278 {
279  unsigned seqnum(0);
280  bool remote_logged_out {};
281  const Message *msg = nullptr;
282 
283  try
284  {
285  const f8String::size_type fpos(from.find("34="));
286  if (fpos == f8String::npos)
287  {
288  slout_debug << "Session::process throwing for " << from;
289  throw InvalidMessage(from, FILE_LINE);
290  }
291 
292  seqnum = fast_atoi<unsigned>(from.data() + fpos + 3, default_field_separator);
293 
294  bool retry_plog(false);
296  {
298  plog(from, Logger::Info, 1);
299  else
300  retry_plog = true;
301  }
302 
304  {
305  glout_fatal << "Fatal: factory failed to generate a valid message";
306  return false;
307  }
308 
310  cout << *msg << endl;
311  else if (_control & print)
312  cout << *msg << endl;
313 
314  bool result(false), admin_result(msg->is_admin() ? handle_admin(seqnum, msg) : true);
315  if (msg->get_msgtype().size() > 1)
316  goto application_call;
317  else switch(msg->get_msgtype()[0])
318  {
319  default:
320 application_call:
321  if (activation_check(seqnum, msg))
322  result = handle_application(seqnum, msg);
323  break;
325  result = handle_heartbeat(seqnum, msg);
326  break;
328  result = handle_test_request(seqnum, msg);
329  break;
331  result = handle_resend_request(seqnum, msg);
332  break;
334  result = handle_reject(seqnum, msg);
335  break;
337  result = handle_sequence_reset(seqnum, msg);
338  break;
340  result = handle_logout(seqnum, msg);
341  remote_logged_out = true;
342  break;
344  result = handle_logon(seqnum, msg);
345  break;
346  }
347 
349  if (retry_plog)
350  plog(from, Logger::Info, 1);
351 
353 
354  if (remote_logged_out) // FX-615; permit logout seqnum to be persisted
355  {
356  slout_debug << "logout received from remote";
357  stop();
358  }
359 
360  delete msg;
361  return result && admin_result;
362  }
363  catch (LogfileException& e)
364  {
365  cerr << e.what() << endl;
366  }
367  catch (f8Exception& e)
368  {
369  slout_debug << "process: f8exception" << ' ' << seqnum << ' ' << e.what();
370 
371  if (e.force_logoff())
372  {
374  plog(from, Logger::Info, 1);
375  slout_fatal << e.what() << " - will logoff";
377  {
379  send(generate_logout(e.what()), true, 0, true); // so it won't increment
381  }
383  {
384  slout_debug << "rethrowing ...";
385  throw;
386  }
387  stop();
388  }
389  else
390  {
391  slout_error << e.what() << " - inbound message rejected";
392  handle_outbound_reject(seqnum, msg, e.what());
395  plog(from, Logger::Info, 1);
396  delete msg;
397  return true; // message is handled but has errors
398  }
399  }
400  catch (Poco::Net::NetException& e)
401  {
402  slout_debug << "process:: Poco::Net::NetException";
403  slout_error << e.what();
404  }
405  catch (exception& e)
406  {
407  slout_debug << "process:: std::exception";
408  slout_error << e.what();
409  }
410 
411  return false;
412 }
const char Common_MsgByte_SEQUENCE_RESET('4')
F8API void update_persist_seqnums()
Force persister to sync next send/receive seqnums.
Definition: session.cpp:266
const char Common_MsgByte_REJECT('3')
virtual bool activation_check(const unsigned seqnum, const Message *msg)
Definition: session.hpp:736
bool force_logoff() const
Definition: f8exception.hpp:89
bool plog(const std::string &what, Logger::Level lev, const unsigned direction=0) const
Definition: session.hpp:703
Logger * _plogger
Definition: session.hpp:422
virtual F8API bool handle_logon(const unsigned seqnum, const Message *msg)
Definition: session.cpp:470
const char Common_MsgByte_LOGON('A')
#define FILE_LINE
Definition: f8utils.hpp:68
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
bool has_flag(const Flags flg) const
Definition: logger.hpp:338
const f8String Common_MsgType_HEARTBEAT("0")
virtual bool handle_admin(const unsigned seqnum, const Message *msg)
Definition: session.hpp:490
virtual bool is_admin() const
Definition: message.hpp:1151
virtual F8API bool handle_resend_request(const unsigned seqnum, const Message *msg)
Definition: session.cpp:663
LoginParameters _loginParameters
Definition: session.hpp:418
virtual F8API Message * generate_logout(const char *msgstr=nullptr)
Definition: session.cpp:947
virtual F8API bool handle_sequence_reset(const unsigned seqnum, const Message *msg)
Definition: session.cpp:642
const char Common_MsgByte_RESEND_REQUEST('2')
virtual F8API bool handle_logout(const unsigned seqnum, const Message *msg)
Definition: session.cpp:630
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
virtual bool handle_reject(const unsigned seqnum, const Message *msg)
Definition: session.hpp:484
Base exception class.
Definition: f8exception.hpp:49
const f8String & get_msgtype() const
Definition: message.hpp:538
Control _control
Definition: session.hpp:406
F8API void stop()
stop the session.
Definition: session.cpp:230
virtual bool handle_application(const unsigned seqnum, const Message *&msg)=0
const char Common_MsgByte_HEARTBEAT('0')
virtual F8API bool handle_test_request(const unsigned seqnum, const Message *msg)
Definition: session.cpp:771
const char Common_MsgByte_TEST_REQUEST('1')
virtual F8API bool handle_heartbeat(const unsigned seqnum, const Message *msg)
Definition: session.cpp:868
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
static Message * factory(const F8MetaCntx &ctx, const char *from, bool no_chksum=false, bool permissive_mode=false)
Definition: message.hpp:1238
An invalid message was requested or decoded.
const F8MetaCntx & _ctx
Definition: session.hpp:411
#define glout_fatal
Definition: logger.hpp:608
const unsigned char default_field_separator(0x1)
default FIX field separator (^A)
const char * what() const
Definition: f8exception.hpp:85
#define slout_fatal
Definition: session.hpp:886
#define slout_error
Definition: session.hpp:885
virtual F8API bool handle_outbound_reject(const unsigned seqnum, const Message *msg, const char *errstr)
Definition: session.cpp:782
const char Common_MsgByte_LOGOUT('5')
#define slout_debug
Definition: session.hpp:887
Could not open a logfile.
void Session::recover_seqnums ( )
protectedvirtual

Recover next expected and next to send sequence numbers from persitence layer.

Definition at line 1124 of file session.cpp.

References _connection, _next_receive_seq, _next_send_seq, _per_spl, _persist, FIX8::Persister::get(), FIX8::Connection::get_pmodel(), FIX8::pm_coro, and slout_info.

Referenced by handle_logon(), and start().

1125 {
1127  if (_persist)
1128  {
1129  unsigned send_seqnum, receive_seqnum;
1130  if (_persist->get(send_seqnum, receive_seqnum))
1131  {
1132  slout_info << "Last sent: " << send_seqnum << ", last received: " << receive_seqnum;
1133  _next_send_seq = send_seqnum; // + 1;
1134  _next_receive_seq = receive_seqnum; // + 1;
1135  }
1136  }
1137 }
Persister * _persist
Definition: session.hpp:421
ProcessModel get_pmodel() const
Definition: connection.hpp:553
virtual bool get(const unsigned seqnum, f8String &to) const =0
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
Connection * _connection
Definition: session.hpp:413
f8_spin_lock _per_spl
Definition: session.hpp:420
#define slout_info
Definition: session.hpp:883
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
bool Session::retrans_callback ( const SequencePair with,
RetransmissionContext rctx 
)
virtual

Retransmission callback. Called by framework with each message to be resent.

Parameters
withpair of sequence number and raw fix message
rctxretransmission context
Returns
true on success

Definition at line 707 of file session.cpp.

References FIX8::Session::RetransmissionContext::_begin, _ctx, FIX8::Session::RetransmissionContext::_interrupted_seqnum, FIX8::Session::RetransmissionContext::_last, _next_send_seq, FIX8::Session::RetransmissionContext::_no_more_records, do_state_change(), FIX8::Message::factory(), generate_sequence_reset(), send(), slout_debug, and FIX8::States::st_continuous.

Referenced by handle_resend_request().

708 {
709  //cout << "first:" << with.first << ' ' << rctx << endl;
710 
711  if (rctx._no_more_records)
712  {
713  /*
714  if (rctx._end)
715  {
716  _next_send_seq = rctx._interrupted_seqnum - 1;
717  send(generate_sequence_reset(rctx._interrupted_seqnum, true), true, rctx._last + 1);
718  //cout << "#1" << endl;
719  }
720  else if (!rctx._last)
721  {
722  _next_send_seq = rctx._interrupted_seqnum;
723  send(generate_sequence_reset(rctx._interrupted_seqnum, true), true, rctx._begin);
724  //cout << "#4" << endl;
725  }
726  */
727  if (!rctx._last) // start to infinity requested
728  {
729  // handle case where requested seq is greater than current last sent seq (interrupted)
730  const unsigned nseq(rctx._begin >= rctx._interrupted_seqnum ? rctx._begin + 1 : rctx._interrupted_seqnum);
731  send(generate_sequence_reset(nseq, true), true, rctx._begin);
732  _next_send_seq = nseq;
733  slout_debug << "retrans_callback scenario #" << (nseq == rctx._interrupted_seqnum ? 4 : 5) << ' ' << rctx;
734  }
735  else // range requested // was: if (rctx._end)
736  {
737  // handle case where requested seq is greater than current last sent seq (interrupted)
738  const unsigned nseq(rctx._last + 1 >= rctx._interrupted_seqnum ? rctx._last + 2 : rctx._interrupted_seqnum);
739  send(generate_sequence_reset(nseq, true), true, rctx._last + 1);
740  _next_send_seq = nseq;
741  slout_debug << "retrans_callback scenario #" << (nseq == rctx._interrupted_seqnum ? 1 : 6) << ' ' << rctx;
742  }
744  return true;
745  }
746 
747  if (rctx._last)
748  {
749  if (rctx._last + 1 < with.first)
750  {
751  send(generate_sequence_reset(with.first, true), true, _next_send_seq);
752  slout_debug << "retrans_callback scenario #2, " << rctx;
753  }
754  }
755  else
756  {
757  if (with.first > rctx._begin)
758  {
759  send(generate_sequence_reset(with.first, true));
760  slout_debug << "retrans_callback scenario #3, " << rctx;
761  }
762  }
763 
764  rctx._last = with.first;
765 
766  Message *msg(Message::factory(_ctx, with.second));
767  return send(msg);
768 }
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
static Message * factory(const F8MetaCntx &ctx, const char *from, bool no_chksum=false, bool permissive_mode=false)
Definition: message.hpp:1238
const F8MetaCntx & _ctx
Definition: session.hpp:411
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
virtual F8API Message * generate_sequence_reset(const unsigned newseqnum, const bool gapfillflag=false)
Definition: session.cpp:966
#define slout_debug
Definition: session.hpp:887
bool Session::send ( Message msg,
bool  destroy = true,
const unsigned  custom_seqnum = 0,
const bool  no_increment = false 
)
virtual

Send message.

Parameters
msgMessage
destroyif true, destroy message after send
custom_seqnumoverride sequence number with this value
no_incrementif true, don't increment the seqnum after sending
Returns
true on success

Definition at line 978 of file session.cpp.

References _connection, FIX8::Message::set_custom_seqnum(), FIX8::Message::set_no_increment(), and FIX8::Connection::write().

Referenced by handle_logon(), handle_outbound_reject(), handle_resend_request(), handle_test_request(), heartbeat_service(), process(), retrans_callback(), sequence_check(), and start().

979 {
980  if (custom_seqnum)
981  tosend->set_custom_seqnum(custom_seqnum);
982  if (no_increment)
983  tosend->set_no_increment(no_increment);
984  return _connection && _connection->write(tosend, destroy);
985 }
Connection * _connection
Definition: session.hpp:413
virtual bool write(Message *from, bool destroy=true)
Definition: connection.hpp:577
bool Session::send ( Message msg,
const unsigned  custom_seqnum = 0,
const bool  no_increment = false 
)
virtual

Send message - non-pipelined version. WARNING: be sure you don't inadvertently use this method. Symptoms will be out of sequence messages (seqnum==1) and core dumping.

Parameters
msgMessage
custom_seqnumoverride sequence number with this value
no_incrementif true, don't increment the seqnum after sending
Returns
true on success

Definition at line 987 of file session.cpp.

References _connection, FIX8::Message::set_custom_seqnum(), FIX8::Message::set_no_increment(), and FIX8::Connection::write().

988 {
989  if (custom_seqnum)
990  tosend.set_custom_seqnum(custom_seqnum);
991  if (no_increment)
992  tosend.set_no_increment(no_increment);
993  return _connection && _connection->write(tosend);
994 }
Connection * _connection
Definition: session.hpp:413
virtual bool write(Message *from, bool destroy=true)
Definition: connection.hpp:577
size_t Session::send_batch ( const std::vector< Message * > &  msgs,
bool  destroy = true 
)
virtual

Send a batch of messages. During this call HB and test requests are suspended.

Parameters
msgsvector of Message ptrs
destroyif true, destroy message after send
Returns
size_t number of messages sent - if destroy was true those sent messages will have been destroyed with the reamining messages in the vector still allocated

Definition at line 997 of file session.cpp.

References _connection, and FIX8::Connection::write_batch().

998 {
999  return _connection->write_batch(msgs, destroy);
1000 }
size_t write_batch(const std::vector< Message * > &msgs, bool destroy)
Definition: connection.hpp:588
Connection * _connection
Definition: session.hpp:413
bool Session::send_process ( Message msg)

Process message (encode) and send.

Parameters
msgMessage
Returns
true on success

Definition at line 1009 of file session.cpp.

References FIX8::LoginParameters::_always_seqnum_assign, _batchmsgs_buffer, _connection, _last_sent, _loginParameters, _next_receive_seq, _next_send_seq, _per_spl, _persist, _plogger, _sid, FIX8::Common_MsgSeqNum(), FIX8::Common_MsgType_SEQUENCE_RESET(), FIX8::Common_PossDupFlag(), FIX8::Common_SenderCompID(), FIX8::Common_TargetCompID(), FIX8_MAX_MSG_LENGTH, FIX8::MessageBase::get(), FIX8::Message::get_custom_seqnum(), FIX8::Connection::get_pmodel(), FIX8::SessionID::get_senderCompID(), FIX8::SessionID::get_targetCompID(), FIX8::Logger::has_flag(), FIX8::MessageBase::have(), FIX8::Message::Header(), FIX8::HEADER_CALC_OFFSET(), FIX8::Logger::Info, modify_header(), modify_outbound(), FIX8::Tickval::now(), FIX8::Logger::outbound, plog(), FIX8::pm_coro, FIX8::Persister::put(), FIX8::MessageBase::remove(), FIX8::Connection::send(), slout_debug, slout_error, and FIX8::f8Exception::what().

Referenced by FIX8::FIXWriter::write(), and FIX8::FIXWriter::write_batch().

1010 {
1011  //cout << "send_process()" << endl;
1012  bool is_dup(msg->Header()->have(Common_PossDupFlag));
1013  if (!msg->Header()->have(Common_SenderCompID))
1014  *msg->Header() << new sender_comp_id(_sid.get_senderCompID());
1015  if (!msg->Header()->have(Common_TargetCompID))
1016  *msg->Header() << new target_comp_id(_sid.get_targetCompID());
1017 
1018  if (msg->Header()->have(Common_MsgSeqNum))
1019  {
1020  if (is_dup)
1021  {
1023  delete msg->Header()->remove(Common_PossDupFlag);
1024  }
1025  else
1026  {
1028  {
1029  *msg->Header() << new poss_dup_flag(true);
1030  is_dup = true;
1031  }
1032  }
1033 
1034  sending_time sendtime;
1035  msg->Header()->get(sendtime);
1036  *msg->Header() << new orig_sending_time(sendtime());
1037 
1039  {
1040  slout_debug << "send_process: _next_send_seq = " << _next_send_seq;
1041  *msg->Header() << new msg_seq_num(msg->get_custom_seqnum() ? msg->get_custom_seqnum() : static_cast<unsigned int>(_next_send_seq));
1042  }
1043  }
1044  else
1045  {
1046  slout_debug << "send_process: _next_send_seq = " << _next_send_seq;
1047  *msg->Header() << new msg_seq_num(msg->get_custom_seqnum() ? msg->get_custom_seqnum() : static_cast<unsigned int>(_next_send_seq));
1048  }
1049  *msg->Header() << new sending_time;
1050 
1051  // allow session to modify the header of this message before sending
1052  const int fields_modified(modify_header(msg->Header()));
1053  if (fields_modified)
1054  {
1055  slout_debug << "send_process: " << fields_modified << " header fields added/modified";
1056  }
1057 
1058  try
1059  {
1060  slout_debug << "Sending:" << *msg;
1061  modify_outbound(msg);
1062  char output[FIX8_MAX_MSG_LENGTH + HEADER_CALC_OFFSET], *ptr(output);
1063  size_t enclen(msg->encode(&ptr));
1064  const char *optr(ptr);
1065  if (msg->get_end_of_batch())
1066  {
1067  if (!_batchmsgs_buffer.empty())
1068  {
1069  _batchmsgs_buffer.append(ptr);
1070  ptr = &_batchmsgs_buffer[0];
1071  enclen = _batchmsgs_buffer.size();
1072  }
1073  if (!_connection->send(ptr, enclen))
1074  {
1075  slout_error << "Message write failed: " << enclen << " bytes";
1076  _batchmsgs_buffer.clear();
1077  return false;
1078  }
1079  _last_sent.now();
1080  _batchmsgs_buffer.clear();
1081  }
1082  else
1083  {
1084  _batchmsgs_buffer.append(ptr);
1085  }
1086 
1088  plog(optr, Logger::Info);
1089 
1090  //cout << "send_process" << endl;
1091 
1092  if (!is_dup)
1093  {
1094  if (_persist)
1095  {
1096  f8_scoped_spin_lock guard(_per_spl, _connection->get_pmodel() == pm_coro); // not needed for coroutine mode
1097  if (!msg->is_admin())
1098  _persist->put(_next_send_seq, ptr);
1099  _persist->put(_next_send_seq + 1, _next_receive_seq);
1100  //cout << "Persisted (send):" << (_next_send_seq + 1) << " and " << _next_receive_seq << endl;
1101  }
1102  if (!msg->get_custom_seqnum() && !msg->get_no_increment() && msg->get_msgtype() != Common_MsgType_SEQUENCE_RESET)
1103  {
1104  ++_next_send_seq;
1105  //cout << "Seqnum now:" << _next_send_seq << " and " << _next_receive_seq << endl;
1106  }
1107  }
1108  }
1109  catch (f8Exception& e)
1110  {
1111  slout_error << e.what();
1112  return false;
1113  }
1114  catch (Poco::Exception& e)
1115  {
1116  slout_error << e.displayText();
1117  return false;
1118  }
1119 
1120  return true;
1121 }
bool have(const unsigned short fnum) const
Definition: message.hpp:725
bool plog(const std::string &what, Logger::Level lev, const unsigned direction=0) const
Definition: session.hpp:703
Logger * _plogger
Definition: session.hpp:422
Field< UTCTimestamp, Common_SendingTime > sending_time
Definition: field.hpp:2155
const f8String Common_MsgType_SEQUENCE_RESET("4")
F8API BaseField * remove(const unsigned short fnum, Presence::const_iterator itr)
Definition: message.cpp:646
const unsigned short Common_TargetCompID(56)
virtual void modify_outbound(Message *msg)
Definition: session.hpp:514
Field< SeqNum, Common_MsgSeqNum > msg_seq_num
Definition: field.hpp:2137
Field< Boolean, Common_PossDupFlag > poss_dup_flag
Definition: field.hpp:2159
Persister * _persist
Definition: session.hpp:421
ProcessModel get_pmodel() const
Definition: connection.hpp:553
const size_t HEADER_CALC_OFFSET(32)
SessionID _sid
Definition: session.hpp:415
const sender_comp_id & get_senderCompID() const
Definition: session.hpp:101
bool has_flag(const Flags flg) const
Definition: logger.hpp:338
std::string _batchmsgs_buffer
Definition: session.hpp:426
LoginParameters _loginParameters
Definition: session.hpp:418
Field< UTCTimestamp, Common_OrigSendingTime > orig_sending_time
Definition: field.hpp:2156
const unsigned short Common_SenderCompID(49)
bool get(T &to) const
Definition: message.hpp:671
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
Connection * _connection
Definition: session.hpp:413
Base exception class.
Definition: f8exception.hpp:49
Tickval _last_sent
Definition: session.hpp:410
MessageBase * Header() const
Definition: message.hpp:1098
virtual bool put(const unsigned seqnum, const f8String &what)=0
const unsigned short Common_PossDupFlag(43)
Field< f8String, Common_SenderCompID > sender_comp_id
Definition: field.hpp:2145
#define FIX8_MAX_MSG_LENGTH
Definition: f8config.h:576
virtual F8API int modify_header(MessageBase *msg)
Definition: session.cpp:1003
Tickval & now()
Definition: tickval.hpp:133
f8_spin_lock _per_spl
Definition: session.hpp:420
const unsigned short Common_MsgSeqNum(34)
const target_comp_id & get_targetCompID() const
Definition: session.hpp:105
Field template. There will ONLY be partial template specialisations of this template.
Definition: field.hpp:256
unsigned get_custom_seqnum() const
Definition: message.hpp:1259
int send(const char *from, size_t sz)
Definition: connection.hpp:594
const char * what() const
Definition: f8exception.hpp:85
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
#define slout_error
Definition: session.hpp:885
Field< f8String, Common_TargetCompID > target_comp_id
Definition: field.hpp:2146
#define slout_debug
Definition: session.hpp:887
bool Session::sequence_check ( const unsigned  seqnum,
const Message msg 
)

Check that a message is in the correct sequence for this session. Will generated resend request if required. Throws InvalidMsgSequence, MissingMandatoryField, BadSendingTime.

Parameters
seqnummessage sequence number
msgMessage
Returns
true on success

Definition at line 427 of file session.cpp.

References _next_receive_seq, FIX8::SessionConfig::_ses, _sf, _state, do_state_change(), generate_resend_request(), FIX8::MessageBase::get(), FIX8::Configuration::get_ignore_logon_sequence_check_flag(), FIX8::Message::Header(), FIX8::BaseField::print(), send(), slout_warn, FIX8::States::st_continuous, and FIX8::States::st_resend_request_sent.

Referenced by enforce().

428 {
429  //cout << "seqnum:" << seqnum << " next_target_seq:" << _next_receive_seq
430  //<< " next_sender_seq:" << _next_send_seq << " state:" << _state << " next_receive_seq:" << _next_receive_seq << endl;
431 
432  if (seqnum > _next_receive_seq)
433  {
435  {
436  slout_warn << "Resend request already sent";
437  }
439  {
442  }
443  // If SessionConfig has *not* been set, assume wrong logon sequence is checked.
445  throw InvalidMsgSequence(seqnum, _next_receive_seq);
446  return false;
447  }
448 
449  if (seqnum < _next_receive_seq)
450  {
451  poss_dup_flag pdf(false);
452  msg->Header()->get(pdf);
453  if (!pdf()) // poss dup not set so bad
454  throw MsgSequenceTooLow(seqnum, _next_receive_seq);
455  sending_time st;
456  msg->Header()->get(st);
457  orig_sending_time ost;
458  if (msg->Header()->get(ost) && ost() > st())
459  {
460  ostringstream ostr;
461  ost.print(ostr);
462  throw BadSendingTime(ostr.str());
463  }
464  }
465 
466  return true;
467 }
A message was received with an invalid sending time.
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
bool get(T &to) const
Definition: message.hpp:671
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
MessageBase * Header() const
Definition: message.hpp:1098
A message was received with an out of sequence sequence number.
struct SessionConfig * _sf
Definition: session.hpp:416
Field template. There will ONLY be partial template specialisations of this template.
Definition: field.hpp:256
bool get_ignore_logon_sequence_check_flag(const XmlElement *from, const bool def=false) const
const XmlElement * _ses
#define slout_warn
Definition: session.hpp:884
virtual F8API Message * generate_resend_request(const unsigned begin, const unsigned end=0)
Definition: session.cpp:957
virtual std::ostream & print(std::ostream &os) const =0
A message was received with an out of sequence sequence number.
void Session::set_affinity ( int  core_id)
protected

Set the CPU affinity mask for the given thread

Parameters
core_idcore to mask on

Definition at line 1200 of file session.cpp.

References slout_error.

1201 {
1202  slout_error << "set_affinity: not implemented";
1203 }
#define slout_error
Definition: session.hpp:885
void FIX8::Session::set_login_parameters ( const LoginParameters loginParamaters)
inline

Set the LoginParameters

Parameters
loginParamatersto populate from

Definition at line 754 of file session.hpp.

754 { _loginParameters = loginParamaters; }
LoginParameters _loginParameters
Definition: session.hpp:418
void FIX8::Session::set_persister ( Persister pst)
inline

Set the persister.

Parameters
pstpointer to persister object

Definition at line 770 of file session.hpp.

770 { _persist = pst; }
Persister * _persist
Definition: session.hpp:421
void FIX8::Session::set_reset_sequence_numbers_flag ( bool  flag)
inline

Set the reset_sequence_numbers flag, defaults to value set in config file (default false)

Parameters
flagtrue or false

Definition at line 766 of file session.hpp.

References FIX8::LoginParameters::_reset_sequence_numbers.

LoginParameters _loginParameters
Definition: session.hpp:418
void Session::set_scheduler ( int  priority)
protected

Set the scheduling policy for the current thread

Parameters
priorityscheduler priority

Definition at line 1194 of file session.cpp.

References slout_error.

1195 {
1196  slout_error << "set_scheduler: not implemented";
1197 }
#define slout_error
Definition: session.hpp:885
void FIX8::Session::set_session_config ( struct SessionConfig sf)
inline

Definition at line 782 of file session.hpp.

782 { _sf = sf; }
struct SessionConfig * _sf
Definition: session.hpp:416
int Session::start ( Connection connection,
bool  wait = true,
const unsigned  send_seqnum = 0,
const unsigned  recv_seqnum = 0,
const f8String  davi = f8String() 
)

Start the session.

Parameters
connectionestablished connection
waitif true, thread will wait till session ends before returning
send_seqnumif supplied, override the send login sequence number, set next send to seqnum+1
recv_seqnumif supplied, override the receive login sequence number, set next recv to seqnum+1
davidefault appl version id (FIXT)
Returns
-1 on error, 0 on success

Definition at line 167 of file session.cpp.

References _connection, _control, _logger, _loginParameters, _next_receive_seq, _next_send_seq, _plogger, _req_next_receive_seq, _req_next_send_seq, FIX8::LoginParameters::_reset_sequence_numbers, _schedule, FIX8::SessionConfig::_ses, _session_scheduler, _sf, _timer, atomic_init(), FIX8::ebitset_r< T, B >::clear(), FIX8::Connection::cn_acceptor, FIX8::Connection::cn_initiator, FIX8::Connection::connect(), copyright_string(), FIX8::Configuration::create_session_schedule(), do_state_change(), generate_logon(), FIX8::Connection::get_hb_interval(), FIX8::Connection::get_role(), glout_info, FIX8::Connection::join(), FIX8::Logger::purge_thread_codes(), recover_seqnums(), send(), shutdown, slout_info, FIX8::States::st_logon_sent, FIX8::States::st_not_logged_in, FIX8::States::st_wait_for_logon, and FIX8::Connection::start().

169 {
171  if (_logger)
173 
174  if (_plogger)
176 
178  slout_info << "Starting session";
179  _connection = connection; // takes owership
180  if (!_connection->connect()) // if already connected returns true
181  return -1;
183  atomic_init(States::st_wait_for_logon); // important for server that this is done before connect
184  _connection->start();
185  slout_info << "Session connected";
186 
188  {
192  else
193  {
194  recover_seqnums();
195  if (send_seqnum)
196  _next_send_seq = send_seqnum;
197  if (recv_seqnum)
198  _next_receive_seq = recv_seqnum;
199  }
200 
203  }
204  else
205  {
206  if (send_seqnum)
207  _req_next_send_seq = send_seqnum;
208  if (recv_seqnum)
209  _req_next_receive_seq = recv_seqnum;
210  }
211 
213  {
215  {
216  slout_info << *_schedule;
217  }
218  _timer.schedule(_session_scheduler, 1000); // check every second
219  }
220 
221  if (wait) // wait for
222  {
223  _connection->join();
224  }
225 
226  return 0;
227 }
Logger * _logger
Definition: session.hpp:422
unsigned _req_next_send_seq
Definition: session.hpp:414
unsigned get_hb_interval() const
Definition: connection.hpp:608
Logger * _plogger
Definition: session.hpp:422
TimerEvent< Session > _session_scheduler
Definition: session.hpp:425
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
Session_Schedule * _schedule
Definition: session.hpp:427
Role get_role() const
Definition: connection.hpp:549
static F8API const f8String copyright_string()
Definition: session.cpp:1207
LoginParameters _loginParameters
Definition: session.hpp:418
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
Connection * _connection
Definition: session.hpp:413
Control _control
Definition: session.hpp:406
virtual F8API Message * generate_logon(const unsigned heartbeat_interval, const f8String davi=f8String())
Definition: session.cpp:933
struct SessionConfig * _sf
Definition: session.hpp:416
F8API void start()
Start the reader and writer threads.
Definition: connection.cpp:315
void clear(const T sbit)
Definition: f8utils.hpp:1011
F8API void purge_thread_codes()
Remove dead threads from the thread code cache.
Definition: logger.cpp:206
#define glout_info
Definition: logger.hpp:601
virtual bool connect()
Definition: connection.hpp:567
unsigned _req_next_receive_seq
Definition: session.hpp:414
#define slout_info
Definition: session.hpp:883
F8API Session_Schedule * create_session_schedule(const XmlElement *from) const
const XmlElement * _ses
Timer< Session > _timer
Definition: session.hpp:424
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
virtual F8API void recover_seqnums()
Recover next expected and next to send sequence numbers from persitence layer.
Definition: session.cpp:1124
void atomic_init(States::SessionStates st)
Definition: session.cpp:144
virtual void FIX8::Session::state_change ( const States::SessionStates  before,
const States::SessionStates  after 
)
inlineprotectedvirtual

This method id called whenever a session state change occurs

Parameters
beforeprevious session state
afternew session state

Reimplemented in myfix_session_server, and myfix_session_client.

Definition at line 510 of file session.hpp.

Referenced by do_state_change().

510 {}
void Session::stop ( )

stop the session.

Definition at line 230 of file session.cpp.

References _connection, _control, _per_spl, _persist, _timer, FIX8::Connection::cn_initiator, FIX8::Connection::get_pmodel(), FIX8::Connection::get_role(), FIX8::hypersleep< h_milliseconds >(), FIX8::pm_coro, shutdown, FIX8::Persister::stop(), and FIX8::Connection::stop().

Referenced by client_process(), handle_logon(), heartbeat_service(), and process().

231 {
232  if (_control & shutdown)
233  return;
234  _control |= shutdown;
235 
236  if (_connection)
237  {
239  _timer.clear();
240  else
241  {
243  if (_persist)
244  _persist->stop();
245  }
246  _connection->stop();
247  }
249 }
F8API void stop()
Stop the reader and writer threads.
Definition: connection.cpp:322
Persister * _persist
Definition: session.hpp:421
ProcessModel get_pmodel() const
Definition: connection.hpp:553
Role get_role() const
Definition: connection.hpp:549
Connection * _connection
Definition: session.hpp:413
Control _control
Definition: session.hpp:406
int hypersleep< h_milliseconds >(unsigned amt)
Definition: hypersleep.hpp:105
f8_spin_lock _per_spl
Definition: session.hpp:420
virtual void stop()
Stop the persister thread.
Definition: persist.hpp:142
Timer< Session > _timer
Definition: session.hpp:424
void Session::update_persist_seqnums ( )

Force persister to sync next send/receive seqnums.

Definition at line 266 of file session.cpp.

References _connection, _next_receive_seq, _next_send_seq, _per_spl, _persist, FIX8::Connection::get_pmodel(), FIX8::pm_coro, and FIX8::Persister::put().

Referenced by process().

267 {
268  if (_persist)
269  {
272  //cout << "Persisted:" << _next_send_seq << " and " << _next_receive_seq << endl;
273  }
274 }
Persister * _persist
Definition: session.hpp:421
ProcessModel get_pmodel() const
Definition: connection.hpp:553
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
Connection * _connection
Definition: session.hpp:413
virtual bool put(const unsigned seqnum, const f8String &what)=0
f8_spin_lock _per_spl
Definition: session.hpp:420
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
void FIX8::Session::update_received ( )
inline

Update the last received time.

Definition at line 718 of file session.hpp.

References FIX8::Tickval::now().

718 { _last_received.now(); }
Tickval _last_received
Definition: session.hpp:410
Tickval & now()
Definition: tickval.hpp:133
void FIX8::Session::update_sent ( )
inline

Update the last sent time.

Definition at line 715 of file session.hpp.

References FIX8::Tickval::now().

715 { _last_sent.now(); }
Tickval _last_sent
Definition: session.hpp:410
Tickval & now()
Definition: tickval.hpp:133

Member Data Documentation

f8_atomic<bool> FIX8::Session::_active
protected

Definition at line 409 of file session.hpp.

Referenced by activation_check(), activation_service(), and atomic_init().

std::string FIX8::Session::_batchmsgs_buffer
protected

Definition at line 426 of file session.hpp.

Referenced by send_process(), and Session().

Control FIX8::Session::_control
protected

Definition at line 406 of file session.hpp.

Referenced by control(), process(), start(), and stop().

const F8MetaCntx& FIX8::Session::_ctx
protected

Definition at line 411 of file session.hpp.

Referenced by get_ctx(), handle_logon(), process(), and retrans_callback().

TimerEvent<Session> FIX8::Session::_hb_processor
protected

Definition at line 425 of file session.hpp.

Referenced by handle_logon().

Tickval FIX8::Session::_last_received
protected

Definition at line 410 of file session.hpp.

Referenced by get_last_received(), and heartbeat_service().

Tickval FIX8::Session::_last_sent
protected

Definition at line 410 of file session.hpp.

Referenced by get_last_sent(), heartbeat_service(), and send_process().

Logger* FIX8::Session::_logger
protected

Definition at line 422 of file session.hpp.

Referenced by handle_logon(), Session(), start(), and ~Session().

LoginParameters FIX8::Session::_loginParameters
protected
f8_atomic<unsigned> FIX8::Session::_next_receive_seq
protected
f8_atomic<unsigned> FIX8::Session::_next_send_seq
protected
f8_spin_lock FIX8::Session::_per_spl
protected
Persister* FIX8::Session::_persist
protected
Logger * FIX8::Session::_plogger
protected

Definition at line 422 of file session.hpp.

Referenced by handle_logon(), process(), send_process(), Session(), start(), and ~Session().

unsigned FIX8::Session::_req_next_receive_seq
protected

Definition at line 414 of file session.hpp.

Referenced by handle_logon(), and start().

unsigned FIX8::Session::_req_next_send_seq
protected

Definition at line 414 of file session.hpp.

Referenced by handle_logon(), and start().

Session_Schedule* FIX8::Session::_schedule
protected

Definition at line 427 of file session.hpp.

Referenced by activation_service(), handle_logon(), start(), and ~Session().

sender_comp_id FIX8::Session::_sci
protected

Definition at line 412 of file session.hpp.

Referenced by handle_logon().

TimerEvent<Session> FIX8::Session::_session_scheduler
protected

Definition at line 425 of file session.hpp.

Referenced by start().

struct SessionConfig* FIX8::Session::_sf
protected

Definition at line 416 of file session.hpp.

Referenced by handle_logon(), sequence_check(), and start().

SessionID FIX8::Session::_sid
protected

Definition at line 415 of file session.hpp.

Referenced by enforce(), get_sid(), handle_logon(), send_process(), and Session().

const vector< f8String > Session::_state_names
staticprotected
Initial value:
{
"none", "continuous", "session_terminated",
"wait_for_logon", "not_logged_in", "logon_sent", "logon_received", "logoff_sent",
"logoff_received", "test_request_sent", "sequence_reset_sent",
"sequence_reset_received", "resend_request_sent", "resend_request_received"
}

string representation of Sessionstates

Definition at line 430 of file session.hpp.

Timer<Session> FIX8::Session::_timer
protected

Definition at line 424 of file session.hpp.

Referenced by get_timer(), handle_logon(), Session(), start(), and stop().


The documentation for this class was generated from the following files: