XRootD
Loading...
Searching...
No Matches
XrdCl::XCpCtx Class Reference

#include <XrdClXCpCtx.hh>

+ Collaboration diagram for XrdCl::XCpCtx:

Public Member Functions

 XCpCtx (const std::vector< std::string > &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize)
 
bool AllDone ()
 
void Delete ()
 
std::pair< uint64_t, uint64_t > GetBlock ()
 
XRootDStatus GetChunk (XrdCl::PageInfo &ci)
 
bool GetNextUrl (std::string &url)
 
int64_t GetSize ()
 
XRootDStatus Initialize ()
 
void NotifyIdleSrc ()
 
void NotifyInitExpectant ()
 
void PutChunk (PageInfo *chunk)
 
void RemoveSrc (XCpSrc *src)
 
XCpCtxSelf ()
 
void SetFileSize (int64_t size)
 
XCpSrcWeakestLink (XCpSrc *exclude)
 

Detailed Description

Definition at line 40 of file XrdClXCpCtx.hh.

Constructor & Destructor Documentation

◆ XCpCtx()

XrdCl::XCpCtx::XCpCtx ( const std::vector< std::string > & urls,
uint64_t blockSize,
uint8_t parallelSrc,
uint64_t chunkSize,
uint64_t parallelChunks,
int64_t fileSize )

Constructor

Parameters
urls: list of replica urls
blockSize: the default block size
parallelSrc: maximum number of parallel sources
chunkSize: the default chunk size
parallelChunks: the default number of parallel chunks per source
fileSize: the file size if specified in the metalink file (-1 indicates that the file size is not known and a stat should be done)

Definition at line 36 of file XrdClXCpCtx.cc.

36 :
37 pUrls( std::deque<std::string>( urls.begin(), urls.end() ) ), pBlockSize( blockSize ),
38 pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ),
39 pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ),
40 pDoneCV( 0 ), pRefCount( 1 )
41{
42 SetFileSize( fileSize );
43}
void SetFileSize(int64_t size)

References SetFileSize().

+ Here is the call graph for this function:

Member Function Documentation

◆ AllDone()

bool XrdCl::XCpCtx::AllDone ( )

Returns true if all chunks have been transferred, otherwise blocks until NotifyIdleSrc is called, or a 1 minute timeout occurs.

Returns
: true is all chunks have been transferred, false otherwise.

Definition at line 177 of file XrdClXCpCtx.cc.

178{
179 XrdSysCondVarHelper lck( pDoneCV );
180
181 if( !pDone )
182 pDoneCV.Wait( 60 );
183
184 return pDone;
185}

References XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ Delete()

void XrdCl::XCpCtx::Delete ( )
inline

Deletes the instance if the reference counter reached 0.

Definition at line 61 of file XrdClXCpCtx.hh.

62 {
63 XrdSysMutexHelper lck( pMtx );
64 --pRefCount;
65 if( !pRefCount )
66 {
67 lck.UnLock();
68 delete this;
69 }
70 }

References XrdSysMutexHelper::UnLock().

Referenced by XrdCl::XCpSrc::Start().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetBlock()

std::pair< uint64_t, uint64_t > XrdCl::XCpCtx::GetBlock ( )

Get next block that has to be transferred

Returns
: pair of offset and block size

Definition at line 92 of file XrdClXCpCtx.cc.

93{
94 XrdSysMutexHelper lck( pMtx );
95
96 uint64_t blkSize = pBlockSize, offset = pOffset;
97 if( pOffset + blkSize > uint64_t( pFileSize ) )
98 blkSize = pFileSize - pOffset;
99 pOffset += blkSize;
100
101 return std::make_pair( offset, blkSize );
102}

◆ GetChunk()

XRootDStatus XrdCl::XCpCtx::GetChunk ( XrdCl::PageInfo & ci)

Gets the next chunk from the sink, if the sink is empty blocks.

Parameters
ci: the chunk retrieved from sink (output parameter)
Returns
: stError if we failed to transfer the file, stOK otherwise, with one of the following codes:
  • suDone : the whole file has been transferred, we are done
  • suContinue : a chunk has been written into ci, continue calling GetChunk in order to retrieve remaining chunks
  • suRetry : a chunk has not been written into ci, try again.

Definition at line 140 of file XrdClXCpCtx.cc.

141{
142 // if we received all the data we are done here
143 if( pDataReceived == uint64_t( pFileSize ) )
144 {
145 XrdSysCondVarHelper lck( pDoneCV );
146 pDone = true;
147 pDoneCV.Broadcast();
148 return XRootDStatus( stOK, suDone );
149 }
150
151 // if we don't have active sources it means we failed
152 if( GetRunning() == 0 )
153 {
154 XrdSysCondVarHelper lck( pDoneCV );
155 pDone = true;
156 pDoneCV.Broadcast();
157 return XRootDStatus( stError, errNoMoreReplicas );
158 }
159
160 PageInfo *chunk = pSink.Get();
161 if( chunk )
162 {
163 pDataReceived += chunk->GetLength();
164 ci = std::move( *chunk );
165 delete chunk;
166 return XRootDStatus( stOK, suContinue );
167 }
168
169 return XRootDStatus( stOK, suRetry );
170}
const uint16_t suRetry
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t stOK
Everything went OK.
const uint16_t suDone
const uint16_t suContinue
const uint16_t errNoMoreReplicas
No more replicas to try.

References XrdSysCondVar::Broadcast(), XrdCl::errNoMoreReplicas, XrdCl::PageInfo::GetLength(), XrdCl::stError, XrdCl::stOK, XrdCl::suContinue, XrdCl::suDone, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ GetNextUrl()

bool XrdCl::XCpCtx::GetNextUrl ( std::string & url)

Gets the next URL from the list of file replicas

