00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "Session.h"
00028 #include "Values.h"
00029 #include <algorithm>
00030 #include <iostream>
00031
00032 namespace FIX
00033 {
00034 Session::Sessions Session::s_sessions;
00035 Session::SessionIDs Session::s_sessionIDs;
00036 Session::Sessions Session::s_registered;
00037 Mutex Session::s_mutex;
00038
00039 #define LOGEX( method ) try { method; } catch( std::exception& e ) \
00040 { m_state.onEvent( e.what() ); }
00041
00042 Session::Session( Application& application,
00043 MessageStoreFactory& messageStoreFactory,
00044 const SessionID& sessionID,
00045 const DataDictionaryProvider& dataDictionaryProvider,
00046 const TimeRange& sessionTime,
00047 int heartBtInt, LogFactory* pLogFactory )
00048 : m_application( application ),
00049 m_sessionID( sessionID ),
00050 m_sessionTime( sessionTime ),
00051 m_logonTime( sessionTime ),
00052 m_senderDefaultApplVerID(ApplVerID_FIX50),
00053 m_targetDefaultApplVerID(ApplVerID_FIX50),
00054 m_sendRedundantResendRequests( false ),
00055 m_checkCompId( true ),
00056 m_checkLatency( true ),
00057 m_maxLatency( 120 ),
00058 m_resetOnLogon( false ),
00059 m_resetOnLogout( false ),
00060 m_resetOnDisconnect( false ),
00061 m_refreshOnLogon( false ),
00062 m_millisecondsInTimeStamp( true ),
00063 m_persistMessages( true ),
00064 m_dataDictionaryProvider( dataDictionaryProvider ),
00065 m_messageStoreFactory( messageStoreFactory ),
00066 m_pLogFactory( pLogFactory ),
00067 m_pResponder( 0 )
00068 {
00069 m_state.heartBtInt( heartBtInt );
00070 m_state.initiate( heartBtInt != 0 );
00071 m_state.store( m_messageStoreFactory.create( m_sessionID ) );
00072 if ( m_pLogFactory )
00073 m_state.log( m_pLogFactory->create( m_sessionID ) );
00074
00075 if( !checkSessionTime(UtcTimeStamp()) )
00076 reset();
00077
00078 addSession( *this );
00079 m_application.onCreate( m_sessionID );
00080 m_state.onEvent( "Created session" );
00081 }
00082
00083 Session::~Session()
00084 { QF_STACK_IGNORE_BEGIN
00085 removeSession( *this );
00086 m_messageStoreFactory.destroy( m_state.store() );
00087 if ( m_pLogFactory && m_state.log() )
00088 m_pLogFactory->destroy( m_state.log() );
00089 QF_STACK_IGNORE_END
00090 }
00091
00092 void Session::insertSendingTime( Header& header )
00093 { QF_STACK_PUSH(Session::insertSendingTime)
00094
00095 UtcTimeStamp now;
00096 bool showMilliseconds = false;
00097 if( m_sessionID.getBeginString() == BeginString_FIXT11 )
00098 showMilliseconds = true;
00099 else
00100 showMilliseconds = m_sessionID.getBeginString() >= BeginString_FIX42;
00101
00102 header.setField( SendingTime(now, showMilliseconds && m_millisecondsInTimeStamp) );
00103
00104 QF_STACK_POP
00105 }
00106
00107 void Session::insertOrigSendingTime( Header& header, const UtcTimeStamp& when )
00108 { QF_STACK_PUSH(Session::insertSendingTime)
00109
00110 bool showMilliseconds = false;
00111 if( m_sessionID.getBeginString() == BeginString_FIXT11 )
00112 showMilliseconds = true;
00113 else
00114 showMilliseconds = m_sessionID.getBeginString() >= BeginString_FIX42;
00115
00116 header.setField( OrigSendingTime(when, showMilliseconds && m_millisecondsInTimeStamp) );
00117
00118 QF_STACK_POP
00119 }
00120
00121 void Session::fill( Header& header )
00122 { QF_STACK_PUSH(Session::fill)
00123
00124 UtcTimeStamp now;
00125 m_state.lastSentTime( now );
00126 header.setField( m_sessionID.getBeginString() );
00127 header.setField( m_sessionID.getSenderCompID() );
00128 header.setField( m_sessionID.getTargetCompID() );
00129 header.setField( MsgSeqNum( getExpectedSenderNum() ) );
00130 insertSendingTime( header );
00131
00132 QF_STACK_POP
00133 }
00134
00135 void Session::next()
00136 {
00137 next( UtcTimeStamp() );
00138 }
00139
00140 void Session::next( const UtcTimeStamp& timeStamp )
00141 { QF_STACK_PUSH(Session::next)
00142
00143 try
00144 {
00145 if ( !checkSessionTime(timeStamp) )
00146 { reset(); return; }
00147
00148 if( !isEnabled() || !isLogonTime(timeStamp) )
00149 {
00150 if( isLoggedOn() )
00151 {
00152 if( !m_state.sentLogout() )
00153 {
00154 m_state.onEvent( "Initiated logout request" );
00155 generateLogout( m_state.logoutReason() );
00156 }
00157 }
00158 else
00159 return;
00160 }
00161
00162 if ( !m_state.receivedLogon() )
00163 {
00164 if ( m_state.shouldSendLogon() && isLogonTime(timeStamp) )
00165 {
00166 generateLogon();
00167 m_state.onEvent( "Initiated logon request" );
00168 }
00169 else if ( m_state.alreadySentLogon() && m_state.logonTimedOut() )
00170 {
00171 m_state.onEvent( "Timed out waiting for logon response" );
00172 disconnect();
00173 }
00174 return ;
00175 }
00176
00177 if ( m_state.heartBtInt() == 0 ) return ;
00178
00179 if ( m_state.logoutTimedOut() )
00180 {
00181 m_state.onEvent( "Timed out waiting for logout response" );
00182 disconnect();
00183 }
00184
00185 if ( m_state.withinHeartBeat() ) return ;
00186
00187 if ( m_state.timedOut() )
00188 {
00189 m_state.onEvent( "Timed out waiting for heartbeat" );
00190 disconnect();
00191 }
00192 else
00193 {
00194 if ( m_state.needTestRequest() )
00195 {
00196 generateTestRequest( "TEST" );
00197 m_state.testRequest( m_state.testRequest() + 1 );
00198 m_state.onEvent( "Sent test request TEST" );
00199 }
00200 else if ( m_state.needHeartbeat() )
00201 {
00202 generateHeartbeat();
00203 }
00204 }
00205 }
00206 catch ( FIX::IOException& e )
00207 {
00208 m_state.onEvent( e.what() );
00209 disconnect();
00210 }
00211
00212 QF_STACK_POP
00213 }
00214
00215 void Session::nextLogon( const Message& logon, const UtcTimeStamp& timeStamp )
00216 { QF_STACK_PUSH(Session::nextLogon)
00217
00218 SenderCompID senderCompID;
00219 TargetCompID targetCompID;
00220 logon.getHeader().getField( senderCompID );
00221 logon.getHeader().getField( targetCompID );
00222
00223 if( m_refreshOnLogon )
00224 refresh();
00225
00226 if( !isLogonTime(timeStamp) )
00227 {
00228 m_state.onEvent( "Received logon outside of valid logon time" );
00229 disconnect();
00230 return;
00231 }
00232
00233 ResetSeqNumFlag resetSeqNumFlag(false);
00234 if( logon.isSetField(resetSeqNumFlag) )
00235 logon.getField( resetSeqNumFlag );
00236 m_state.receivedReset( resetSeqNumFlag );
00237
00238 if( m_state.receivedReset() )
00239 {
00240 m_state.onEvent( "Logon contains ResetSeqNumFlag=Y, reseting sequence numbers to 1" );
00241 if( !m_state.sentReset() ) m_state.reset();
00242 }
00243
00244 if( m_state.shouldSendLogon() && !m_state.receivedReset() )
00245 {
00246 m_state.onEvent( "Received logon response before sending request" );
00247 disconnect();
00248 return;
00249 }
00250
00251 if( !m_state.initiate() && m_resetOnLogon )
00252 m_state.reset();
00253
00254 if( !verify( logon, false, true ) )
00255 return;
00256 m_state.receivedLogon( true );
00257
00258 if ( !m_state.initiate()
00259 || (m_state.receivedReset() && !m_state.sentReset()) )
00260 {
00261 if( logon.isSetField(m_state.heartBtInt()) )
00262 logon.getField( m_state.heartBtInt() );
00263 m_state.onEvent( "Received logon request" );
00264 generateLogon( logon );
00265 m_state.onEvent( "Responding to logon request" );
00266 }
00267 else
00268 m_state.onEvent( "Received logon response" );
00269
00270 m_state.sentReset( false );
00271 m_state.receivedReset( false );
00272
00273 MsgSeqNum msgSeqNum;
00274 logon.getHeader().getField( msgSeqNum );
00275 if ( isTargetTooHigh( msgSeqNum ) && !resetSeqNumFlag )
00276 {
00277 doTargetTooHigh( logon );
00278 }
00279 else
00280 {
00281 m_state.incrNextTargetMsgSeqNum();
00282 nextQueued( timeStamp );
00283 }
00284
00285 if ( isLoggedOn() )
00286 m_application.onLogon( m_sessionID );
00287
00288 QF_STACK_POP
00289 }
00290
00291 void Session::nextHeartbeat( const Message& heartbeat, const UtcTimeStamp& timeStamp )
00292 { QF_STACK_PUSH(Session::nextHeartbeat)
00293
00294 if ( !verify( heartbeat ) ) return ;
00295 m_state.incrNextTargetMsgSeqNum();
00296 nextQueued( timeStamp );
00297
00298 QF_STACK_POP
00299 }
00300
00301 void Session::nextTestRequest( const Message& testRequest, const UtcTimeStamp& timeStamp )
00302 { QF_STACK_PUSH(Session::nextTestRequest)
00303
00304 if ( !verify( testRequest ) ) return ;
00305 generateHeartbeat( testRequest );
00306 m_state.incrNextTargetMsgSeqNum();
00307 nextQueued( timeStamp );
00308
00309 QF_STACK_POP
00310 }
00311
00312 void Session::nextLogout( const Message& logout, const UtcTimeStamp& timeStamp )
00313 { QF_STACK_PUSH(Session::nextLogout)
00314
00315 if ( !verify( logout, false, false ) ) return ;
00316 if ( !m_state.sentLogout() )
00317 {
00318 m_state.onEvent( "Received logout request" );
00319 generateLogout();
00320 m_state.onEvent( "Sending logout response" );
00321 }
00322 else
00323 m_state.onEvent( "Received logout response" );
00324
00325 m_state.incrNextTargetMsgSeqNum();
00326 if ( m_resetOnLogout ) m_state.reset();
00327 disconnect();
00328
00329 QF_STACK_POP
00330 }
00331
00332 void Session::nextReject( const Message& reject, const UtcTimeStamp& timeStamp )
00333 { QF_STACK_PUSH(Session::nextReject)
00334
00335 if ( !verify( reject, false, true ) ) return ;
00336 m_state.incrNextTargetMsgSeqNum();
00337 nextQueued( timeStamp );
00338
00339 QF_STACK_POP
00340 }
00341
00342 void Session::nextSequenceReset( const Message& sequenceReset, const UtcTimeStamp& timeStamp )
00343 { QF_STACK_PUSH(Session::nextSequenceReset)
00344
00345 bool isGapFill = false;
00346 GapFillFlag gapFillFlag;
00347 if ( sequenceReset.isSetField( gapFillFlag ) )
00348 {
00349 sequenceReset.getField( gapFillFlag );
00350 isGapFill = gapFillFlag;
00351 }
00352
00353 if ( !verify( sequenceReset, isGapFill, isGapFill ) ) return ;
00354
00355 NewSeqNo newSeqNo;
00356 if ( sequenceReset.isSetField( newSeqNo ) )
00357 {
00358 sequenceReset.getField( newSeqNo );
00359
00360 m_state.onEvent( "Received SequenceReset FROM: "
00361 + IntConvertor::convert( getExpectedTargetNum() ) +
00362 " TO: " + IntConvertor::convert( newSeqNo ) );
00363
00364 if ( newSeqNo > getExpectedTargetNum() )
00365 m_state.setNextTargetMsgSeqNum( MsgSeqNum( newSeqNo ) );
00366 else if ( newSeqNo < getExpectedTargetNum() )
00367 generateReject( sequenceReset, SessionRejectReason_VALUE_IS_INCORRECT );
00368 }
00369
00370 QF_STACK_POP
00371 }
00372
00373 void Session::nextResendRequest( const Message& resendRequest, const UtcTimeStamp& timeStamp )
00374 { QF_STACK_PUSH(Session::nextResendRequest)
00375
00376 if ( !verify( resendRequest, false, false ) ) return ;
00377
00378 Locker l( m_mutex );
00379
00380 BeginSeqNo beginSeqNo;
00381 EndSeqNo endSeqNo;
00382 resendRequest.getField( beginSeqNo );
00383 resendRequest.getField( endSeqNo );
00384
00385 m_state.onEvent( "Received ResendRequest FROM: "
00386 + IntConvertor::convert( beginSeqNo ) +
00387 " TO: " + IntConvertor::convert( endSeqNo ) );
00388
00389 std::string beginString = m_sessionID.getBeginString();
00390 if ( (beginString >= FIX::BeginString_FIX42 && endSeqNo == 0) ||
00391 (beginString <= FIX::BeginString_FIX42 && endSeqNo == 999999) ||
00392 (endSeqNo >= getExpectedSenderNum()) )
00393 { endSeqNo = getExpectedSenderNum() - 1; }
00394
00395 if ( !m_persistMessages )
00396 {
00397 endSeqNo = EndSeqNo(endSeqNo + 1);
00398 int next = m_state.getNextSenderMsgSeqNum();
00399 if( endSeqNo > next )
00400 endSeqNo = EndSeqNo(next);
00401 generateSequenceReset( beginSeqNo, endSeqNo );
00402 return;
00403 }
00404
00405 std::vector < std::string > messages;
00406 m_state.get( beginSeqNo, endSeqNo, messages );
00407
00408 std::vector < std::string > ::iterator i;
00409 MsgSeqNum msgSeqNum(0);
00410 MsgType msgType;
00411 int begin = 0;
00412 int current = beginSeqNo;
00413 std::string messageString;
00414 Message msg;
00415
00416 for ( i = messages.begin(); i != messages.end(); ++i )
00417 {
00418 const DataDictionary& sessionDD =
00419 m_dataDictionaryProvider.getSessionDataDictionary(m_sessionID.getBeginString());
00420
00421 if( m_sessionID.isFIXT() )
00422 {
00423 msg.setStringHeader(*i);
00424 ApplVerID applVerID;
00425 if( msg.getHeader().isSetField(applVerID) )
00426 msg.getHeader().getField(applVerID);
00427 else
00428 applVerID = m_senderDefaultApplVerID;
00429
00430 const DataDictionary& applicationDD =
00431 m_dataDictionaryProvider.getApplicationDataDictionary(applVerID);
00432 msg = Message( *i, sessionDD, applicationDD );
00433 }
00434 else
00435 {
00436 msg = Message( *i, sessionDD );
00437 }
00438
00439
00440 msg.getHeader().getField( msgSeqNum );
00441 msg.getHeader().getField( msgType );
00442
00443 if( (current != msgSeqNum) && !begin )
00444 begin = current;
00445
00446 if ( Message::isAdminMsgType( msgType ) )
00447 {
00448 if ( !begin ) begin = msgSeqNum;
00449 }
00450 else
00451 {
00452 if ( resend( msg ) )
00453 {
00454 if ( begin ) generateSequenceReset( begin, msgSeqNum );
00455 send( msg.toString(messageString) );
00456 m_state.onEvent( "Resending Message: "
00457 + IntConvertor::convert( msgSeqNum ) );
00458 begin = 0;
00459 }
00460 else
00461 { if ( !begin ) begin = msgSeqNum; }
00462 }
00463 current = msgSeqNum + 1;
00464 }
00465 if ( begin )
00466 {
00467 generateSequenceReset( begin, msgSeqNum + 1 );
00468 }
00469
00470 if ( endSeqNo > msgSeqNum )
00471 {
00472 endSeqNo = EndSeqNo(endSeqNo + 1);
00473 int next = m_state.getNextSenderMsgSeqNum();
00474 if( endSeqNo > next )
00475 endSeqNo = EndSeqNo(next);
00476 generateSequenceReset( beginSeqNo, endSeqNo );
00477 }
00478
00479 resendRequest.getHeader().getField( msgSeqNum );
00480 if( !isTargetTooHigh(msgSeqNum) && !isTargetTooLow(msgSeqNum) )
00481 m_state.incrNextTargetMsgSeqNum();
00482
00483 QF_STACK_POP
00484 }
00485
00486 bool Session::send( Message& message )
00487 { QF_STACK_PUSH(Session::send)
00488
00489 message.getHeader().removeField( FIELD::PossDupFlag );
00490 message.getHeader().removeField( FIELD::OrigSendingTime );
00491 return sendRaw( message );
00492
00493 QF_STACK_POP
00494 }
00495
00496 bool Session::sendRaw( Message& message, int num )
00497 { QF_STACK_PUSH(Session::sendRaw)
00498
00499 Locker l( m_mutex );
00500
00501 try
00502 {
00503 Header& header = message.getHeader();
00504
00505 MsgType msgType;
00506 if( header.isSetField(msgType) )
00507 header.getField( msgType );
00508
00509 fill( header );
00510 std::string messageString;
00511
00512 if ( num )
00513 header.setField( MsgSeqNum( num ) );
00514
00515 if ( Message::isAdminMsgType( msgType ) )
00516 {
00517 m_application.toAdmin( message, m_sessionID );
00518
00519 if( msgType == "A" && !m_state.receivedReset() )
00520 {
00521 ResetSeqNumFlag resetSeqNumFlag( false );
00522 if( message.isSetField(resetSeqNumFlag) )
00523 message.getField( resetSeqNumFlag );
00524 if( resetSeqNumFlag )
00525 {
00526 m_state.reset();
00527 message.getHeader().setField( MsgSeqNum(getExpectedSenderNum()) );
00528 }
00529 m_state.sentReset( resetSeqNumFlag );
00530 }
00531
00532 message.toString( messageString );
00533
00534 if( !num )
00535 persist( message, messageString );
00536
00537 if (
00538 msgType == "A" || msgType == "5"
00539 || msgType == "2" || msgType == "4"
00540 || isLoggedOn() )
00541 {
00542 send( messageString );
00543 }
00544 }
00545 else
00546 {
00547
00548 if( !isLoggedOn() && shouldSendReset() )
00549 return false;
00550
00551 try
00552 {
00553 m_application.toApp( message, m_sessionID );
00554 message.toString( messageString );
00555
00556 if( !num )
00557 persist( message, messageString );
00558
00559 if ( isLoggedOn() )
00560 send( messageString );
00561 }
00562 catch ( DoNotSend& ) { return false; }
00563 }
00564
00565 return true;
00566 }
00567 catch ( IOException& e )
00568 {
00569 m_state.onEvent( e.what() );
00570 return false;
00571 }
00572
00573 QF_STACK_POP
00574 }
00575
00576 bool Session::send( const std::string& string )
00577 { QF_STACK_PUSH(Session::send)
00578
00579 if ( !m_pResponder ) return false;
00580 m_state.onOutgoing( string );
00581 return m_pResponder->send( string );
00582
00583 QF_STACK_POP
00584 }
00585
00586 void Session::disconnect()
00587 { QF_STACK_PUSH(Session::disconnect)
00588
00589 Locker l(m_mutex);
00590
00591 if ( m_pResponder )
00592 {
00593 m_state.onEvent( "Disconnecting" );
00594
00595 m_pResponder->disconnect();
00596 m_pResponder = 0;
00597 }
00598
00599 if ( m_state.receivedLogon() || m_state.sentLogon() )
00600 {
00601 m_state.receivedLogon( false );
00602 m_state.sentLogon( false );
00603 m_application.onLogout( m_sessionID );
00604 }
00605
00606 m_state.sentLogout( false );
00607 m_state.receivedReset( false );
00608 m_state.sentReset( false );
00609 m_state.clearQueue();
00610 m_state.logoutReason();
00611 if ( m_resetOnDisconnect )
00612 m_state.reset();
00613
00614 m_state.resendRange( 0, 0 );
00615
00616 QF_STACK_POP
00617 }
00618
00619 bool Session::resend( Message& message )
00620 { QF_STACK_PUSH(Session::resend)
00621
00622 SendingTime sendingTime;
00623 MsgSeqNum msgSeqNum;
00624 Header& header = message.getHeader();
00625 header.getField( sendingTime );
00626 header.getField( msgSeqNum );
00627 insertOrigSendingTime( header, sendingTime );
00628 header.setField( PossDupFlag( true ) );
00629 insertSendingTime( header );
00630
00631 try
00632 {
00633 m_application.toApp( message, m_sessionID );
00634 return true;
00635 }
00636 catch ( DoNotSend& )
00637 { return false; }
00638
00639 QF_STACK_POP
00640 }
00641
00642 void Session::persist( const Message& message, const std::string& messageString )
00643 throw ( IOException )
00644 {
00645 MsgSeqNum msgSeqNum;
00646 message.getHeader().getField( msgSeqNum );
00647 if( m_persistMessages )
00648 m_state.set( msgSeqNum, messageString );
00649 m_state.incrNextSenderMsgSeqNum();
00650 }
00651
00652 void Session::generateLogon()
00653 { QF_STACK_PUSH(Session::generateLogon)
00654
00655 Message logon;
00656 logon.getHeader().setField( MsgType( "A" ) );
00657 logon.setField( EncryptMethod( 0 ) );
00658 logon.setField( m_state.heartBtInt() );
00659 if( m_sessionID.isFIXT() )
00660 logon.setField( DefaultApplVerID(m_senderDefaultApplVerID) );
00661 if( m_refreshOnLogon )
00662 refresh();
00663 if( m_resetOnLogon )
00664 m_state.reset();
00665 if( shouldSendReset() )
00666 logon.setField( ResetSeqNumFlag(true) );
00667
00668 fill( logon.getHeader() );
00669 UtcTimeStamp now;
00670 m_state.lastReceivedTime( now );
00671 m_state.testRequest( 0 );
00672 m_state.sentLogon( true );
00673 sendRaw( logon );
00674
00675 QF_STACK_POP
00676 }
00677
00678 void Session::generateLogon( const Message& aLogon )
00679 { QF_STACK_PUSH(Session::generateLogon)
00680
00681 Message logon;
00682 EncryptMethod encryptMethod;
00683 HeartBtInt heartBtInt;
00684 logon.setField( EncryptMethod( 0 ) );
00685 if( m_sessionID.isFIXT() )
00686 logon.setField( DefaultApplVerID(m_senderDefaultApplVerID) );
00687 if( m_state.receivedReset() )
00688 logon.setField( ResetSeqNumFlag(true) );
00689 aLogon.getField( heartBtInt );
00690 logon.getHeader().setField( MsgType( "A" ) );
00691 logon.setField( heartBtInt );
00692 fill( logon.getHeader() );
00693 sendRaw( logon );
00694 m_state.sentLogon( true );
00695
00696 QF_STACK_POP
00697 }
00698
00699 void Session::generateResendRequest( const BeginString& beginString, const MsgSeqNum& msgSeqNum )
00700 { QF_STACK_PUSH(Session::generateResendRequest)
00701
00702 Message resendRequest;
00703 BeginSeqNo beginSeqNo( ( int ) getExpectedTargetNum() );
00704 EndSeqNo endSeqNo( msgSeqNum - 1 );
00705 if ( beginString >= FIX::BeginString_FIX42 )
00706 endSeqNo = 0;
00707 else if( beginString <= FIX::BeginString_FIX41 )
00708 endSeqNo = 999999;
00709 resendRequest.getHeader().setField( MsgType( "2" ) );
00710 resendRequest.setField( beginSeqNo );
00711 resendRequest.setField( endSeqNo );
00712 fill( resendRequest.getHeader() );
00713 sendRaw( resendRequest );
00714
00715 m_state.onEvent( "Sent ResendRequest FROM: "
00716 + IntConvertor::convert( beginSeqNo ) +
00717 " TO: " + IntConvertor::convert( endSeqNo ) );
00718
00719 m_state.resendRange( beginSeqNo, msgSeqNum - 1 );
00720
00721 QF_STACK_POP
00722 }
00723
00724 void Session::generateSequenceReset
00725 ( int beginSeqNo, int endSeqNo )
00726 { QF_STACK_PUSH(Session::generateSequenceReset)
00727
00728 Message sequenceReset;
00729 NewSeqNo newSeqNo( endSeqNo );
00730 sequenceReset.getHeader().setField( MsgType( "4" ) );
00731 sequenceReset.getHeader().setField( PossDupFlag( true ) );
00732 sequenceReset.setField( newSeqNo );
00733 fill( sequenceReset.getHeader() );
00734
00735 SendingTime sendingTime;
00736 sequenceReset.getHeader().getField( sendingTime );
00737 insertOrigSendingTime( sequenceReset.getHeader(), sendingTime );
00738 sequenceReset.getHeader().setField( MsgSeqNum( beginSeqNo ) );
00739 sequenceReset.setField( GapFillFlag( true ) );
00740 sendRaw( sequenceReset, beginSeqNo );
00741 m_state.onEvent( "Sent SequenceReset TO: "
00742 + IntConvertor::convert( newSeqNo ) );
00743
00744 QF_STACK_POP
00745 }
00746
00747 void Session::generateHeartbeat()
00748 { QF_STACK_PUSH(Session::generateHeartbeat)
00749
00750 Message heartbeat;
00751 heartbeat.getHeader().setField( MsgType( "0" ) );
00752 fill( heartbeat.getHeader() );
00753 sendRaw( heartbeat );
00754
00755 QF_STACK_POP
00756 }
00757
00758 void Session::generateHeartbeat( const Message& testRequest )
00759 { QF_STACK_PUSH(Session::generateHeartbeat)
00760
00761 Message heartbeat;
00762 heartbeat.getHeader().setField( MsgType( "0" ) );
00763 fill( heartbeat.getHeader() );
00764 try
00765 {
00766 TestReqID testReqID;
00767 testRequest.getField( testReqID );
00768 heartbeat.setField( testReqID );
00769 }
00770 catch ( FieldNotFound& ) {}
00771
00772 sendRaw( heartbeat );
00773
00774 QF_STACK_POP
00775 }
00776
00777 void Session::generateTestRequest( const std::string& id )
00778 { QF_STACK_PUSH(Session::generateTestRequest)
00779
00780 Message testRequest;
00781 testRequest.getHeader().setField( MsgType( "1" ) );
00782 fill( testRequest.getHeader() );
00783 TestReqID testReqID( id );
00784 testRequest.setField( testReqID );
00785
00786 sendRaw( testRequest );
00787
00788 QF_STACK_POP
00789 }
00790
00791 void Session::generateReject( const Message& message, int err, int field )
00792 { QF_STACK_PUSH(Session::generateReject)
00793
00794 std::string beginString = m_sessionID.getBeginString();
00795
00796 Message reject;
00797 reject.getHeader().setField( MsgType( "3" ) );
00798 reject.reverseRoute( message.getHeader() );
00799 fill( reject.getHeader() );
00800
00801 MsgSeqNum msgSeqNum;
00802 MsgType msgType;
00803
00804 message.getHeader().getField( msgType );
00805 if( message.getHeader().isSetField( msgSeqNum ) )
00806 {
00807 message.getHeader().getField( msgSeqNum );
00808 if( msgSeqNum.getString() != "" )
00809 reject.setField( RefSeqNum( msgSeqNum ) );
00810 }
00811
00812 if ( beginString >= FIX::BeginString_FIX42 )
00813 {
00814 if( msgType.getString() != "" )
00815 reject.setField( RefMsgType( msgType ) );
00816 if ( (beginString == FIX::BeginString_FIX42
00817 && err <= SessionRejectReason_INVALID_MSGTYPE)
00818 || beginString > FIX::BeginString_FIX42 )
00819 {
00820 reject.setField( SessionRejectReason( err ) );
00821 }
00822 }
00823 if ( msgType != MsgType_Logon && msgType != MsgType_SequenceReset
00824 && msgSeqNum == getExpectedTargetNum() )
00825 { m_state.incrNextTargetMsgSeqNum(); }
00826
00827 const char* reason = 0;
00828 switch ( err )
00829 {
00830 case SessionRejectReason_INVALID_TAG_NUMBER:
00831 reason = SessionRejectReason_INVALID_TAG_NUMBER_TEXT;
00832 break;
00833 case SessionRejectReason_REQUIRED_TAG_MISSING:
00834 reason = SessionRejectReason_REQUIRED_TAG_MISSING_TEXT;
00835 break;
00836 case SessionRejectReason_TAG_NOT_DEFINED_FOR_THIS_MESSAGE_TYPE:
00837 reason = SessionRejectReason_TAG_NOT_DEFINED_FOR_THIS_MESSAGE_TYPE_TEXT;
00838 break;
00839 case SessionRejectReason_TAG_SPECIFIED_WITHOUT_A_VALUE:
00840 reason = SessionRejectReason_TAG_SPECIFIED_WITHOUT_A_VALUE_TEXT;
00841 break;
00842 case SessionRejectReason_VALUE_IS_INCORRECT:
00843 reason = SessionRejectReason_VALUE_IS_INCORRECT_TEXT;
00844 break;
00845 case SessionRejectReason_INCORRECT_DATA_FORMAT_FOR_VALUE:
00846 reason = SessionRejectReason_INCORRECT_DATA_FORMAT_FOR_VALUE_TEXT;
00847 break;
00848 case SessionRejectReason_COMPID_PROBLEM:
00849 reason = SessionRejectReason_COMPID_PROBLEM_TEXT;
00850 break;
00851 case SessionRejectReason_SENDINGTIME_ACCURACY_PROBLEM:
00852 reason = SessionRejectReason_SENDINGTIME_ACCURACY_PROBLEM_TEXT;
00853 break;
00854 case SessionRejectReason_INVALID_MSGTYPE:
00855 reason = SessionRejectReason_INVALID_MSGTYPE_TEXT;
00856 break;
00857 case SessionRejectReason_TAG_APPEARS_MORE_THAN_ONCE:
00858 reason = SessionRejectReason_TAG_APPEARS_MORE_THAN_ONCE_TEXT;
00859 break;
00860 case SessionRejectReason_TAG_SPECIFIED_OUT_OF_REQUIRED_ORDER:
00861 reason = SessionRejectReason_TAG_SPECIFIED_OUT_OF_REQUIRED_ORDER_TEXT;
00862 break;
00863 case SessionRejectReason_INCORRECT_NUMINGROUP_COUNT_FOR_REPEATING_GROUP:
00864 reason = SessionRejectReason_INCORRECT_NUMINGROUP_COUNT_FOR_REPEATING_GROUP_TEXT;
00865 };
00866
00867 if ( reason && ( field || err == SessionRejectReason_INVALID_TAG_NUMBER ) )
00868 {
00869 populateRejectReason( reject, field, reason );
00870 m_state.onEvent( "Message " + msgSeqNum.getString() + " Rejected: "
00871 + reason + ":" + IntConvertor::convert( field ) );
00872 }
00873 else if ( reason )
00874 {
00875 populateRejectReason( reject, reason );
00876 m_state.onEvent( "Message " + msgSeqNum.getString()
00877 + " Rejected: " + reason );
00878 }
00879 else
00880 m_state.onEvent( "Message " + msgSeqNum.getString() + " Rejected" );
00881
00882 if ( !m_state.receivedLogon() )
00883 throw std::runtime_error( "Tried to send a reject while not logged on" );
00884
00885 sendRaw( reject );
00886
00887 QF_STACK_POP
00888 }
00889
00890 void Session::generateReject( const Message& message, const std::string& str )
00891 { QF_STACK_PUSH(Session::generateReject)
00892
00893 std::string beginString = m_sessionID.getBeginString();
00894
00895 Message reject;
00896 reject.getHeader().setField( MsgType( "3" ) );
00897 reject.reverseRoute( message.getHeader() );
00898 fill( reject.getHeader() );
00899
00900 MsgType msgType;
00901 MsgSeqNum msgSeqNum;
00902
00903 message.getHeader().getField( msgType );
00904 message.getHeader().getField( msgSeqNum );
00905 if ( beginString >= FIX::BeginString_FIX42 )
00906 reject.setField( RefMsgType( msgType ) );
00907 reject.setField( RefSeqNum( msgSeqNum ) );
00908
00909 if ( msgType != MsgType_Logon && msgType != MsgType_SequenceReset )
00910 m_state.incrNextTargetMsgSeqNum();
00911
00912 reject.setField( Text( str ) );
00913 sendRaw( reject );
00914 m_state.onEvent( "Message " + msgSeqNum.getString()
00915 + " Rejected: " + str );
00916
00917 QF_STACK_POP
00918 }
00919
00920 void Session::generateBusinessReject( const Message& message, int err, int field )
00921 { QF_STACK_PUSH(Session::generateBusinessReject)
00922
00923 Message reject;
00924 reject.getHeader().setField( MsgType( MsgType_BusinessMessageReject ) );
00925 fill( reject.getHeader() );
00926 MsgType msgType;
00927 MsgSeqNum msgSeqNum;
00928 message.getHeader().getField( msgType );
00929 message.getHeader().getField( msgSeqNum );
00930 reject.setField( RefMsgType( msgType ) );
00931 reject.setField( RefSeqNum( msgSeqNum ) );
00932 reject.setField( BusinessRejectReason( err ) );
00933 m_state.incrNextTargetMsgSeqNum();
00934
00935 const char* reason = 0;
00936 switch ( err )
00937 {
00938 case BusinessRejectReason_OTHER:
00939 reason = BusinessRejectReason_OTHER_TEXT;
00940 break;
00941 case BusinessRejectReason_UNKNOWN_ID:
00942 reason = BusinessRejectReason_UNKNOWN_ID_TEXT;
00943 break;
00944 case BusinessRejectReason_UNKNOWN_SECURITY:
00945 reason = BusinessRejectReason_UNKNOWN_SECURITY_TEXT;
00946 break;
00947 case BusinessRejectReason_UNKNOWN_MESSAGE_TYPE:
00948 reason = BusinessRejectReason_UNSUPPORTED_MESSAGE_TYPE_TEXT;
00949 break;
00950 case BusinessRejectReason_APPLICATION_NOT_AVAILABLE:
00951 reason = BusinessRejectReason_APPLICATION_NOT_AVAILABLE_TEXT;
00952 break;
00953 case BusinessRejectReason_CONDITIONALLY_REQUIRED_FIELD_MISSING:
00954 reason = BusinessRejectReason_CONDITIONALLY_REQUIRED_FIELD_MISSING_TEXT;
00955 break;
00956 case BusinessRejectReason_NOT_AUTHORIZED:
00957 reason = BusinessRejectReason_NOT_AUTHORIZED_TEXT;
00958 break;
00959 case BusinessRejectReason_DELIVERTO_FIRM_NOT_AVAILABLE_AT_THIS_TIME:
00960 reason = BusinessRejectReason_DELIVERTO_FIRM_NOT_AVAILABLE_AT_THIS_TIME_TEXT;
00961 break;
00962 };
00963
00964 if ( reason && field )
00965 {
00966 populateRejectReason( reject, field, reason );
00967 m_state.onEvent( "Message " + msgSeqNum.getString() + " Rejected: "
00968 + reason + ":" + IntConvertor::convert( field ) );
00969 }
00970 else if ( reason )
00971 {
00972 populateRejectReason( reject, reason );
00973 m_state.onEvent( "Message " + msgSeqNum.getString()
00974 + " Rejected: " + reason );
00975 }
00976 else
00977 m_state.onEvent( "Message " + msgSeqNum.getString() + " Rejected" );
00978
00979 sendRaw( reject );
00980
00981 QF_STACK_POP
00982 }
00983
00984 void Session::generateLogout( const std::string& text )
00985 { QF_STACK_PUSH(Session::generateLogout)
00986
00987 Message logout;
00988 logout.getHeader().setField( MsgType( MsgType_Logout ) );
00989 fill( logout.getHeader() );
00990 if ( text.length() )
00991 logout.setField( Text( text ) );
00992 sendRaw( logout );
00993 m_state.sentLogout( true );
00994
00995 QF_STACK_POP
00996 }
00997
00998 void Session::populateRejectReason( Message& reject, int field,
00999 const std::string& text )
01000 { QF_STACK_PUSH(Session::populateRejectReason)
01001
01002 MsgType msgType;
01003 reject.getHeader().getField( msgType );
01004
01005 if ( msgType == MsgType_Reject
01006 && m_sessionID.getBeginString() >= FIX::BeginString_FIX42 )
01007 {
01008 reject.setField( RefTagID( field ) );
01009 reject.setField( Text( text ) );
01010 }
01011 else
01012 {
01013 std::stringstream stream;
01014 stream << text << " (" << field << ")";
01015 reject.setField( Text( stream.str() ) );
01016 }
01017
01018 QF_STACK_POP
01019 }
01020
01021 void Session::populateRejectReason( Message& reject, const std::string& text )
01022 { QF_STACK_PUSH(Session::populateRejectReason)
01023 reject.setField( Text( text ) );
01024 QF_STACK_POP
01025 }
01026
01027 bool Session::verify( const Message& msg, bool checkTooHigh,
01028 bool checkTooLow )
01029 { QF_STACK_PUSH(Session::verify)
01030
01031 const MsgType* pMsgType = 0;
01032 const MsgSeqNum* pMsgSeqNum = 0;
01033
01034 try
01035 {
01036 const Header& header = msg.getHeader();
01037
01038 pMsgType = FIELD_GET_PTR( header, MsgType );
01039 const SenderCompID& senderCompID = FIELD_GET_REF( header, SenderCompID );
01040 const TargetCompID& targetCompID = FIELD_GET_REF( header, TargetCompID );
01041 const SendingTime& sendingTime = FIELD_GET_REF( header, SendingTime );
01042
01043 if( checkTooHigh || checkTooLow )
01044 pMsgSeqNum = FIELD_GET_PTR( header, MsgSeqNum );
01045
01046 if ( !validLogonState( *pMsgType ) )
01047 throw std::logic_error( "Logon state is not valid for message" );
01048
01049 if ( !isGoodTime( sendingTime ) )
01050 {
01051 doBadTime( msg );
01052 return false;
01053 }
01054 if ( !isCorrectCompID( senderCompID, targetCompID ) )
01055 {
01056 doBadCompID( msg );
01057 return false;
01058 }
01059
01060 if ( checkTooHigh && isTargetTooHigh( *pMsgSeqNum ) )
01061 {
01062 doTargetTooHigh( msg );
01063 return false;
01064 }
01065 else if ( checkTooLow && isTargetTooLow( *pMsgSeqNum ) )
01066 {
01067 doTargetTooLow( msg );
01068 return false;
01069 }
01070
01071 if ( (checkTooHigh || checkTooLow) && m_state.resendRequested() )
01072 {
01073 SessionState::ResendRange range = m_state.resendRange();
01074
01075 if ( *pMsgSeqNum >= range.second )
01076 {
01077 m_state.onEvent ("ResendRequest for messages FROM: " +
01078 IntConvertor::convert (range.first) + " TO: " +
01079 IntConvertor::convert (range.second) +
01080 " has been satisfied.");
01081 m_state.resendRange (0, 0);
01082 }
01083 }
01084 }
01085 catch ( std::exception& e )
01086 {
01087 m_state.onEvent( e.what() );
01088 disconnect();
01089 return false;
01090 }
01091
01092 UtcTimeStamp now;
01093 m_state.lastReceivedTime( now );
01094 m_state.testRequest( 0 );
01095
01096 fromCallback( pMsgType ? *pMsgType : MsgType(), msg, m_sessionID );
01097 return true;
01098
01099 QF_STACK_POP
01100 }
01101
01102 bool Session::shouldSendReset()
01103 { QF_STACK_PUSH(Session::shouldSendReset)
01104
01105 std::string beginString = m_sessionID.getBeginString();
01106 return beginString >= FIX::BeginString_FIX41
01107 && ( m_resetOnLogon ||
01108 m_resetOnLogout ||
01109 m_resetOnDisconnect )
01110 && ( getExpectedSenderNum() == 1 )
01111 && ( getExpectedTargetNum() == 1 );
01112
01113 QF_STACK_POP
01114 }
01115
01116 bool Session::validLogonState( const MsgType& msgType )
01117 { QF_STACK_PUSH(Session::validLogonState)
01118
01119 if ( (msgType == MsgType_Logon && m_state.sentReset())
01120 || (m_state.receivedReset()) )
01121 return true;
01122 if ( (msgType == MsgType_Logon && !m_state.receivedLogon())
01123 || (msgType != MsgType_Logon && m_state.receivedLogon()) )
01124 return true;
01125 if ( msgType == MsgType_Logout && m_state.sentLogon() )
01126 return true;
01127 if ( msgType != MsgType_Logout && m_state.sentLogout() )
01128 return true;
01129 if ( msgType == MsgType_SequenceReset )
01130 return true;
01131 if ( msgType == MsgType_Reject )
01132 return true;
01133
01134 return false;
01135
01136 QF_STACK_POP
01137 }
01138
01139 void Session::fromCallback( const MsgType& msgType, const Message& msg,
01140 const SessionID& sessionID )
01141 { QF_STACK_PUSH(Session::fromCallback)
01142
01143 if ( Message::isAdminMsgType( msgType ) )
01144 m_application.fromAdmin( msg, m_sessionID );
01145 else
01146 m_application.fromApp( msg, m_sessionID );
01147
01148 QF_STACK_POP
01149 }
01150
01151 void Session::doBadTime( const Message& msg )
01152 { QF_STACK_PUSH(Session::doBadTime)
01153
01154 generateReject( msg, SessionRejectReason_SENDINGTIME_ACCURACY_PROBLEM );
01155 generateLogout();
01156
01157 QF_STACK_POP
01158 }
01159
01160 void Session::doBadCompID( const Message& msg )
01161 { QF_STACK_PUSH(Session::doBadCompID)
01162
01163 generateReject( msg, SessionRejectReason_COMPID_PROBLEM );
01164 generateLogout();
01165
01166 QF_STACK_POP
01167 }
01168
01169 bool Session::doPossDup( const Message& msg )
01170 { QF_STACK_PUSH(Session::doPossDup)
01171
01172 const Header & header = msg.getHeader();
01173 OrigSendingTime origSendingTime;
01174 SendingTime sendingTime;
01175 MsgType msgType;
01176
01177 header.getField( msgType );
01178 header.getField( sendingTime );
01179
01180 if ( msgType != MsgType_SequenceReset )
01181 {
01182 if ( !header.isSetField( origSendingTime ) )
01183 {
01184 generateReject( msg, SessionRejectReason_REQUIRED_TAG_MISSING, origSendingTime.getField() );
01185 return false;
01186 }
01187 header.getField( origSendingTime );
01188
01189 if ( origSendingTime > sendingTime )
01190 {
01191 generateReject( msg, SessionRejectReason_SENDINGTIME_ACCURACY_PROBLEM );
01192 generateLogout();
01193 return false;
01194 }
01195 }
01196 return true;
01197
01198 QF_STACK_POP
01199 }
01200
01201 bool Session::doTargetTooLow( const Message& msg )
01202 { QF_STACK_PUSH(Session::doTargetTooLow)
01203
01204 const Header & header = msg.getHeader();
01205 PossDupFlag possDupFlag(false);
01206 MsgSeqNum msgSeqNum;
01207 if( header.isSetField(possDupFlag) )
01208 header.getField( possDupFlag );
01209 header.getField( msgSeqNum );
01210
01211 if ( !possDupFlag )
01212 {
01213 std::stringstream stream;
01214 stream << "MsgSeqNum too low, expecting " << getExpectedTargetNum()
01215 << " but received " << msgSeqNum;
01216 generateLogout( stream.str() );
01217 throw std::logic_error( stream.str() );
01218 }
01219
01220 return doPossDup( msg );
01221
01222 QF_STACK_POP
01223 }
01224
01225 void Session::doTargetTooHigh( const Message& msg )
01226 { QF_STACK_PUSH(Session::doTargetTooHigh)
01227
01228 const Header & header = msg.getHeader();
01229 BeginString beginString;
01230 MsgSeqNum msgSeqNum;
01231 header.getField( beginString );
01232 header.getField( msgSeqNum );
01233
01234 m_state.onEvent( "MsgSeqNum too high, expecting "
01235 + IntConvertor::convert( getExpectedTargetNum() )
01236 + " but received "
01237 + IntConvertor::convert( msgSeqNum ) );
01238
01239 m_state.queue( msgSeqNum, msg );
01240
01241 if( m_state.resendRequested() )
01242 {
01243 SessionState::ResendRange range = m_state.resendRange();
01244
01245 if( !m_sendRedundantResendRequests && msgSeqNum >= range.first )
01246 {
01247 m_state.onEvent ("Already sent ResendRequest FROM: " +
01248 IntConvertor::convert (range.first) + " TO: " +
01249 IntConvertor::convert (range.second) +
01250 ". Not sending another.");
01251 return;
01252 }
01253 }
01254
01255 generateResendRequest( beginString, msgSeqNum );
01256
01257 QF_STACK_POP
01258 }
01259
01260 void Session::nextQueued( const UtcTimeStamp& timeStamp )
01261 { QF_STACK_PUSH(Session::nextQueued)
01262 while ( nextQueued( getExpectedTargetNum(), timeStamp ) ) {}
01263 QF_STACK_POP
01264 }
01265
01266 bool Session::nextQueued( int num, const UtcTimeStamp& timeStamp )
01267 { QF_STACK_PUSH(Session::nextQueued)
01268
01269 Message msg;
01270 MsgType msgType;
01271
01272 if( m_state.retrieve( num, msg ) )
01273 {
01274 m_state.onEvent( "Processing QUEUED message: "
01275 + IntConvertor::convert( num ) );
01276 msg.getHeader().getField( msgType );
01277 if( msgType == MsgType_Logon
01278 || msgType == MsgType_ResendRequest )
01279 {
01280 m_state.incrNextTargetMsgSeqNum();
01281 }
01282 else
01283 {
01284 next( msg, timeStamp, true );
01285 }
01286 return true;
01287 }
01288 return false;
01289
01290 QF_STACK_POP
01291 }
01292
01293 void Session::next( const std::string& msg, const UtcTimeStamp& timeStamp, bool queued )
01294 { QF_STACK_PUSH(Session::next)
01295
01296 try
01297 {
01298 m_state.onIncoming( msg );
01299 const DataDictionary& sessionDD =
01300 m_dataDictionaryProvider.getSessionDataDictionary(m_sessionID.getBeginString());
01301 if( m_sessionID.isFIXT() )
01302 {
01303 const DataDictionary& applicationDD =
01304 m_dataDictionaryProvider.getApplicationDataDictionary(m_senderDefaultApplVerID);
01305 next( Message( msg, sessionDD, applicationDD ), timeStamp, queued );
01306 }
01307 else
01308 {
01309 next( Message( msg, sessionDD ), timeStamp, queued );
01310 }
01311 }
01312 catch( InvalidMessage& e )
01313 {
01314 m_state.onEvent( e.what() );
01315
01316 try
01317 {
01318 if( identifyType(msg) == MsgType_Logon )
01319 {
01320 m_state.onEvent( "Logon message is not valid" );
01321 disconnect();
01322 }
01323 } catch( MessageParseError& ) {}
01324 throw e;
01325 }
01326
01327 QF_STACK_POP
01328 }
01329
01330 void Session::next( const Message& message, const UtcTimeStamp& timeStamp, bool queued )
01331 { QF_STACK_PUSH(Session::next)
01332
01333 const Header& header = message.getHeader();
01334
01335 try
01336 {
01337 if ( !checkSessionTime(timeStamp) )
01338 { reset(); return; }
01339
01340 const MsgType& msgType = FIELD_GET_REF( header, MsgType );
01341 const BeginString& beginString = FIELD_GET_REF( header, BeginString );
01342 FIELD_GET_REF( header, SenderCompID );
01343 FIELD_GET_REF( header, TargetCompID );
01344
01345 if ( beginString != m_sessionID.getBeginString() )
01346 throw UnsupportedVersion();
01347
01348 if( msgType == MsgType_Logon )
01349 {
01350 if( m_sessionID.isFIXT() )
01351 {
01352 const DefaultApplVerID& applVerID = FIELD_GET_REF( message, DefaultApplVerID );
01353 setTargetDefaultApplVerID(applVerID);
01354 }
01355 else
01356 {
01357 setTargetDefaultApplVerID(Message::toApplVerID(beginString));
01358 }
01359 }
01360
01361 const DataDictionary& sessionDataDictionary =
01362 m_dataDictionaryProvider.getSessionDataDictionary(m_sessionID.getBeginString());
01363
01364 if( m_sessionID.isFIXT() && message.isApp() )
01365 {
01366 ApplVerID applVerID = m_targetDefaultApplVerID;
01367 if( header.isSetField(FIELD::ApplVerID) )
01368 header.getField(applVerID);
01369 const DataDictionary& applicationDataDictionary =
01370 m_dataDictionaryProvider.getApplicationDataDictionary(applVerID);
01371 DataDictionary::validate( message, &sessionDataDictionary, &applicationDataDictionary );
01372 }
01373 else
01374 {
01375 sessionDataDictionary.validate( message );
01376 }
01377
01378 if ( msgType == MsgType_Logon )
01379 nextLogon( message, timeStamp );
01380 else if ( msgType == MsgType_Heartbeat )
01381 nextHeartbeat( message, timeStamp );
01382 else if ( msgType == MsgType_TestRequest )
01383 nextTestRequest( message, timeStamp );
01384 else if ( msgType == MsgType_SequenceReset )
01385 nextSequenceReset( message, timeStamp );
01386 else if ( msgType == MsgType_Logout )
01387 nextLogout( message, timeStamp );
01388 else if ( msgType == MsgType_ResendRequest )
01389 nextResendRequest( message,timeStamp );
01390 else if ( msgType == MsgType_Reject )
01391 nextReject( message, timeStamp );
01392 else
01393 {
01394 if ( !verify( message ) ) return ;
01395 m_state.incrNextTargetMsgSeqNum();
01396 }
01397 }
01398 catch ( MessageParseError& e )
01399 { m_state.onEvent( e.what() ); }
01400 catch ( RequiredTagMissing & e )
01401 { LOGEX( generateReject( message, SessionRejectReason_REQUIRED_TAG_MISSING, e.field ) ); }
01402 catch ( FieldNotFound & e )
01403 {
01404 if( header.getField(FIELD::BeginString) >= FIX::BeginString_FIX42 && message.isApp() )
01405 {
01406 LOGEX( generateBusinessReject( message, BusinessRejectReason_CONDITIONALLY_REQUIRED_FIELD_MISSING, e.field ) );
01407 }
01408 else
01409 {
01410 LOGEX( generateReject( message, SessionRejectReason_REQUIRED_TAG_MISSING, e.field ) );
01411 if ( header.getField(FIELD::MsgType) == MsgType_Logon )
01412 {
01413 m_state.onEvent( "Required field missing from logon" );
01414 disconnect();
01415 }
01416 }
01417 }
01418 catch ( InvalidTagNumber & e )
01419 { LOGEX( generateReject( message, SessionRejectReason_INVALID_TAG_NUMBER, e.field ) ); }
01420 catch ( NoTagValue & e )
01421 { LOGEX( generateReject( message, SessionRejectReason_TAG_SPECIFIED_WITHOUT_A_VALUE, e.field ) ); }
01422 catch ( TagNotDefinedForMessage & e )
01423 { LOGEX( generateReject( message, SessionRejectReason_TAG_NOT_DEFINED_FOR_THIS_MESSAGE_TYPE, e.field ) ); }
01424 catch ( InvalidMessageType& )
01425 { LOGEX( generateReject( message, SessionRejectReason_INVALID_MSGTYPE ) ); }
01426 catch ( UnsupportedMessageType& )
01427 {
01428 if ( header.getField(FIELD::BeginString) >= FIX::BeginString_FIX42 )
01429 { LOGEX( generateBusinessReject( message, BusinessRejectReason_UNKNOWN_MESSAGE_TYPE ) ); }
01430 else
01431 { LOGEX( generateReject( message, "Unsupported message type" ) ); }
01432 }
01433 catch ( TagOutOfOrder & e )
01434 { LOGEX( generateReject( message, SessionRejectReason_TAG_SPECIFIED_OUT_OF_REQUIRED_ORDER, e.field ) ); }
01435 catch ( IncorrectDataFormat & e )
01436 { LOGEX( generateReject( message, SessionRejectReason_INCORRECT_DATA_FORMAT_FOR_VALUE, e.field ) ); }
01437 catch ( IncorrectTagValue & e )
01438 { LOGEX( generateReject( message, SessionRejectReason_VALUE_IS_INCORRECT, e.field ) ); }
01439 catch ( RepeatedTag & e )
01440 { LOGEX( generateReject( message, SessionRejectReason_TAG_APPEARS_MORE_THAN_ONCE, e.field ) ); }
01441 catch ( RepeatingGroupCountMismatch & e )
01442 { LOGEX( generateReject( message, SessionRejectReason_INCORRECT_NUMINGROUP_COUNT_FOR_REPEATING_GROUP, e.field ) ); }
01443 catch ( InvalidMessage& e )
01444 { m_state.onEvent( e.what() ); }
01445 catch ( RejectLogon& e )
01446 {
01447 m_state.onEvent( e.what() );
01448 generateLogout( e.what() );
01449 disconnect();
01450 }
01451 catch ( UnsupportedVersion& )
01452 {
01453 if ( header.getField(FIELD::MsgType) == MsgType_Logout )
01454 nextLogout( message, timeStamp );
01455 else
01456 {
01457 generateLogout( "Incorrect BeginString" );
01458 m_state.incrNextTargetMsgSeqNum();
01459 }
01460 }
01461 catch ( IOException& e )
01462 {
01463 m_state.onEvent( e.what() );
01464 disconnect();
01465 }
01466
01467 if( !queued )
01468 nextQueued( timeStamp );
01469
01470 if( isLoggedOn() )
01471 next();
01472
01473 QF_STACK_POP
01474 }
01475
01476 bool Session::sendToTarget( Message& message, const std::string& qualifier )
01477 throw( SessionNotFound )
01478 { QF_STACK_PUSH(Session::sendToTarget)
01479
01480 try
01481 {
01482 SessionID sessionID = message.getSessionID( qualifier );
01483 return sendToTarget( message, sessionID );
01484 }
01485 catch ( FieldNotFound& ) { throw SessionNotFound(); }
01486
01487 QF_STACK_POP
01488 }
01489
01490 bool Session::sendToTarget( Message& message, const SessionID& sessionID )
01491 throw( SessionNotFound )
01492 { QF_STACK_PUSH(Session::sendToTarget)
01493
01494 message.setSessionID( sessionID );
01495 Session* pSession = lookupSession( sessionID );
01496 if ( !pSession ) throw SessionNotFound();
01497 return pSession->send( message );
01498
01499 QF_STACK_POP
01500 }
01501
01502 bool Session::sendToTarget
01503 ( Message& message,
01504 const SenderCompID& senderCompID,
01505 const TargetCompID& targetCompID,
01506 const std::string& qualifier )
01507 throw( SessionNotFound )
01508 { QF_STACK_PUSH(Session::sendToTarget)
01509
01510 message.getHeader().setField( senderCompID );
01511 message.getHeader().setField( targetCompID );
01512 return sendToTarget( message, qualifier );
01513
01514 QF_STACK_POP
01515 }
01516
01517 bool Session::sendToTarget
01518 ( Message& message, const std::string& sender, const std::string& target,
01519 const std::string& qualifier )
01520 throw( SessionNotFound )
01521 { QF_STACK_PUSH(Session::sendToTarget)
01522
01523 return sendToTarget( message, SenderCompID( sender ),
01524 TargetCompID( target ), qualifier );
01525
01526 QF_STACK_POP
01527 }
01528
01529 std::set<SessionID> Session::getSessions()
01530 {
01531 return s_sessionIDs;
01532 }
01533
01534 bool Session::doesSessionExist( const SessionID& sessionID )
01535 { QF_STACK_PUSH(Session::doesSessionExist)
01536
01537 Locker locker( s_mutex );
01538 return s_sessions.end() != s_sessions.find( sessionID );
01539
01540 QF_STACK_POP
01541 }
01542
01543 Session* Session::lookupSession( const SessionID& sessionID )
01544 { QF_STACK_PUSH(Session::lookupSession)
01545
01546 Locker locker( s_mutex );
01547 Sessions::iterator find = s_sessions.find( sessionID );
01548 if ( find != s_sessions.end() )
01549 return find->second;
01550 else
01551 return 0;
01552
01553 QF_STACK_POP
01554 }
01555
01556 Session* Session::lookupSession( const std::string& string, bool reverse )
01557 { QF_STACK_PUSH(Session::lookupSession)
01558
01559 Message message;
01560 if ( !message.setStringHeader( string ) )
01561 return 0;
01562
01563 try
01564 {
01565 const Header& header = message.getHeader();
01566 const BeginString& beginString = FIELD_GET_REF( header, BeginString );
01567 const SenderCompID& senderCompID = FIELD_GET_REF( header, SenderCompID );
01568 const TargetCompID& targetCompID = FIELD_GET_REF( header, TargetCompID );
01569
01570 if ( reverse )
01571 {
01572 return lookupSession( SessionID( beginString, SenderCompID( targetCompID ),
01573 TargetCompID( senderCompID ) ) );
01574 }
01575
01576 return lookupSession( SessionID( beginString, senderCompID,
01577 targetCompID ) );
01578 }
01579 catch ( FieldNotFound& ) { return 0; }
01580
01581 QF_STACK_POP
01582 }
01583
01584 bool Session::isSessionRegistered( const SessionID& sessionID )
01585 { QF_STACK_PUSH(Session::isSessionRegistered)
01586
01587 Locker locker( s_mutex );
01588 return s_registered.end() != s_registered.find( sessionID );
01589
01590 QF_STACK_POP
01591 }
01592
01593 Session* Session::registerSession( const SessionID& sessionID )
01594 { QF_STACK_PUSH(Session::registerSession)
01595
01596 Locker locker( s_mutex );
01597 Session* pSession = lookupSession( sessionID );
01598 if ( pSession == 0 ) return 0;
01599 if ( isSessionRegistered( sessionID ) ) return 0;
01600 s_registered[ sessionID ] = pSession;
01601 return pSession;
01602
01603 QF_STACK_POP
01604 }
01605
01606 void Session::unregisterSession( const SessionID& sessionID )
01607 { QF_STACK_PUSH(Session::unregisterSession)
01608 Locker locker( s_mutex );
01609 s_registered.erase( sessionID );
01610 QF_STACK_POP
01611 }
01612
01613 int Session::numSessions()
01614 { QF_STACK_PUSH(Session::numSessions)
01615 Locker locker( s_mutex );
01616 return s_sessions.size();
01617 QF_STACK_POP
01618 }
01619
01620 bool Session::addSession( Session& s )
01621 { QF_STACK_PUSH(Session::addSession)
01622
01623 Locker locker( s_mutex );
01624 Sessions::iterator it = s_sessions.find( s.m_sessionID );
01625 if ( it == s_sessions.end() )
01626 {
01627 s_sessions[ s.m_sessionID ] = &s;
01628 s_sessionIDs.insert( s.m_sessionID );
01629 return true;
01630 }
01631 else
01632 return false;
01633
01634 QF_STACK_POP
01635 }
01636
01637 void Session::removeSession( Session& s )
01638 { QF_STACK_PUSH(Session::removeSession)
01639
01640 Locker locker( s_mutex );
01641 s_sessions.erase( s.m_sessionID );
01642 s_sessionIDs.erase( s.m_sessionID );
01643 s_registered.erase( s.m_sessionID );
01644
01645 QF_STACK_POP
01646 }
01647 }