libzypp 17.35.12
provide.cc
Go to the documentation of this file.
1#include "private/provide_p.h"
5#include <zypp-core/zyppng/io/IODevice>
6#include <zypp-core/Url.h>
7#include <zypp-core/base/DtorReset>
9#include <zypp-media/MediaException>
10#include <zypp-media/FileCheckException>
11#include <zypp-media/CDTools>
12
13// required to generate uuids
14#include <glib.h>
15
16
17L_ENV_CONSTR_DEFINE_FUNC(ZYPP_MEDIA_PROVIDER_DEBUG)
18
19namespace zyppng {
20
22 : BasePrivate(pub)
23 , _workDir( std::move(workDir) )
24 , _workerPath( constants::DEFAULT_PROVIDE_WORKER_PATH.data() )
25 {
26 if ( _workDir.empty() ) {
27 _workDir = zypp::Pathname(".").realpath();
28 } else {
29 _workDir = _workDir.realpath();
30 }
31
32 MIL << "Provider workdir is: " << _workDir << std::endl;
33
34 _scheduleTrigger->setSingleShot(true);
36 }
37
39 {
40 if ( provideDebugEnabled () ) {
41 std::string_view reasonStr;
42 switch( reason ) {
43 case ProvideStart:
44 reasonStr = "ProvideStart";
45 break;
46 case QueueIdle:
47 reasonStr = "QueueIdle";
48 break;
49 case EnqueueItem:
50 reasonStr = "EnqueueItem";
51 break;
52 case EnqueueReq:
53 reasonStr = "EnqueueReq";
54 break;
55 case FinishReq:
56 reasonStr = "FinishReq";
57 break;
58 case RestartAttach:
59 reasonStr = "RestartAttach";
60 break;
61 }
62 DBG << "Triggering the schedule timer (" << reasonStr << ")" << std::endl;
63 }
64
65 // we use a single shot timer that instantly times out when the event loop is entered the next time
66 // this way we compress many schedule requests that happen during a eventloop run into one
67 _scheduleTrigger->start(0);
68 }
69
71 {
72 if ( !_isRunning ) {
73 MIL << "Provider is not started, NOT scheduling" << std::endl;
74 return;
75 }
76
77 if ( _isScheduling ) {
78 DBG_PRV << "Scheduling triggered during scheduling, returning immediately." << std::endl;
79 return;
80 }
81
82 const int cpuLimit =
83#ifdef _SC_NPROCESSORS_ONLN
84 sysconf(_SC_NPROCESSORS_ONLN) * 2;
85#else
86 DEFAULT_CPU_WORKERS;
87#endif
88
89 // helper lambda to find the worker that is idle for the longest time
90 constexpr auto findLaziestWorker = []( const auto &workerQueues, const auto &idleNames ) {
91 auto candidate = workerQueues.end();
92 ProvideQueue::TimePoint candidateIdleSince = ProvideQueue::TimePoint::max();
93
94 //find the worker thats idle the longest
95 for ( const auto &name : idleNames ) {
96 auto thisElem = workerQueues.find(name);
97 if ( thisElem == workerQueues.end() )
98 continue;
99
100 const auto idleS = thisElem->second->idleSince();
101 if ( idleS
102 && ( candidate == workerQueues.end() || *idleS < candidateIdleSince ) ) {
103 candidateIdleSince = *idleS;
104 candidate = thisElem;
105 }
106 }
107
108 if ( candidate != workerQueues.end() )
109 MIL_PRV << "Found idle worker:" << candidate->first << " idle since: " << candidateIdleSince.time_since_epoch().count() << std::endl;
110
111 return candidate;
112 };
113
114 // clean up old media
115
116 for ( auto iMedia = _attachedMediaInfos.begin(); iMedia != _attachedMediaInfos.end(); ) {
117 if ( (*iMedia)->refCount() > 1 ) {
118 MIL_PRV << "Not releasing media " << (*iMedia)->_name << " refcount is not zero" << std::endl;
119 ++iMedia;
120 continue;
121 }
122 if ( (*iMedia)->_workerType == ProvideQueue::Config::Downloading ) {
123 // we keep the information around for an hour so we do not constantly download the media files for no reason
124 if ( (*iMedia)->_idleSince && std::chrono::steady_clock::now() - (*iMedia)->_idleSince.value() >= std::chrono::hours(1) ) {
125 MIL << "Detaching medium " << (*iMedia)->_name << " for baseUrl " << (*iMedia)->_attachedUrl << std::endl;
126 iMedia = _attachedMediaInfos.erase(iMedia);
127 continue;
128 } else {
129 MIL_PRV << "Not releasing media " << (*iMedia)->_name << " downloading worker and not timed out yet." << std::endl;
130 }
131 } else {
132 // mounting handlers, we need to send a request to the workers
133 auto bQueue = (*iMedia)->_backingQueue.lock();
134 if ( bQueue ) {
135 zypp::Url url = (*iMedia)->_attachedUrl;
136 url.setScheme( url.getScheme() + std::string( constants::ATTACHED_MEDIA_SUFFIX) );
137 url.setAuthority( (*iMedia)->_name );
138 const auto &req = ProvideRequest::createDetach( url );
139 if ( req ) {
140 MIL << "Detaching medium " << (*iMedia)->_name << " for baseUrl " << (*iMedia)->_attachedUrl << std::endl;
141 bQueue->enqueue ( *req );
142 iMedia = _attachedMediaInfos.erase(iMedia);
143 continue;
144 } else {
145 ERR << "Could not send detach request, creating the request failed" << std::endl;
146 }
147 } else {
148 ERR << "Could not send detach request since no backing queue was defined" << std::endl;
149 }
150 }
151 ++iMedia;
152 }
153
154 zypp::DtorReset schedFlag( _isScheduling, false );
155 _isScheduling = true;
156
157 const auto schedStart = std::chrono::steady_clock::now();
158 MIL_PRV << "Start scheduling" << std::endl;
159
160 zypp::OnScopeExit deferExitMessage( [&](){
161 const auto dur = std::chrono::steady_clock::now() - schedStart;
162 MIL_PRV << "Exit scheduling after:" << std::chrono::duration_cast<std::chrono::milliseconds>( dur ).count () << std::endl;
163 });
164
165 // bump inactive items
166 for ( auto it = _items.begin (); it != _items.end(); ) {
167 // was maybe released during scheduling
168 if ( !(*it) )
169 it = _items.erase(it);
170 else {
171 auto &item = *it;
172 if ( item->state() == ProvideItem::Uninitialized ) {
173 item->initialize();
174 }
175 it++;
176 }
177 }
178
179 // we are scheduling now, everything that triggered the timer until now we can forget about
180 _scheduleTrigger->stop();
181
182 for( auto queueIter = _queues.begin(); queueIter != _queues.end(); queueIter ++ ) {
183
184 const auto &scheme = queueIter->_schemeName;
185 auto &queue = queueIter->_requests;
186
187 if ( !queue.size() )
188 continue;
189
190 const auto &configOpt = schemeConfig ( scheme );
191
192 MIL_PRV << "Start scheduling for scheme:" << scheme << " queue size is: " << queue.size() << std::endl;
193
194 if ( !configOpt ) {
195 // FAIL all requests in this queue
196 ERR << "Scheme: " << scheme << " failed to return a valid configuration." << std::endl;
197
198 while( queue.size() ) {
199 auto item = std::move( queue.front() );
200 queue.pop_front();
201 if ( item->owner() )
202 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("Failed to query scheme config.")) );
203 }
204
205 continue;
206 }
207
208 // the scheme config that defines how we schedule requests on this set of queues
209 const auto &config = configOpt.get();
210 const auto isSingleInstance = ( (config.cfg_flags() & ProvideQueue::Config::SingleInstance) == ProvideQueue::Config::SingleInstance );
211 if ( config.worker_type() == ProvideQueue::Config::Downloading && !isSingleInstance ) {
212
213 for( auto i = queue.begin (); i != queue.end(); ) {
214
215 // this is the only place where we remove elements from the queue when the scheduling flag is active
216 // other code just nulls out requests in the queue if during scheduling items need to be removed
217 while ( i != queue.end() && !(*i) ) {
218 i = queue.erase(i);
219 }
220
221 if ( i == queue.end() )
222 break;
223
224 ProvideRequestRef item = *i;
225
226 // Downloading queues do not support attaching via a AttachRequest, this is handled by simply providing the media verification files
227 // If we hit this code path, its a bug
228 if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
229 i = queue.erase(i);
230 if ( item->owner() )
231 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Downloading Queues do not support ProvideMessage::Code::Attach requests") ) );
232 continue;
233 }
234
235 MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
236
237 // how many workers for this type do already exist
238 int existingTypeWorkers = 0;
239
240 // how many currently active connections are there
241 int existingConnections = 0;
242
243 // all currently available possible queues for the request
244 std::vector< std::pair<zypp::Url, ProvideQueue*> > possibleHostWorkers;
245
246 // currently idle workers
247 std::vector<std::string> idleWorkers;
248
249 // all mirrors without a existing worker
250 std::vector<zypp::Url> mirrsWithoutWorker;
251 for ( const auto &url : item->urls() ) {
252
253 if ( effectiveScheme( url.getScheme() ) != scheme ) {
254 MIL << "Mirror URL " << url << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
255 continue;
256 }
257
258 if( item->owner()->canRedirectTo( item, url ) )
259 mirrsWithoutWorker.push_back( url );
260 else {
261 MIL_PRV << "URL was rejected" << url << std::endl;
262 }
263 }
264
265 // at this point the list contains all useable mirrors, if this list is empty the request needs to fail
266 if( mirrsWithoutWorker.size() == 0 ) {
267 MIL << "Request has NO usable URLs" << std::endl;
268 if ( item->owner() )
269 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
270 i = queue.erase(i);
271 continue;
272 }
273
274
275 for ( auto &[ queueName, workerQueue ] : _workerQueues ) {
276 if ( ProvideQueue::Config::Downloading != workerQueue->workerConfig().worker_type() )
277 continue;
278
279 existingTypeWorkers ++;
280 existingConnections += workerQueue->activeRequests();
281
282 if ( workerQueue->isIdle() )
283 idleWorkers.push_back (queueName);
284
285 if ( !zypp::str::startsWith( queueName, scheme ) )
286 continue;
287
288 for ( auto i = mirrsWithoutWorker.begin (); i != mirrsWithoutWorker.end(); ) {
289 const auto &u = *i;
290 if ( u.getHost() == workerQueue->hostname() ) {
291 if ( workerQueue->requestCount() < constants::DEFAULT_ACTIVE_CONN_PER_HOST )
292 possibleHostWorkers.push_back( {u, workerQueue.get()} );
293 i = mirrsWithoutWorker.erase( i );
294 // we can not stop after removing the first hit, since there could be multiple mirrors with the same hostname
295 } else {
296 ++i;
297 }
298 }
299 }
300
301 if( provideDebugEnabled() ) {
302 MIL << "Current stats: " << std::endl;
303 MIL << "Existing type workers: " << existingTypeWorkers << std::endl;
304 MIL << "Existing active connections: " << existingConnections << std::endl;
305 MIL << "Possible host workers: "<< possibleHostWorkers.size() << std::endl;
306 MIL << "Mirrors without worker: " << mirrsWithoutWorker.size() << std::endl;
307 }
308
309 // need to wait for requests to finish in order to schedule more requests
310 if ( existingConnections >= constants::DEFAULT_ACTIVE_CONN ) {
311 MIL_PRV << "Reached maximum nr of connections, break" << std::endl;
312 break;
313 }
314
315 // if no workers are running, take the first mirror and start a worker for it
316 // if < nr of workers are running, use a mirror we do not have a conn yet to
317 if ( existingTypeWorkers < constants::DEFAULT_MAX_DYNAMIC_WORKERS
318 && mirrsWithoutWorker.size() ) {
319
320 MIL_PRV << "Free worker slots and available mirror URLs, starting a new worker" << std::endl;
321
322 //@TODO out of the available mirrors use the best one based on statistics ( if available )
323 bool found = false;
324 for( const auto &url : mirrsWithoutWorker ) {
325
326 // mark this URL as used now, in case the queue can not be started we won't try it anymore
327 if ( !item->owner()->safeRedirectTo ( item, url ) )
328 continue;
329
330 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
331 if ( !q->startup( scheme, _workDir / scheme / url.getHost(), url.getHost() ) ) {
332 break;
333 } else {
334
335 MIL_PRV << "Started worker for " << url.getHost() << " enqueing request" << std::endl;
336
337 item->setActiveUrl(url);
338 found = true;
339
340 std::string str = zypp::str::Format("%1%://%2%") % scheme % url.getHost();
341 _workerQueues[str] = q;
342 q->enqueue( item );
343 break;
344 }
345 }
346
347 if( found ) {
348 i = queue.erase(i);
349 continue;
350 }
351 }
352
353 // if we cannot start a new worker, find the best queue where we can push the item into
354 if ( possibleHostWorkers.size() ) {
355
356 MIL_PRV << "No free worker slots, looking for the best existing worker" << std::endl;
357 bool found = false;
358 while( possibleHostWorkers.size () ) {
359 std::vector< std::pair<zypp::Url, ProvideQueue *> >::iterator candidate = possibleHostWorkers.begin();
360 for ( auto i = candidate+1; i != possibleHostWorkers.end(); i++ ) {
361 if ( i->second->activeRequests () < candidate->second->activeRequests () )
362 candidate = i;
363 }
364
365 if ( !item->owner()->safeRedirectTo( item, candidate->first ) ) {
366 possibleHostWorkers.erase( candidate );
367 continue;
368 }
369
370 MIL_PRV << "Using existing worker " << candidate->first.getHost() << " to download request" << std::endl;
371
372 found = true;
373 item->setActiveUrl( candidate->first );
374 candidate->second->enqueue( item );
375 break;
376 }
377
378 if( found ) {
379 i = queue.erase(i);
380 continue;
381 }
382 }
383
384 // if we reach this place all we can now try is to decomission idle queues and use the new slot to start
385 // a new worker
386 if ( idleWorkers.size() && mirrsWithoutWorker.size() ) {
387
388 MIL_PRV << "No free worker slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
389
390 auto candidate = findLaziestWorker( _workerQueues, idleWorkers );
391 if ( candidate != _workerQueues.end() ) {
392
393 // for now we decomission the worker and start a new one, should we instead introduce a "reset" message
394 // that repurposes the worker to another hostname/workdir config?
395 _workerQueues.erase(candidate);
396
397 //@TODO out of the available mirrors use the best one based on statistics ( if available )
398 bool found = false;
399 for( const auto &url : mirrsWithoutWorker ) {
400
401 if ( !item->owner()->safeRedirectTo ( item, url ) )
402 continue;
403
404 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
405 if ( !q->startup( scheme, _workDir / scheme / url.getHost(), url.getHost() ) ) {
406 break;
407 } else {
408
409 MIL_PRV << "Replaced worker for " << url.getHost() << ", enqueing request" << std::endl;
410
411 item->setActiveUrl(url);
412 found = true;
413
414 auto str = zypp::str::Format("%1%://%2%") % scheme % url.getHost();
415 _workerQueues[str] = q;
416 q->enqueue( item );
417 }
418 }
419
420 if( found ) {
421 i = queue.erase(i);
422 continue;
423 }
424 }
425 }
426
427 // if we reach here we skip over the item and try to schedule it again later
428 MIL_PRV << "End of line, deferring request for next try." << std::endl;
429 i++;
430
431 }
432 } else if ( config.worker_type() == ProvideQueue::Config::CPUBound && !isSingleInstance ) {
433
434 for( auto i = queue.begin (); i != queue.end(); ) {
435
436 // this is the only place where we remove elements from the queue when the scheduling flag is active
437 // other code just nulls out requests in the queue if during scheduling items need to be removed
438 while ( i != queue.end() && !(*i) ) {
439 i = queue.erase(i);
440 }
441
442 if ( i == queue.end() )
443 break;
444
445 // make a real reference so it does not dissapear when we remove it from the queue
446 ProvideRequestRef item = *i;
447
448 // CPU bound queues do not support attaching via a AttachRequest, this is handled by simply providing the media verification files
449 // If we hit this code path, its a bug
450 if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
451 i = queue.erase(i);
452 if ( item->owner () )
453 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("CPU bound Queues do not support ProvideAttachSpecRef requests") ) );
454 continue;
455 }
456
457 MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
458
459 // how many workers for this type do already exist
460 int existingTypeWorkers = 0;
461 int existingSchemeWorkers = 0;
462
463 // all currently available possible queues for the request
464 std::vector< ProvideQueue* > possibleWorkers;
465
466 // currently idle workers
467 std::vector<std::string> idleWorkers;
468
469 // the URL we are going to use this time
470 zypp::Url url;
471
472 //CPU bound queues do not spawn per mirrors, we use the first compatible URL
473 for ( const auto &tmpurl : item->urls() ) {
474 if ( effectiveScheme( tmpurl.getScheme() ) != scheme ) {
475 MIL << "Mirror URL " << tmpurl << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
476 continue;
477 }
478 url = tmpurl;
479 break;
480 }
481
482 // at this point if the URL is empty the request needs to fail
483 if( !url.isValid() ) {
484 MIL << "Request has NO usable URLs" << std::endl;
485 if ( item->owner() )
486 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
487 i = queue.erase(i);
488 continue;
489 }
490
491 for ( auto &[ queueName, workerQueue ] : _workerQueues ) {
492
493 if ( ProvideQueue::Config::CPUBound != workerQueue->workerConfig().worker_type() )
494 continue;
495
496 const bool thisScheme = zypp::str::startsWith( queueName, scheme );
497
498 existingTypeWorkers ++;
499 if ( thisScheme ) {
500 existingSchemeWorkers++;
501 if ( workerQueue->canScheduleMore() )
502 possibleWorkers.push_back(workerQueue.get());
503 }
504
505 if ( workerQueue->isIdle() )
506 idleWorkers.push_back(queueName);
507 }
508
509 if( provideDebugEnabled() ) {
510 MIL << "Current stats: " << std::endl;
511 MIL << "Existing type workers: " << existingTypeWorkers << std::endl;
512 MIL << "Possible CPU workers: "<< possibleWorkers.size() << std::endl;
513 }
514
515 // first we use existing idle workers of the current type
516 if ( possibleWorkers.size() ) {
517 bool found = false;
518 for ( auto &w : possibleWorkers ) {
519 if ( w->isIdle() ) {
520 MIL_PRV << "Using existing idle worker to provide request" << std::endl;
521 // this is not really required because we are not doing redirect checks
522 item->owner()->redirectTo ( item, url );
523 item->setActiveUrl( url );
524 w->enqueue( item );
525 i = queue.erase(i);
526 found = true;
527 break;
528 }
529 }
530 if ( found )
531 continue;
532 }
533
534 // we first start as many workers as we need before queueing more request to existing ones
535 if ( existingTypeWorkers < cpuLimit ) {
536
537 MIL_PRV << "Free CPU slots, starting a new worker" << std::endl;
538
539 // this is not really required because we are not doing redirect checks
540 item->owner()->redirectTo ( item, url );
541
542 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
543 if ( q->startup( scheme, _workDir / scheme ) ) {
544
545 item->setActiveUrl(url);
546
547 auto str = zypp::str::Format("%1%#%2%") % scheme % existingSchemeWorkers;
548 _workerQueues[str] = q;
549 q->enqueue( item );
550 i = queue.erase(i);
551 continue;
552 } else {
553 // CPU bound requests can not recover from this error
554 i = queue.erase(i);
555 if ( item->owner() )
556 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
557 continue;
558 }
559 }
560
561 // we can not start more workers, all we can do now is fill up queues of existing ones
562 if ( possibleWorkers.size() ) {
563 MIL_PRV << "No free CPU slots, looking for the best existing worker" << std::endl;
564
565 if( possibleWorkers.size () ) {
566 std::vector<ProvideQueue *>::iterator candidate = possibleWorkers.begin();
567 for ( auto i = candidate+1; i != possibleWorkers.end(); i++ ) {
568 if ( (*i)->activeRequests () < (*candidate)->activeRequests () )
569 candidate = i;
570 }
571
572 // this is not really required because we are not doing redirect checks
573 item->owner()->redirectTo ( item, url );
574
575 MIL_PRV << "Using existing worker to provide request" << std::endl;
576 item->setActiveUrl( url );
577 (*candidate)->enqueue( item );
578 i = queue.erase(i);
579 continue;
580 }
581 }
582
583 // if we reach this place all we can now try is to decomission idle queues and use the new slot to start
584 // a new worker
585 if ( idleWorkers.size() ) {
586
587 MIL_PRV << "No free CPU slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
588
589 auto candidate = findLaziestWorker( _workerQueues, idleWorkers );
590 if ( candidate != _workerQueues.end() ) {
591
592 _workerQueues.erase(candidate);
593
594 // this is not really required because we are not doing redirect checks
595 item->owner()->redirectTo ( item, url );
596
597 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
598 if ( q->startup( scheme, _workDir / scheme ) ) {
599
600 MIL_PRV << "Replaced worker, enqueing request" << std::endl;
601
602 item->setActiveUrl(url);
603
604 auto str = zypp::str::Format("%1%#%2%") % scheme % ( existingSchemeWorkers + 1 );
605 _workerQueues[str] = q;
606 q->enqueue( item );
607 i = queue.erase(i);
608 continue;
609 } else {
610 // CPU bound requests can not recover from this error
611 i = queue.erase(i);
612 if ( item->owner() )
613 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
614 continue;
615 }
616 }
617 } else {
618 MIL_PRV << "No idle workers and no free CPU spots, wait for the next schedule run" << std::endl;
619 break;
620 }
621
622 // if we reach here we skip over the item and try to schedule it again later
623 MIL_PRV << "End of line, deferring request for next try." << std::endl;
624 i++;
625 }
626
627 } else {
628 // either SingleInstance worker or Mounting/VolatileMounting
629
630 for( auto i = queue.begin (); i != queue.end(); ) {
631
632 // this is the only place where we remove elements from the queue when the scheduling flag is active
633 // other code just nulls out requests in the queue if during scheduling items need to be removed
634 while ( i != queue.end() && !(*i) ) {
635 i = queue.erase(i);
636 }
637
638 if ( i == queue.end() )
639 break;
640
641 // make a real reference so it does not dissapear when we remove it from the queue
642 ProvideRequestRef item = *i;
643 MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
644
645 zypp::Url url;
646
647 //mounting queues do not spawn per mirrors, we use the first compatible URL
648 for ( const auto &tmpurl : item->urls() ) {
649 if ( effectiveScheme( tmpurl.getScheme() ) != scheme ) {
650 MIL << "Mirror URL " << tmpurl << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
651 continue;
652 }
653 url = tmpurl;
654 break;
655 }
656
657 // at this point if the URL is empty the request needs to fail
658 if( !url.isValid() ) {
659 MIL << "Request has NO usable URLs" << std::endl;
660 if ( item->owner() )
661 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
662 i = queue.erase(i);
663 continue;
664 }
665
666
667 ProvideQueue *qToUse = nullptr;
668 if ( !_workerQueues.count(scheme) ) {
669 ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
670 if ( !q->startup( scheme, _workDir / scheme ) ) {
671 ERR << "Worker startup failed!" << std::endl;
672 // mounting/single instance requests can not recover from this error
673 i = queue.erase(i);
674
675 if ( item->owner() )
676 item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
677 continue;
678 }
679
680 MIL_PRV << "Started worker, enqueing request" << std::endl;
681 qToUse = q.get();
682 _workerQueues[scheme] = q;
683 } else {
684 MIL_PRV << "Found worker, enqueing request" << std::endl;
685 qToUse = _workerQueues.at(scheme).get();
686 }
687
688 // this is not really required because we are not doing redirect checks
689 item->owner()->redirectTo ( item, url );
690
691 item->setActiveUrl(url);
692 qToUse->enqueue( item );
693 i = queue.erase(i);
694 }
695 }
696 }
697 }
698
699 std::list<ProvideItemRef> &ProvidePrivate::items()
700 {
701 return _items;
702 }
703
708
709 std::vector<zypp::Url> ProvidePrivate::sanitizeUrls(const std::vector<zypp::Url> &urls)
710 {
711 std::vector<zypp::Url> usableMirrs;
712 std::optional<ProvideQueue::Config> scheme;
713
714 for ( auto mirrIt = urls.begin() ; mirrIt != urls.end(); mirrIt++ ) {
715 const auto &s = schemeConfig( effectiveScheme( mirrIt->getScheme() ) );
716 if ( !s ) {
717 WAR << "URL: " << *mirrIt << " is not supported, ignoring!" << std::endl;
718 continue;
719 }
720 if ( !scheme ) {
721 scheme = *s;
722 usableMirrs.push_back ( *mirrIt );
723 } else {
724 if ( scheme->worker_type () == s->worker_type () ) {
725 usableMirrs.push_back( *mirrIt );
726 } else {
727 WAR << "URL: " << *mirrIt << " has different worker type than the primary URL: "<< usableMirrs.front() <<", ignoring!" << std::endl;
728 }
729 }
730 }
731
732 if ( !scheme || usableMirrs.empty() ) {
733 return {};
734 }
735
736 return usableMirrs;
737 }
738
739 std::vector<AttachedMediaInfo_Ptr> &ProvidePrivate::attachedMediaInfos()
740 {
741 return _attachedMediaInfos;
742 }
743
745 {
746 if ( auto i = _schemeConfigs.find( scheme ); i != _schemeConfigs.end() ) {
748 } else {
749 // we do not have the queue config yet, we need to start a worker to get one
750 ProvideQueue q( *this );
751 if ( !q.startup( scheme, _workDir / scheme ) ) {
752 return expected<ProvideQueue::Config>::error(ZYPP_EXCPT_PTR(zypp::media::MediaException("Failed to start worker to read scheme config.")));
753 }
754 auto newItem = _schemeConfigs.insert( std::make_pair( scheme, q.workerConfig() ));
755 return expected<ProvideQueue::Config>::success(newItem.first->second);
756 }
757 }
758
759 std::optional<zypp::ManagedFile> ProvidePrivate::addToFileCache( const zypp::filesystem::Pathname &downloadedFile )
760 {
761 const auto &key = downloadedFile.asString();
762
763 if ( !zypp::PathInfo(downloadedFile).isExist() ) {
764 _fileCache.erase ( key );
765 return {};
766 }
767
768 auto i = _fileCache.insert( { key, FileCacheItem() } );
769 if ( !i.second ) {
770 // file did already exist in the cache, return the shared data
771 i.first->second._deathTimer.reset();
772 return i.first->second._file;
773 }
774
775 i.first->second._file = zypp::ManagedFile( downloadedFile, zypp::filesystem::unlink );
776 return i.first->second._file;
777 }
778
779 bool ProvidePrivate::isInCache ( const zypp::Pathname &downloadedFile ) const
780 {
781 const auto &key = downloadedFile.asString();
782 return (_fileCache.count(key) > 0);
783 }
784
785 void ProvidePrivate::queueItem ( ProvideItemRef item )
786 {
787 _items.push_back( item );
789 }
790
792 {
793 auto elem = std::find_if( _items.begin(), _items.end(), [item]( const auto &i){ return i.get() == item; } );
794 if ( elem != _items.end() ) {
795 if ( _isScheduling ) {
796 (*elem).reset();
797 } else {
798 _items.erase(elem);
799 }
800 }
801 }
802
803 std::string ProvidePrivate::nextMediaId() const
804 {
805 zypp::AutoDispose rawStr( g_uuid_string_random (), g_free );
806 return zypp::str::asString ( rawStr.value() );
807 }
808
809 AttachedMediaInfo_Ptr ProvidePrivate::addMedium( AttachedMediaInfo_Ptr &&medium )
810 {
811 assert( medium );
812 if ( !medium )
813 return nullptr;
814
815 MIL_PRV << "Registered new media attachment with ID: " << medium->name() << " with mountPoint: (" << medium->_localMountPoint.value_or(zypp::Pathname()) << ")" << std::endl;
816 _attachedMediaInfos.push_back( std::move(medium) );
817
818 return _attachedMediaInfos.back();
819 }
820
821 bool ProvidePrivate::queueRequest ( ProvideRequestRef req )
822 {
823 const auto &schemeName = effectiveScheme( req->url().getScheme() );
824 auto existingQ = std::find_if( _queues.begin (), _queues.end(), [&schemeName]( const auto &qItem) {
825 return (qItem._schemeName == schemeName);
826 });
827 if ( existingQ != _queues.end() ) {
828 existingQ->_requests.push_back(req);
829 } else {
830 _queues.push_back( ProvidePrivate::QueueItem{ schemeName, {req} } );
831 }
832
834 return true;
835 }
836
837 bool ProvidePrivate::dequeueRequest(ProvideRequestRef req , std::exception_ptr error)
838 {
839 auto queue = req->currentQueue ();
840 if ( queue ) {
841 queue->cancel( req.get(), error );
842 return true;
843 } else {
844 // Request not started yet, search request queues
845 for ( auto &q : _queues ) {
846 auto elem = std::find( q._requests.begin(), q._requests.end(), req );
847 if ( elem != q._requests.end() ) {
848 q._requests.erase(elem);
849
850 if ( req->owner() )
851 req->owner()->finishReq( nullptr, req, error );
852 return true;
853 }
854 }
855 }
856 return false;
857 }
858
860 {
861 return _workerPath;
862 }
863
864 const std::string ProvidePrivate::queueName( ProvideQueue &q ) const
865 {
866 for ( const auto &v : _workerQueues ) {
867 if ( v.second.get() == &q )
868 return v.first;
869 }
870 return {};
871 }
872
874 {
875 return _isRunning;
876 }
877
878 std::string ProvidePrivate::effectiveScheme(const std::string &scheme) const
879 {
880 const std::string &ss = zypp::str::stripSuffix( scheme, constants::ATTACHED_MEDIA_SUFFIX );
881 if ( auto it = _workerAlias.find ( ss ); it != _workerAlias.end () ) {
882 return it->second;
883 }
884 return ss;
885 }
886
888 {
889 DBG_PRV << "Pulse timeout" << std::endl;
890
891 auto now = std::chrono::steady_clock::now();
892
893 if ( _log ) _log->pulse();
894
895 // release old cache files
896 for ( auto i = _fileCache.begin (); i != _fileCache.end(); ) {
897 auto &cacheItem = i->second;
898 if ( cacheItem._file.unique() ) {
899 if ( cacheItem._deathTimer ) {
900 if ( now - *cacheItem._deathTimer < std::chrono::seconds(20) ) {
901 MIL << "Releasing file " << *i->second._file << " from cache, death timeout." << std::endl;
902 i = _fileCache.erase(i);
903 continue;
904 }
905 } else {
906 // start the death timeout
907 cacheItem._deathTimer = std::chrono::steady_clock::now();
908 }
909 }
910
911 ++i;
912 }
913 }
914
916 {
917 if ( !_items.empty() )
918 return;
919 for ( auto &[k,q] : _workerQueues ) {
920 if ( !q->empty() )
921 return;
922 }
923
924 // all queues are empty
925 _sigIdle.emit();
926 }
927
929 {
930 if ( item.state() == ProvideItem::Finished ) {
931 auto itemRef = item.shared_this<ProvideItem>();
932 auto i = std::find( _items.begin(), _items.end(), itemRef );
933 if ( i == _items.end() ) {
934 ERR << "State of unknown Item changed, ignoring" << std::endl;
935 return;
936 }
937 if ( _isScheduling )
938 i->reset();
939 else
940 _items.erase(i);
941 }
942 if ( _items.empty() )
943 onQueueIdle();
944 }
945
947 {
948 //@TODO is it required to handle overflow?
949 return ++_nextRequestId;
950 }
951
952 ProvideMediaHandle::ProvideMediaHandle(Provide &parent, AttachedMediaInfo_Ptr mediaInfoRef )
953 : _parent( parent.weak_this<Provide>() )
954 , _mediaRef( std::move(mediaInfoRef) )
955 {}
956
957 std::shared_ptr<Provide> ProvideMediaHandle::parent() const
958 {
959 return _parent.lock();
960 }
961
963 {
964 return ( _mediaRef.get() != nullptr );
965 }
966
967 std::string ProvideMediaHandle::handle() const
968 {
969 if ( !_mediaRef )
970 return {};
971 return _mediaRef->_name;
972 }
973
975 {
976 static zypp::Url invalidHandle;
977 if ( !_mediaRef )
978 return invalidHandle;
979 return _mediaRef->_attachedUrl;
980 }
981
982 const std::optional<zypp::Pathname> &ProvideMediaHandle::localPath() const
983 {
984 static std::optional<zypp::Pathname> invalidHandle;
985 if ( !_mediaRef )
986 return invalidHandle;
987 return _mediaRef->_localMountPoint;
988 }
989
990 AttachedMediaInfo_constPtr ProvideMediaHandle::mediaInfo() const
991 {
992 return _mediaRef;
993 }
994
995
996 Provide::Provide( const zypp::Pathname &workDir ) : Base( *new ProvidePrivate( zypp::Pathname(workDir), *this ) )
997 {
998 Z_D();
1000 }
1001
1002 ProvideRef Provide::create( const zypp::filesystem::Pathname &workDir )
1003 {
1004 return ProvideRef( new Provide(workDir) );
1005 }
1006
1007 expected<Provide::LazyMediaHandle> Provide::prepareMedia(const std::vector<zypp::Url> &urls, const ProvideMediaSpec &request)
1008 {
1009 Z_D();
1010 // sanitize the mirrors to contain only URLs that have same worker types
1011 std::vector<zypp::Url> usableMirrs = d->sanitizeUrls( urls );
1012 if ( usableMirrs.empty() ) {
1014 }
1015 return expected<Provide::LazyMediaHandle>::success( shared_this<Provide>(), std::move(usableMirrs), request );
1016 }
1017
1019 {
1020 return prepareMedia( std::vector<zypp::Url>{url}, request );
1021 }
1022
1024 {
1025 using namespace zyppng::operators;
1026 if ( lazyHandle.attached() )
1027 return makeReadyResult( expected<MediaHandle>::success( *lazyHandle.handle() ) );
1028
1029 MIL << "Attaching lazy medium with label: [" << lazyHandle.spec().label() << "]" << std::endl;
1030
1031 return attachMedia( lazyHandle.urls(), lazyHandle.spec () )
1032 | and_then([lazyHandle]( MediaHandle handle ) {
1033 lazyHandle._sharedData->_mediaHandle = handle;
1034 return expected<MediaHandle>::success( std::move(handle) );
1035 });
1036 }
1037
1039 {
1040 return attachMedia ( std::vector<zypp::Url>{url}, request );
1041 }
1042
1043 AsyncOpRef<expected<Provide::MediaHandle>> Provide::attachMedia( const std::vector<zypp::Url> &urls, const ProvideMediaSpec &request )
1044 {
1045 Z_D();
1046
1047 // sanitize the mirrors to contain only URLs that have same worker types
1048 std::vector<zypp::Url> usableMirrs = d->sanitizeUrls( urls );
1049 if ( usableMirrs.empty() ) {
1051 }
1052
1053 // first check if there is a already attached medium we can use as well
1054 auto &attachedMedia = d->attachedMediaInfos ();
1055 for ( auto &medium : attachedMedia ) {
1056 if ( medium->isSameMedium ( usableMirrs, request ) ) {
1058 }
1059 }
1060
1061 auto op = AttachMediaItem::create( usableMirrs, request, *d_func() );
1062 d->queueItem (op);
1063 return op->promise();
1064 }
1065
1066 AsyncOpRef< expected<ProvideRes> > Provide::provide( const std::vector<zypp::Url> &urls, const ProvideFileSpec &request )
1067 {
1068 Z_D();
1069 auto op = ProvideFileItem::create( urls, request, *d );
1070 d->queueItem (op);
1071 return op->promise();
1072 }
1073
1075 {
1076 return provide( std::vector<zypp::Url>{ url }, request );
1077 }
1078
1079 AsyncOpRef< expected<ProvideRes> > Provide::provide( const MediaHandle &attachHandle, const zypp::Pathname &fileName, const ProvideFileSpec &request )
1080 {
1081 Z_D();
1082 const auto i = std::find( d->_attachedMediaInfos.begin(), d->_attachedMediaInfos.end(), attachHandle.mediaInfo() );
1083 if ( i == d->_attachedMediaInfos.end() ) {
1085 }
1086
1087 // for downloading items we need to make the baseUrl part of the request URL
1088 zypp::Url url = (*i)->_attachedUrl;
1089
1090 // real mount devices use a ID to reference a attached medium, for those we do not need to send the baseUrl as well since its already
1091 // part of the mount point, so if we mount host:/path/to/repo to the ID 1234 and look for the file /path/to/repo/file1 the request URL will look like: nfs-media://1234/file1
1092 if ( (*i)->_workerType == ProvideQueue::Config::SimpleMount || (*i)->_workerType == ProvideQueue::Config::VolatileMount ) {
1093 url = zypp::Url();
1094 // work around the zypp::Url requirements for certain Url schemes by attaching a suffix, that way we are always able to have a authority
1095 url.setScheme( (*i)->_attachedUrl.getScheme() + std::string(constants::ATTACHED_MEDIA_SUFFIX) );
1096 url.setAuthority( (*i)->_name );
1097 url.setPathName("/");
1098 }
1099
1100 url.appendPathName( fileName );
1101 auto op = ProvideFileItem::create( {url}, request, *d );
1102 op->setMediaRef( MediaHandle( *this, (*i) ));
1103 d->queueItem (op);
1104
1105 return op->promise();
1106 }
1107
1109 {
1110 using namespace zyppng::operators;
1111 return attachMediaIfNeeded ( attachHandle )
1112 | and_then([weakMe = weak_this<Provide>(), fName = fileName, req = request ]( MediaHandle handle ){
1113 auto me = weakMe.lock();
1114 if ( !me )
1115 return makeReadyResult(expected<ProvideRes>::error(ZYPP_EXCPT_PTR(zypp::Exception("Provide was released during a operation"))));
1116 return me->provide( handle, fName, req);
1117 });
1118 }
1119
1121 {
1122 using namespace zyppng::operators;
1123
1124 zypp::Url url("chksum:///");
1125 url.setPathName( p );
1126 auto fut = provide( url, zyppng::ProvideFileSpec().setCustomHeaderValue( "chksumType", algorithm ) )
1127 | and_then( [algorithm]( zyppng::ProvideRes &&chksumRes ) {
1128 if ( chksumRes.headers().contains(algorithm) ) {
1129 try {
1130 return expected<zypp::CheckSum>::success( zypp::CheckSum( algorithm, chksumRes.headers().value(algorithm).asString() ) );
1131 } catch ( ... ) {
1133 }
1134 }
1135 return expected<zypp::CheckSum>::error( ZYPP_EXCPT_PTR( zypp::FileCheckException("Invalid/Empty checksum returned from worker") ) );
1136 } );
1137 return fut;
1138 }
1139
1141 {
1142 using namespace zyppng::operators;
1143
1144 zypp::Url url("copy:///");
1145 url.setPathName( source );
1146 auto fut = provide( url, ProvideFileSpec().setDestFilenameHint( target ))
1147 | and_then( [&]( ProvideRes &&copyRes ) {
1148 return expected<zypp::ManagedFile>::success( copyRes.asManagedFile() );
1149 } );
1150 return fut;
1151 }
1152
1154 {
1155 using namespace zyppng::operators;
1156
1157 auto fName = source.file();
1158 return copyFile( fName, target )
1159 | [ resSave = std::move(source) ] ( auto &&result ) {
1160 // callback lambda to keep the ProvideRes reference around until the op is finished,
1161 // if the op fails the callback will be cleaned up and so the reference
1162 return result;
1163 };
1164 }
1165
1167 {
1168 Z_D();
1169 d->_isRunning = true;
1170 d->_pulseTimer->start( 5000 );
1171 d->schedule( ProvidePrivate::ProvideStart );
1172 if ( d->_log ) d->_log->provideStart();
1173 }
1174
1176 {
1177 d_func()->_workerPath = path;
1178 }
1179
1180 bool Provide::ejectDevice(const std::string &queueRef, const std::string &device)
1181 {
1182 if ( !queueRef.empty() ) {
1183 return zypp::media::CDTools::openTray(device);
1184 }
1185 return false;
1186 }
1187
1188 void Provide::setStatusTracker( ProvideStatusRef tracker )
1189 {
1190 d_func()->_log = tracker;
1191 }
1192
1194 {
1195 return d_func()->_workDir;
1196 }
1197
1199 {
1200 Z_D();
1201 return d->_credManagerOptions;
1202 }
1203
1205 {
1206 d_func()->_credManagerOptions = opt;
1207 }
1208
1210 {
1211 return d_func()->_sigIdle;
1212 }
1213
1214 SignalProxy<Provide::MediaChangeAction ( const std::string &queueRef, const std::string &, const int32_t, const std::vector<std::string> &, const std::optional<std::string> &)> Provide::sigMediaChangeRequested()
1215 {
1216 return d_func()->_sigMediaChange;
1217 }
1218
1219 SignalProxy< std::optional<zypp::media::AuthData> ( const zypp::Url &reqUrl, const std::string &triedUsername, const std::map<std::string, std::string> &extraValues ) > Provide::sigAuthRequired()
1220 {
1221 return d_func()->_sigAuthRequired;
1222 }
1223
1225
1226 ProvideStatus::ProvideStatus( ProvideRef parent )
1227 : _provider( parent )
1228 { }
1229
1231 {
1232 _stats = Stats();
1233 _stats._startTime = std::chrono::steady_clock::now();
1234 _stats._lastPulseTime = std::chrono::steady_clock::now();
1235 }
1236
1238 {
1239 const auto &sTime = item.startTime();
1240 const auto &fTime = item.finishedTime();
1241 if ( sTime > sTime.min() && fTime >= sTime ) {
1242 auto duration = std::chrono::duration_cast<std::chrono::seconds>( item.finishedTime() - item.startTime() );
1243 if ( duration.count() )
1244 MIL << "Item finished after " << duration.count() << " seconds, with " << zypp::ByteCount( item.currentStats()->_bytesProvided.operator zypp::ByteCount::SizeType() / duration.count() ) << "/s" << std::endl;
1245 MIL << "Item finished after " << (item.finishedTime() - item.startTime()).count() << " ns" << std::endl;
1246 }
1247 pulse( );
1248 }
1249
1251 {
1252 MIL << "Item failed" << std::endl;
1253 }
1254
1256 {
1257 return _stats;
1258 }
1259
1261 {
1262 auto prov = _provider.lock();
1263 if ( !prov )
1264 return;
1265
1266 const auto lastFinishedBytes = _stats._finishedBytes;
1267 const auto lastPartialBytes = _stats._partialBytes;
1268 _stats._expectedBytes = _stats._finishedBytes; // finished bytes are expected too!
1269 zypp::ByteCount tmpPartialBytes (0); // bytes that are finished in staging, but not commited to cache yet
1270
1271 for ( const auto &i : prov->d_func()->items() ) {
1272
1273 if ( !i // maybe released during scheduling
1274 || i->state() == ProvideItem::Cancelling )
1275 continue;
1276
1277 if ( i->state() == ProvideItem::Uninitialized
1278 || i->state() == ProvideItem::Pending ) {
1279 _stats._expectedBytes += i->bytesExpected();
1280 continue;
1281 }
1282
1283 i->pulse();
1284
1285 const auto & stats = i->currentStats();
1286 const auto & prevStats = i->previousStats();
1287 if ( !stats || !prevStats ) {
1288 ERR << "Bug! Stats should be initialized by now" << std::endl;
1289 continue;
1290 }
1291
1292 if ( i->state() == ProvideItem::Downloading
1293 || i->state() == ProvideItem::Processing
1294 || i->state() == ProvideItem::Finalizing ) {
1295 _stats._expectedBytes += stats->_bytesExpected;
1296 tmpPartialBytes += stats->_bytesProvided;
1297 } else if ( i->state() == ProvideItem::Finished ) {
1298 _stats._finishedBytes += stats->_bytesProvided; // remember those bytes are finished in stats directly
1299 _stats._expectedBytes += stats->_bytesProvided;
1300 }
1301 }
1302
1303 const auto now = std::chrono::steady_clock::now();
1304 const auto sinceLast = std::chrono::duration_cast<std::chrono::seconds>( now - _stats._lastPulseTime );
1305 const auto lastFinB = lastPartialBytes + lastFinishedBytes;
1306 const auto currFinB = tmpPartialBytes + _stats._finishedBytes;
1307
1308 const auto diff = currFinB - lastFinB;
1309 _stats._lastPulseTime = now;
1310 _stats._partialBytes = tmpPartialBytes;
1311
1312 if ( sinceLast >= std::chrono::seconds(1) )
1313 _stats._perSecondSinceLastPulse = ( diff / ( sinceLast.count() ) );
1314
1315 auto sinceStart = std::chrono::duration_cast<std::chrono::seconds>( _stats._lastPulseTime - _stats._startTime );
1316 if ( sinceStart.count() ) {
1317 const size_t diff = _stats._finishedBytes + _stats._partialBytes;
1318 _stats._perSecond = zypp::ByteCount( diff / sinceStart.count() );
1319 }
1320 }
1321}
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition AutoDispose.h:95
reference value() const
Reference to the Tp object.
Store and operate with byte count.
Definition ByteCount.h:32
Unit::ValueType SizeType
Definition ByteCount.h:38
Assign a vaiable a certain value when going out of scope.
Definition dtorreset.h:50
Base class for Exception.
Definition Exception.h:147
Url manipulation class.
Definition Url.h:92
std::string getScheme() const
Returns the scheme name of the URL.
Definition Url.cc:537
void setAuthority(const std::string &authority)
Set the authority component in the URL.
Definition Url.cc:702
void setPathName(const std::string &path, EEncoding eflag=zypp::url::E_DECODED)
Set the path name.
Definition Url.cc:768
bool isValid() const
Verifies the Url.
Definition Url.cc:493
void appendPathName(const Pathname &path_r, EEncoding eflag_r=zypp::url::E_DECODED)
Extend the path name.
Definition Url.cc:790
void setScheme(const std::string &scheme)
Set the scheme name in the URL.
Definition Url.cc:672
Wrapper class for stat/lstat.
Definition PathInfo.h:222
const std::string & asString() const
String representation.
Definition Pathname.h:93
bool empty() const
Test for an empty path.
Definition Pathname.h:116
static bool openTray(const std::string &device_r)
Definition cdtools.cc:33
Just inherits Exception to separate media exceptions.
static AttachMediaItemRef create(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request, ProvidePrivate &parent)
std::shared_ptr< T > shared_this() const
Definition base.h:113
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition base.h:142
std::weak_ptr< T > weak_this() const
Definition base.h:123
const std::vector< zypp::Url > & urls() const
const ProvideMediaSpec & spec() const
std::optional< MediaHandle > handle() const
static ProvideFileItemRef create(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request, ProvidePrivate &parent)
virtual std::chrono::steady_clock::time_point startTime() const
virtual std::chrono::steady_clock::time_point finishedTime() const
State state() const
const std::optional< ItemStats > & currentStats() const
AttachedMediaInfo_Ptr _mediaRef
Definition provide.h:67
const zypp::Url & baseUrl() const
Definition provide.cc:974
zyppng::AttachedMediaInfo_constPtr mediaInfo() const
Definition provide.cc:990
ProvideWeakRef _parent
Definition provide.h:66
const std::optional< zypp::Pathname > & localPath() const
Definition provide.cc:982
std::string handle() const
Definition provide.cc:967
std::shared_ptr< Provide > parent() const
Definition provide.cc:957
const std::string & label() const
std::unordered_map< std::string, ProvideQueue::Config > _schemeConfigs
Definition provide_p.h:134
const std::string queueName(ProvideQueue &q) const
Definition provide.cc:864
void doSchedule(Timer &)
Definition provide.cc:70
void onItemStateChanged(ProvideItem &item)
Definition provide.cc:928
std::string effectiveScheme(const std::string &scheme) const
Definition provide.cc:878
std::list< ProvideItemRef > _items
Definition provide_p.h:121
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Definition provide.cc:759
bool dequeueRequest(ProvideRequestRef req, std::exception_ptr error)
Definition provide.cc:837
ProvideStatusRef _log
Definition provide_p.h:145
zypp::media::CredManagerOptions _credManagerOptions
Definition provide_p.h:143
ProvidePrivate(zypp::Pathname &&workDir, Provide &pub)
Definition provide.cc:21
std::string nextMediaId() const
Definition provide.cc:803
void queueItem(ProvideItemRef item)
Definition provide.cc:785
zypp::media::CredManagerOptions & credManagerOptions()
Definition provide.cc:704
Timer::Ptr _scheduleTrigger
Definition provide_p.h:118
zypp::Pathname _workerPath
Definition provide_p.h:142
bool isRunning() const
Definition provide.cc:873
bool queueRequest(ProvideRequestRef req)
Definition provide.cc:821
std::unordered_map< std::string, ProvideQueueRef > _workerQueues
Definition provide_p.h:133
std::vector< AttachedMediaInfo_Ptr > _attachedMediaInfos
Definition provide_p.h:131
std::unordered_map< std::string, FileCacheItem > _fileCache
Definition provide_p.h:140
std::vector< zypp::Url > sanitizeUrls(const std::vector< zypp::Url > &urls)
Definition provide.cc:709
void onPulseTimeout(Timer &)
Definition provide.cc:887
bool isInCache(const zypp::Pathname &downloadedFile) const
Definition provide.cc:779
std::vector< AttachedMediaInfo_Ptr > & attachedMediaInfos()
Definition provide.cc:739
std::deque< QueueItem > _queues
Definition provide_p.h:128
zypp::Pathname _workDir
Definition provide_p.h:119
void schedule(ScheduleReason reason)
Definition provide.cc:38
AttachedMediaInfo_Ptr addMedium(AttachedMediaInfo_Ptr &&medium)
Definition provide.cc:809
std::unordered_map< std::string, std::string > _workerAlias
Definition provide_p.h:103
void dequeueItem(ProvideItem *item)
Definition provide.cc:791
expected< ProvideQueue::Config > schemeConfig(const std::string &scheme)
Definition provide.cc:744
uint32_t nextRequestId()
Definition provide.cc:946
const zypp::Pathname & workerPath() const
Definition provide.cc:859
std::list< ProvideItemRef > & items()
Definition provide.cc:699
Signal< void()> _sigIdle
Definition provide_p.h:146
std::chrono::time_point< std::chrono::steady_clock > TimePoint
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
const Config & workerConfig() const
void enqueue(ProvideRequestRef request)
static expected< ProvideRequestRef > createDetach(const zypp::Url &url)
A ProvideRes object is a reference counted ownership of a resource in the cache provided by a Provide...
Definition provideres.h:36
virtual void provideStart()
Definition provide.cc:1230
const Stats & stats() const
Definition provide.cc:1255
virtual void pulse()
Definition provide.cc:1260
ProvideWeakRef _provider
Definition provide.h:108
ProvideStatus(ProvideRef parent)
Definition provide.cc:1226
virtual void itemFailed(ProvideItem &item)
Definition provide.cc:1250
virtual void itemDone(ProvideItem &item)
Definition provide.cc:1237
AsyncOpRef< expected< ProvideRes > > provide(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request)
Definition provide.cc:1066
const zypp::media::CredManagerOptions & credManangerOptions() const
Definition provide.cc:1198
SignalProxy< std::optional< zypp::media::AuthData >(const zypp::Url &reqUrl, const std::string &triedUsername, const std::map< std::string, std::string > &extraValues) > sigAuthRequired()
Definition provide.cc:1219
static ProvideRef create(const zypp::Pathname &workDir="")
Definition provide.cc:1002
AsyncOpRef< expected< MediaHandle > > attachMedia(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request)
Definition provide.cc:1043
void setWorkerPath(const zypp::Pathname &path)
Definition provide.cc:1175
SignalProxy< void()> sigIdle()
Definition provide.cc:1209
void setCredManagerOptions(const zypp::media::CredManagerOptions &opt)
Definition provide.cc:1204
Provide(const zypp::Pathname &workDir)
Definition provide.cc:996
AsyncOpRef< expected< zypp::CheckSum > > checksumForFile(const zypp::Pathname &p, const std::string &algorithm)
Definition provide.cc:1120
void setStatusTracker(ProvideStatusRef tracker)
Definition provide.cc:1188
AsyncOpRef< expected< MediaHandle > > attachMediaIfNeeded(LazyMediaHandle lazyHandle)
Definition provide.cc:1023
bool ejectDevice(const std::string &queueRef, const std::string &device)
Definition provide.cc:1180
expected< LazyMediaHandle > prepareMedia(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request)
Definition provide.cc:1007
AsyncOpRef< expected< zypp::ManagedFile > > copyFile(const zypp::Pathname &source, const zypp::Pathname &target)
Definition provide.cc:1140
SignalProxy< MediaChangeAction(const std::string &queueRef, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)> sigMediaChangeRequested()
Definition provide.cc:1214
std::optional< Action > MediaChangeAction
Definition provide.h:173
const zypp::Pathname & providerWorkdir() const
Definition provide.cc:1193
ProvideMediaHandle MediaHandle
Definition provide.h:120
The Timer class provides repetitive and single-shot timers.
Definition timer.h:45
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition timer.cc:120
static expected success(ConsParams &&...params)
Definition expected.h:115
Definition Arch.h:364
String related utilities and Regular expression matching.
int unlink(const Pathname &path)
Like 'unlink'.
Definition PathInfo.cc:705
const std::string & asString(const std::string &t)
Global asString() that works with std::string too.
Definition String.h:139
bool startsWith(const C_Str &str_r, const C_Str &prefix_r)
alias for hasPrefix
Definition String.h:1084
std::string stripSuffix(const C_Str &str_r, const C_Str &suffix_r)
Strip a suffix_r from str_r and return the resulting string.
Definition String.h:1047
Easy-to use interface to the ZYPP dependency resolver.
AutoDispose< const Pathname > ManagedFile
A Pathname plus associated cleanup code to be executed when path is no longer needed.
Definition ManagedFile.h:27
constexpr auto DEFAULT_ACTIVE_CONN_PER_HOST
Definition provide_p.h:34
constexpr auto DEFAULT_ACTIVE_CONN
Definition provide_p.h:35
constexpr auto DEFAULT_MAX_DYNAMIC_WORKERS
Definition provide_p.h:36
constexpr std::string_view ATTACHED_MEDIA_SUFFIX
Definition provide_p.h:33
std::conditional_t< isAsync, AsyncOpRef< T >, T > makeReadyResult(T &&result)
Definition asyncop.h:297
std::shared_ptr< AsyncOp< T > > AsyncOpRef
Definition asyncop.h:255
ResultType and_then(const expected< T, E > &exp, Function &&f)
Definition expected.h:423
bool provideDebugEnabled()
#define DBG_PRV
#define MIL_PRV
Convenient building of std::string with boost::format.
Definition String.h:253
zypp::ByteCount _partialBytes
Definition provide.h:85
zypp::ByteCount _perSecondSinceLastPulse
Definition provide.h:86
zypp::ByteCount _perSecond
Definition provide.h:87
zypp::ByteCount _expectedBytes
Definition provide.h:84
std::chrono::steady_clock::time_point _startTime
Definition provide.h:79
zypp::ByteCount _finishedBytes
Definition provide.h:83
std::chrono::steady_clock::time_point _lastPulseTime
Definition provide.h:80
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition Exception.h:428
#define ZYPP_FWD_CURRENT_EXCPT()
Drops a logline and returns the current Exception as a std::exception_ptr.
Definition Exception.h:436
#define DBG
Definition Logger.h:97
#define MIL
Definition Logger.h:98
#define ERR
Definition Logger.h:100
#define WAR
Definition Logger.h:99
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
Definition Logger.h:115
#define ZYPP_IMPL_PRIVATE(Class)
Definition zyppglobal.h:92
#define Z_D()
Definition zyppglobal.h:105