Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

Acceptor.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "Acceptor.h"
00028 #include "Utility.h"
00029 #include "Session.h"
00030 #include "SessionFactory.h"
00031 #include "HttpServer.h"
00032 #include <algorithm>
00033 #include <fstream>
00034 
00035 namespace FIX
00036 {
00037 Acceptor::Acceptor( Application& application,
00038                     MessageStoreFactory& messageStoreFactory,
00039                     const SessionSettings& settings )
00040 throw( ConfigError )
00041   : m_threadid( 0 ),
00042   m_application( application ),
00043   m_messageStoreFactory( messageStoreFactory ),
00044   m_settings( settings ),
00045   m_pLogFactory( 0 ),
00046   m_pLog( 0 ),
00047   m_firstPoll( true ),
00048   m_stop( true )
00049 {
00050   initialize();
00051 }
00052 
00053 Acceptor::Acceptor( Application& application,
00054                     MessageStoreFactory& messageStoreFactory,
00055                     const SessionSettings& settings,
00056                     LogFactory& logFactory )
00057 throw( ConfigError )
00058 : m_threadid( 0 ),
00059   m_application( application ),
00060   m_messageStoreFactory( messageStoreFactory ),
00061   m_settings( settings ),
00062   m_pLogFactory( &logFactory ),
00063   m_pLog( logFactory.create() ),
00064   m_firstPoll( true ),
00065   m_stop( true )
00066 {
00067   initialize();
00068 }
00069 
00070 void Acceptor::initialize() throw ( ConfigError )
00071 { QF_STACK_PUSH( Acceptor::initialize )
00072 
00073   std::set < SessionID > sessions = m_settings.getSessions();
00074   std::set < SessionID > ::iterator i;
00075 
00076   if ( !sessions.size() )
00077     throw ConfigError( "No sessions defined" );
00078 
00079   SessionFactory factory( m_application, m_messageStoreFactory,
00080                           m_pLogFactory );
00081 
00082   for ( i = sessions.begin(); i != sessions.end(); ++i )
00083   {
00084     if ( m_settings.get( *i ).getString( CONNECTION_TYPE ) == "acceptor" )
00085     {
00086       m_sessionIDs.insert( *i );
00087       m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
00088     }
00089   }
00090 
00091   if ( !m_sessions.size() )
00092     throw ConfigError( "No sessions defined for acceptor" );
00093 
00094   QF_STACK_POP
00095 }
00096 
00097 Acceptor::~Acceptor()
00098 { QF_STACK_IGNORE_BEGIN
00099 
00100   Sessions::iterator i;
00101   for ( i = m_sessions.begin(); i != m_sessions.end(); ++i )
00102     delete i->second;
00103 
00104   if( m_pLogFactory && m_pLog )
00105     m_pLogFactory->destroy( m_pLog );
00106   
00107   QF_STACK_IGNORE_END
00108 }
00109 
00110 Session* Acceptor::getSession
00111 ( const std::string& msg, Responder& responder )
00112 { QF_STACK_PUSH( Acceptor::getSession )
00113 
00114   Message message;
00115   if ( !message.setStringHeader( msg ) )
00116     return 0;
00117 
00118   BeginString beginString;
00119   SenderCompID clSenderCompID;
00120   TargetCompID clTargetCompID;
00121   MsgType msgType;
00122   try
00123   {
00124     message.getHeader().getField( beginString );
00125     message.getHeader().getField( clSenderCompID );
00126     message.getHeader().getField( clTargetCompID );
00127     message.getHeader().getField( msgType );
00128     if ( msgType != "A" ) return 0;
00129 
00130     SenderCompID senderCompID( clTargetCompID );
00131     TargetCompID targetCompID( clSenderCompID );
00132     SessionID sessionID( beginString, senderCompID, targetCompID );
00133 
00134     Sessions::iterator i = m_sessions.find( sessionID );
00135     if ( i != m_sessions.end() )
00136     {
00137       i->second->setResponder( &responder );
00138       return i->second;
00139     }
00140   }
00141   catch ( FieldNotFound& ) {}
00142   return 0;
00143 
00144   QF_STACK_POP
00145 }
00146 
00147 Session* Acceptor::getSession( const SessionID& sessionID ) const
00148 { QF_STACK_PUSH(Initiator::getSession)
00149 
00150   Sessions::const_iterator i = m_sessions.find( sessionID );
00151   if( i != m_sessions.end() )
00152     return i->second;
00153   else
00154     return 0;
00155 
00156   QF_STACK_POP
00157 }
00158 
00159 const Dictionary* const Acceptor::getSessionSettings( const SessionID& sessionID ) const
00160 { QF_STACK_PUSH(Initiator::getSessionSettings)
00161 
00162   try
00163   {
00164     return &m_settings.get( sessionID );
00165   }
00166   catch( ConfigError& )
00167   {
00168     return 0;
00169   }
00170 
00171   QF_STACK_POP
00172 }
00173 
00174 void Acceptor::start() throw ( ConfigError, RuntimeError )
00175 { QF_STACK_PUSH( Acceptor::start )
00176 
00177   m_stop = false;
00178   onConfigure( m_settings );
00179   onInitialize( m_settings );
00180 
00181   HttpServer::startGlobal( m_settings );
00182 
00183   if( !thread_spawn( &startThread, this, m_threadid ) )
00184     throw RuntimeError("Unable to spawn thread");
00185 
00186   QF_STACK_POP
00187 }
00188 
00189 void Acceptor::block() throw ( ConfigError, RuntimeError )
00190 { QF_STACK_PUSH( Acceptor::start )
00191 
00192   m_stop = false;
00193   onConfigure( m_settings );
00194   onInitialize( m_settings );
00195 
00196   startThread(this);
00197 
00198   QF_STACK_POP
00199 }
00200 
00201 bool Acceptor::poll( double timeout ) throw ( ConfigError, RuntimeError )
00202 { QF_STACK_PUSH( Acceptor::poll )
00203 
00204   if( m_firstPoll )
00205   {
00206     onConfigure( m_settings );
00207     onInitialize( m_settings );
00208     m_firstPoll = false;
00209   }
00210 
00211   return onPoll( timeout );
00212 
00213   QF_STACK_POP
00214 }
00215 
00216 void Acceptor::stop( bool force )
00217 { QF_STACK_PUSH( Acceptor::stop )
00218 
00219   if( isStopped() ) return;
00220 
00221   HttpServer::stopGlobal();
00222 
00223   std::vector<Session*> enabledSessions;
00224 
00225   Sessions sessions = m_sessions;
00226   Sessions::iterator i = sessions.begin();
00227   for ( ; i != sessions.end(); ++i )
00228   {
00229     Session* pSession = Session::lookupSession(i->first);
00230     if( pSession->isEnabled() )
00231     {
00232       enabledSessions.push_back( pSession );
00233       pSession->logout();
00234       Session::unregisterSession( pSession->getSessionID() );
00235     }
00236   }
00237 
00238   if( !force )
00239   {
00240     for ( int second = 1; second <= 10 && isLoggedOn(); ++second )
00241       process_sleep( 1 );
00242   }
00243 
00244   m_stop = true;
00245   onStop();
00246   if( m_threadid )
00247     thread_join( m_threadid );
00248   m_threadid = 0;
00249 
00250   std::vector<Session*>::iterator session = enabledSessions.begin();
00251   for( ; session != enabledSessions.end(); ++session )
00252     (*session)->logon();
00253 
00254   QF_STACK_POP
00255 }
00256 
00257 bool Acceptor::isLoggedOn()
00258 { QF_STACK_PUSH(Acceptor::isLoggedOn)
00259 
00260   Sessions sessions = m_sessions;
00261   Sessions::iterator i = sessions.begin();
00262   for ( ; i != sessions.end(); ++i )
00263   {
00264     if( i->second->isLoggedOn() )
00265       return true;
00266   }
00267   return false;
00268 
00269   QF_STACK_POP
00270 }
00271 
00272 THREAD_PROC Acceptor::startThread( void* p )
00273 { QF_STACK_TRY
00274   QF_STACK_PUSH( Acceptor::startThread )
00275 
00276   Acceptor * pAcceptor = static_cast < Acceptor* > ( p );
00277   pAcceptor->onStart();
00278   return 0;
00279 
00280   QF_STACK_POP
00281   QF_STACK_CATCH
00282 }
00283 }

Generated on Mon Apr 5 20:59:50 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001