00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
00026 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
00027
00028 #include "XrdCl/XrdClPostMasterInterfaces.hh"
00029 #include "XrdCl/XrdClXRootDResponses.hh"
00030 #include "XrdCl/XrdClDefaultEnv.hh"
00031 #include "XrdCl/XrdClMessage.hh"
00032 #include "XProtocol/XProtocol.hh"
00033 #include "XrdCl/XrdClLog.hh"
00034 #include "XrdCl/XrdClConstants.hh"
00035
00036 #include "XrdSys/XrdSysPthread.hh"
00037
00038 #include <sys/uio.h>
00039
00040 #include <list>
00041 #include <memory>
00042
00043 #if __cplusplus >= 201103L
00044 #include <atomic>
00045 #endif
00046
00047 namespace XrdCl
00048 {
00049 class PostMaster;
00050 class SIDManager;
00051 class URL;
00052 class LocalFileHandler;
00053
00054
00055
00056
00057 struct RedirectEntry
00058 {
00059 enum Type
00060 {
00061 EntryRedirect,
00062 EntryRedirectOnWait,
00063 EntryRetry,
00064 EntryWait
00065 };
00066
00067 RedirectEntry( const URL &from, const URL &to, Type type ) :
00068 from( from ), to( to ), type( type )
00069 {
00070
00071 }
00072
00073 URL from;
00074 URL to;
00075 Type type;
00076 XRootDStatus status;
00077
00078 std::string ToString( bool prevok = true )
00079 {
00080 const std::string tostr = to.GetLocation();
00081 const std::string fromstr = from.GetLocation();
00082
00083 if( prevok )
00084 {
00085 switch( type )
00086 {
00087 case EntryRedirect: return "Redirected from: " + fromstr + " to: "
00088 + tostr;
00089
00090 case EntryRedirectOnWait: return "Server responded with wait. "
00091 "Falling back to virtual redirector: " + tostr;
00092
00093 case EntryRetry: return "Retrying: " + tostr;
00094
00095 case EntryWait: return "Waited at server request. Resending: "
00096 + tostr;
00097 }
00098 }
00099 return "Failed at: " + fromstr + ", retrying at: " + tostr;
00100 }
00101 };
00102
00103
00105
00106 class XRootDMsgHandler: public IncomingMsgHandler,
00107 public OutgoingMsgHandler
00108 {
00109 friend class HandleRspJob;
00110
00111 public:
00112
00121
00122 XRootDMsgHandler( Message *msg,
00123 ResponseHandler *respHandler,
00124 const URL *url,
00125 SIDManager *sidMgr,
00126 LocalFileHandler *lFileHandler):
00127 pRequest( msg ),
00128 pResponse( 0 ),
00129 pResponseHandler( respHandler ),
00130 pUrl( *url ),
00131 pEffectiveDataServerUrl( 0 ),
00132 pSidMgr( sidMgr ),
00133 pLFileHandler( lFileHandler ),
00134 pExpiration( 0 ),
00135 pRedirectAsAnswer( false ),
00136 pOksofarAsAnswer( false ),
00137 pHosts( 0 ),
00138 pHasLoadBalancer( false ),
00139 pHasSessionId( false ),
00140 pChunkList( 0 ),
00141 pRedirectCounter( 0 ),
00142 pNotAuthorizedCounter( 0 ),
00143
00144 pAsyncOffset( 0 ),
00145 pAsyncReadSize( 0 ),
00146 pAsyncReadBuffer( 0 ),
00147 pAsyncMsgSize( 0 ),
00148
00149 pReadRawStarted( false ),
00150 pReadRawCurrentOffset( 0 ),
00151
00152 pReadVRawMsgOffset( 0 ),
00153 pReadVRawChunkHeaderDone( false ),
00154 pReadVRawChunkHeaderStarted( false ),
00155 pReadVRawSizeError( false ),
00156 pReadVRawChunkIndex( 0 ),
00157 pReadVRawMsgDiscard( false ),
00158
00159 pOtherRawStarted( false ),
00160
00161 pFollowMetalink( false ),
00162
00163 pStateful( false ),
00164
00165 pAggregatedWaitTime( 0 ),
00166
00167 pMsgInFly( false ),
00168
00169 pTimeoutFence( false ),
00170
00171 pDirListStarted( false ),
00172 pDirListWithStat( false ),
00173
00174 pCV( 0 )
00175
00176 {
00177 pPostMaster = DefaultEnv::GetPostMaster();
00178 if( msg->GetSessionId() )
00179 pHasSessionId = true;
00180 memset( &pReadVRawChunkHeader, 0, sizeof( readahead_list ) );
00181
00182 Log *log = DefaultEnv::GetLog();
00183 log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
00184 pUrl.GetHostId().c_str(), this,
00185 pRequest->GetDescription().c_str() );
00186 }
00187
00188
00190
00191 ~XRootDMsgHandler()
00192 {
00193 DumpRedirectTraceBack();
00194
00195 if( !pHasSessionId )
00196 delete pRequest;
00197 delete pResponse;
00198 std::vector<Message *>::iterator it;
00199 for( it = pPartialResps.begin(); it != pPartialResps.end(); ++it )
00200 delete *it;
00201
00202 delete pEffectiveDataServerUrl;
00203
00204 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
00205 pResponse = reinterpret_cast<Message*>( 0xDEADBEEF );
00206 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
00207 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
00208 pSidMgr = reinterpret_cast<SIDManager*>( 0xDEADBEEF );
00209 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
00210 pHosts = reinterpret_cast<HostList*>( 0xDEADBEEF );
00211 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
00212 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
00213
00214 Log *log = DefaultEnv::GetLog();
00215 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
00216 pUrl.GetHostId().c_str(), this );
00217 }
00218
00219
00225
00226 virtual uint16_t Examine( Message *msg );
00227
00228
00232
00233 virtual uint16_t GetSid() const;
00234
00235
00239
00240 virtual void Process( Message *msg );
00241
00242
00252
00253 virtual Status ReadMessageBody( Message *msg,
00254 int socket,
00255 uint32_t &bytesRead );
00256
00257
00263
00264 virtual uint8_t OnStreamEvent( StreamEvent event,
00265 uint16_t streamNum,
00266 Status status );
00267
00268
00270
00271 virtual void OnStatusReady( const Message *message,
00272 Status status );
00273
00274
00276
00277 virtual bool IsRaw() const;
00278
00279
00288
00289 Status WriteMessageBody( int socket,
00290 uint32_t &bytesRead );
00291
00292
00297
00298 ChunkList* GetMessageBody( uint32_t *&asyncOffset )
00299 {
00300 asyncOffset = &pAsyncOffset;
00301 return pChunkList;
00302 }
00303
00304
00308
00309 void WaitDone( time_t now );
00310
00311
00313
00314 void SetExpiration( time_t expiration )
00315 {
00316 pExpiration = expiration;
00317 }
00318
00319
00322
00323 void SetRedirectAsAnswer( bool redirectAsAnswer )
00324 {
00325 pRedirectAsAnswer = redirectAsAnswer;
00326 }
00327
00328
00331
00332 void SetOksofarAsAnswer( bool oksofarAsAnswer )
00333 {
00334 pOksofarAsAnswer = oksofarAsAnswer;
00335 }
00336
00337
00339
00340 const Message *GetRequest() const
00341 {
00342 return pRequest;
00343 }
00344
00345
00347
00348 void SetLoadBalancer( const HostInfo &loadBalancer )
00349 {
00350 if( !loadBalancer.url.IsValid() )
00351 return;
00352 pLoadBalancer = loadBalancer;
00353 pHasLoadBalancer = true;
00354 }
00355
00356
00358
00359 void SetHostList( HostList *hostList )
00360 {
00361 delete pHosts;
00362 pHosts = hostList;
00363 }
00364
00365
00367
00368 void SetChunkList( ChunkList *chunkList )
00369 {
00370 pChunkList = chunkList;
00371 if( chunkList )
00372 pChunkStatus.resize( chunkList->size() );
00373 else
00374 pChunkStatus.clear();
00375 }
00376
00377
00379
00380 void SetRedirectCounter( uint16_t redirectCounter )
00381 {
00382 pRedirectCounter = redirectCounter;
00383 }
00384
00385 void SetFollowMetalink( bool followMetalink )
00386 {
00387 pFollowMetalink = followMetalink;
00388 }
00389
00390 void SetStateful( bool stateful )
00391 {
00392 pStateful = stateful;
00393 }
00394
00395
00397
00398 void TakeDownTimeoutFence();
00399
00400 private:
00401
00403
00404 Status ReadRawRead( Message *msg,
00405 int socket,
00406 uint32_t &bytesRead );
00407
00408
00410
00411 Status ReadRawReadV( Message *msg,
00412 int socket,
00413 uint32_t &bytesRead );
00414
00415
00417
00418 Status ReadRawOther( Message *msg,
00419 int socket,
00420 uint32_t &bytesRead );
00421
00422
00425
00426 Status ReadAsync( int socket, uint32_t &btesRead );
00427
00428
00430
00431 void HandleError( Status status, Message *msg = 0 );
00432
00433
00435
00436 Status RetryAtServer( const URL &url, RedirectEntry::Type entryType );
00437
00438
00440
00441 void HandleResponse();
00442
00443
00445
00446 XRootDStatus *ProcessStatus();
00447
00448
00451
00452 Status ParseResponse( AnyObject *&response );
00453
00454
00457
00458 Status RewriteRequestRedirect( const URL &newUrl );
00459
00460
00462
00463 Status RewriteRequestWait();
00464
00465
00467
00468 Status PostProcessReadV( VectorReadInfo *vReadInfo );
00469
00470
00472
00473 Status UnPackReadVResponse( Message *msg );
00474
00475
00477
00478 void UpdateTriedCGI(uint32_t errNo=0);
00479
00480
00482
00483 void SwitchOnRefreshFlag();
00484
00485
00488
00489 void HandleRspOrQueue();
00490
00491
00493
00494 void HandleLocalRedirect( URL *url );
00495
00496
00501
00502 bool IsRetriable( Message *request );
00503
00504
00511
00512 bool OmitWait( Message *request, const URL &url );
00513
00514
00520
00521 bool RetriableErrorResponse( const Status &status );
00522
00523
00525
00526 void DumpRedirectTraceBack();
00527
00528
00529
00530
00531 struct ChunkStatus
00532 {
00533 ChunkStatus(): sizeError( false ), done( false ) {}
00534 bool sizeError;
00535 bool done;
00536 };
00537
00538 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
00539
00540 Message *pRequest;
00541 Message *pResponse;
00542 std::vector<Message *> pPartialResps;
00543 ResponseHandler *pResponseHandler;
00544 URL pUrl;
00545 URL *pEffectiveDataServerUrl;
00546 PostMaster *pPostMaster;
00547 SIDManager *pSidMgr;
00548 LocalFileHandler *pLFileHandler;
00549 Status pStatus;
00550 Status pLastError;
00551 time_t pExpiration;
00552 bool pRedirectAsAnswer;
00553 bool pOksofarAsAnswer;
00554 HostList *pHosts;
00555 bool pHasLoadBalancer;
00556 HostInfo pLoadBalancer;
00557 bool pHasSessionId;
00558 std::string pRedirectUrl;
00559 ChunkList *pChunkList;
00560 std::vector<ChunkStatus> pChunkStatus;
00561 uint16_t pRedirectCounter;
00562 uint16_t pNotAuthorizedCounter;
00563
00564 uint32_t pAsyncOffset;
00565 uint32_t pAsyncReadSize;
00566 char* pAsyncReadBuffer;
00567 uint32_t pAsyncMsgSize;
00568
00569 bool pReadRawStarted;
00570 uint32_t pReadRawCurrentOffset;
00571
00572 uint32_t pReadVRawMsgOffset;
00573 bool pReadVRawChunkHeaderDone;
00574 bool pReadVRawChunkHeaderStarted;
00575 bool pReadVRawSizeError;
00576 int32_t pReadVRawChunkIndex;
00577 readahead_list pReadVRawChunkHeader;
00578 bool pReadVRawMsgDiscard;
00579
00580 bool pOtherRawStarted;
00581
00582 bool pFollowMetalink;
00583
00584 bool pStateful;
00585 int pAggregatedWaitTime;
00586
00587 std::unique_ptr<RedirectEntry> pRdirEntry;
00588 RedirectTraceBack pRedirectTraceBack;
00589
00590 bool pMsgInFly;
00591
00592
00593
00594
00595
00596
00597 #if __cplusplus >= 201103L
00598 std::atomic<bool> pTimeoutFence;
00599 #else
00600 bool pTimeoutFence;
00601 #endif
00602
00603
00604
00605
00606
00607
00608 bool pDirListStarted;
00609 bool pDirListWithStat;
00610
00611
00612
00613
00614
00615 XrdSysCondVar pCV;
00616 };
00617 }
00618
00619 #endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__