fix8  version 1.4.0
Open Source C++ FIX Framework
persist.hpp
Go to the documentation of this file.
1 //-------------------------------------------------------------------------------------------------
2 /*
3 
4 Fix8 is released under the GNU LESSER GENERAL PUBLIC LICENSE Version 3.
5 
6 Fix8 Open Source FIX Engine.
7 Copyright (C) 2010-16 David L. Dight <fix@fix8.org>
8 
9 Fix8 is free software: you can redistribute it and / or modify it under the terms of the
10 GNU Lesser General Public License as published by the Free Software Foundation, either
11 version 3 of the License, or (at your option) any later version.
12 
13 Fix8 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
14 even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 
16 You should have received a copy of the GNU Lesser General Public License along with Fix8.
17 If not, see <http://www.gnu.org/licenses/>.
18 
19 BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE PROGRAM, TO
20 THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE
21 COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY
22 KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO
24 THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE,
25 YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
26 
27 IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT
28 HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE PROGRAM AS PERMITTED
29 ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
30 CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT
31 NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
32 THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH
33 HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
34 
35 */
36 //-------------------------------------------------------------------------------------------------
37 #ifndef FIX8_PERSIST_HPP_
38 #define FIX8_PERSIST_HPP_
39 
40 #if defined FIX8_HAVE_BDB
41 # include <db_cxx.h>
42 #endif
43 #if defined HAVE_LIBMEMCACHED
44 # include <libmemcached/memcached.h>
45 #endif
46 #if defined FIX8_HAVE_LIBHIREDIS
47 # include <hiredis/hiredis.h>
48 #endif
49 
50 //-------------------------------------------------------------------------------------------------
51 namespace FIX8 {
52 
53 //-------------------------------------------------------------------------------------------------
55 class Persister
56 {
57 protected:
58  bool _opened = false;
59 
60 public:
62  Persister() = default;
63 
65  virtual ~Persister() {}
66 
67  Persister(const Persister&) = delete;
68  Persister& operator=(const Persister&) = delete;
69 
72 
77  virtual bool put(const unsigned seqnum, const f8String& what) = 0;
78 
84  virtual bool put(const f8String& key, const f8String& what) { return false; }
85 
90  virtual bool put(const unsigned sender_seqnum, const unsigned target_seqnum) = 0;
91 
96  virtual bool get(const unsigned seqnum, f8String& to) const = 0;
97 
103  virtual bool get(const f8String& key, f8String& to) const { return false; }
104 
109  virtual bool del(const f8String& key) { return false; }
110 
117  virtual unsigned get(const unsigned from, const unsigned to, Session& session,
118  bool (Session::*callback)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const = 0;
119 
123  virtual unsigned get_last_seqnum(unsigned& to) const = 0;
124 
129  virtual bool get(unsigned& sender_seqnum, unsigned& target_seqnum) const = 0;
130 
135  virtual unsigned find_nearest_highest_seqnum (const unsigned requested, const unsigned last) const = 0;
136 
139  virtual bool purge() { return true; }
140 
142  virtual void stop() {}
143 };
144 
145 //-------------------------------------------------------------------------------------------------
146 #if defined FIX8_HAVE_BDB
147 
149 class BDBPersister : public Persister
150 {
151  f8_thread<BDBPersister> _thread;
152 
153  DbEnv _dbEnv;
154  Db *_db;
155  f8String _dbDir, _dbFname;
156  bool _wasCreated;
157 
158  struct KeyDataBuffer
159  {
160  union Ubuf
161  {
162  unsigned int_;
163  char char_[sizeof(unsigned)];
164  Ubuf() : int_() {}
165  Ubuf(const unsigned val) : int_(val) {}
166  Ubuf(const Ubuf& from) : int_(from.int_) {}
167  }
168  keyBuf_;
169  unsigned dataBufLen_;
170  char dataBuf_[MaxMsgLen];
171 
172  KeyDataBuffer() : keyBuf_(), dataBufLen_(), dataBuf_() {}
173  KeyDataBuffer(const unsigned ival) : keyBuf_(ival), dataBufLen_(), dataBuf_() {}
174  KeyDataBuffer(const unsigned ival, const f8String& src) : keyBuf_(ival), dataBuf_()
175  { src.copy(dataBuf_, dataBufLen_ = src.size() > MaxMsgLen ? MaxMsgLen : src.size()); }
176  KeyDataBuffer(const unsigned snd, const unsigned trg) : keyBuf_(), dataBufLen_(2 * sizeof(unsigned))
177  {
178  unsigned *loc(reinterpret_cast<unsigned *>(dataBuf_));
179  *loc++ = snd;
180  *loc = trg;
181  }
182  KeyDataBuffer(const KeyDataBuffer& from) : keyBuf_(from.keyBuf_), dataBufLen_(from.dataBufLen_)
183  { memcpy(dataBuf_, from.dataBuf_, dataBufLen_); }
184 
185  bool empty() const { return dataBufLen_ == 0 && keyBuf_.int_ == 0; }
186  };
187 
188  struct KeyDataPair
189  {
190  Dbt _key, _data;
191 
192  KeyDataPair(KeyDataBuffer& buf)
193  : _key(buf.keyBuf_.char_, sizeof(unsigned)), _data(buf.dataBuf_, buf.dataBufLen_)
194  {
195  _key.set_flags(DB_DBT_USERMEM);
196  _key.set_ulen(sizeof(unsigned));
197  _data.set_flags(DB_DBT_USERMEM);
198  _data.set_ulen(MaxMsgLen);
199  }
200  };
201 
202  static int bt_compare_fcn(Db *db, const Dbt *p1, const Dbt *p2)
203  {
204  // Returns: < 0 if a < b; = 0 if a = b; > 0 if a > b
205 
206  const unsigned& a((*reinterpret_cast<KeyDataBuffer *>(p1->get_data())).keyBuf_.int_);
207  const unsigned& b((*reinterpret_cast<KeyDataBuffer *>(p2->get_data())).keyBuf_.int_);
208 
209  return a < b ? -1 : a > b ? 1 : 0;
210  }
211 
212  f8_concurrent_queue<KeyDataBuffer> _persist_queue;
213 
214  bool write(const KeyDataBuffer& what)
215  {
216  return _persist_queue.try_push(what);
217  }
218 
219  f8_thread_cancellation_token _cancellation_token;
220 
221 public:
223  BDBPersister() : _thread(std::ref(*this)), _dbEnv(0), _db(new Db(&_dbEnv, 0)), _wasCreated() {}
225  virtual ~BDBPersister();
226 
232  F8API virtual bool initialise(const f8String& dbDir, const f8String& dbFname, bool purge=false);
233 
238  F8API virtual unsigned find_nearest_highest_seqnum (const unsigned requested, const unsigned last) const;
239 
244  F8API virtual bool put(const unsigned seqnum, const f8String& what);
245 
250  F8API virtual bool put(const unsigned sender_seqnum, const unsigned target_seqnum);
251 
256  F8API virtual bool get(const unsigned seqnum, f8String& to) const;
257 
264  virtual unsigned get(const unsigned from, const unsigned to, Session& session,
265  bool (Session::*)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const;
266 
270  F8API virtual unsigned get_last_seqnum(unsigned& to) const;
271 
276  F8API virtual bool get(unsigned& sender_seqnum, unsigned& target_seqnum) const;
277 
279  void stop() { write(KeyDataBuffer()); _thread.join(); }
280 
283  F8API int operator()(); // write thread
284 
285  f8_thread_cancellation_token& cancellation_token() { return _cancellation_token; }
286 };
287 
288 #endif // FIX8_HAVE_BDB
289 
290 //-------------------------------------------------------------------------------------------------
293 {
294  using Store = std::map<unsigned, const f8String>;
296 
297 public:
301  virtual ~MemoryPersister() {}
302 
307  F8API virtual bool put(const unsigned seqnum, const f8String& what);
308 
313  F8API virtual bool put(const unsigned sender_seqnum, const unsigned target_seqnum);
314 
319  F8API virtual bool get(const unsigned seqnum, f8String& to) const;
320 
327  F8API virtual unsigned get(const unsigned from, const unsigned to, Session& session,
328  bool (Session::*)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const;
329 
333  F8API virtual unsigned get_last_seqnum(unsigned& to) const;
334 
339  F8API virtual bool get(unsigned& sender_seqnum, unsigned& target_seqnum) const;
340 
345  F8API virtual unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const;
346 };
347 
348 //-------------------------------------------------------------------------------------------------
349 #ifndef _MSC_VER
350 # define O_BINARY 0
351 #endif
352 
353 //-------------------------------------------------------------------------------------------------
354 #pragma pack(push, 1)
355 //-------------------------------------------------------------------------------------------------
357 struct Prec
358 {
359  Prec(const off_t offset, const int32_t size) : _offset(offset), _size(size) {}
360  Prec() : _offset(), _size() {}
361  off_t _offset;
362  int32_t _size;
363 
364  Prec& operator=(const Prec& that)
365  {
366  if (this != &that)
367  {
368  _offset = that._offset;
369  _size = that._size;
370  }
371  return *this;
372  }
373 
374  friend std::ostream& operator<<(std::ostream& os, const Prec& what)
375  { return os << "offset:" << what._offset << " size:" << what._size; }
376 };
377 
378 struct IPrec
379 {
380  IPrec(const uint32_t seq, const off_t offset, const int32_t size)
381  : _seq(seq), _prec(offset, size) {}
382  IPrec() : _seq() {}
383  uint32_t _seq;
385 
386  friend std::ostream& operator<<(std::ostream& os, const IPrec& what)
387  { return os << "seq:" << what._seq << ' ' << what._prec; }
388 };
389 #pragma pack(pop)
390 
391 class FilePersister : public Persister
392 {
394  int _fod, _iod;
395  unsigned _rotnum;
397 
398  using Index = std::map<uint32_t, Prec>;
400 
401 public:
403  FilePersister(unsigned rotnum=0) : _fod(-1), _iod(-1), _rotnum(rotnum), _wasCreated() {}
404 
406  F8API virtual ~FilePersister();
407 
413  F8API virtual bool initialise(const f8String& dbDir, const f8String& dbFname, bool purge = false);
414 
419  F8API virtual bool put(const unsigned seqnum, const f8String& what);
420 
425  F8API virtual bool put(const unsigned sender_seqnum, const unsigned target_seqnum);
426 
431  F8API virtual bool get(const unsigned seqnum, f8String& to) const;
432 
439  F8API virtual unsigned get(const unsigned from, const unsigned to, Session& session,
440  bool (Session::*)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const;
441 
445  F8API virtual unsigned get_last_seqnum(unsigned& to) const;
446 
451  F8API virtual bool get(unsigned& sender_seqnum, unsigned& target_seqnum) const;
452 
457  F8API virtual unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const;
458 };
459 
460 //-------------------------------------------------------------------------------------------------
461 #if defined HAVE_LIBMEMCACHED
462 class MemcachedPersister : public Persister
464 {
465  memcached_st *_cache = nullptr;
467  f8String _key_base;
468  unsigned _server_count = 0;
469 
470 public:
472  MemcachedPersister() = default;
473 
475  F8API virtual ~MemcachedPersister();
476 
482  F8API virtual bool initialise(const f8String& config_str, const f8String& key_base, bool purge=false);
483 
488  F8API virtual bool put(const unsigned seqnum, const f8String& what);
489 
494  F8API virtual bool put(const unsigned sender_seqnum, const unsigned target_seqnum);
495 
500  F8API virtual bool get(const unsigned seqnum, f8String& to) const;
501 
508  F8API virtual unsigned get(const unsigned from, const unsigned to, Session& session,
509  bool (Session::*)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const;
510 
514  F8API virtual unsigned get_last_seqnum(unsigned& to) const;
515 
520  F8API virtual bool get(unsigned& sender_seqnum, unsigned& target_seqnum) const;
521 
526  F8API virtual unsigned find_nearest_highest_seqnum (const unsigned requested, const unsigned last) const;
527 
531  F8API virtual unsigned find_nearest_seqnum (unsigned requested) const;
532 
537  F8API virtual bool put(const f8String& key, const f8String& what);
538 
543  F8API virtual bool get(const f8String& key, f8String& to) const;
544 
549  bool get_from_cache(const std::string &key, std::string &target) const
550  {
551  uint32_t flags(0);
552  memcached_return_t rc;
553  size_t value_length;
554 
555  char *value(memcached_get(_cache, key.c_str(), key.size(), &value_length, &flags, &rc));
556  if (value)
557  {
558  target.reserve(value_length);
559  target.assign(value, value + value_length);
560  free(value);
561  return true;
562  }
563 
564  return false;
565  }
566 
571  bool put_to_cache(const std::string &key, const std::string &source)
572  {
573  return memcached_success(memcached_set(_cache, key.c_str(), key.size(), source.c_str(), source.size(), 0, 0));
574  }
575 
579  const std::string generate_seq_key(unsigned seqnum) const
580  {
581  std::ostringstream ostr;
582  ostr << _key_base << ':' << seqnum;
583  return ostr.str();
584  }
585 
590  static std::string generate_ctrl_record(unsigned sender_seqnum, unsigned target_seqnum)
591  {
592  std::ostringstream ostr;
593  ostr << sender_seqnum << ':' << target_seqnum;
594  return ostr.str();
595  }
596 
602  static bool extract_ctrl_record(const std::string& source, unsigned &sender_seqnum, unsigned &target_seqnum)
603  {
604  std::istringstream istr(source);
605  istr >> sender_seqnum;
606  istr.ignore();
607  istr >> target_seqnum;
608  return true;
609  }
610 };
611 
612 #endif // HAVE_LIBMEMCACHED
613 
614 //-------------------------------------------------------------------------------------------------
615 #if defined FIX8_HAVE_LIBHIREDIS
616 class HiredisPersister : public Persister
618 {
619  redisContext *_cache = nullptr;
621  f8String _key_base;
622 
623 public:
625  HiredisPersister() = default;
626 
628  F8API virtual ~HiredisPersister();
629 
637  F8API virtual bool initialise(const f8String& host, unsigned port, unsigned connect_timeout,
638  const f8String& key_base, bool purge=false);
639 
644  F8API virtual bool put(const unsigned seqnum, const f8String& what);
645 
650  F8API virtual bool put(const unsigned sender_seqnum, const unsigned target_seqnum);
651 
656  F8API virtual bool get(const unsigned seqnum, f8String& to) const;
657 
664  F8API virtual unsigned get(const unsigned from, const unsigned to, Session& session,
665  bool (Session::*)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const;
666 
670  F8API virtual unsigned get_last_seqnum(unsigned& to) const;
671 
676  F8API virtual bool get(unsigned& sender_seqnum, unsigned& target_seqnum) const;
677 
682  F8API virtual unsigned find_nearest_highest_seqnum (const unsigned requested, const unsigned last) const;
683 
688  F8API virtual bool put(const f8String& key, const f8String& what);
689 
694  F8API virtual bool get(const f8String& key, f8String& to) const;
695 
699  F8API virtual bool del(const f8String& key);
700 };
701 
702 #endif // FIX8_HAVE_LIBHIREDIS
703 
704 //-------------------------------------------------------------------------------------------------
705 
706 } // FIX8
707 
708 #endif // FIX8_PERSIST_HPP_
std::map< uint32_t, Prec > Index
Definition: persist.hpp:398
Prec _prec
Definition: persist.hpp:384
virtual unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const =0
virtual bool purge()
Definition: persist.hpp:139
friend std::ostream & operator<<(std::ostream &os, const IPrec &what)
Definition: persist.hpp:386
virtual F8API unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const
Definition: persist.cpp:401
FilePersister(unsigned rotnum=0)
Ctor.
Definition: persist.hpp:403
std::map< unsigned, const f8String > Store
Definition: persist.hpp:294
Fix8 Base Session. User sessions are derived from this class.
Definition: session.hpp:394
virtual F8API unsigned get_last_seqnum(unsigned &to) const
Definition: persist.cpp:417
virtual bool del(const f8String &key)
Definition: persist.hpp:109
std::pair< const unsigned, const f8String > SequencePair
Definition: session.hpp:615
File persister.
Definition: persist.hpp:357
virtual F8API bool initialise(const f8String &dbDir, const f8String &dbFname, bool purge=false)
Definition: filepersist.cpp:45
Base (ABC) Persister class.
Definition: persist.hpp:55
virtual F8API bool put(const unsigned seqnum, const f8String &what)
Definition: persist.cpp:371
Provides context to your retrans handler.
Definition: session.hpp:598
virtual F8API bool put(const unsigned seqnum, const f8String &what)
virtual ~Persister()
Dtor.
Definition: persist.hpp:65
int32_t _size
Definition: persist.hpp:362
Persister()=default
Ctor.
uint32_t _seq
Definition: persist.hpp:383
virtual bool put(const f8String &key, const f8String &what)
Definition: persist.hpp:84
Persister & operator=(const Persister &)=delete
Prec(const off_t offset, const int32_t size)
Definition: persist.hpp:359
virtual unsigned get_last_seqnum(unsigned &to) const =0
#define F8API
Definition: f8dll.h:60
IPrec(const uint32_t seq, const off_t offset, const int32_t size)
Definition: persist.hpp:380
virtual bool put(const unsigned seqnum, const f8String &what)=0
#define FIX8_MAX_MSG_LENGTH
Definition: f8config.h:576
virtual void stop()
Stop the persister thread.
Definition: persist.hpp:142
friend std::ostream & operator<<(std::ostream &os, const Prec &what)
Definition: persist.hpp:374
off_t _offset
Definition: persist.hpp:361
tbb::concurrent_bounded_queue< T > f8_concurrent_queue
Definition: mpmc.hpp:48
Memory based message persister.
Definition: persist.hpp:292
virtual ~MemoryPersister()
Dtor.
Definition: persist.hpp:301
Prec & operator=(const Prec &that)
Definition: persist.hpp:364
virtual F8API ~FilePersister()
Dtor.
virtual F8API unsigned find_nearest_highest_seqnum(const unsigned requested, const unsigned last) const
virtual F8API unsigned get_last_seqnum(unsigned &to) const
std::string f8String
Definition: f8types.hpp:47