XRootD
Loading...
Searching...
No Matches
XrdSys::IOEvents::Poller Class Referenceabstract

#include <XrdSysIOEvents.hh>

+ Inheritance diagram for XrdSys::IOEvents::Poller:
+ Collaboration diagram for XrdSys::IOEvents::Poller:

Classes

struct  PipeData
 

Public Types

enum  CreateOpts { optTOM }
 

Public Member Functions

 Poller (int cFD, int rFD)
 
virtual ~Poller ()
 Destructor. Stop() is effecively called when this object is deleted.
 
void Stop ()
 

Static Public Member Functions

static PollerCreate (int &eNum, const char **eTxt=0, int crOpts=0)
 

Protected Member Functions

virtual void Begin (XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
 
void CbkTMO ()
 
bool CbkXeq (Channel *cP, int events, int eNum, const char *eTxt)
 
 CPP_ATOMIC_TYPE (bool) wakePend
 
virtual void Exclude (Channel *cP, bool &isLocked, bool dover=1)=0
 
int GetFault (Channel *cP)
 
int GetPollEnt (Channel *cP)
 
int GetRequest ()
 
virtual bool Include (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
 
bool Init (Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
 
void LockChannel (Channel *cP)
 
virtual bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
 
int Poll2Enum (short events)
 
int SendCmd (PipeData &cmd)
 
void SetPollEnt (Channel *cP, int ptEnt)
 
virtual void Shutdown ()=0
 
bool TmoAdd (Channel *cP, int tmoSet)
 
void TmoDel (Channel *cP)
 
int TmoGet ()
 
void UnLockChannel (Channel *cP)
 

Protected Attributes

ChannelattBase
 
bool chDead
 
int cmdFD
 
int pipeBlen
 
char * pipeBuff
 
struct pollfd pipePoll
 
pthread_t pollTid
 
PipeData reqBuff
 
int reqFD
 
ChanneltmoBase
 
unsigned char tmoMask
 

Static Protected Attributes

static time_t maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
 
static pid_t parentPID = getpid()
 

Friends

class BootStrap
 
class Channel
 

Detailed Description

Define a poller object interface. A poller fields and dispatches event callbacks. An actual instance of a poller object is obtained by using the Create() method. You cannot simply create an instance of this object using new or in-place declaration since it is abstract. Any number of these objects may created. Each creation spawns a polling thread.

Definition at line 371 of file XrdSysIOEvents.hh.

Member Enumeration Documentation

◆ CreateOpts

Create a specialized instance of a poller object, initialize it, and start the polling process. You must call Create() to obtain a specialized poller.

Parameters
eNumPlace where errno is placed upon failure.
eTxtPlace where a pointer to the description of the failing operation is to be set. If null, no description is returned.
crOptsPoller options (see static const optxxx): optTOM - Timeout resumption after a timeout event must be manually reenabled. By default, event timeouts are automatically renabled after successful callbacks.
Returns
!0 Poller successfully created and started. eNum contains zero. eTxt if not null contains a null string. The returned value is a pointer to the Poller object. 0 Poller could not be created. eNum contains the associated errno value. eTxt if not null contains the failing operation.
Enumerator
optTOM 

Definition at line 398 of file XrdSysIOEvents.hh.

Constructor & Destructor Documentation

◆ Poller()

XrdSys::IOEvents::Poller::Poller ( int cFD,
int rFD )

Constructor

Parameters
cFDThe file descriptor to send commands to the poll thread.
rFDThe file descriptor to recv commands in the poll thread.

Definition at line 571 of file XrdSysIOEvents.cc.

572{
573
574// Now initialize local class members
575//
576 attBase = 0;
577 tmoBase = 0;
578 cmdFD = cFD;
579 reqFD = rFD;
580 wakePend = false;
581 pipeBuff = 0;
582 pipeBlen = 0;
583 pipePoll.fd = rFD;
584 pipePoll.events = POLLIN | POLLRDNORM;
585 tmoMask = 255;
586}

◆ ~Poller()

virtual XrdSys::IOEvents::Poller::~Poller ( )
inlinevirtual

Destructor. Stop() is effecively called when this object is deleted.

Definition at line 430 of file XrdSysIOEvents.hh.

430{}

Member Function Documentation

◆ Begin()

virtual void XrdSys::IOEvents::Poller::Begin ( XrdSysSemaphore * syncp,
int & rc,
const char ** eTxt )
protectedpure virtual

Start the polling event loop. An implementation must be supplied. Begin() is called via the internal BootStrap class from a new thread.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

Referenced by XrdSys::IOEvents::BootStrap::Start().

+ Here is the caller graph for this function:

◆ CbkTMO()

void XrdSys::IOEvents::Poller::CbkTMO ( )
protected

Definition at line 614 of file XrdSysIOEvents.cc.

615{
616 Channel *cP;
617
618// Process each element in the timeout queue, calling the callback function
619// if the timeout has passed. As this method can be called with a lock on the
620// channel mutex, we need to drop it prior to calling the callback.
621//
622 toMutex.Lock();
623 while((cP = tmoBase) && cP->deadLine <= time(0))
624 {int dlType = cP->dlType;
625 toMutex.UnLock();
626 CbkXeq(cP, dlType, 0, 0);
627 toMutex.Lock();
628 }
629 toMutex.UnLock();
630}
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)

◆ CbkXeq()

bool XrdSys::IOEvents::Poller::CbkXeq ( Channel * cP,
int events,
int eNum,
const char * eTxt )
protected

Definition at line 636 of file XrdSysIOEvents.cc.

638{
639 XrdSysMutexHelper cbkMHelp(cP->chMutex);
640 char oldEvents;
641 bool cbok, retval, isRead, isWrite, isLocked = true;
642
643// Perform any required tracing
644//
645 if (TRACING)
646 {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
647 (cP->chPoller == &pollInit ? "init" :
648 (cP->chPoller == &pollWait ? "wait" : "err")));
649 DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
650 <<" chev=" <<static_cast<int>(cP->chEvents)
651 <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
652 <<" callback " <<(cP->chCB ? "present" : "missing")
653 <<" poller=" <<cbtype);
654 }
655
656// Remove this from the timeout queue if there and reset the deadlines based
657// on the event we are reflecting. This separates read and write deadlines
658//
659 if (cP->inTOQ)
660 {TmoDel(cP);
661 cP->dlType |= (events & CallBack::ValidEvents) << 4;
662 isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
663 if (isRead) cP->rdDL = maxTime;
665 if (isWrite) cP->wrDL = maxTime;
666 } else {
667 cP->dlType &= CallBack::ValidEvents;
668 isRead = isWrite = false;
669 }
670
671// Verify that there is a callback here and the channel is ready. If not,
672// disable this channel for the events being refelcted unless the event is a
673// fatal error. In this case we need to abandon the channel since error events
674// may continue to be generated as we can't always disable them.
675//
676 if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
677 {if (eNum)
678 {cP->chPoller = &pollErr1; cP->chFault = eNum;
679 cP->inPSet = 0;
680 return false;
681 }
682 oldEvents = cP->chEvents;
683 cP->chEvents = 0;
684 retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
685 TRACE_MOD(CbkXeq,cP->chFD,0);
686 if (!isLocked) cP->chMutex.Lock();
687 cP->chEvents = oldEvents;
688 return true;
689 }
690
691// Resolve the problem where we get an error event but the channel wants them
692// presented as a read or write event. If neither is possible then defer the
693// error until the channel is enabled again.
694//
695 if (eNum)
696 {if (cP->chEvents & Channel::errorEvents)
697 {cP->chPoller = &pollErr1; cP->chFault = eNum;
698 cP->chStat = Channel::isCBMode;
699 chDead = false;
700 cbkMHelp.UnLock();
701 cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
702 if (chDead) return true;
703 cbkMHelp.Lock(&(cP->chMutex));
704 cP->inPSet = 0;
705 return false;
706 }
707 if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
708 else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
709 else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
710 return false;
711 }
712 }
713
714// Indicate that we are in callback mode then drop the channel lock and effect
715// the callback. This allows the callback to freely manage locks.
716//
717 cP->chStat = Channel::isCBMode;
718 chDead = false;
719 cbkMHelp.UnLock();
720 IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
721 cbok = cP->chCB->Event(cP,cP->chCBA, events);
722 IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
723
724// If channel destroyed by the callback, bail really fast. Otherwise, regain
725// the channel lock.
726//
727 if (chDead) return true;
728 cbkMHelp.Lock(&(cP->chMutex));
729
730// If the channel is being destroyed; then another thread must have done so.
731// Tell it the callback has finished and just return.
732//
733 if (cP->chStat != Channel::isCBMode)
734 {if (cP->chStat == Channel::isDead)
735 ((XrdSysSemaphore *)cP->chCBA)->Post();
736 return true;
737 }
738 cP->chStat = Channel::isClear;
739
740// Handle enable or disable here. If we keep the channel enabled then reset
741// the timeout if it hasn't been handled via a call from the callback.
742//
743 if (!cbok) Detach(cP,isLocked,false);
744 else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
745 TmoAdd(cP, 0);
746
747// All done. While the mutex should not have been unlocked, we relock it if
748// it has to keep the mutex helper from croaking.
749//
750 if (!isLocked) cP->chMutex.Lock();
751 return true;
752}
#define IF_TRACE(x, fd, y)
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define WEVENTS(x)
#define TRACING(x)
Definition XrdTrace.hh:70
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ ValidEvents
Mask to test for valid events.
@ errorEvents
Error event non-r/w specific.
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool TmoAdd(Channel *cP, int tmoSet)

