ThreadedSocketAcceptor.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "ThreadedSocketAcceptor.h"
00028 #include "Settings.h"
00029 #include "Utility.h"
00030
00031 namespace FIX
00032 {
00033 ThreadedSocketAcceptor::ThreadedSocketAcceptor(
00034 Application& application,
00035 MessageStoreFactory& factory,
00036 const SessionSettings& settings ) throw( ConfigError )
00037 : Acceptor( application, factory, settings )
00038 { socket_init(); }
00039
00040 ThreadedSocketAcceptor::ThreadedSocketAcceptor(
00041 Application& application,
00042 MessageStoreFactory& factory,
00043 const SessionSettings& settings,
00044 LogFactory& logFactory ) throw( ConfigError )
00045 : Acceptor( application, factory, settings, logFactory )
00046 {
00047 socket_init();
00048 }
00049
00050 ThreadedSocketAcceptor::~ThreadedSocketAcceptor()
00051 {
00052 socket_term();
00053 }
00054
00055 void ThreadedSocketAcceptor::onConfigure( const SessionSettings& s )
00056 throw ( ConfigError )
00057 { QF_STACK_PUSH(ThreadedSocketAcceptor::onConfigure)
00058
00059 std::set<SessionID> sessions = s.getSessions();
00060 std::set<SessionID>::iterator i;
00061 for( i = sessions.begin(); i != sessions.end(); ++i )
00062 {
00063 const Dictionary& settings = s.get( *i );
00064 settings.getLong( SOCKET_ACCEPT_PORT );
00065 if( settings.has(SOCKET_REUSE_ADDRESS) )
00066 settings.getBool( SOCKET_REUSE_ADDRESS );
00067 if( settings.has(SOCKET_NODELAY) )
00068 settings.getBool( SOCKET_NODELAY );
00069 }
00070
00071 QF_STACK_POP
00072 }
00073
00074 void ThreadedSocketAcceptor::onInitialize( const SessionSettings& s )
00075 throw ( RuntimeError )
00076 { QF_STACK_PUSH(ThreadedSocketAcceptor::onInitialize)
00077
00078 short port = 0;
00079 std::set<int> ports;
00080
00081 std::set<SessionID> sessions = s.getSessions();
00082 std::set<SessionID>::iterator i = sessions.begin();
00083 for( ; i != sessions.end(); ++i )
00084 {
00085 Dictionary settings = s.get( *i );
00086 port = (short)settings.getLong( SOCKET_ACCEPT_PORT );
00087
00088 m_portToSessions[port].insert( *i );
00089
00090 if( ports.find(port) != ports.end() )
00091 continue;
00092 ports.insert( port );
00093
00094 const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ?
00095 s.get().getBool( SOCKET_REUSE_ADDRESS ) : true;
00096
00097 const bool noDelay = settings.has( SOCKET_NODELAY ) ?
00098 s.get().getBool( SOCKET_NODELAY ) : false;
00099
00100 const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
00101 s.get().getLong( SOCKET_SEND_BUFFER_SIZE ) : 0;
00102
00103 const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
00104 s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
00105
00106 int socket = socket_createAcceptor( port, reuseAddress );
00107 if( socket < 0 )
00108 {
00109 SocketException e;
00110 socket_close( socket );
00111 throw RuntimeError( "Unable to create, bind, or listen to port "
00112 + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
00113 }
00114 if( noDelay )
00115 socket_setsockopt( socket, TCP_NODELAY );
00116 if( sendBufSize )
00117 socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
00118 if( rcvBufSize )
00119 socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
00120
00121 m_socketToPort[socket] = port;
00122 m_sockets.insert( socket );
00123 }
00124
00125 QF_STACK_POP
00126 }
00127
00128 void ThreadedSocketAcceptor::onStart()
00129 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStart)
00130
00131 Sockets::iterator i;
00132 for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
00133 {
00134 Locker l( m_mutex );
00135 int port = m_socketToPort[*i];
00136 AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
00137 thread_id thread;
00138 thread_spawn( &socketAcceptorThread, info, thread );
00139 addThread( *i, thread );
00140 }
00141
00142 QF_STACK_POP
00143 }
00144
00145 bool ThreadedSocketAcceptor::onPoll( double timeout )
00146 { QF_STACK_PUSH(ThreadedSocketAcceptor::onPoll)
00147
00148 return false;
00149
00150 QF_STACK_POP
00151 }
00152
00153 void ThreadedSocketAcceptor::onStop()
00154 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStop)
00155
00156 SocketToThread threads;
00157 SocketToThread::iterator i;
00158
00159 {
00160 Locker l(m_mutex);
00161
00162 time_t start = 0;
00163 time_t now = 0;
00164
00165 ::time( &start );
00166 while ( isLoggedOn() )
00167 {
00168 if( ::time(&now) -5 >= start )
00169 break;
00170 }
00171
00172 threads = m_threads;
00173 m_threads.clear();
00174 }
00175
00176 for ( i = threads.begin(); i != threads.end(); ++i )
00177 socket_close( i->first );
00178 for ( i = threads.begin(); i != threads.end(); ++i )
00179 thread_join( i->second );
00180
00181 QF_STACK_POP
00182 }
00183
00184 void ThreadedSocketAcceptor::addThread( int s, thread_id t )
00185 { QF_STACK_PUSH(ThreadedSocketAcceptor::addThread)
00186
00187 Locker l(m_mutex);
00188
00189 m_threads[ s ] = t;
00190
00191 QF_STACK_POP
00192 }
00193
00194 void ThreadedSocketAcceptor::removeThread( int s )
00195 { QF_STACK_PUSH(ThreadedSocketAcceptor::removeThread)
00196
00197 Locker l(m_mutex);
00198 SocketToThread::iterator i = m_threads.find( s );
00199 if ( i != m_threads.end() )
00200 {
00201 thread_detach( i->second );
00202 m_threads.erase( i );
00203 }
00204
00205 QF_STACK_POP
00206 }
00207
00208 THREAD_PROC ThreadedSocketAcceptor::socketAcceptorThread( void* p )
00209 { QF_STACK_TRY
00210 QF_STACK_PUSH(ThreadedSocketAcceptor::socketAcceptorThread)
00211
00212 AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
00213
00214 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00215 int s = info->m_socket;
00216 int port = info->m_port;
00217 delete info;
00218
00219 int noDelay = 0;
00220 int sendBufSize = 0;
00221 int rcvBufSize = 0;
00222 socket_getsockopt( s, TCP_NODELAY, noDelay );
00223 socket_getsockopt( s, SO_SNDBUF, sendBufSize );
00224 socket_getsockopt( s, SO_RCVBUF, rcvBufSize );
00225
00226 int socket = 0;
00227 while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
00228 {
00229 if( noDelay )
00230 socket_setsockopt( socket, TCP_NODELAY );
00231 if( sendBufSize )
00232 socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
00233 if( rcvBufSize )
00234 socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
00235
00236 Sessions sessions = pAcceptor->m_portToSessions[port];
00237
00238 ThreadedSocketConnection * pConnection =
00239 new ThreadedSocketConnection
00240 ( socket, sessions, pAcceptor->getApplication(), pAcceptor->getLog() );
00241
00242 ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
00243
00244 {
00245 Locker l( pAcceptor->m_mutex );
00246
00247 std::stringstream stream;
00248 stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
00249
00250 if( pAcceptor->getLog() )
00251 pAcceptor->getLog()->onEvent( stream.str() );
00252
00253 thread_id thread;
00254 if ( !thread_spawn( &socketConnectionThread, info, thread ) )
00255 delete info;
00256 pAcceptor->addThread( socket, thread );
00257 }
00258 }
00259
00260 if( !pAcceptor->isStopped() )
00261 pAcceptor->removeThread( s );
00262
00263 return 0;
00264
00265 QF_STACK_POP
00266 QF_STACK_CATCH
00267 }
00268
00269 THREAD_PROC ThreadedSocketAcceptor::socketConnectionThread( void* p )
00270 { QF_STACK_TRY
00271 QF_STACK_PUSH(ThreadedSocketAcceptor::socketConnectionThread)
00272
00273 ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
00274
00275 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00276 ThreadedSocketConnection* pConnection = info->m_pConnection;
00277 delete info;
00278
00279 int socket = pConnection->getSocket();
00280
00281 while ( pConnection->read() ) {}
00282 delete pConnection;
00283 if( !pAcceptor->isStopped() )
00284 pAcceptor->removeThread( socket );
00285 return 0;
00286
00287 QF_STACK_POP
00288 QF_STACK_CATCH
00289 }
00290 }