fix8  version 1.4.0
Open Source C++ FIX Framework
session.cpp
Go to the documentation of this file.
1 //-----------------------------------------------------------------------------------------
2 /*
3 
4 Fix8 is released under the GNU LESSER GENERAL PUBLIC LICENSE Version 3.
5 
6 Fix8 Open Source FIX Engine.
7 Copyright (C) 2010-16 David L. Dight <fix@fix8.org>
8 
9 Fix8 is free software: you can redistribute it and / or modify it under the terms of the
10 GNU Lesser General Public License as published by the Free Software Foundation, either
11 version 3 of the License, or (at your option) any later version.
12 
13 Fix8 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
14 even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 
16 You should have received a copy of the GNU Lesser General Public License along with Fix8.
17 If not, see <http://www.gnu.org/licenses/>.
18 
19 BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE PROGRAM, TO
20 THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE
21 COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY
22 KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO
24 THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE,
25 YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
26 
27 IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT
28 HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE PROGRAM AS PERMITTED
29 ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
30 CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT
31 NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
32 THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH
33 HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
34 
35 */
36 //-----------------------------------------------------------------------------------------
37 #include "precomp.hpp"
38 #include <fix8/f8includes.hpp>
39 
40 //-------------------------------------------------------------------------------------------------
41 using namespace FIX8;
42 using namespace std;
43 
44 
45 //-------------------------------------------------------------------------------------------------
46 namespace
47 {
48  const string package_version { FIX8_PACKAGE_NAME " version " FIX8_PACKAGE_VERSION };
49  const string copyright_short { "Copyright (c) 2010-" };
50  const string copyright_short2 { ", David L. Dight <" FIX8_PACKAGE_BUGREPORT ">, All rights reserved. [" FIX8_PACKAGE_URL "]"};
51 }
52 
53 //-------------------------------------------------------------------------------------------------
54 RegExp SessionID::_sid("([^:]+):([^-]+)->(.+)");
55 
56 //-------------------------------------------------------------------------------------------------
57 #if defined(_MSC_VER) && !defined(BUILD_F8API)
58 // no need in definition since it is in dll already
59 #else
60 const vector<f8String> Session::_state_names
61 {
62  "none", "continuous", "session_terminated",
63  "wait_for_logon", "not_logged_in", "logon_sent", "logon_received", "logoff_sent",
64  "logoff_received", "test_request_sent", "sequence_reset_sent",
65  "sequence_reset_received", "resend_request_sent", "resend_request_received"
66 };
67 #endif
68 #if defined(_MSC_VER)
69 #pragma warning(push)
70 #pragma warning(disable: 4273)
71 #endif
72 //-------------------------------------------------------------------------------------------------
74 {
75  ostringstream ostr;
76  ostr << _beginString << ':' << _senderCompID << "->" << _targetCompID;
77  _id = ostr.str();
78 }
79 
80 //-------------------------------------------------------------------------------------------------
82 {
83  return SessionID(_beginString(), _targetCompID(), _senderCompID());
84 }
85 
86 //-------------------------------------------------------------------------------------------------
88 {
89  RegMatch match;
90  if (_sid.SearchString(match, from, 4) == 4)
91  {
92  f8String bstr, scid, tcid;
93  _sid.SubExpr(match, from, bstr, 0, 1);
94  _beginString.set(bstr);
95  _sid.SubExpr(match, from, scid, 0, 2);
96  _senderCompID.set(scid);
97  _sid.SubExpr(match, from, tcid, 0, 3);
98  _targetCompID.set(tcid);
99  make_id();
100  }
101 }
102 
103 //-------------------------------------------------------------------------------------------------
104 //-------------------------------------------------------------------------------------------------
105 Session::Session(const F8MetaCntx& ctx, const SessionID& sid, Persister *persist, Logger *logger, Logger *plogger) :
106 _state(States::st_none),
107 _ctx(ctx), _connection(), _req_next_send_seq(), _req_next_receive_seq(),
108  _sid(sid), _sf(), _persist(persist), _logger(logger), _plogger(plogger), // initiator
109  _timer(*this, 10), _hb_processor(&Session::heartbeat_service, true),
110  _session_scheduler(&Session::activation_service, true), _schedule()
111 {
112  _timer.start();
114 
115  if (!_logger)
116  {
117  glout_warn << "Warning: no session logger defined for " << _sid;
118  }
119 
120  if (!_plogger)
121  {
122  glout_warn << "Warning: no protocol logger defined for " << _sid;
123  }
124 
125  if (!_persist)
126  {
127  glout_warn << "Warning: no persister defined for " << _sid;
128  }
129 }
130 
131 //-------------------------------------------------------------------------------------------------
132 Session::Session(const F8MetaCntx& ctx, const sender_comp_id& sci, Persister *persist, Logger *logger, Logger *plogger) :
133 _state(States::st_none),
134 _ctx(ctx), _sci(sci), _connection(), _req_next_send_seq(), _req_next_receive_seq(),
135  _sf(), _persist(persist), _logger(logger), _plogger(plogger), // acceptor
136  _timer(*this, 10), _hb_processor(&Session::heartbeat_service, true),
137  _session_scheduler(&Session::activation_service, true), _schedule()
138 {
139  _timer.start();
141 }
142 
143 //-------------------------------------------------------------------------------------------------
145 {
146  do_state_change(st);
148  _active = true;
149 }
150 
151 //-------------------------------------------------------------------------------------------------
153 {
154  slout_info << "Session terminating";
155  if (_logger)
156  _logger->stop();
157  if (_plogger)
158  _plogger->stop();
159  hypersleep<h_seconds>(1); // needed for service threads to exit gracefully
160 
162  { f8_scoped_spin_lock guard(_per_spl); delete _persist; _persist = 0; }
163  delete _schedule;
164 }
165 
166 //-------------------------------------------------------------------------------------------------
167 int Session::start(Connection *connection, bool wait, const unsigned send_seqnum,
168  const unsigned recv_seqnum, const f8String davi)
169 {
171  if (_logger)
173 
174  if (_plogger)
176 
178  slout_info << "Starting session";
179  _connection = connection; // takes owership
180  if (!_connection->connect()) // if already connected returns true
181  return -1;
183  atomic_init(States::st_wait_for_logon); // important for server that this is done before connect
184  _connection->start();
185  slout_info << "Session connected";
186 
188  {
192  else
193  {
194  recover_seqnums();
195  if (send_seqnum)
196  _next_send_seq = send_seqnum;
197  if (recv_seqnum)
198  _next_receive_seq = recv_seqnum;
199  }
200 
203  }
204  else
205  {
206  if (send_seqnum)
207  _req_next_send_seq = send_seqnum;
208  if (recv_seqnum)
209  _req_next_receive_seq = recv_seqnum;
210  }
211 
213  {
215  {
216  slout_info << *_schedule;
217  }
218  _timer.schedule(_session_scheduler, 1000); // check every second
219  }
220 
221  if (wait) // wait for
222  {
223  _connection->join();
224  }
225 
226  return 0;
227 }
228 
229 //-------------------------------------------------------------------------------------------------
231 {
232  if (_control & shutdown)
233  return;
234  _control |= shutdown;
235 
236  if (_connection)
237  {
239  _timer.clear();
240  else
241  {
243  if (_persist)
244  _persist->stop();
245  }
246  _connection->stop();
247  }
249 }
250 
251 //-------------------------------------------------------------------------------------------------
252 bool Session::enforce(const unsigned seqnum, const Message *msg)
253 {
255  {
257  compid_check(seqnum, msg, _sid);
258  if (msg->get_msgtype() != Common_MsgType_SEQUENCE_RESET && sequence_check(seqnum, msg))
259  return false;
260  }
261 
262  return true;
263 }
264 
265 //-------------------------------------------------------------------------------------------------
267 {
268  if (_persist)
269  {
272  //cout << "Persisted:" << _next_send_seq << " and " << _next_receive_seq << endl;
273  }
274 }
275 
276 //-------------------------------------------------------------------------------------------------
277 bool Session::process(const f8String& from)
278 {
279  unsigned seqnum(0);
280  bool remote_logged_out {};
281  const Message *msg = nullptr;
282 
283  try
284  {
285  const f8String::size_type fpos(from.find("34="));
286  if (fpos == f8String::npos)
287  {
288  slout_debug << "Session::process throwing for " << from;
289  throw InvalidMessage(from, FILE_LINE);
290  }
291 
292  seqnum = fast_atoi<unsigned>(from.data() + fpos + 3, default_field_separator);
293 
294  bool retry_plog(false);
296  {
298  plog(from, Logger::Info, 1);
299  else
300  retry_plog = true;
301  }
302 
304  {
305  glout_fatal << "Fatal: factory failed to generate a valid message";
306  return false;
307  }
308 
310  cout << *msg << endl;
311  else if (_control & print)
312  cout << *msg << endl;
313 
314  bool result(false), admin_result(msg->is_admin() ? handle_admin(seqnum, msg) : true);
315  if (msg->get_msgtype().size() > 1)
316  goto application_call;
317  else switch(msg->get_msgtype()[0])
318  {
319  default:
320 application_call:
321  if (activation_check(seqnum, msg))
322  result = handle_application(seqnum, msg);
323  break;
325  result = handle_heartbeat(seqnum, msg);
326  break;
328  result = handle_test_request(seqnum, msg);
329  break;
331  result = handle_resend_request(seqnum, msg);
332  break;
334  result = handle_reject(seqnum, msg);
335  break;
337  result = handle_sequence_reset(seqnum, msg);
338  break;
340  result = handle_logout(seqnum, msg);
341  remote_logged_out = true;
342  break;
344  result = handle_logon(seqnum, msg);
345  break;
346  }
347 
349  if (retry_plog)
350  plog(from, Logger::Info, 1);
351 
353 
354  if (remote_logged_out) // FX-615; permit logout seqnum to be persisted
355  {
356  slout_debug << "logout received from remote";
357  stop();
358  }
359 
360  delete msg;
361  return result && admin_result;
362  }
363  catch (LogfileException& e)
364  {
365  cerr << e.what() << endl;
366  }
367  catch (f8Exception& e)
368  {
369  slout_debug << "process: f8exception" << ' ' << seqnum << ' ' << e.what();
370 
371  if (e.force_logoff())
372  {
374  plog(from, Logger::Info, 1);
375  slout_fatal << e.what() << " - will logoff";
377  {
379  send(generate_logout(e.what()), true, 0, true); // so it won't increment
381  }
383  {
384  slout_debug << "rethrowing ...";
385  throw;
386  }
387  stop();
388  }
389  else
390  {
391  slout_error << e.what() << " - inbound message rejected";
392  handle_outbound_reject(seqnum, msg, e.what());
395  plog(from, Logger::Info, 1);
396  delete msg;
397  return true; // message is handled but has errors
398  }
399  }
400  catch (Poco::Net::NetException& e)
401  {
402  slout_debug << "process:: Poco::Net::NetException";
403  slout_error << e.what();
404  }
405  catch (exception& e)
406  {
407  slout_debug << "process:: std::exception";
408  slout_error << e.what();
409  }
410 
411  return false;
412 }
413 
414 //-------------------------------------------------------------------------------------------------
415 void Session::compid_check(const unsigned seqnum, const Message *msg, const SessionID& id) const
416 {
418  {
419  if (!id.same_sender_comp_id(msg->Header()->get<target_comp_id>()->get()))
420  throw BadCompidId(msg->Header()->get<target_comp_id>()->get());
421  if (!id.same_target_comp_id(msg->Header()->get<sender_comp_id>()->get()))
422  throw BadCompidId(msg->Header()->get<sender_comp_id>()->get());
423  }
424 }
425 
426 //-------------------------------------------------------------------------------------------------
427 bool Session::sequence_check(const unsigned seqnum, const Message *msg)
428 {
429  //cout << "seqnum:" << seqnum << " next_target_seq:" << _next_receive_seq
430  //<< " next_sender_seq:" << _next_send_seq << " state:" << _state << " next_receive_seq:" << _next_receive_seq << endl;
431 
432  if (seqnum > _next_receive_seq)
433  {
435  {
436  slout_warn << "Resend request already sent";
437  }
439  {
442  }
443  // If SessionConfig has *not* been set, assume wrong logon sequence is checked.
445  throw InvalidMsgSequence(seqnum, _next_receive_seq);
446  return false;
447  }
448 
449  if (seqnum < _next_receive_seq)
450  {
451  poss_dup_flag pdf(false);
452  msg->Header()->get(pdf);
453  if (!pdf()) // poss dup not set so bad
454  throw MsgSequenceTooLow(seqnum, _next_receive_seq);
455  sending_time st;
456  msg->Header()->get(st);
457  orig_sending_time ost;
458  if (msg->Header()->get(ost) && ost() > st())
459  {
460  ostringstream ostr;
461  ost.print(ostr);
462  throw BadSendingTime(ostr.str());
463  }
464  }
465 
466  return true;
467 }
468 
469 //-------------------------------------------------------------------------------------------------
470 bool Session::handle_logon(const unsigned seqnum, const Message *msg)
471 {
473  {
474  send(generate_reject(seqnum, "Already logged on"), msg->get_msgtype().c_str());
475  return true;
476  }
478  const bool reset_given(msg->have(Common_ResetSeqNumFlag) && msg->get<reset_seqnum_flag>()->get());
479  sender_comp_id sci; // so this should be our tci
480  msg->Header()->get(sci);
481  target_comp_id tci; // so this should be our sci
482  msg->Header()->get(tci);
483  SessionID id(_ctx._beginStr, tci(), sci());
484 
486  {
487  if (id != _sid)
488  {
489  glout_warn << "Inbound TargetCompID not recognised (" << tci << "), expecting (" << _sid.get_senderCompID() << ')';
491  {
492  stop();
494  return false;
495  }
496  }
497 
498  enforce(seqnum, msg);
500  }
501  else // acceptor
502  {
503  default_appl_ver_id davi;
504  msg->get(davi);
505 
506  if (_sci() != tci())
507  {
508  glout_warn << "Inbound TargetCompID not recognised (" << tci << "), expecting (" << _sci << ')';
510  {
511  stop();
513  return false;
514  }
515  }
516 
517  if (!_loginParameters._clients.empty())
518  {
519  auto itr(_loginParameters._clients.find(sci()));
520  bool iserr(false);
521  if (itr == _loginParameters._clients.cend())
522  {
523  glout_error << "Remote (" << sci << ") not found (" << id << "). NOT authorised to proceed.";
524  iserr = true;
525  }
526 
527  if (!iserr && get<1>(itr->second) != Poco::Net::IPAddress()
528  && get<1>(itr->second) != _connection->get_peer_socket_address().host())
529  {
530  glout_error << "Remote (" << get<0>(itr->second) << ", " << sci << ") NOT authorised to proceed ("
531  << _connection->get_peer_socket_address().toString() << ").";
532  iserr = true;
533  }
534 
535  if (iserr)
536  {
537  stop();
539  return false;
540  }
541 
542  glout_info << "Remote (" << get<0>(itr->second) << ", " << sci << ") authorised to proceed ("
543  << _connection->get_peer_socket_address().toString() << ").";
544  }
545 
546  // important - these objects can't be created until we have a valid SessionID
547  if (_sf)
548  {
550  {
551  glout_warn << "Warning: no session logger defined for " << id;
552  }
553 
555  {
556  glout_warn << "Warning: no protocol logger defined for " << id;
557  }
558 
559  if (!_persist)
560  {
562  if (!(_persist = _sf->create_persister(_sf->_ses, &id, reset_given)))
563  {
564  glout_warn << "Warning: no persister defined for " << id;
565  }
566  }
567 
568 #if 0
569  if (_schedule)
570  slout_info << *_schedule;
571 #endif
572  }
573  else
574  {
575  glout_error << "Error: SessionConfig object missing in session";
576  }
577 
578  slout_info << "Connection from " << _connection->get_peer_socket_address().toString();
579 
580  if (reset_given) // ignore version restrictions on this behaviour
581  {
582  slout_info << "Resetting sequence numbers";
584  }
585  else
586  {
587  recover_seqnums();
588  if (_req_next_send_seq)
592  }
593 
594  if (authenticate(id, msg))
595  {
596  _sid = id;
597  enforce(seqnum, msg);
598  heartbeat_interval hbi;
599  msg->get(hbi);
601  send(generate_logon(hbi(), davi()));
603  slout_info << "Client setting heartbeat interval to " << hbi();
604  }
605  else
606  {
607  slout_error << id << " failed authentication";
608  stop();
610  return false;
611  }
612 
614  {
615  slout_error << id << " Session unavailable. Login not accepted.";
616  stop();
618  return false;
619  }
620  }
621 
622  slout_info << "Heartbeat interval is " << _connection->get_hb_interval();
623 
624  _timer.schedule(_hb_processor, 1000); // check every second
625 
626  return true;
627 }
628 
629 //-------------------------------------------------------------------------------------------------
630 bool Session::handle_logout(const unsigned seqnum, const Message *msg)
631 {
632  enforce(seqnum, msg);
633 
634  //if (_state != States::st_logoff_sent)
635  // send(generate_logout());
636  slout_info << "peer has logged out";
637  // stop(); // FX-615
638  return true;
639 }
640 
641 //-------------------------------------------------------------------------------------------------
642 bool Session::handle_sequence_reset(const unsigned seqnum, const Message *msg)
643 {
644  enforce(seqnum, msg);
645 
646  new_seq_num nsn;
647  if (msg->get(nsn))
648  {
649  slout_debug << "newseqnum = " << nsn() << ", _next_receive_seq = " << _next_receive_seq << " seqnum:" << seqnum;
650  if (nsn() >= static_cast<int>(_next_receive_seq))
651  _next_receive_seq = nsn() - 1;
652  else if (nsn() < static_cast<int>(_next_receive_seq))
653  throw MsgSequenceTooLow(nsn(), _next_receive_seq);
654  }
655 
658 
659  return true;
660 }
661 
662 //-------------------------------------------------------------------------------------------------
663 bool Session::handle_resend_request(const unsigned seqnum, const Message *msg)
664 {
665  enforce(seqnum, msg);
666 
668  {
669  begin_seq_num begin;
670  end_seq_num end;
671 
672  if (!msg->get(begin))
673  {
674  slout_warn << "handle_resend_request: can't obtain BeginSeqNo from request";
675  }
676  if (!msg->get(end))
677  {
678  slout_warn << "handle_resend_request: can't obtain EndSeqNo from request";
679  }
680 
681  if ((begin() > end() && end()) || begin() == 0)
682  handle_outbound_reject(seqnum, msg, "Invalid resend range: Begin > End or Begin = 0");
683  else if (!_persist)
684  {
685  const int nxt(static_cast<int>(_next_send_seq)), nseq(begin() >= nxt ? begin() + 1 : nxt);
686  send(generate_sequence_reset(nseq, true), true, begin());
687  _next_send_seq = nseq;
688  slout_debug << "handle_resend_request scenario #" << (nseq == nxt ? 7 : 8);
689  }
690  else
691  {
692  //cout << "got resend request:" << begin() << " to " << end() << endl;
694  //f8_scoped_spin_lock guard(_per_spl, _connection->get_pmodel() == pm_coro); // no no nanette!
695  _persist->get(begin(), end(), *this, &Session::retrans_callback);
696  }
697  }
698  else
699  {
700  slout_warn << "resend request received during an existing resend sequence";
701  }
702 
703  return true;
704 }
705 
706 //-------------------------------------------------------------------------------------------------
708 {
709  //cout << "first:" << with.first << ' ' << rctx << endl;
710 
711  if (rctx._no_more_records)
712  {
713  /*
714  if (rctx._end)
715  {
716  _next_send_seq = rctx._interrupted_seqnum - 1;
717  send(generate_sequence_reset(rctx._interrupted_seqnum, true), true, rctx._last + 1);
718  //cout << "#1" << endl;
719  }
720  else if (!rctx._last)
721  {
722  _next_send_seq = rctx._interrupted_seqnum;
723  send(generate_sequence_reset(rctx._interrupted_seqnum, true), true, rctx._begin);
724  //cout << "#4" << endl;
725  }
726  */
727  if (!rctx._last) // start to infinity requested
728  {
729  // handle case where requested seq is greater than current last sent seq (interrupted)
730  const unsigned nseq(rctx._begin >= rctx._interrupted_seqnum ? rctx._begin + 1 : rctx._interrupted_seqnum);
731  send(generate_sequence_reset(nseq, true), true, rctx._begin);
732  _next_send_seq = nseq;
733  slout_debug << "retrans_callback scenario #" << (nseq == rctx._interrupted_seqnum ? 4 : 5) << ' ' << rctx;
734  }
735  else // range requested // was: if (rctx._end)
736  {
737  // handle case where requested seq is greater than current last sent seq (interrupted)
738  const unsigned nseq(rctx._last + 1 >= rctx._interrupted_seqnum ? rctx._last + 2 : rctx._interrupted_seqnum);
739  send(generate_sequence_reset(nseq, true), true, rctx._last + 1);
740  _next_send_seq = nseq;
741  slout_debug << "retrans_callback scenario #" << (nseq == rctx._interrupted_seqnum ? 1 : 6) << ' ' << rctx;
742  }
744  return true;
745  }
746 
747  if (rctx._last)
748  {
749  if (rctx._last + 1 < with.first)
750  {
751  send(generate_sequence_reset(with.first, true), true, _next_send_seq);
752  slout_debug << "retrans_callback scenario #2, " << rctx;
753  }
754  }
755  else
756  {
757  if (with.first > rctx._begin)
758  {
759  send(generate_sequence_reset(with.first, true));
760  slout_debug << "retrans_callback scenario #3, " << rctx;
761  }
762  }
763 
764  rctx._last = with.first;
765 
766  Message *msg(Message::factory(_ctx, with.second));
767  return send(msg);
768 }
769 
770 //-------------------------------------------------------------------------------------------------
771 bool Session::handle_test_request(const unsigned seqnum, const Message *msg)
772 {
773  enforce(seqnum, msg);
774 
775  test_request_id testReqID;
776  msg->get(testReqID);
777  send(generate_heartbeat(testReqID()));
778  return true;
779 }
780 
781 //-------------------------------------------------------------------------------------------------
782 bool Session::handle_outbound_reject(const unsigned seqnum, const Message *msg, const char *errstr)
783 {
784  return send(generate_reject(seqnum, errstr, msg && !msg->get_msgtype().empty() ? msg->get_msgtype().c_str() : nullptr));
785 }
786 
787 //-------------------------------------------------------------------------------------------------
788 bool Session::activation_service() // called on the timer threead
789 {
790  //cout << "activation_service()" << endl;
791  if (is_shutdown())
792  return false;
793 
795  {
796  const bool curr(_active);
797  _active = _schedule->_sch.test(curr);
798  if (curr != _active)
799  {
800  slout_info << "Session activation transitioned to " << (_active ? "active" : "inactive");
801  }
802  }
803 
804  return true;
805 }
806 
807 //-------------------------------------------------------------------------------------------------
809 {
810  //cout << "heartbeat_service()" << endl;
811  if (is_shutdown())
812  return false;
813 
815  {
816  Tickval now(true);
817  if ((now - _last_sent).secs() >= static_cast<time_t>(_connection->get_hb_interval()))
818  {
819  const f8String testReqID;
820  send(generate_heartbeat(testReqID));
821  }
822 
823  now.now();
824  if ((now - _last_received).secs() > static_cast<time_t>(_connection->get_hb_interval20pc()))
825  {
826  if (_state == States::st_test_request_sent) // already sent
827  {
828  ostringstream ostr;
829  ostr << "Remote has ignored my test request. Aborting session...";
830  send(generate_logout(_loginParameters._silent_disconnect ? 0 : ostr.str().c_str()), true, 0, true); // so it won't increment
832  log(ostr.str(), Logger::Error);
833  try
834  {
835  stop();
836  }
837  catch (Poco::Net::NetException& e)
838  {
839  slout_error << e.what();
840  }
841  catch (exception& e)
842  {
843  slout_error << e.what();
844  }
845  return true;
846  }
848  {
849  ostringstream ostr;
850  ostr << "Have not received anything from remote for ";
851  if (_last_received.secs())
852  ostr << (now - _last_received).secs();
853  else
854  ostr << "more than " << _connection->get_hb_interval20pc();
855  ostr << " secs. Sending test request";
856  log(ostr.str(), Logger::Warn);
857  const f8String testReqID("TEST");
858  send(generate_test_request(testReqID));
860  }
861  }
862  }
863 
864  return true;
865 }
866 
867 //-------------------------------------------------------------------------------------------------
868 bool Session::handle_heartbeat(const unsigned seqnum, const Message *msg)
869 {
870  enforce(seqnum, msg);
871 
874  return true;
875 }
876 
877 //-------------------------------------------------------------------------------------------------
879 {
881  if (!testReqID.empty())
882  *msg << new test_request_id(testReqID);
883 
884  return msg;
885 }
886 
887 //-------------------------------------------------------------------------------------------------
888 Message *Session::generate_reject(const unsigned seqnum, const char *what, const char *msgtype)
889 {
891  *msg << new ref_seq_num(seqnum);
892  if (what)
893  *msg << new text(what);
894  if (msgtype)
895  *msg << new ref_msg_type(msgtype);
896 
897  return msg;
898 }
899 
900 //-------------------------------------------------------------------------------------------------
901 Message *Session::generate_business_reject(const unsigned seqnum, const Message *imsg, const int reason, const char *what)
902 {
903  Message *msg;
904  try
905  {
907  }
909  {
910  // since this is an application message, it may not be supported in supplied schema
911  return nullptr;
912  }
913 
914  *msg << new ref_seq_num(seqnum);
915  *msg << new ref_msg_type(imsg->get_msgtype());
916  *msg << new business_reject_reason(reason);
917  if (what)
918  *msg << new text(what);
919 
920  return msg;
921 }
922 
923 //-------------------------------------------------------------------------------------------------
925 {
927  *msg << new test_request_id(testReqID);
928 
929  return msg;
930 }
931 
932 //-------------------------------------------------------------------------------------------------
933 Message *Session::generate_logon(const unsigned heartbtint, const f8String davi)
934 {
936  *msg << new heartbeat_interval(heartbtint)
937  << new encrypt_method(0); // FIXME
938  if (!davi.empty() && msg->is_legal<default_appl_ver_id>())
939  *msg << new default_appl_ver_id(davi);
941  *msg << new reset_seqnum_flag(true);
942 
943  return msg;
944 }
945 
946 //-------------------------------------------------------------------------------------------------
947 Message *Session::generate_logout(const char *msgstr)
948 {
950  if (msgstr)
951  *msg << new text(msgstr);
952 
953  return msg;
954 }
955 
956 //-------------------------------------------------------------------------------------------------
957 Message *Session::generate_resend_request(const unsigned begin, const unsigned end)
958 {
960  *msg << new begin_seq_num(begin) << new end_seq_num(end);
961 
962  return msg;
963 }
964 
965 //-------------------------------------------------------------------------------------------------
966 Message *Session::generate_sequence_reset(const unsigned newseqnum, const bool gapfillflag)
967 {
969  *msg << new new_seq_num(newseqnum);
970 
971  if (gapfillflag)
972  *msg << new gap_fill_flag(true);
973 
974  return msg;
975 }
976 
977 //-------------------------------------------------------------------------------------------------
978 bool Session::send(Message *tosend, bool destroy, const unsigned custom_seqnum, const bool no_increment)
979 {
980  if (custom_seqnum)
981  tosend->set_custom_seqnum(custom_seqnum);
982  if (no_increment)
983  tosend->set_no_increment(no_increment);
984  return _connection && _connection->write(tosend, destroy);
985 }
986 
987 bool Session::send(Message& tosend, const unsigned custom_seqnum, const bool no_increment)
988 {
989  if (custom_seqnum)
990  tosend.set_custom_seqnum(custom_seqnum);
991  if (no_increment)
992  tosend.set_no_increment(no_increment);
993  return _connection && _connection->write(tosend);
994 }
995 
996 //-------------------------------------------------------------------------------------------------
997 size_t Session::send_batch(const vector<Message *>& msgs, bool destroy)
998 {
999  return _connection->write_batch(msgs, destroy);
1000 }
1001 
1002 //-------------------------------------------------------------------------------------------------
1004 {
1005  return 0;
1006 }
1007 
1008 //-------------------------------------------------------------------------------------------------
1009 bool Session::send_process(Message *msg) // called from the connection (possibly on separate thread)
1010 {
1011  //cout << "send_process()" << endl;
1012  bool is_dup(msg->Header()->have(Common_PossDupFlag));
1013  if (!msg->Header()->have(Common_SenderCompID))
1014  *msg->Header() << new sender_comp_id(_sid.get_senderCompID());
1015  if (!msg->Header()->have(Common_TargetCompID))
1016  *msg->Header() << new target_comp_id(_sid.get_targetCompID());
1017 
1018  if (msg->Header()->have(Common_MsgSeqNum))
1019  {
1020  if (is_dup)
1021  {
1023  delete msg->Header()->remove(Common_PossDupFlag);
1024  }
1025  else
1026  {
1028  {
1029  *msg->Header() << new poss_dup_flag(true);
1030  is_dup = true;
1031  }
1032  }
1033 
1034  sending_time sendtime;
1035  msg->Header()->get(sendtime);
1036  *msg->Header() << new orig_sending_time(sendtime());
1037 
1039  {
1040  slout_debug << "send_process: _next_send_seq = " << _next_send_seq;
1041  *msg->Header() << new msg_seq_num(msg->get_custom_seqnum() ? msg->get_custom_seqnum() : static_cast<unsigned int>(_next_send_seq));
1042  }
1043  }
1044  else
1045  {
1046  slout_debug << "send_process: _next_send_seq = " << _next_send_seq;
1047  *msg->Header() << new msg_seq_num(msg->get_custom_seqnum() ? msg->get_custom_seqnum() : static_cast<unsigned int>(_next_send_seq));
1048  }
1049  *msg->Header() << new sending_time;
1050 
1051  // allow session to modify the header of this message before sending
1052  const int fields_modified(modify_header(msg->Header()));
1053  if (fields_modified)
1054  {
1055  slout_debug << "send_process: " << fields_modified << " header fields added/modified";
1056  }
1057 
1058  try
1059  {
1060  slout_debug << "Sending:" << *msg;
1061  modify_outbound(msg);
1062  char output[FIX8_MAX_MSG_LENGTH + HEADER_CALC_OFFSET], *ptr(output);
1063  size_t enclen(msg->encode(&ptr));
1064  const char *optr(ptr);
1065  if (msg->get_end_of_batch())
1066  {
1067  if (!_batchmsgs_buffer.empty())
1068  {
1069  _batchmsgs_buffer.append(ptr);
1070  ptr = &_batchmsgs_buffer[0];
1071  enclen = _batchmsgs_buffer.size();
1072  }
1073  if (!_connection->send(ptr, enclen))
1074  {
1075  slout_error << "Message write failed: " << enclen << " bytes";
1076  _batchmsgs_buffer.clear();
1077  return false;
1078  }
1079  _last_sent.now();
1080  _batchmsgs_buffer.clear();
1081  }
1082  else
1083  {
1084  _batchmsgs_buffer.append(ptr);
1085  }
1086 
1088  plog(optr, Logger::Info);
1089 
1090  //cout << "send_process" << endl;
1091 
1092  if (!is_dup)
1093  {
1094  if (_persist)
1095  {
1096  f8_scoped_spin_lock guard(_per_spl, _connection->get_pmodel() == pm_coro); // not needed for coroutine mode
1097  if (!msg->is_admin())
1098  _persist->put(_next_send_seq, ptr);
1100  //cout << "Persisted (send):" << (_next_send_seq + 1) << " and " << _next_receive_seq << endl;
1101  }
1102  if (!msg->get_custom_seqnum() && !msg->get_no_increment() && msg->get_msgtype() != Common_MsgType_SEQUENCE_RESET)
1103  {
1104  ++_next_send_seq;
1105  //cout << "Seqnum now:" << _next_send_seq << " and " << _next_receive_seq << endl;
1106  }
1107  }
1108  }
1109  catch (f8Exception& e)
1110  {
1111  slout_error << e.what();
1112  return false;
1113  }
1114  catch (Poco::Exception& e)
1115  {
1116  slout_error << e.displayText();
1117  return false;
1118  }
1119 
1120  return true;
1121 }
1122 
1123 //-------------------------------------------------------------------------------------------------
1125 {
1127  if (_persist)
1128  {
1129  unsigned send_seqnum, receive_seqnum;
1130  if (_persist->get(send_seqnum, receive_seqnum))
1131  {
1132  slout_info << "Last sent: " << send_seqnum << ", last received: " << receive_seqnum;
1133  _next_send_seq = send_seqnum; // + 1;
1134  _next_receive_seq = receive_seqnum; // + 1;
1135  }
1136  }
1137 }
1138 
1139 //-------------------------------------------------------------------------------------------------
1140 #if (FIX8_THREAD_SYSTEM == FIX8_THREAD_PTHREAD) && !defined _MSC_VER && defined _GNU_SOURCE && defined __linux__
1141 f8String Session::get_thread_policy_string(thread_id_t id)
1142 {
1143  int policy;
1144  ostringstream ostr;
1145  sched_param param {};
1146  if (!pthread_getschedparam(id, &policy, &param))
1147  return policy == SCHED_OTHER ? "SCHED_OTHER" : policy == SCHED_RR ? "SCHED_RR"
1148  : policy == SCHED_FIFO ? "SCHED_FIFO" : "UNKNOWN";
1149 
1150  ostr << "Could not get scheduler parameters: " << Str_error(errno);
1151  return ostr.str();
1152 }
1153 
1154 //-------------------------------------------------------------------------------------------------
1155 void Session::set_scheduler(int priority)
1156 {
1157  pthread_t thread(pthread_self());
1158  sched_param param { priority };
1159 
1160  slout_info << "Current scheduler policy: " << get_thread_policy_string(thread);
1161 
1162  if (pthread_setschedparam(thread, SCHED_RR, &param))
1163  {
1164  slout_error << "Could not set new scheduler priority: " << get_thread_policy_string(thread)
1165  << " (" << Str_error(errno) << ") " << priority;
1166  return;
1167  }
1168 
1169  slout_info << "New scheduler policy: " << get_thread_policy_string(thread);
1170 }
1171 
1172 //-------------------------------------------------------------------------------------------------
1173 void Session::set_affinity(int core_id)
1174 {
1175  const int num_cores(sysconf(_SC_NPROCESSORS_ONLN));
1176  if (core_id >= num_cores)
1177  {
1178  slout_error << "Invalid core id: " << core_id;
1179  return;
1180  }
1181 
1182  cpu_set_t cpuset;
1183  CPU_ZERO(&cpuset);
1184  CPU_SET(core_id, &cpuset);
1185  const int error(pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset));
1186 
1187  if (error)
1188  slout_error << "Could not set thread affinity for core " << core_id << " (" << Str_error(errno) << ')';
1189  else
1190  slout_info << "Set thread affinity to " << core_id << " core for thread " << pthread_self();
1191 }
1192 #else
1193 //-------------------------------------------------------------------------------------------------
1194 void Session::set_scheduler(int priority)
1195 {
1196  slout_error << "set_scheduler: not implemented";
1197 }
1198 
1199 //-------------------------------------------------------------------------------------------------
1200 void Session::set_affinity(int core_id)
1201 {
1202  slout_error << "set_affinity: not implemented";
1203 }
1204 #endif
1205 
1206 //-------------------------------------------------------------------------------------------------
1208 {
1209  time_t now(time(0));
1210 #ifdef _MSC_VER
1211  struct tm *ptim(localtime (&now));
1212 #else
1213  struct tm tim;
1214  localtime_r(&now, &tim);
1215  struct tm *ptim(&tim);
1216 #endif
1217  ostringstream ostr;
1218  ostr << endl << package_version << ' ' << copyright_short << setw(2) << (ptim->tm_year - 100) << copyright_short2;
1219  return ostr.str();
1220 }
1221 
1222 
1223 //-------------------------------------------------------------------------------------------------
1224 #ifdef FIX8_HAVE_OPENSSL
1225 void Fix8CertificateHandler::onInvalidCertificate(const void*, Poco::Net::VerificationErrorArgs& errorCert)
1226 {
1227  const Poco::Net::X509Certificate& cert(errorCert.certificate());
1228  glout_warn << "WARNING: Certificate verification failed";
1229  glout_warn << "----------------------------------------";
1230  glout_warn << "Issuer Name: " << cert.issuerName();
1231  glout_warn << "Subject Name: " << cert.subjectName();
1232  glout_warn << "The certificate yielded the error: " << errorCert.errorMessage();
1233  glout_warn << "The error occurred in the certificate chain at position " << errorCert.errorDepth();
1234  errorCert.setIgnoreError(true);
1235 }
1236 
1237 void Fix8PassPhraseHandler::onPrivateKeyRequested(const void*, std::string& privateKey)
1238 {
1239  glout_warn << "warning: privatekey passphrase requested and ignored!";
1240 }
1241 
1242 #endif // FIX8_HAVE_OPENSSL
1243 //-------------------------------------------------------------------------------------------------
1244 #if defined(_MSC_VER)
1245 #pragma warning(pop)
1246 #endif
Logger * _logger
Definition: session.hpp:422
void stop()
Stop the logging thread.
Definition: logger.hpp:310
const char Common_MsgByte_SEQUENCE_RESET('4')
unsigned _req_next_send_seq
Definition: session.hpp:414
A message was received with an invalid sending time.
unsigned get_hb_interval() const
Definition: connection.hpp:608
F8API void update_persist_seqnums()
Force persister to sync next send/receive seqnums.
Definition: session.cpp:266
bool have(const unsigned short fnum) const
Definition: message.hpp:725
Field< int, Common_EncryptMethod > encrypt_method
Definition: field.hpp:2163
const char Common_MsgByte_REJECT('3')
bool is_valid() const
Definition: session.hpp:242
Indicates a static metadata lookup failed. With the exception of user defined fields there should nev...
void set_hb_interval(const unsigned hb_interval)
Definition: connection.hpp:603
const f8String Common_MsgType_BUSINESS_REJECT("j")
virtual bool activation_check(const unsigned seqnum, const Message *msg)
Definition: session.hpp:736
bool force_logoff() const
Definition: f8exception.hpp:89
bool plog(const std::string &what, Logger::Level lev, const unsigned direction=0) const
Definition: session.hpp:703
f8_thread delegated async logging class
Definition: logger.hpp:153
F8API bool sequence_check(const unsigned seqnum, const Message *msg)
Definition: session.cpp:427
Logger * _plogger
Definition: session.hpp:422
Field< UTCTimestamp, Common_SendingTime > sending_time
Definition: field.hpp:2155
F8API std::string Str_error(const int err, const char *str=0)
Definition: f8utils.cpp:165
POSIX regex wrapper class.
Definition: f8utils.hpp:370
const f8String Common_MsgType_LOGON("A")
F8API void compid_check(const unsigned seqnum, const Message *msg, const SessionID &id) const
Definition: session.cpp:415
Fix8 Base Session. User sessions are derived from this class.
Definition: session.hpp:394
virtual F8API bool handle_logon(const unsigned seqnum, const Message *msg)
Definition: session.cpp:470
size_t write_batch(const std::vector< Message * > &msgs, bool destroy)
Definition: connection.hpp:588
const f8String Common_MsgType_SEQUENCE_RESET("4")
Field< Boolean, Common_GapFillFlag > gap_fill_flag
Definition: field.hpp:2158
std::pair< const unsigned, const f8String > SequencePair
Definition: session.hpp:615
TimerEvent< Session > _hb_processor
Definition: session.hpp:425
F8API BaseField * remove(const unsigned short fnum, Presence::const_iterator itr)
Definition: message.cpp:646
const char Common_MsgByte_LOGON('A')
#define FILE_LINE
Definition: f8utils.hpp:68
const unsigned short Common_TargetCompID(56)
Base (ABC) Persister class.
Definition: persist.hpp:55
F8API void stop()
Stop the reader and writer threads.
Definition: connection.cpp:322
F8API SessionID make_reverse_id() const
Definition: session.cpp:81
Provides context to your retrans handler.
Definition: session.hpp:598
virtual F8API bool process(const f8String &from)
Definition: session.cpp:277
virtual void modify_outbound(Message *msg)
Definition: session.hpp:514
Field< SeqNum, Common_MsgSeqNum > msg_seq_num
Definition: field.hpp:2137
Field< Boolean, Common_PossDupFlag > poss_dup_flag
Definition: field.hpp:2159
Tickval _last_received
Definition: session.hpp:410
Persister * _persist
Definition: session.hpp:421
ProcessModel get_pmodel() const
Definition: connection.hpp:553
f8_atomic< States::SessionStates > _state
Definition: session.hpp:408
#define FIX8_PACKAGE_VERSION
Definition: f8config.h:631
virtual void set_custom_seqnum(unsigned seqnum)
Definition: message.hpp:1255
const size_t HEADER_CALC_OFFSET(32)
TimerEvent< Session > _session_scheduler
Definition: session.hpp:425
SessionID _sid
Definition: session.hpp:415
void do_state_change(const States::SessionStates new_state)
Definition: session.hpp:827
const sender_comp_id & get_senderCompID() const
Definition: session.hpp:101
bool has_flag(const Flags flg) const
Definition: logger.hpp:338
Field< SeqNum, Common_EndSeqNo > end_seq_num
Definition: field.hpp:2139
Session_Schedule * _schedule
Definition: session.hpp:427
F8API bool activation_service()
Session start/stop service thread method.
Definition: session.cpp:788
const f8String Common_MsgType_TEST_REQUEST("1")
const f8String Common_MsgType_HEARTBEAT("0")
const f8String Common_MsgType_LOGOUT("5")
Quickfix style sessionid.
Definition: session.hpp:46
virtual bool handle_admin(const unsigned seqnum, const Message *msg)
Definition: session.hpp:490
virtual bool is_admin() const
Definition: message.hpp:1151
#define FIX8_PACKAGE_URL
Definition: f8config.h:626
Role get_role() const
Definition: connection.hpp:549
virtual F8API bool handle_resend_request(const unsigned seqnum, const Message *msg)
Definition: session.cpp:663
Complete Fix connection (reader and writer).
Definition: connection.hpp:508
bool test(bool prev=false) const
Definition: session.hpp:247
static F8API const f8String copyright_string()
Definition: session.cpp:1207
std::string _batchmsgs_buffer
Definition: session.hpp:426
LoginParameters _loginParameters
Definition: session.hpp:418
Field< UTCTimestamp, Common_OrigSendingTime > orig_sending_time
Definition: field.hpp:2156
virtual F8API Message * generate_logout(const char *msgstr=nullptr)
Definition: session.cpp:947
#define FIX8_PACKAGE_NAME
Definition: f8config.h:611
const unsigned short Common_SenderCompID(49)
#define glout_error
Definition: logger.hpp:606
virtual F8API bool handle_sequence_reset(const unsigned seqnum, const Message *msg)
Definition: session.cpp:642
const f8String Common_MsgType_REJECT("3")
F8API Persister * create_persister(const XmlElement *from, const SessionID *sid=nullptr, bool flag=false) const
const char Common_MsgByte_RESEND_REQUEST('2')
bool get(T &to) const
Definition: message.hpp:671
Schedule _login_schedule
Definition: session.hpp:359
const f8String _beginStr
Fix header beginstring.
Definition: message.hpp:228
virtual F8API bool handle_logout(const unsigned seqnum, const Message *msg)
Definition: session.cpp:630
#define FIX8_PACKAGE_BUGREPORT
Definition: f8config.h:606
sender_comp_id _sci
Definition: session.hpp:412
virtual bool get(const unsigned seqnum, f8String &to) const =0
static F8API const std::vector< f8String > _state_names
string representation of Sessionstates
Definition: session.hpp:430
f8_atomic< unsigned > _next_receive_seq
Definition: session.hpp:407
virtual F8API bool send(Message *msg, bool destroy=true, const unsigned custom_seqnum=0, const bool no_increment=false)
Definition: session.cpp:978
#define glout_warn
Definition: logger.hpp:604
f8_atomic< bool > _active
Definition: session.hpp:409
virtual bool handle_reject(const unsigned seqnum, const Message *msg)
Definition: session.hpp:484
Connection * _connection
Definition: session.hpp:413
bool is_connected() const
Definition: connection.hpp:571
const F8MetaCntx & ctx()
Compiler generated metadata object, accessed through this function.
Base exception class.
Definition: f8exception.hpp:49
Tickval _last_sent
Definition: session.hpp:410
const f8String & get_msgtype() const
Definition: message.hpp:538
Field< f8String, Common_TestReqID > test_request_id
Definition: field.hpp:2150
Control _control
Definition: session.hpp:406
MessageBase * Header() const
Definition: message.hpp:1098
virtual bool put(const unsigned seqnum, const f8String &what)=0
const unsigned short Common_PossDupFlag(43)
A message was received with an out of sequence sequence number.
F8API void stop()
stop the session.
Definition: session.cpp:230
int hypersleep< h_milliseconds >(unsigned amt)
Definition: hypersleep.hpp:105
Field< f8String, Common_SenderCompID > sender_comp_id
Definition: field.hpp:2145
virtual F8API Message * generate_logon(const unsigned heartbeat_interval, const f8String davi=f8String())
Definition: session.cpp:933
struct SessionConfig * _sf
Definition: session.hpp:416
F8API void start()
Start the reader and writer threads.
Definition: connection.cpp:315
virtual bool handle_application(const unsigned seqnum, const Message *&msg)=0
#define FIX8_MAX_MSG_LENGTH
Definition: f8config.h:576
A message was received with an invalid sender/target compid id.
void clear(const T sbit)
Definition: f8utils.hpp:1011
virtual F8API int modify_header(MessageBase *msg)
Definition: session.cpp:1003
F8API void purge_thread_codes()
Remove dead threads from the thread code cache.
Definition: logger.cpp:206
Field< f8String, Common_DefaultApplVerID > default_appl_ver_id
Definition: field.hpp:2152
Tickval & now()
Definition: tickval.hpp:133
f8_spin_lock _per_spl
Definition: session.hpp:420
void set_scheduler(int priority)
Definition: session.cpp:1194
#define glout_info
Definition: logger.hpp:601
virtual void stop()
Stop the persister thread.
Definition: persist.hpp:142
const unsigned short Common_MsgSeqNum(34)
time_t secs() const
Definition: tickval.hpp:141
const char Common_MsgByte_HEARTBEAT('0')
Field< Boolean, Common_ResetSeqNumFlag > reset_seqnum_flag
Definition: field.hpp:2160
virtual F8API bool handle_test_request(const unsigned seqnum, const Message *msg)
Definition: session.cpp:771
const char Common_MsgByte_TEST_REQUEST('1')
const target_comp_id & get_targetCompID() const
Definition: session.hpp:105
virtual F8API ~Session()
Dtor.
Definition: session.cpp:152
bool log(const std::string &what, Logger::Level lev, const char *fl=nullptr, unsigned value=0) const
Definition: session.hpp:695
virtual bool connect()
Definition: connection.hpp:567
unsigned _req_next_receive_seq
Definition: session.hpp:414
virtual F8API bool handle_heartbeat(const unsigned seqnum, const Message *msg)
Definition: session.cpp:868
A complete Fix message with header, body and trailer.
Definition: message.hpp:1058
static Message * factory(const F8MetaCntx &ctx, const char *from, bool no_chksum=false, bool permissive_mode=false)
Definition: message.hpp:1238
F8API bool send_process(Message *msg)
Definition: session.cpp:1009
F8API Session(const F8MetaCntx &ctx, const SessionID &sid, Persister *persist=nullptr, Logger *logger=nullptr, Logger *plogger=nullptr)
Definition: session.cpp:105
const f8String Common_MsgType_RESEND_REQUEST("2")
An invalid message was requested or decoded.
virtual F8API Message * generate_reject(const unsigned seqnum, const char *what, const char *msgtype=nullptr)
Definition: session.cpp:888
F8API void make_id()
Definition: session.cpp:73
Message * create_msg(const f8String &msg_type) const
Definition: session.hpp:528
#define slout_info
Definition: session.hpp:883
virtual bool authenticate(SessionID &id, const Message *msg)
Definition: session.hpp:520
int hypersleep< h_seconds >(unsigned amt)
Definition: hypersleep.hpp:83
Poco::Net::SocketAddress get_peer_socket_address() const
Definition: connection.hpp:616
F8API bool heartbeat_service()
Heartbeat generation service thread method.
Definition: session.cpp:808
F8API Session_Schedule * create_session_schedule(const XmlElement *from) const
const F8MetaCntx & _ctx
Definition: session.hpp:411
#define glout_fatal
Definition: logger.hpp:608
virtual F8API Message * generate_business_reject(const unsigned seqnum, const Message *msg, const int reason, const char *what)
Definition: session.cpp:901
virtual F8API Message * generate_test_request(const f8String &testReqID)
Definition: session.cpp:924
const unsigned char default_field_separator(0x1)
default FIX field separator (^A)
Field< int, Common_HeartBtInt > heartbeat_interval
Definition: field.hpp:2162
void set_affinity(int core_id)
Definition: session.cpp:1200
unsigned get_custom_seqnum() const
Definition: message.hpp:1259
F8API bool enforce(const unsigned seqnum, const Message *msg)
Definition: session.cpp:252
Field< f8String, Common_Text > text
Definition: field.hpp:2151
bool get_ignore_logon_sequence_check_flag(const XmlElement *from, const bool def=false) const
unsigned get_hb_interval20pc() const
Definition: connection.hpp:612
F8API void from_string(const f8String &from)
Create a sessionid string.
Definition: session.cpp:87
A class to contain regex matches using RegExp.
Definition: f8utils.hpp:308
const XmlElement * _ses
int send(const char *from, size_t sz)
Definition: connection.hpp:594
Field< int, Common_BusinessRejectReason > business_reject_reason
Definition: field.hpp:2164
std::thread::id thread_id_t
Definition: thread.hpp:58
Field< SeqNum, Common_BeginSeqNo > begin_seq_num
Definition: field.hpp:2138
virtual F8API Message * generate_heartbeat(const f8String &testReqID)
Definition: session.cpp:878
Field< SeqNum, Common_RefSeqNum > ref_seq_num
Definition: field.hpp:2141
Timer< Session > _timer
Definition: session.hpp:424
static RegExp _sid
Definition: session.hpp:48
bool is_legal(unsigned short fnum) const
Definition: message.hpp:691
virtual F8API size_t send_batch(const std::vector< Message * > &msgs, bool destroy=true)
Definition: session.cpp:997
Field< SeqNum, Common_NewSeqNo > new_seq_num
Definition: field.hpp:2140
const char * what() const
Definition: f8exception.hpp:85
Base class for all fix messages.
Definition: message.hpp:381
#define slout_warn
Definition: session.hpp:884
virtual F8API Message * generate_resend_request(const unsigned begin, const unsigned end=0)
Definition: session.cpp:957
const unsigned short Common_ResetSeqNumFlag(141)
F8API Logger * create_logger(const XmlElement *from, const Logtype ltype, const SessionID *sid=nullptr) const
f8_atomic< unsigned > _next_send_seq
Definition: session.hpp:407
virtual F8API Message * generate_sequence_reset(const unsigned newseqnum, const bool gapfillflag=false)
Definition: session.cpp:966
Static metadata context class - one per FIX xml schema.
Definition: message.hpp:210
#define slout_fatal
Definition: session.hpp:886
#define slout_error
Definition: session.hpp:885
F8API int start(Connection *connection, bool wait=true, const unsigned send_seqnum=0, const unsigned recv_seqnum=0, const f8String davi=f8String())
Definition: session.cpp:167
virtual F8API bool handle_outbound_reject(const unsigned seqnum, const Message *msg, const char *errstr)
Definition: session.cpp:782
bool is_shutdown()
Definition: session.hpp:778
virtual F8API void recover_seqnums()
Recover next expected and next to send sequence numbers from persitence layer.
Definition: session.cpp:1124
std::string f8String
Definition: f8types.hpp:47
void atomic_init(States::SessionStates st)
Definition: session.cpp:144
static bool is_established(SessionStates ss)
Definition: session.hpp:175
Field< f8String, Common_RefMsgType > ref_msg_type
Definition: field.hpp:2153
virtual void set_no_increment(bool flag=true)
Definition: message.hpp:1263
virtual std::ostream & print(std::ostream &os) const =0
const char Common_MsgByte_LOGOUT('5')
A message was received with an out of sequence sequence number.
Field< f8String, Common_TargetCompID > target_comp_id
Definition: field.hpp:2146
#define slout_debug
Definition: session.hpp:887
virtual F8API bool retrans_callback(const SequencePair &with, RetransmissionContext &rctx)
Definition: session.cpp:707
virtual bool write(Message *from, bool destroy=true)
Definition: connection.hpp:577
Could not open a logfile.