References BOOLNAME, DO_TRACE, XrdSys::IOEvents::Channel::errorEvents, XrdSys::IOEvents::CallBack::Event(), XrdSys::IOEvents::CallBack::Fatal(), IF_TRACE, XrdSysMutex::Lock(), XrdSysMutexHelper::Lock(), Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollInit, XrdSys::IOEvents::pollWait, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REVENTS, TRACE_MOD, TRACING, XrdSysMutexHelper::UnLock(), XrdSys::IOEvents::CallBack::ValidEvents, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

+ Here is the call graph for this function:

◆ CPP_ATOMIC_TYPE()

XrdSys::IOEvents::Poller::CPP_ATOMIC_TYPE ( bool )
protected

◆ Create()

XrdSys::IOEvents::Poller * XrdSys::IOEvents::Poller::Create ( int & eNum,
const char ** eTxt = 0,
int crOpts = 0 )
static

Definition at line 758 of file XrdSysIOEvents.cc.

761{
762 int fildes[2];
763 struct pollArg pArg;
764 pthread_t tid;
765
766// Create a pipe used to break the poll wait loop
767//
768 if (XrdSysFD_Pipe(fildes))
769 {eNum = errno;
770 if (eTxt) *eTxt = "creating poll pipe";
771 return 0;
772 }
773
774// Create an actual implementation of a poller
775//
776 if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
777 {close(fildes[0]);
778 close(fildes[1]);
779 return 0;
780 }
781
782// Now start a thread to handle this poller object
783//
785 (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
786 {if (eTxt) *eTxt = "creating poller thread"; return 0;}
787
788// Now wait for the thread to finish initializing before we allow use
789// Note that the bootstrap takes ownership of the semaphore and will delete it
790// once the thread positing the semaphore actually ends. This is to avoid
791// semaphore bugs present in certain (e.g. Linux) kernels.
792//
793 pArg.pollSync->Wait();
794
795// Check if all went well
796//
797 if (pArg.retCode)
798 {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
799 eNum = pArg.retCode;
800 delete pArg.pollP;
801 return 0;
802 }
803
804// Set creation options in the new poller
805//
806 if (crOpts & optTOM)
807 pArg.pollP->tmoMask = ~(CallBack::ReadTimeOut|CallBack::WriteTimeOut);
808
809// All done
810//
811 eNum = 0;
812 if (eTxt) *eTxt = "";
813 return pArg.pollP;
814}
#define close(a)
Definition XrdPosix.hh:43
#define XRDSYSTHREAD_BIND
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void * Start(void *parg)

References close, XrdSys::IOEvents::pollArg::pollP, XrdSys::IOEvents::pollArg::pollSync, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::pollArg::retCode, XrdSys::IOEvents::pollArg::retMsg, XrdSysThread::Run(), XrdSys::IOEvents::BootStrap::Start(), tmoMask, XrdSysSemaphore::Wait(), XrdSys::IOEvents::CallBack::WriteTimeOut, and XRDSYSTHREAD_BIND.

+ Here is the call graph for this function:

◆ Exclude()

virtual void XrdSys::IOEvents::Poller::Exclude ( Channel * cP,
bool & isLocked,
bool dover = 1 )
protectedpure virtual

Remove a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

◆ GetFault()

int XrdSys::IOEvents::Poller::GetFault ( Channel * cP)
inlineprotected

Definition at line 437 of file XrdSysIOEvents.hh.

437{return cP->chFault;}

Referenced by XrdSys::IOEvents::PollerErr1::Include(), and XrdSys::IOEvents::PollerErr1::Modify().

+ Here is the caller graph for this function:

◆ GetPollEnt()

int XrdSys::IOEvents::Poller::GetPollEnt ( Channel * cP)
inlineprotected

Definition at line 438 of file XrdSysIOEvents.hh.

438{return cP->pollEnt;}

◆ GetRequest()

int XrdSys::IOEvents::Poller::GetRequest ( )
protected

Definition at line 866 of file XrdSysIOEvents.cc.

867{
868 ssize_t rlen;
869 int rc;
870
871// See if we are to resume a read or start a fresh one
872//
873 if (!pipeBlen)
874 {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
875
876// Wait for the next request. Some OS's (like Linux) don't support non-blocking
877// pipes. So, we must front the read with a poll.
878//
879 do {rc = poll(&pipePoll, 1, 0);}
880 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
881 if (rc < 1) return 0;
882
883// Now we can put up a read without a delay. Normally a full command will be
884// present. Under some heavy conditions, this may not be the case.
885//
886 do {rlen = read(reqFD, pipeBuff, pipeBlen);}
887 while(rlen < 0 && errno == EINTR);
888 if (rlen <= 0)
889 {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
890 return 0;
891 }
892
893// Check if all the data has arrived. If not all the data is present, defer
894// this request until more data arrives.
895//
896 if (!(pipeBlen -= rlen)) return 1;
897 pipeBuff += rlen;
898 return 0;
899}
#define read(a, b, c)
Definition XrdPosix.hh:77
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99

References read, and XrdSysE2T().

+ Here is the call graph for this function:

◆ Include()

virtual bool XrdSys::IOEvents::Poller::Include ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedpure virtual

Add a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

Referenced by Init().

+ Here is the caller graph for this function:

◆ Init()

bool XrdSys::IOEvents::Poller::Init ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLockd )
protected

Definition at line 905 of file XrdSysIOEvents.cc.

907{
908// The channel must be locked upon entry!
909//
910 bool retval;
911
912
913// If we are already in progress then simply update the shadow events and
914// resuppress all current events.
915//
916 if (cP->chPoller == &pollWait)
917 {cP->reMod = cP->chEvents;
918 cP->chEvents = 0;
919 IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
920 return true;
921 }
922
923// Trace this entry
924//
925 IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
926
927// If no events are enabled at this point, just return
928//
929 if (!(cP->chEvents)) return true;
930
931// Refuse to enable a channel without a callback function
932//
933 if (!(cP->chCB))
934 {eNum = EDESTADDRREQ;
935 if (eTxt) *eTxt = "enabling without a callback";
936 return false;
937 }
938
939// So, now we can include the channel in the poll set. We will include it
940// with no events enabled to prevent callbacks prior to completion here.
941//
942 cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
943 retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
944 IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
945 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
946
947// Determine what future poller to use. If we can use the regular poller then
948// set the correct event mask for the channel. Note that we could have lost
949// control but the correct events will be reflected in the "reMod" member.
950//
951 if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
952 else {cP->chPoller = cP->chPollXQ;
953 cP->inPSet = 1;
954 if (cP->reMod)
955 {cP->chEvents = cP->reMod;
956 retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
957 TRACE_MOD(Init,cP->chFD,int(cP->reMod));
958 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
959 } else {
960 TRACE_NOD(Init,cP->chFD,0);
961 }
962 }
963
964// All done
965//
966 cP->reMod = 0;
967 return retval;
968}
#define TRACE_LOK
#define TRACE_NOD(x, fd, y)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)

References BOOLNAME, IF_TRACE, Include(), XrdSysMutex::Lock(), Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollWait, TRACE_LOK, TRACE_MOD, and TRACE_NOD.

Referenced by XrdSys::IOEvents::PollerInit::Modify(), and XrdSys::IOEvents::PollerWait::Modify().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ LockChannel()

void XrdSys::IOEvents::Poller::LockChannel ( Channel * cP)
inlineprotected

Definition at line 441 of file XrdSysIOEvents.hh.

441{cP->chMutex.Lock();}

References XrdSysMutex::Lock().

+ Here is the call graph for this function:

◆ Modify()

virtual bool XrdSys::IOEvents::Poller::Modify ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedpure virtual

Modify the event status of a channel. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

Referenced by CbkXeq(), and Init().

+ Here is the caller graph for this function:

◆ Poll2Enum()

int XrdSys::IOEvents::Poller::Poll2Enum ( short events)
protected

Definition at line 974 of file XrdSysIOEvents.cc.

975{
976 if (events & POLLERR) return EPIPE;
977
978 if (events & POLLHUP) return ECONNRESET;
979
980 if (events & POLLNVAL) return EBADF;
981
982 return EOPNOTSUPP;
983}

◆ SendCmd()

int XrdSys::IOEvents::Poller::SendCmd ( PipeData & cmd)
protected

Definition at line 989 of file XrdSysIOEvents.cc.

990{
991 int wlen;
992
993// Pipe writes are atomic so we don't need locks. Some commands require
994// confirmation. We handle that here based on the command. Note that pipes
995// gaurantee that all of the data will be written or we will block.
996//
997 if (cmd.req >= PipeData::Post)
998 {XrdSysSemaphore mySem(0);
999 cmd.theSem = &mySem;
1000 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1001 while (wlen < 0 && errno == EINTR);
1002 if (wlen > 0) mySem.Wait();
1003 } else {
1004 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1005 while (wlen < 0 && errno == EINTR);
1006 }
1007
1008// All done
1009//
1010 return (wlen >= 0 ? 0 : errno);
1011}
#define write(a, b, c)
Definition XrdPosix.hh:110

References XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::Poller::PipeData::theSem, XrdSysSemaphore::Wait(), and write.

+ Here is the call graph for this function:

◆ SetPollEnt()

void XrdSys::IOEvents::Poller::SetPollEnt ( Channel * cP,
int ptEnt )
protected

Definition at line 1017 of file XrdSysIOEvents.cc.

1018{
1019 cP->pollEnt = pe;
1020}

◆ Shutdown()

virtual void XrdSys::IOEvents::Poller::Shutdown ( )
protectedpure virtual

Shutdown the poller. An implementation must be supplied. The shutdown method must release any allocated storage and close private file descriptors. The polling thread will have already been terminated and x-thread pipe closed. Warning: the derived destructor must call Stop() and do nothing else!

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

◆ Stop()

void XrdSys::IOEvents::Poller::Stop ( )

Stop a poller object. Active callbacks are completed. Pending callbacks are discarded. After which the poller event thread exits. Subsequently, each associated channel is disabled and removed from the poller object. If the channel is enabled for a StopEvent, the stop callback is invoked. However, any attempt to use the channel methods that require an active poller will return an error.

Since a stopped poller cannot be restarted; the only thing left is to delete it. This also applies to all the associated channels since they no longer have an active poller.

Definition at line 1026 of file XrdSysIOEvents.cc.

1027{
1028 PipeData cmdbuff;
1029 CallBack *theCB;
1030 Channel *cP;
1031 void *cbArg;
1032 int doCB;
1033
1034// Initialize the pipdata structure
1035//
1036 memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1037 cmdbuff.req = PipeData::Stop;
1038
1039// Lock all of this
1040//
1041 adMutex.Lock();
1042
1043// If we are already shutdown then we are done
1044//
1045 if (cmdFD == -1) {adMutex.UnLock(); return;}
1046
1047// First we must stop the poller thread in an orderly fashion.
1048//
1049 adMutex.UnLock();
1050 SendCmd(cmdbuff);
1051 adMutex.Lock();
1052
1053// Close the pipe communication mechanism
1054//
1055 close(cmdFD); cmdFD = -1;
1056 close(reqFD); reqFD = -1;
1057
1058// Run through cleaning up the channels. While there should not be any other
1059// operations happening on this poller, we take the conservative approach.
1060//
1061 while((cP = attBase))
1062 {REMOVE(attBase, attList, cP);
1063 adMutex.UnLock();
1064 cP->chMutex.Lock();
1065 doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1066 if (cP->inTOQ) TmoDel(cP);
1067 cP->Reset(&pollErr1, cP->chFD, EIDRM);
1068 cP->chPollXQ = &pollErr1;
1069 if (doCB)
1070 {cP->chStat = Channel::isClear;
1071 theCB = cP->chCB; cbArg = cP->chCBA;
1072 cP->chMutex.UnLock();
1073 theCB->Stop(cP, cbArg);
1074 } else cP->chMutex.UnLock();
1075 adMutex.Lock();
1076 }
1077
1078// Now invoke the poller specific shutdown
1079//
1080 Shutdown();
1081 adMutex.UnLock();
1082}
#define REMOVE(dlbase, dlvar, curitem)
@ stopEvent
Poller stop event.
int SendCmd(PipeData &cmd)
virtual void Shutdown()=0

References close, XrdSysMutex::Lock(), XrdSys::IOEvents::pollErr1, REMOVE, XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::CallBack::Stop(), XrdSys::IOEvents::Channel::stopEvent, and XrdSysMutex::UnLock().

Referenced by XrdSys::IOEvents::PollE::~PollE(), XrdSys::IOEvents::PollKQ::~PollKQ(), XrdSys::IOEvents::PollPoll::~PollPoll(), XrdSys::IOEvents::PollPort::~PollPort(), and XrdCl::PollerBuiltIn::Stop().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TmoAdd()

bool XrdSys::IOEvents::Poller::TmoAdd ( Channel * cP,
int tmoSet )
protected

Definition at line 1088 of file XrdSysIOEvents.cc.

1089{
1090 XrdSysMutexHelper mHelper(toMutex);
1091 time_t tNow;
1092 Channel *ncP;
1093 bool setRTO, setWTO;
1094
1095// Do some tracing
1096//
1097 IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1098 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1099
1100// Remove element from timeout queue if it is there
1101//
1102 if (cP->inTOQ)
1103 {REMOVE(tmoBase, tmoList, cP);
1104 cP->inTOQ = 0;
1105 }
1106
1107// Determine which timeouts need to be reset
1108//
1109 tmoSet|= cP->dlType >> 4;
1112
1113// Reset the required deadlines
1114//
1115 tNow = time(0);
1116 if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1117 cP->rdDL = cP->chRTO + tNow;
1118 if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1119 cP->wrDL = cP->chWTO + tNow;
1120
1121// Calculate the closest enabled deadline
1122//
1123 if (cP->rdDL < cP->wrDL)
1124 {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1125 } else {
1126 cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1127 if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1128 }
1129 IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1130 <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1131
1132// If no timeout really applies, we are done
1133//
1134 if (cP->deadLine == maxTime) return false;
1135
1136// Add the channel to the timeout queue in correct deadline position.
1137//
1138 if ((ncP = tmoBase))
1139 {do {if (cP->deadLine < ncP->deadLine) break;
1140 ncP = ncP->tmoList.next;
1141 } while(ncP != tmoBase);
1142 INSERT(tmoList, ncP, cP);
1143 if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1144 } else tmoBase = cP;
1145 cP->inTOQ = 1;
1146
1147// Indicate to the caller whether or not a wakeup is required
1148//
1149 return (tmoBase == cP);
1150}
#define INSERT(dlvar, curitem, newitem)
#define STATUSOF(x)

References BOOLNAME, IF_TRACE, INSERT, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REMOVE, REVENTS, STATUSOF, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

◆ TmoDel()

void XrdSys::IOEvents::Poller::TmoDel ( Channel * cP)
protected

Definition at line 1156 of file XrdSysIOEvents.cc.

1157{
1158
1159// Do some tracing
1160//
1161 IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1162 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1163
1164// Get the timeout queue lock and remove the channel from the queue
1165//
1166 toMutex.Lock();
1167 REMOVE(tmoBase, tmoList, cP);
1168 cP->inTOQ = 0;
1169 toMutex.UnLock();
1170}

References BOOLNAME, IF_TRACE, REMOVE, and STATUSOF.

◆ TmoGet()

int XrdSys::IOEvents::Poller::TmoGet ( )
protected

Definition at line 1176 of file XrdSysIOEvents.cc.

1177{
1178 int wtval;
1179
1180// Lock the timeout queue
1181//
1182 toMutex.Lock();
1183
1184// Calculate wait time. If the deadline passed, invoke the timeout callback.
1185// we will need to drop the timeout lock as we don't have the channel lock.
1186//
1187 do {if (!tmoBase) {wtval = -1; break;}
1188 wtval = (tmoBase->deadLine - time(0)) * 1000;
1189 if (wtval > 0) break;
1190 toMutex.UnLock();
1191 CbkTMO();
1192 toMutex.Lock();
1193 } while(1);
1194
1195// Return the value
1196//
1197 CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1198 toMutex.UnLock();
1199 return wtval;
1200}
#define CPP_ATOMIC_STORE(x, val, order)

References CPP_ATOMIC_STORE.

Referenced by XrdSys::IOEvents::PollPort::BegTO().

+ Here is the caller graph for this function:

◆ UnLockChannel()

void XrdSys::IOEvents::Poller::UnLockChannel ( Channel * cP)
inlineprotected

Definition at line 448 of file XrdSysIOEvents.hh.

448{cP->chMutex.UnLock();}

References XrdSysMutex::UnLock().

+ Here is the call graph for this function:

Friends And Related Symbol Documentation

◆ BootStrap

friend class BootStrap
friend

Definition at line 373 of file XrdSysIOEvents.hh.

◆ Channel

friend class Channel
friend

Definition at line 374 of file XrdSysIOEvents.hh.

Member Data Documentation

◆ attBase

Channel* XrdSys::IOEvents::Poller::attBase
protected

Definition at line 488 of file XrdSysIOEvents.hh.

◆ chDead

bool XrdSys::IOEvents::Poller::chDead
protected

Definition at line 511 of file XrdSysIOEvents.hh.

Referenced by XrdSys::IOEvents::Channel::Delete().

◆ cmdFD

int XrdSys::IOEvents::Poller::cmdFD
protected

Definition at line 494 of file XrdSysIOEvents.hh.

◆ maxTime

time_t XrdSys::IOEvents::Poller::maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
staticprotected

Definition at line 513 of file XrdSysIOEvents.hh.

Referenced by XrdSys::IOEvents::Channel::Enable().

◆ parentPID

pid_t XrdSys::IOEvents::Poller::parentPID = getpid()
staticprotected

Definition at line 515 of file XrdSysIOEvents.hh.

◆ pipeBlen

int XrdSys::IOEvents::Poller::pipeBlen
protected

Definition at line 508 of file XrdSysIOEvents.hh.

◆ pipeBuff

char* XrdSys::IOEvents::Poller::pipeBuff
protected

Definition at line 507 of file XrdSysIOEvents.hh.

◆ pipePoll

struct pollfd XrdSys::IOEvents::Poller::pipePoll
protected

Definition at line 493 of file XrdSysIOEvents.hh.

◆ pollTid

pthread_t XrdSys::IOEvents::Poller::pollTid
protected

◆ reqBuff

PipeData XrdSys::IOEvents::Poller::reqBuff
protected

Definition at line 506 of file XrdSysIOEvents.hh.

◆ reqFD

int XrdSys::IOEvents::Poller::reqFD
protected

Definition at line 495 of file XrdSysIOEvents.hh.

Referenced by XrdSys::IOEvents::PollKQ::PollKQ().

◆ tmoBase

Channel* XrdSys::IOEvents::Poller::tmoBase
protected

Definition at line 489 of file XrdSysIOEvents.hh.

◆ tmoMask

unsigned char XrdSys::IOEvents::Poller::tmoMask
protected

Definition at line 509 of file XrdSysIOEvents.hh.

Referenced by Create().


The documentation for this class was generated from the following files: