fix8  version 1.4.0
Open Source C++ FIX Framework
redispersist.cpp
Go to the documentation of this file.
1 //-----------------------------------------------------------------------------------------
2 /*
3 
4 Fix8 is released under the GNU LESSER GENERAL PUBLIC LICENSE Version 3.
5 
6 Fix8 Open Source FIX Engine.
7 Copyright (C) 2010-16 David L. Dight <fix@fix8.org>
8 
9 Fix8 is free software: you can redistribute it and / or modify it under the terms of the
10 GNU Lesser General Public License as published by the Free Software Foundation, either
11 version 3 of the License, or (at your option) any later version.
12 
13 Fix8 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
14 even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 
16 You should have received a copy of the GNU Lesser General Public License along with Fix8.
17 If not, see <http://www.gnu.org/licenses/>.
18 
19 BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE PROGRAM, TO
20 THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE
21 COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY
22 KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO
24 THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE,
25 YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
26 
27 IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT
28 HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE PROGRAM AS PERMITTED
29 ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
30 CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT
31 NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR
32 THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH
33 HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
34 
35 */
36 //-----------------------------------------------------------------------------------------
37 #include "precomp.hpp"
38 #include <fix8/f8includes.hpp>
39 
40 #if defined FIX8_HAVE_LIBHIREDIS
41 //-------------------------------------------------------------------------------------------------
42 using namespace FIX8;
43 using namespace std;
44 
45 //-------------------------------------------------------------------------------------------------
46 bool HiredisPersister::initialise(const f8String& host, unsigned port, unsigned connect_timeout,
47  const f8String& key_base, bool purge)
48 {
49  if (_cache)
50  return true;
51  _key_base = key_base;
52  const timeval timeout { connect_timeout, 0 }; // seconds
53  if (!(_cache = redisConnectWithTimeout(host.c_str(), port, timeout)) || _cache->err)
54  {
55  if (_cache->err)
56  {
57  glout_error << "redis error connect: " << _cache->errstr << " for " << _key_base;
58  }
59  return false;
60  }
61 
62  if (purge)
63  {
64  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "ZREMRANGEBYRANK %s 0 -1", _key_base.c_str())));
65  if (reply->type == REDIS_REPLY_ERROR)
66  {
67  glout_error << "redis error purge (ZREMRANGEBYRANK): " << reply->str << " for " << _key_base;
68  }
69  freeReplyObject(reply);
70  }
71 
72  return true;
73 }
74 
75 //-------------------------------------------------------------------------------------------------
76 HiredisPersister::~HiredisPersister()
77 {
78  if (_cache) // redisFree doesn't check for nullptr
79  {
80  redisFree(_cache);
81  _cache = 0;
82  }
83 }
84 
85 //-------------------------------------------------------------------------------------------------
86 unsigned HiredisPersister::get_last_seqnum(unsigned& sequence) const
87 {
88  unsigned result(0);
89  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "ZRANGE %s -1 -1 WITHSCORES", _key_base.c_str())));
90  if (reply->type == REDIS_REPLY_ERROR)
91  {
92  glout_error << "redis error ZRANGE: " << reply->str << " for " << _key_base;
93  }
94  else if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 2) // we expect two records
95  result = fast_atoi<unsigned>((*(reply->element + 1))->str); // 3nd element is score (seqnum)
96  else
97  {
98  glout_error << "redis error ZRANGE: unexpected type: " << reply->type << " for " << _key_base;
99  }
100 
101  freeReplyObject(reply);
102  return sequence = result;
103 }
104 
105 //-------------------------------------------------------------------------------------------------
106 unsigned HiredisPersister::get(const unsigned from, const unsigned to, Session& session,
107  bool (Session::*callback)(const Session::SequencePair& with, Session::RetransmissionContext& rctx)) const
108 {
109  unsigned last_seq(0);
110  get_last_seqnum(last_seq);
111  unsigned recs_sent(0), startSeqNum(from);
112  const unsigned finish(to == 0 ? last_seq : to);
113  Session::RetransmissionContext rctx(from, to, session.get_next_send_seq());
114 
115  if (!startSeqNum || from > finish)
116  {
117  glout_warn << "No records found";
118  rctx._no_more_records = true;
119  (session.*callback)(Session::SequencePair(0, ""), rctx);
120  return 0;
121  }
122 
123  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "ZRANGEBYSCORE %s %u %u WITHSCORES", _key_base.c_str(), startSeqNum, finish)));
124  if (reply->type == REDIS_REPLY_ERROR)
125  {
126  glout_error << "redis error ZRANGEBYSCORE: " << reply->str << " for " << _key_base;
127  }
128  else if (reply->type == REDIS_REPLY_ARRAY)
129  {
130  //cerr << "last_seq=" << last_seq << " reply->elements=" << reply->elements << endl;
131 
132  for (unsigned ii(0); ii < reply->elements; ii += 2)
133  {
134  Session::SequencePair txresult(fast_atoi<unsigned>((*(reply->element + ii + 1))->str),
135  f8String((*(reply->element + ii))->str, (*(reply->element + ii))->len));
136  ++recs_sent;
137  if (!(session.*callback)(txresult, rctx))
138  break;
139  }
140  }
141  else
142  {
143  glout_error << "redis error ZRANGEBYSCORE: unexpected type: " << reply->type << " for " << _key_base;
144  }
145 
146  freeReplyObject(reply);
147 
148  rctx._no_more_records = true;
149  (session.*callback)(Session::SequencePair(0, ""), rctx);
150 
151  return recs_sent;
152 }
153 
154 //-------------------------------------------------------------------------------------------------
155 bool HiredisPersister::put(const unsigned sender_seqnum, const unsigned target_seqnum)
156 {
157  if (!_cache)
158  return false;
159  bool result(true);
160  // we need to remove the exiting control record and readd it
161  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "ZREMRANGEBYSCORE %s 0 0", _key_base.c_str())));
162  freeReplyObject(reply);
163  reply = static_cast<redisReply*>(redisCommand(_cache, "ZADD %s 0 %u:%u", _key_base.c_str(), sender_seqnum, target_seqnum));
164  if (reply->type == REDIS_REPLY_ERROR)
165  {
166  glout_error << "redis error ZADD: " << reply->str << " for " << _key_base;
167  result = false;
168  }
169 
170  freeReplyObject(reply);
171  return result;
172 }
173 
174 //-------------------------------------------------------------------------------------------------
175 bool HiredisPersister::put(const unsigned seqnum, const f8String& what)
176 {
177  if (!_cache || !seqnum)
178  return false;
179 
180  bool result(true);
181  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "ZADD %s %u %b", _key_base.c_str(),
182  seqnum, what.data(), what.size())));
183  if (reply->type == REDIS_REPLY_ERROR)
184  {
185  glout_error << "redis error ZADD: " << reply->str << " for " << _key_base;
186  result = false;
187  }
188 
189  freeReplyObject(reply);
190  return result;
191 }
192 
193 //-------------------------------------------------------------------------------------------------
194 bool HiredisPersister::put(const f8String& key, const f8String& what)
195 {
196  if (!_cache)
197  return false;
198 
199  bool result(true);
200  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "SET %s%s %b", _key_base.c_str(),
201  key.c_str(), what.data(), what.size())));
202  if (reply->type == REDIS_REPLY_ERROR)
203  {
204  glout_error << "redis error SET: " << reply->str << " for " << _key_base << ':' << key;
205  result = false;
206  }
207 
208  freeReplyObject(reply);
209  return result;
210 }
211 
212 //-------------------------------------------------------------------------------------------------
213 bool HiredisPersister::get(unsigned& sender_seqnum, unsigned& target_seqnum) const
214 {
215  if (!_cache)
216  return false;
217 
218  bool result(false);
219  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "ZRANGEBYSCORE %s 0 0", _key_base.c_str())));
220  if (reply->type == REDIS_REPLY_ERROR)
221  {
222  glout_error << "redis error ZRANGEBYSCORE: " << reply->str << " for " << _key_base;
223  }
224  else if (reply->type == REDIS_REPLY_ARRAY)
225  {
226  if (reply->elements == 1)
227  {
228  istringstream istr((*reply->element)->str);
229  istr >> sender_seqnum;
230  istr.ignore();
231  istr >> target_seqnum;
232  result = true;
233  }
234  }
235  else
236  {
237  glout_error << "redis error ZRANGEBYSCORE: unexpected type: " << reply->type << " for " << _key_base;
238  }
239 
240  freeReplyObject(reply);
241  return result;
242 }
243 
244 //-------------------------------------------------------------------------------------------------
245 bool HiredisPersister::get(const unsigned seqnum, f8String& to) const
246 {
247  if (!_cache || !seqnum)
248  return false;
249 
250  bool result(false);
251  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "ZRANGEBYSCORE %s %u %u", _key_base.c_str(), seqnum, seqnum)));
252  if (reply->type == REDIS_REPLY_ERROR)
253  {
254  glout_error << "redis error ZRANGEBYSCORE: " << reply->str << " for " << _key_base;
255  }
256  else if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 1) // we expect one record
257  {
258  to.assign((*reply->element)->str, (*reply->element)->len);
259  result = true;
260  }
261  else
262  {
263  glout_error << "redis error ZRANGEBYSCORE: unexpected type: " << reply->type << " for " << _key_base;
264  }
265 
266  freeReplyObject(reply);
267  return result;
268 }
269 
270 //-------------------------------------------------------------------------------------------------
271 bool HiredisPersister::get(const f8String& key, f8String& to) const
272 {
273  if (!_cache)
274  return false;
275 
276  bool result(false);
277  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "GET %s%s", _key_base.c_str(), key.c_str())));
278  if (reply->type == REDIS_REPLY_ERROR)
279  {
280  glout_error << "redis error GET: " << reply->str << " for " << _key_base << ':' << key;
281  }
282  else if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 1) // we expect one record
283  {
284  to.assign((*reply->element)->str, (*reply->element)->len);
285  result = true;
286  }
287  else
288  {
289  glout_error << "redis error GET: unexpected type: " << reply->type << " for " << _key_base << ':' << key;
290  }
291 
292  freeReplyObject(reply);
293  return result;
294 }
295 
296 //-------------------------------------------------------------------------------------------------
297 bool HiredisPersister::del(const f8String& key)
298 {
299  if (!_cache)
300  return false;
301 
302  bool result(false);
303  redisReply *reply(static_cast<redisReply*>(redisCommand(_cache, "DEL %s%s", _key_base.c_str(), key.c_str())));
304  if (reply->type == REDIS_REPLY_ERROR)
305  {
306  glout_error << "redis error DEL: " << reply->str << " for " << _key_base << ':' << key;
307  }
308  else if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 1) // we expect one record
309  {
310  result = true;
311  }
312  else
313  {
314  glout_error << "redis error DEL: unexpected type: " << reply->type << " for " << _key_base << ':' << key;
315  }
316 
317  freeReplyObject(reply);
318  return result;
319 }
320 
321 //---------------------------------------------------------------------------------------------------
322 unsigned HiredisPersister::find_nearest_highest_seqnum (const unsigned requested, const unsigned last) const
323 {
324  if (last)
325  {
326  string target;
327  for (unsigned startseqnum(requested); startseqnum <= last; ++startseqnum)
328  if (get(startseqnum, target))
329  return startseqnum;
330  }
331 
332  return 0;
333 }
334 
335 #endif // FIX8_HAVE_LIBHIREDIS
336 
Fix8 Base Session. User sessions are derived from this class.
Definition: session.hpp:394
std::pair< const unsigned, const f8String > SequencePair
Definition: session.hpp:615
Provides context to your retrans handler.
Definition: session.hpp:598
#define glout_error
Definition: logger.hpp:606
unsigned get_next_send_seq() const
Definition: session.hpp:750
#define glout_warn
Definition: logger.hpp:604
std::string f8String
Definition: f8types.hpp:47