XRootD
Loading...
Searching...
No Matches
XrdSsiFileReq.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S s i F i l e R e q . c c */
4/* */
5/* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <cstdio>
31#include <cstring>
32#include <arpa/inet.h>
33#include <sys/types.h>
34
38#include "XrdSfs/XrdSfsDio.hh"
39#include "XrdSsi/XrdSsiAlert.hh"
45#include "XrdSsi/XrdSsiSfs.hh"
47#include "XrdSsi/XrdSsiStats.hh"
48#include "XrdSsi/XrdSsiTrace.hh"
49#include "XrdSsi/XrdSsiUtils.hh"
50#include "XrdSys/XrdSysError.hh"
51
52/******************************************************************************/
53/* L o c a l M a c r o s */
54/******************************************************************************/
55
56#define DEBUGXQ(x) DEBUG(rID<<sessN<<rspstID[urState]<<reqstID[myState]<<x)
57
58#define DUMPIT(x,y) XrdSsiUtils::b2x(x,y,hexBuff,sizeof(hexBuff),dotBuff)<<dotBuff
59
60/******************************************************************************/
61/* G l o b a l s */
62/******************************************************************************/
63
64namespace XrdSsi
65{
67extern XrdScheduler *Sched;
70};
71
72using namespace XrdSsi;
73
74/******************************************************************************/
75/* S t a t i c L o c a l s */
76/******************************************************************************/
77
78namespace
79{
80const char *rspstID[XrdSsiFileReq::isMax] =
81 {" [new", " [begun", " [bound",
82 " [abort", " [done"
83 };
84
85const char *reqstID[XrdSsiFileReq::rsEnd] =
86 {" wtReq] ", " xqReq] ", " wtRsp] ",
87 " doRsp] ", " odRsp] ", " erRsp] "
88 };
89};
90
91/******************************************************************************/
92/* S t a t i c M e m b e r s */
93/******************************************************************************/
94
95XrdSysMutex XrdSsiFileReq::aqMutex;
96XrdSsiFileReq *XrdSsiFileReq::freeReq = 0;
97int XrdSsiFileReq::freeCnt = 0;
98int XrdSsiFileReq::freeMax = 256;
99
100/******************************************************************************/
101/* A c t i v a t e */
102/******************************************************************************/
103
105{
106 EPNAME("Activate");
107
108// Do some debugging
109//
110 DEBUGXQ((oP ? "oucbuff" : "sfsbuff") <<" rqsz=" <<rSz);
111
112// Do statistics
113//
115 Stats.ReqCount++;
116 Stats.ReqBytes += rSz;
117 if (rSz > Stats.ReqMaxsz) Stats.ReqMaxsz = rSz;
119
120// Set request buffer pointers
121//
122 oucBuff = oP;
123 sfsBref = bR;
124 reqSize = rSz;
125
126// Now schedule ourselves to process this request. The state is new.
127//
128 Sched->Schedule((XrdJob *)this);
129}
130
131/******************************************************************************/
132/* A l e r t */
133/******************************************************************************/
134
136{
137 EPNAME("Alert");
138 XrdSsiAlert *aP;
139 int msgLen;
140
141// Do some debugging
142//
143 aMsg.GetMsg(msgLen);
144 DEBUGXQ(msgLen <<" byte alert presented wtr=" <<respWait);
145
146// Add up statistics
147//
149
150// Lock this object
151//
152 frqMutex.Lock();
153
154// Validate the length and whether this call is allowed
155//
156 if (msgLen <= 0 || haveResp || isEnding)
157 {frqMutex.UnLock();
158 aMsg.RecycleMsg();
159 return;
160 }
161
162// Allocate an alert object and chain it into the pending queue
163//
164 aP = XrdSsiAlert::Alloc(aMsg);
165
166// Alerts must be sent in the orer they are presented. So, check if we need
167// to chain this and try to send the first in the chain. This only really
168// matters if we can send the alert now because the client is waiting.
169//
170 if (respWait)
171 {if (alrtPend)
172 {alrtLast->next = aP;
173 alrtLast = aP;
174 aP = alrtPend;
175 alrtPend = alrtPend->next;
176 }
177 WakeUp(aP);
178 } else {
179 if (alrtLast) alrtLast->next = aP;
180 else alrtPend = aP;
181 alrtLast = aP;
182 }
183
184// All done
185//
186 frqMutex.UnLock();
187}
188
189/******************************************************************************/
190/* A l l o c */
191/******************************************************************************/
192
195 XrdSsiFileSess *fP,
196 const char *sID,
197 const char *cID,
198 unsigned int rnum)
199{
200 XrdSsiFileReq *nP;
201
202// Check if we can grab this from out queue
203//
204 aqMutex.Lock();
205 if ((nP = freeReq))
206 {freeCnt--;
207 freeReq = nP->nextReq;
208 aqMutex.UnLock();
209 nP->Init(cID);
210 } else {
211 aqMutex.UnLock();
212 nP = new XrdSsiFileReq(cID);
213 }
214
215// Initialize for processing
216//
217 if (nP)
218 {nP->sessN = sID;
219 nP->fileR = rP;
220 nP->fileP = fP;
221 nP->cbInfo = eiP;
222 nP->reqID = rnum;
223 snprintf(nP->rID, sizeof(nP->rID), "%u:", rnum);
224 }
225
226// Return the pointer
227//
228 return nP;
229}
230
231/******************************************************************************/
232/* Private: B i n d D o n e */
233/******************************************************************************/
234
235// This is called with frqMutex locked!
236
237void XrdSsiFileReq::BindDone()
238{
239 EPNAME("BindDone");
240
241// Do some debugging
242//
243 DEBUGXQ("Bind called; for request " <<reqID);
244
245// Collect statistics
246//
248
249// Processing depends on the current state. Only listed states are valid.
250// When the state is done, a finished event occuured between the time the
251// request was handed off to the service but before the service bound it.
252//
253 switch(urState)
254 {case isBegun: urState = isBound;
255 case isBound: return;
256 break;
257 case isDone: if (!schedDone)
258 {schedDone = true;
259 Sched->Schedule((XrdJob *)this);
260 }
261 return;
262 break;
263 default: break;
264 }
265
266// If we get here then we have an invalid state. Report it but otherwise we
267// can't really do anything else. This means some memory may be lost.
268//
269 Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
270}
271
272/******************************************************************************/
273/* D i s p o s e */
274/******************************************************************************/
275
276void XrdSsiFileReq::Dispose()
277{
278 EPNAME("Dispose");
279
280// Do some debugging
281//
282 DEBUGXQ("Recycling request...");
283
284// Collect statistics
285//
287
288// Simply recycle the object
289//
290 Recycle();
291}
292
293/******************************************************************************/
294/* D o I t */
295/******************************************************************************/
296
298{
299 EPNAME("DoIt");
300 bool cancel;
301
302// Processing is determined by the responder's state. Only listed states are
303// valid. Others should never occur in this context.
304//
305 frqMutex.Lock();
306 switch(urState)
307 {case isNew: myState = xqReq; urState = isBegun;
308 DEBUGXQ("Calling service processor");
309 frqMutex.UnLock();
312 (XrdSsiFileResource &)*fileR);
313 return;
314 break;
315 case isAbort: DEBUGXQ("Skipped calling service processor");
316 frqMutex.UnLock();
318 Recycle();
319 return;
320 break;
321 case isDone: cancel = (myState != odRsp);
322 DEBUGXQ("Calling Finished(" <<cancel <<')');
323 if (respWait) WakeUp();
324 if (finWait) finWait->Post();
325 frqMutex.UnLock();
327 if (cancel) Stats.Bump(Stats.ReqCancels);
328 Finished(cancel); // This object may be deleted!
329 return;
330 break;
331 default: break;
332 }
333
334// If we get here then we have an invalid state. Report it but otherwise we
335// can't really do anything else. This means some memory may be lost.
336//
337 frqMutex.UnLock();
338 Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
339}
340
341/******************************************************************************/
342/* D o n e */
343/******************************************************************************/
344
345// Gets invoked only after query() waitresp signal was sent
346
347void XrdSsiFileReq::Done(int &retc, XrdOucErrInfo *eiP, const char *name)
348{
349 EPNAME("Done");
350 XrdSsiMutexMon mHelper(frqMutex);
351
352// We may need to delete the errinfo object if this callback was async. Note
353// that the following test is valid even if the file object has been deleted.
354//
355 if (eiP != fileP->errInfo()) delete eiP;
356
357// Check if we should finalize this request. This will be the case if the
358// complete response was sent.
359//
360 if (myState == odRsp)
361 {DEBUGXQ("resp sent; no additional data remains");
362 Finalize();
363 return;
364 }
365
366// Do some debugging
367//
368 DEBUGXQ("wtrsp sent; resp " <<(haveResp ? "here" : "pend"));
369
370// We are invoked when sync() waitresp has been sent, check if a response was
371// posted while this was going on. If so, make sure to send a wakeup. Note
372// that the respWait flag is at this moment false as this is called in the
373// sync response path for fctl() and the response may have been posted.
374//
375 if (!haveResp) respWait = true;
376 else WakeUp();
377}
378
379/******************************************************************************/
380/* Private: E m s g */
381/******************************************************************************/
382
383int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value
384 int ecode, // The error code
385 const char *op) // Operation being performed
386{
387 char buffer[2048];
388
389// Count errors
390//
392
393// Get correct error code
394//
395 if (ecode < 0) ecode = -ecode;
396
397// Format the error message
398//
399 XrdOucERoute::Format(buffer, sizeof(buffer), ecode, op, sessN);
400
401// Put the message in the log
402//
403 Log.Emsg(pfx, tident, buffer);
404
405// Place the error message in the error object and return
406//
407 if (cbInfo) cbInfo->setErrInfo(ecode, buffer);
408 return SFS_ERROR;
409}
410
411/******************************************************************************/
412
413int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value
414 XrdSsiErrInfo &eObj, // The error description
415 const char *op) // Operation being performed
416{
417 const char *eMsg;
418 char buffer[2048];
419 int eNum;
420
421// Count errors
422//
424
425// Get correct error code and message
426//
427 eMsg = eObj.Get(eNum).c_str();
428 if (eNum <= 0) eNum = EFAULT;
429 if (!eMsg || !(*eMsg)) eMsg = "reason unknown";
430
431// Format the error message
432//
433 snprintf(buffer, sizeof(buffer), "Unable to %s %s; %s", op, sessN, eMsg);
434
435// Put the message in the log
436//
437 Log.Emsg(pfx, tident, buffer);
438
439// Place the error message in the error object and return
440//
441 if (cbInfo) cbInfo->setErrInfo(eNum, buffer);
442 return SFS_ERROR;
443}
444
445/******************************************************************************/
446/* F i n a l i z e */
447/******************************************************************************/
448
450{
451 EPNAME("Finalize");
452 XrdSsiMutexMon mHelper(frqMutex);
453 bool cancel = (myState != odRsp);
454
455// Release any unsent alerts (prevent any new alerts from being accepted)
456//
457 isEnding = true;
458 if (alrtSent || alrtPend)
459 {XrdSsiAlert *dP, *aP = alrtSent;
460 if (aP) aP->next = alrtPend;
461 else aP = alrtPend;
462 mHelper.UnLock();
463 while((dP = aP)) {aP = aP->next; dP->Recycle();}
464 mHelper.Lock(frqMutex);
465 }
466
467// Processing is determined by the responder's state
468//
469 switch(urState)
470 // Request is being scheduled, so we can simply abort it.
471 //
472 {case isNew: urState = isAbort;
473 cbInfo = 0;
474 sessN = "???";
476 DEBUGXQ("Aborting request processing");
477 return;
478 break;
479
480 // Request already handed off but not yet bound. Defer until bound.
481 // We need to wait until this occurs to sequence Unprovision().
482 //
483 case isBegun: urState = isDone;
484 {XrdSysSemaphore wt4fin(0);
485 finWait = &wt4fin;
486 mHelper.UnLock();
487 wt4fin.Wait();
488 }
489 sessN = "n/a";
490 return;
491
492 // Request is bound so we can finish right off.
493 //
494 case isBound: urState = isDone;
495 if (strBuff) {strBuff->Recycle(); strBuff = 0;}
496 DEBUGXQ("Calling Finished(" <<cancel <<')');
497 if (respWait) WakeUp();
498 mHelper.UnLock();
500 if (cancel) Stats.Bump(Stats.ReqCancels);
501 Finished(cancel); // This object may be deleted!
502 sessN = "n/a";
503 return;
504 break;
505
506 // The following two cases may happen but it's safe to ignore them.
507 //
508 case isAbort:
509 case isDone: sessN = "bad";
510 return;
511 break;
512 default: break;
513 }
514
515// If we get here then we have an invalid state. Report it but otherwise we
516// can't really do anything else. This means some memory may be lost.
517//
518 Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
519}
520
521/******************************************************************************/
522/* G e t R e q u e s t */
523/******************************************************************************/
524
526{
527 EPNAME("GetRequest");
528
529// Do some debugging
530//
531 DEBUGXQ("sz=" <<reqSize);
533
534// The request may come from a ouc buffer or an sfs buffer
535//
536 rLen = reqSize;
537 if (oucBuff) return oucBuff->Data();
538 return XrdSfsXio::Buffer(sfsBref);
539}
540
541/******************************************************************************/
542/* Private: I n i t */
543/******************************************************************************/
544
545void XrdSsiFileReq::Init(const char *cID)
546{
547 tident = (cID ? strdup(cID) : strdup("???"));
548 finWait = 0;
549 nextReq = 0;
550 cbInfo = 0;
551 respCB = 0;
552 respCBarg = 0;
553 alrtSent = 0;
554 alrtPend = 0;
555 alrtLast = 0;
556 sessN = "???";
557 oucBuff = 0;
558 sfsBref = 0;
559 strBuff = 0;
560 reqSize = 0;
561 respBuf = 0;
562 respOff = 0;
563 fileSz = 0; // Also does respLen = 0;
564 myState = wtReq;
565 urState = isNew;
566 *rID = 0;
567 schedDone = false;
568 haveResp = false;
569 respWait = false;
570 strmEOF = false;
571 isEnding = false;
573 XrdSsiRRAgent::SetMutex(this, &frqMutex);
574}
575
576/******************************************************************************/
577/* Protected: P r o c e s s R e s p o n s e */
578/******************************************************************************/
579
580// This is called via the responder with the responder and request locks held.
581
583 const XrdSsiRespInfo &Resp)
584{
585 EPNAME("ProcessResponse");
586
587// Do some debugging
588//
589 DEBUGXQ("Response presented wtr=" <<respWait);
590
591// Make sure we are still in execute state
592//
593 if (urState != isBegun && urState != isBound) return false;
594 myState = doRsp;
595 respOff = 0;
596
597// Handle the response
598//
599 switch(Resp.rType)
601 DEBUGXQ("Resp data sz="<<Resp.blen);
602 respLen = Resp.blen;
604 break;
606 DEBUGXQ("Resp err rc="<<Resp.eNum<<" msg="<<Resp.eMsg);
607 respLen = 0;
609 break;
611 DEBUGXQ("Resp file fd="<<Resp.fdnum<<" sz="<<Resp.fsize);
612 fileSz = Resp.fsize;
613 respOff = 0;
615 break;
617 DEBUGXQ("Resp strm");
618 respLen = 0;
620 break;
621 default:
622 DEBUGXQ("Resp invalid!!!!");
623 return false;
625 break;
626 }
627
628// If the client is waiting for the response, wake up the client to get it.
629//
630 haveResp = true;
631 if (respWait) WakeUp();
632 return true;
633}
634
635/******************************************************************************/
636/* R e a d */
637/******************************************************************************/
638
640 char *buff, // Out
641 XrdSfsXferSize blen) // In
642/*
643 Function: Read `blen' bytes at `offset' into 'buff' and return the actual
644 number of bytes read.
645
646 Input: buff - Address of the buffer in which to place the data.
647 blen - The size of the buffer. This is the maximum number
648 of bytes that will be returned.
649
650 Output: Returns the number of bytes read upon success and SFS_ERROR o/w.
651*/
652{
653 static const char *epname = "read";
654 XrdSfsXferSize nbytes;
655 XrdSsiRespInfo const *Resp = XrdSsiRRAgent::RespP(this);
656
657// A read should never be issued unless a response has been set
658//
659 if (myState != doRsp)
660 {done = true;
661 return (myState == odRsp ? 0 : Emsg(epname, ENOMSG, "read"));
662 }
663
664// Fan out based on the kind of response we have
665//
666 switch(Resp->rType)
668 if (respLen <= 0) {done = true; myState = odRsp; return 0;}
669 if (blen >= respLen)
670 {memcpy(buff, Resp->buff+respOff, respLen);
671 blen = respLen; myState = odRsp; done = true;
672 } else {
673 memcpy(buff, Resp->buff+respOff, blen);
674 respLen -= blen; respOff += blen;
675 }
676 return blen;
677 break;
679 cbInfo->setErrInfo(Resp->eNum, Resp->eMsg);
680 myState = odRsp; done = true;
681 return SFS_ERROR;
682 break;
684 if (fileSz <= 0) {done = true; myState = odRsp; return 0;}
685 nbytes = pread(Resp->fdnum, buff, blen, respOff);
686 if (nbytes <= 0)
687 {done = true;
688 if (!nbytes) {myState = odRsp; return 0;}
689 myState = erRsp;
690 return Emsg(epname, errno, "read");
691 }
692 respOff += nbytes; fileSz -= nbytes;
693 return nbytes;
694 break;
696 nbytes = (Resp->strmP->Type() == XrdSsiStream::isActive ?
697 readStrmA(Resp->strmP, buff, blen)
698 : readStrmP(Resp->strmP, buff, blen));
699 done = strmEOF && strBuff == 0;
700 return nbytes;
701 break;
702 default: break;
703 };
704
705// We should never get here
706//
707 myState = erRsp;
708 done = true;
709 return Emsg(epname, EFAULT, "read");
710}
711
712/******************************************************************************/
713/* Private: r e a d S t r m A */
714/******************************************************************************/
715
716XrdSfsXferSize XrdSsiFileReq::readStrmA(XrdSsiStream *strmP,
717 char *buff, XrdSfsXferSize blen)
718{
719 static const char *epname = "readStrmA";
720 XrdSsiErrInfo eObj;
721 XrdSfsXferSize xlen = 0;
722
723
724// Copy out data from the stream to fill the buffer
725//
726do{if (strBuff)
727 {if (respLen > blen)
728 {memcpy(buff, strBuff->data+respOff, blen);
729 respLen -= blen; respOff += blen;
730 return xlen+blen;
731 }
732 memcpy(buff, strBuff->data+respOff, respLen);
733 xlen += respLen;
734 strBuff->Recycle(); strBuff = 0;
735 blen -= respLen; buff += respLen;
736 }
737
738 if (!strmEOF && blen)
739 {respLen = blen; respOff = 0;
740 strBuff = strmP->GetBuff(eObj, respLen, strmEOF);
741 }
742 } while(strBuff);
743
744// Check if we have data to return
745//
746 if (strmEOF) {myState = odRsp; return xlen;}
747 else if (!blen) return xlen;
748
749// Report the error
750//
751 myState = erRsp; strmEOF = true;
752 return Emsg(epname, eObj, "read stream");
753}
754
755/******************************************************************************/
756/* Private: r e a d S t r m P */
757/******************************************************************************/
758
759XrdSfsXferSize XrdSsiFileReq::readStrmP(XrdSsiStream *strmP,
760 char *buff, XrdSfsXferSize blen)
761{
762 static const char *epname = "readStrmP";
763 XrdSsiErrInfo eObj;
764 XrdSfsXferSize xlen = 0;
765 int dlen = 0;
766
767// Copy out data from the stream to fill the buffer
768//
769 while(!strmEOF && (dlen = strmP->SetBuff(eObj, buff, blen, strmEOF)) > 0)
770 {xlen += dlen;
771 if (dlen == blen) return xlen;
772 if (dlen > blen) {eObj.Set(0,EOVERFLOW); break;}
773 buff += dlen; blen -= dlen;
774 }
775
776// Check if we ended with an zero length read
777//
778 if (strmEOF || !dlen) {myState = odRsp; strmEOF = true; return xlen;}
779
780// Return an error
781//
782 myState = erRsp; strmEOF = true;
783 return Emsg(epname, eObj, "read stream");
784}
785
786/******************************************************************************/
787/* Private: R e c y c l e */
788/******************************************************************************/
789
790void XrdSsiFileReq::Recycle()
791{
792
793// If we have an oucbuffer then we need to recycle it, otherwise if we have
794// and sfs buffer, put it on the deferred release queue.
795//
796 if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
797 else if (sfsBref) {XrdSfsXio::Reclaim(sfsBref); sfsBref = 0;}
798 reqSize = 0;
799
800// Add to queue unless we have too many of these. If we add it back to the
801// queue; make sure it's a cleaned up object!
802//
803 aqMutex.Lock();
804 if (tident) {free(tident); tident = 0;}
805 if (freeCnt >= freeMax) {aqMutex.UnLock(); delete this;}
806 else {XrdSsiRRAgent::CleanUp(*this);
807 nextReq = freeReq;
808 freeReq = this;
809 freeCnt++;
810 aqMutex.UnLock();
811 }
812}
813
814/******************************************************************************/
815/* R e l R e q u e s t B u f f e r */
816/******************************************************************************/
817
819{
820 EPNAME("RelReqBuff");
821 XrdSsiMutexMon mHelper(frqMutex);
822
823// Do some debugging
824//
825 DEBUGXQ("called");
827
828// Release buffers
829//
830 if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
831 else if (sfsBref) {XrdSfsXio::Reclaim(sfsBref); sfsBref = 0;}
832 reqSize = 0;
833}
834
835/******************************************************************************/
836/* S e n d */
837/******************************************************************************/
838
840{
841 static const char *epname = "send";
842 XrdSsiRespInfo const *Resp = XrdSsiRRAgent::RespP(this);
843 XrdOucSFVec sfVec[2];
844 int rc;
845
846// A send should never be issued unless a response has been set. Return a
847// continuation which will cause Read() to be called to return the error.
848//
849 if (myState != doRsp) return 1;
850
851// Fan out based on the kind of response we have
852//
853 switch(Resp->rType)
855 if (blen > 0)
856 {sfVec[1].buffer = (char *)Resp->buff+respOff;
857 sfVec[1].fdnum = -1;
858 if (blen > respLen)
859 {blen = respLen; myState = odRsp;
860 } else {
861 respLen -= blen; respOff += blen;
862 }
863 } else blen = 0;
864 break;
866 return 1; // Causes error to be returned via Read()
867 break;
869 if (fileSz > 0)
870 {sfVec[1].offset = respOff; sfVec[1].fdnum = Resp->fdnum;
871 if (blen > fileSz)
872 {blen = fileSz; myState = odRsp;}
873 respOff += blen; fileSz -= blen;
874 } else blen = 0;
875 break;
877 if (Resp->strmP->Type() == XrdSsiStream::isPassive) return 1;
878 return sendStrmA(Resp->strmP, sfDio, blen);
879 break;
880 default: myState = erRsp;
881 return Emsg(epname, EFAULT, "send");
882 break;
883 };
884
885// Send off the data
886//
887 if (!blen) {sfVec[1].buffer = rID; myState = odRsp;}
888 sfVec[1].sendsz = blen;
889 rc = sfDio->SendFile(sfVec, 2);
890
891// If send succeeded, indicate the action to be taken
892//
893 if (!rc) return myState != odRsp;
894
895// The send failed, diagnose the problem
896//
897 rc = (rc < 0 ? EIO : EFAULT);
898 myState = erRsp;
899 return Emsg(epname, rc, "send");
900}
901
902/******************************************************************************/
903/* Private: s e n d S t r m A */
904/******************************************************************************/
905
906int XrdSsiFileReq::sendStrmA(XrdSsiStream *strmP,
907 XrdSfsDio *sfDio, XrdSfsXferSize blen)
908{
909 static const char *epname = "sendStrmA";
910 XrdSsiErrInfo eObj;
911 XrdOucSFVec sfVec[2];
912 int rc;
913
914// Check if we need a buffer
915//
916 if (!strBuff)
917 {respLen = blen;
918 if (strmEOF || !(strBuff = strmP->GetBuff(eObj, respLen, strmEOF)))
919 {myState = odRsp; strmEOF = true;
920 if (!strmEOF) Emsg(epname, eObj, "read stream");
921 return 1;
922 }
923 respOff = 0;
924 }
925
926// Complete the sendfile vector
927//
928 sfVec[1].buffer = strBuff->data+respOff;
929 sfVec[1].fdnum = -1;
930 if (respLen > blen)
931 {sfVec[1].sendsz = blen;
932 respLen -= blen; respOff += blen;
933 } else {
934 sfVec[1].sendsz = respLen;
935 respLen = 0;
936 }
937
938// Send off the data
939//
940 rc = sfDio->SendFile(sfVec, 2);
941
942// Release any completed buffer
943//
944 if (strBuff && !respLen) {strBuff->Recycle(); strBuff = 0;}
945
946// If send succeeded, indicate the action to be taken
947//
948 if (!rc) return myState != odRsp;
949
950// The send failed, diagnose the problem
951//
952 rc = (rc < 0 ? EIO : EFAULT);
953 myState = erRsp; strmEOF = true;
954 return Emsg(epname, rc, "send");
955}
956
957/******************************************************************************/
958/* W a n t R e s p o n s e */
959/******************************************************************************/
960
962{
963 EPNAME("WantResp");
964 XrdSsiMutexMon frqMon;
965 const XrdSsiRespInfo *rspP;
966
967// Check if we have a previos alert that was sent (we need to recycle it). We
968// don't need a lock for this as it's fully serialized via serial fsctl calls.
969//
970 if (alrtSent) {alrtSent->Recycle(); alrtSent = 0;}
971
972// Serialize the remainder of this code
973//
974 frqMon.Lock(frqMutex);
975 rspP = XrdSsiRRAgent::RespP(this);
976
977// If we have a pending alert then we need to send it now. Suppress the callback
978// as we will recycle the alert on the next call (there should be one).
979//
980 if (alrtPend)
981 {char hexBuff[16], binBuff[8], dotBuff[4];
982 alrtSent = alrtPend;
983 if (!(alrtPend = alrtPend->next)) alrtLast = 0;
984 int n = alrtSent->SetInfo(eInfo, binBuff, sizeof(binBuff));
985 eInfo.setErrCB((XrdOucEICB *)0);
986 DEBUGXQ(n <<" byte alert (0x" <<DUMPIT(binBuff, n) <<") sent; "
987 <<(alrtPend ? "" : "no ") <<"more pending");
988 return true;
989 }
990
991// Check if a response is here (well, ProcessResponse was called)
992//
993// if (rspP->rType)
994 if (haveResp)
995 {respCBarg = 0;
996 if (fileP->AttnInfo(eInfo, rspP, reqID))
997 { eInfo.setErrCB((XrdOucEICB *)this); myState = odRsp;}
998 else eInfo.setErrCB((XrdOucEICB *)0);
999 return true;
1000 }
1001
1002// Defer this and record the callback arguments. We defer setting respWait
1003// to true until we know the deferal request has been sent (i.e. when Done()
1004// is called). This forces ProcessResponse() to not prematurely wakeup the
1005// client. This is necessitated by the fact that we must release the request
1006// lock upon return; allowing a response to come in while the deferal request
1007// is still in transit.
1008//
1009 respCB = eInfo.getErrCB(respCBarg);
1010 respWait = false;
1011 return false;
1012}
1013
1014/******************************************************************************/
1015/* Private: W a k e U p */
1016/******************************************************************************/
1017
1018void XrdSsiFileReq::WakeUp(XrdSsiAlert *aP) // Called with frqMutex locked!
1019{
1020 EPNAME("WakeUp");
1021 XrdOucErrInfo *wuInfo =
1022 new XrdOucErrInfo(tident,(XrdOucEICB *)0,respCBarg);
1023 const XrdSsiRespInfo *rspP = XrdSsiRRAgent::RespP(this);
1024 int respCode = SFS_DATAVEC;
1025
1026// Do some debugging
1027//
1028 DEBUGXQ("respCBarg=" <<Xrd::hex <<respCBarg <<Xrd::dec);
1029
1030// Setup the wakeup data. This may be for an alert or for an actual response.
1031// If this is an alert or the complete response, then make sure we get a
1032// callback to do the finalization. Otherwise, we don't need a callback
1033// and the callback handler will simply delete the error object for us.
1034//
1035 if (aP)
1036 {char hexBuff[16], binBuff[8], dotBuff[4];
1037 int n = aP->SetInfo(*wuInfo, binBuff, sizeof(binBuff));
1038 wuInfo->setErrCB((XrdOucEICB *)aP, respCBarg);
1039 DEBUGXQ(n <<" byte alert (0x" <<DUMPIT(binBuff, n) <<") sent; "
1040 <<(alrtPend ? "" : "no ") <<"more pending");
1041 } else {
1042 if (fileP->AttnInfo(*wuInfo, rspP, reqID))
1043 {wuInfo->setErrCB((XrdOucEICB *)this, respCBarg); myState = odRsp;}
1044 }
1045
1046// Tell the client to issue a read now or handle the alert or full response.
1047//
1048 respWait = false;
1049 respCB->Done(respCode, wuInfo, sessN);
1051}
#define EPNAME(x)
#define pread(a, b, c, d)
Definition XrdPosix.hh:80
#define eMsg(x)
#define SFS_DATAVEC
#define SFS_ERROR
int XrdSfsXferSize
class XrdBuffer * XrdSfsXioHandle
Definition XrdSfsXio.hh:46
#define DEBUGXQ(x)
#define DUMPIT(x, y)
char * Data() const
void Recycle()
Recycle the buffer. The buffer may be reused in the future.
virtual void Done(int &Result, XrdOucErrInfo *eInfo, const char *Path=0)=0
static int Format(char *buff, int blen, int ecode, const char *etxt1, const char *etxt2=0)
XrdOucEICB * getErrCB()
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
XrdSysMutex statsMutex
void Bump(int &val)
void Schedule(XrdJob *jp)
virtual int SendFile(int fildes)=0
static void Reclaim(XrdSfsXioHandle theHand)
Definition XrdSfsXio.cc:70
static char * Buffer(XrdSfsXioHandle theHand, int *buffsz=0)
Definition XrdSfsXio.cc:61
void Recycle()
static XrdSsiAlert * Alloc(XrdSsiRespInfoMsg &aMsg)
int SetInfo(XrdOucErrInfo &eInfo, char *aMsg, int aLen)
XrdSsiAlert * next
void Set(const char *eMsg=0, int eNum=0, int eArg=0)
const std::string & Get(int &eNum) const
void Alert(XrdSsiRespInfoMsg &aMsg)
Send or receive a server generated alert.
bool WantResponse(XrdOucErrInfo &eInfo)
XrdSfsXferSize Read(bool &done, char *buffer, XrdSfsXferSize blen)
char * GetRequest(int &rLen)
void Finished(XrdSsiRequest &rqstR, const XrdSsiRespInfo &rInfo, bool cancel=false)
void RelRequestBuffer()
bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &resp)
XrdSsiFileReq(const char *cID=0)
int Send(XrdSfsDio *sfDio, XrdSfsXferSize size)
static XrdSsiFileReq * Alloc(XrdOucErrInfo *eP, XrdSsiFileResource *rP, XrdSsiFileSess *fP, const char *sn, const char *id, unsigned int rnum)
void Activate(XrdOucBuffer *oP, XrdSfsXioHandle bR, int rSz)
void Done(int &Result, XrdOucErrInfo *cbInfo, const char *path=0)
bool AttnInfo(XrdOucErrInfo &eInfo, const XrdSsiRespInfo *respP, unsigned int reqID)
XrdOucErrInfo * errInfo()
void Lock(XrdSsiMutex *mutex)
static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP)
static void onServer(XrdSsiRequest *rP)
static XrdSsiRespInfo * RespP(XrdSsiRequest *rP)
static void CleanUp(XrdSsiRequest &reqR)
char * GetMsg(int &mlen)
virtual void RecycleMsg(bool sent=true)=0
virtual void ProcessRequest(XrdSsiRequest &reqRef, XrdSsiResource &resRef)=0
Process a request; client-side or server-side.
long long ReqMaxsz
long long ReqBytes
virtual void Recycle()=0
virtual bool SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen)
virtual Buffer * GetBuff(XrdSsiErrInfo &eRef, int &dlen, bool &last)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSsiStats Stats
XrdSsiService * Service
XrdScheduler * Sched
XrdSysError Log
int fdnum
File descriptor for data.
int sendsz
Length of data at offset.