Parameters
url: the output parameter
Returns
: true if a url has been written to the url parameter, false otherwise

Definition at line 57 of file XrdClXCpCtx.cc.

58{
59 XrdSysMutexHelper lck( pMtx );
60 if( pUrls.empty() ) return false;
61 url = pUrls.front();
62 pUrls.pop();
63 return true;
64}

◆ GetSize()

int64_t XrdCl::XCpCtx::GetSize ( )
inline

Get file size. The call blocks until the file size is being set using SetFileSize.

Definition at line 129 of file XrdClXCpCtx.hh.

130 {
131 XrdSysCondVarHelper lck( pFileSizeCV );
132 while( pFileSize < 0 && GetRunning() > 0 ) pFileSizeCV.Wait();
133 return pFileSize;
134 }

References XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ Initialize()

XRootDStatus XrdCl::XCpCtx::Initialize ( )

Starts one thread per source, each thread tries to open a file, stat the file if necessary, and then starts reading the file, all chunks read go to the sink.

Returns
Error if we were not able to create any threads

Definition at line 121 of file XrdClXCpCtx.cc.

122{
123 for( uint8_t i = 0; i < pParallelSrc; ++i )
124 {
125 XCpSrc *src = new XCpSrc( pChunkSize, pParallelChunks, pFileSize, this );
126 pSources.push_back( src );
127 src->Start();
128 }
129
130 if( pSources.empty() )
131 {
132 Log *log = DefaultEnv::GetLog();
133 log->Error( UtilityMsg, "Failed to initialize (failed to create new threads)" );
134 return XRootDStatus( stError, errInternal, EAGAIN, "XCpCtx: failed to create new threads." );
135 }
136
137 return XRootDStatus();
138}
static Log * GetLog()
Get default log.
const uint16_t errInternal
Internal error.
const uint64_t UtilityMsg
XrdSysError Log
Definition XrdConfig.cc:111

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::XCpSrc::Start(), XrdCl::stError, and XrdCl::UtilityMsg.

+ Here is the call graph for this function:

◆ NotifyIdleSrc()

void XrdCl::XCpCtx::NotifyIdleSrc ( )

Notify idle sources, used in two case:

  • if one of the sources failed and an idle source needs to take over
  • or if we are done and all idle source should be stopped

Definition at line 172 of file XrdClXCpCtx.cc.

173{
174 pDoneCV.Broadcast();
175}

References XrdSysCondVar::Broadcast().

+ Here is the call graph for this function:

◆ NotifyInitExpectant()

void XrdCl::XCpCtx::NotifyInitExpectant ( )
inline

Notify those who are waiting for initialization. In particular the GetSize() caller will be waiting on the result of initialization.

Definition at line 197 of file XrdClXCpCtx.hh.

198 {
199 pFileSizeCV.Broadcast();
200 }

References XrdSysCondVar::Broadcast().

+ Here is the call graph for this function:

◆ PutChunk()

void XrdCl::XCpCtx::PutChunk ( PageInfo * chunk)

Put a chunk into the sink

Parameters
chunk: the chunk

Definition at line 87 of file XrdClXCpCtx.cc.

88{
89 pSink.Put( chunk );
90}

◆ RemoveSrc()

void XrdCl::XCpCtx::RemoveSrc ( XCpSrc * src)
inline

Remove given source

Parameters
src: the source to be removed

Definition at line 167 of file XrdClXCpCtx.hh.

168 {
169 XrdSysMutexHelper lck( pMtx );
170 pSources.remove( src );
171 }

Referenced by XrdCl::XCpSrc::Start().

+ Here is the caller graph for this function:

◆ Self()

XCpCtx * XrdCl::XCpCtx::Self ( )
inline

Increments the reference counter.

Returns
: myself.

Definition at line 77 of file XrdClXCpCtx.hh.

78 {
79 XrdSysMutexHelper lck( pMtx );
80 ++pRefCount;
81 return this;
82 }

◆ SetFileSize()

void XrdCl::XCpCtx::SetFileSize ( int64_t size)

Set the file size (GetSize will block until SetFileSize will be called). Also calculates the block size.

Parameters
size: file size

Definition at line 104 of file XrdClXCpCtx.cc.

105{
106 XrdSysMutexHelper lck( pMtx );
107 if( pFileSize < 0 && size >= 0 )
108 {
109 XrdSysCondVarHelper lck( pFileSizeCV );
110 pFileSize = size;
111 pFileSizeCV.Broadcast();
112
113 if( pBlockSize > uint64_t( pFileSize ) / pParallelSrc )
114 pBlockSize = pFileSize / pParallelSrc;
115
116 if( pBlockSize < pChunkSize )
117 pBlockSize = pChunkSize;
118 }
119}

References XrdSysCondVar::Broadcast().

Referenced by XCpCtx().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WeakestLink()

XCpSrc * XrdCl::XCpCtx::WeakestLink ( XCpSrc * exclude)

Get the 'weakest' sources

Parameters
exclude: the source that is excluded from the search
Returns
: the weakest source

Definition at line 66 of file XrdClXCpCtx.cc.

67{
68 uint64_t transferRate = -1; // set transferRate to max uint64 value
69 XCpSrc *ret = 0;
70
71 std::list<XCpSrc*>::iterator itr;
72 for( itr = pSources.begin() ; itr != pSources.end() ; ++itr )
73 {
74 XCpSrc *src = *itr;
75 if( src == exclude ) continue;
76 uint64_t tmp = src->TransferRate();
77 if( src->HasData() && tmp < transferRate )
78 {
79 ret = src;
80 transferRate = tmp;
81 }
82 }
83
84 return ret;
85}

References XrdCl::XCpSrc::HasData(), and XrdCl::XCpSrc::TransferRate().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: