00001 #ifndef _XrdClientPhyConnection
00002 #define _XrdClientPhyConnection
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00034
00035
00036
00038
00039
00040 #include "XrdClient/XrdClientSock.hh"
00041 #include "XrdClient/XrdClientMessage.hh"
00042 #include "XrdClient/XrdClientUnsolMsg.hh"
00043 #include "XrdClient/XrdClientInputBuffer.hh"
00044 #include "XrdClient/XrdClientUrlInfo.hh"
00045 #include "XrdSys/XrdSysPthread.hh"
00046 #include "XrdSys/XrdSysSemWait.hh"
00047
00048 #include <time.h>
00049
00050 enum ELoginState {
00051 kNo = 0,
00052 kYes = 1,
00053 kPending = 2
00054 };
00055
00056 enum ERemoteServerType {
00057 kSTError = -1,
00058 kSTNone = 0,
00059 kSTRootd = 1,
00060 kSTBaseXrootd = 2,
00061 kSTDataXrootd = 3,
00062 kSTMetaXrootd = 4
00063 };
00064
00065 class XrdClientSid;
00066 class XrdClientThread;
00067 class XrdSecProtocol;
00068
00069 class XrdClientPhyConnection: public XrdClientUnsolMsgSender {
00070
00071 private:
00072 time_t fLastUseTimestamp;
00073 enum ELoginState fLogged;
00074 XrdSecProtocol *fSecProtocol;
00075
00076 XrdClientInputBuffer
00077 fMsgQ;
00078
00079 int fRequestTimeout;
00080 bool fMStreamsGoing;
00081 XrdSysRecMutex fRwMutex;
00082
00083
00084 XrdSysRecMutex fMutex;
00085 XrdSysRecMutex fMultireadMutex;
00086
00087
00088 XrdClientThread *fReaderthreadhandler[64];
00089
00090
00091 int fReaderthreadrunning;
00092
00093 XrdClientUrlInfo fServer;
00094
00095 XrdClientSock *fSocket;
00096
00097 UnsolRespProcResult HandleUnsolicited(XrdClientMessage *m);
00098
00099 XrdSysSemWait fReaderCV;
00100
00101 short fLogConnCnt;
00102
00103 XrdClientSid *fSidManager;
00104
00105 public:
00106 long fServerProto;
00107 ERemoteServerType fServerType;
00108 long fTTLsec;
00109
00110 XrdClientPhyConnection(XrdClientAbsUnsolMsgHandler *h, XrdClientSid *sid);
00111 ~XrdClientPhyConnection();
00112
00113 XrdClientMessage *BuildMessage(bool IgnoreTimeouts, bool Enqueue);
00114 bool CheckAutoTerm();
00115
00116 bool Connect(XrdClientUrlInfo RemoteHost, bool isUnix = 0);
00117
00118
00126
00127 bool Connect( XrdClientUrlInfo RemoteHost, bool isUnix , int fd );
00128
00129 void CountLogConn(int d = 1);
00130 void Disconnect();
00131
00132 ERemoteServerType
00133 DoHandShake(ServerInitHandShake &xbody,
00134 int substreamid = 0);
00135
00136 bool ExpiredTTL();
00137 short GetLogConnCnt() const { return fLogConnCnt; }
00138 int GetReaderThreadsCnt() { XrdSysMutexHelper l(fMutex); return fReaderthreadrunning; }
00139
00140 long GetTTL() { return fTTLsec; }
00141
00142 XrdSecProtocol *GetSecProtocol() const { return fSecProtocol; }
00143 int GetSocket() { return fSocket ? fSocket->fSocket : -1; }
00144
00145
00146 void ReinitFDTable() { if (fSocket) fSocket->ReinitFDTable(); }
00147
00148 int SaveSocket() { fTTLsec = 0; return fSocket ? (fSocket->SaveSocket()) : -1; }
00149 void SetInterrupt() { if (fSocket) fSocket->SetInterrupt(); }
00150 void SetSecProtocol(XrdSecProtocol *sp) { fSecProtocol = sp; }
00151
00152 void StartedReader();
00153
00154 bool IsAddress(const XrdOucString &addr) {
00155 return ( (fServer.Host == addr) ||
00156 (fServer.HostAddr == addr) );
00157 }
00158
00159 ELoginState IsLogged();
00160
00161 bool IsPort(int port) { return (fServer.Port == port); };
00162 bool IsUser(const XrdOucString &usr) { return (fServer.User == usr); };
00163 bool IsValid();
00164
00165
00166 void LockChannel();
00167
00168
00169 int ReadRaw(void *buffer, int BufferLength, int substreamid = -1,
00170 int *usedsubstreamid = 0);
00171
00172 XrdClientMessage *ReadMessage(int streamid);
00173 bool ReConnect(XrdClientUrlInfo RemoteHost);
00174 void SetLogged(ELoginState status) { fLogged = status; }
00175 inline void SetTTL(long ttl) { fTTLsec = ttl; }
00176 void StartReader();
00177 void Touch();
00178 void UnlockChannel();
00179 int WriteRaw(const void *buffer, int BufferLength, int substreamid = 0);
00180
00181 int TryConnectParallelStream(int port, int windowsz, int sockid) { return ( fSocket ? fSocket->TryConnectParallelSock(port, windowsz, sockid) : -1); }
00182 int EstablishPendingParallelStream(int tmpid, int newid) { return ( fSocket ? fSocket->EstablishParallelSock(tmpid, newid) : -1); }
00183 void RemoveParallelStream(int substreamid) { if (fSocket) fSocket->RemoveParallelSock(substreamid); }
00184
00185
00186 bool TestAndSetMStreamsGoing();
00187
00188 int GetSockIdHint(int reqsperstream) { return ( fSocket ? fSocket->GetSockIdHint(reqsperstream) : 0); }
00189 int GetSockIdCount() {return ( fSocket ? fSocket->GetSockIdCount() : 0); }
00190 void PauseSelectOnSubstream(int substreamid) { if (fSocket) fSocket->PauseSelectOnSubstream(substreamid); }
00191 void RestartSelectOnSubstream(int substreamid) { if (fSocket) fSocket->RestartSelectOnSubstream(substreamid); }
00192
00193
00194 virtual void BanSockDescr(int sockdescr, int sockid) { if (fSocket) fSocket->BanSockDescr(sockdescr, sockid); }
00195 virtual void UnBanSockDescr(int sockdescr) { if (fSocket) fSocket->UnBanSockDescr(sockdescr); }
00196
00197 void ReadLock() { fMultireadMutex.Lock(); }
00198 void ReadUnLock() { fMultireadMutex.UnLock(); }
00199
00200 int WipeStreamid(int streamid) { return fMsgQ.WipeStreamid(streamid); }
00201 };
00202
00203
00204
00205
00206
00207
00208
00209 class XrdClientPhyConnLocker {
00210 private:
00211 XrdClientPhyConnection *phyconn;
00212
00213 public:
00214 XrdClientPhyConnLocker(XrdClientPhyConnection *phyc) {
00215
00216 phyconn = phyc;
00217 phyconn->LockChannel();
00218 }
00219
00220 ~XrdClientPhyConnLocker(){
00221
00222 phyconn->UnlockChannel();
00223 }
00224
00225 };
00226 #endif