XRootD
Loading...
Searching...
No Matches
XrdClXRootDTransport.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27#include "XrdCl/XrdClLog.hh"
28#include "XrdCl/XrdClSocket.hh"
29#include "XrdCl/XrdClMessage.hh"
32#include "XrdCl/XrdClUtils.hh"
34#include "XrdCl/XrdClTls.hh"
35#include "XrdNet/XrdNetAddr.hh"
36#include "XrdNet/XrdNetUtils.hh"
39#include "XrdOuc/XrdOucUtils.hh"
40#include "XrdOuc/XrdOucCRC.hh"
42#include "XrdSys/XrdSysTimer.hh"
47#include "XrdSys/XrdSysE2T.hh"
48#include "XrdCl/XrdClTls.hh"
49#include "XrdCl/XrdClSocket.hh"
51#include "XrdVersion.hh"
52
53#include <arpa/inet.h>
54#include <sys/types.h>
55#include <unistd.h>
56#include <dlfcn.h>
57#include <sstream>
58#include <iomanip>
59#include <set>
60#include <limits>
61
62#include <atomic>
63
65
66namespace XrdCl
67{
69 {
71
72 static void UnloadHandler()
73 {
74 UnloadHandler( "root" );
75 UnloadHandler( "xroot" );
76 }
77
78 static void UnloadHandler( const std::string &trProt )
79 {
81 TransportHandler *trHandler = trManager->GetHandler( trProt );
82 trHandler->WaitBeforeExit();
83 }
84
85 void Register( const std::string &protocol )
86 {
87 XrdSysRWLockHelper scope( lock, false ); // obtain write lock
88 std::pair< std::set<std::string>::iterator, bool > ret = protocols.insert( protocol );
89 // if that's the first time we are using the protocol, the sec lib
90 // was just loaded so now's the time to register the atexit handler
91 if( ret.second )
92 {
93 atexit( UnloadHandler );
94 }
95 }
96
99 std::set<std::string> protocols;
100 };
101
102 //----------------------------------------------------------------------------
104 //----------------------------------------------------------------------------
106 {
107 //--------------------------------------------------------------------------
108 // Define the stream status for the link negotiation purposes
109 //--------------------------------------------------------------------------
122
123 //--------------------------------------------------------------------------
124 // Constructor
125 //--------------------------------------------------------------------------
129
131 uint8_t pathId;
132 };
133
134 //----------------------------------------------------------------------------
136 //----------------------------------------------------------------------------
138 {
139 StreamSelector( uint16_t size )
140 {
141 //----------------------------------------------------------------------
142 // Subtract one because we shouldn't take into account the control
143 // stream.
144 //----------------------------------------------------------------------
145 strmqueues.resize( size - 1, 0 );
146 }
147
148 //------------------------------------------------------------------------
149 // @param size : number of streams
150 //------------------------------------------------------------------------
151 void AdjustQueues( uint16_t size )
152 {
153 strmqueues.resize( size - 1, 0);
154 }
155
156 //------------------------------------------------------------------------
157 // @param connected : bitarray stating if given sub-stream is connected
158 //
159 // @return : substream number
160 //------------------------------------------------------------------------
161 uint16_t Select( const std::vector<bool> &connected )
162 {
163 uint16_t ret = 0;
164 size_t minval = std::numeric_limits<size_t>::max();
165
166 for( uint16_t i = 0; i < connected.size() && i < strmqueues.size(); ++i )
167 {
168 if( !connected[i] ) continue;
169
170 if( strmqueues[i] < minval )
171 {
172 ret = i;
173 minval = strmqueues[i];
174 }
175 }
176
177 ++strmqueues[ret];
178 return ret + 1;
179 }
180
181 //--------------------------------------------------------------------------
182 // Update queue for given substream
183 //--------------------------------------------------------------------------
184 void MsgReceived( uint16_t substrm )
185 {
186 if( substrm > 0 )
187 --strmqueues[substrm - 1];
188 }
189
190 private:
191
192 std::vector<size_t> strmqueues;
193 };
194
196 {
197 BindPrefSelector( std::vector<std::string> && bindprefs ) :
198 bindprefs( std::move( bindprefs ) ), next( 0 )
199 {
200 }
201
202 inline const std::string& Get()
203 {
204 std::string &ret = bindprefs[next];
205 ++next;
206 if( next >= bindprefs.size() )
207 next = 0;
208 return ret;
209 }
210
211 private:
212 std::vector<std::string> bindprefs;
213 size_t next;
214 };
215
216 //----------------------------------------------------------------------------
218 //----------------------------------------------------------------------------
220 {
221 //--------------------------------------------------------------------------
222 // Constructor
223 //--------------------------------------------------------------------------
224 XRootDChannelInfo( const URL &url ):
225 serverFlags(0),
227 firstLogIn(true),
228 authBuffer(0),
229 authProtocol(0),
230 authParams(0),
231 authEnv(0),
232 finstcnt(0),
233 openFiles(0),
234 waitBarrier(0),
235 protection(0),
236 protRespBody(0),
237 protRespSize(0),
238 encrypted(false),
239 istpc(false)
240 {
242 memset( sessionId, 0, 16 );
243 memset( oldSessionId, 0, 16 );
244 }
245
246 //--------------------------------------------------------------------------
247 // Destructor
248 //--------------------------------------------------------------------------
250 {
251 delete [] authBuffer;
252 }
253
254 typedef std::vector<XRootDStreamInfo> StreamInfoVector;
255
256 //--------------------------------------------------------------------------
257 // Data
258 //--------------------------------------------------------------------------
259 uint32_t serverFlags;
261 uint8_t sessionId[16];
262 uint8_t oldSessionId[16];
264 std::shared_ptr<SIDManager> sidManager;
270 std::string streamName;
271 std::string authProtocolName;
272 std::set<uint16_t> sentOpens;
273 std::set<uint16_t> sentCloses;
274 std::atomic<uint32_t> finstcnt; // file instance count
275 uint32_t openFiles;
279 unsigned int protRespSize;
280 std::unique_ptr<StreamSelector> strmSelector;
282 bool istpc;
283 std::unique_ptr<BindPrefSelector> bindSelector;
284 std::string logintoken;
286 };
287
288 //----------------------------------------------------------------------------
289 // Constructor
290 //----------------------------------------------------------------------------
292 pSecUnloadHandler( new PluginUnloadHandler() )
293 {
294 }
295
296 //----------------------------------------------------------------------------
297 // Destructor
298 //----------------------------------------------------------------------------
300 {
301 delete pSecUnloadHandler; pSecUnloadHandler = 0;
302 }
303
304 //----------------------------------------------------------------------------
305 // Read message header from socket
306 //----------------------------------------------------------------------------
308 {
309 //--------------------------------------------------------------------------
310 // A new message - allocate the space needed for the header
311 //--------------------------------------------------------------------------
312 if( message.GetCursor() == 0 && message.GetSize() < 8 )
313 message.Allocate( 8 );
314
315 //--------------------------------------------------------------------------
316 // Read the message header
317 //--------------------------------------------------------------------------
318 if( message.GetCursor() < 8 )
319 {
320 size_t leftToBeRead = 8 - message.GetCursor();
321 while( leftToBeRead )
322 {
323 int bytesRead = 0;
324 XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
325 leftToBeRead, bytesRead );
326 if( !status.IsOK() || status.code == suRetry )
327 return status;
328
329 leftToBeRead -= bytesRead;
330 message.AdvanceCursor( bytesRead );
331 }
332 UnMarshallHeader( message );
333
334 uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
335 Log *log = DefaultEnv::GetLog();
336 log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
337 "body", &message, bodySize );
338
339 return XRootDStatus( stOK, suDone );
340 }
342 }
343
344 //----------------------------------------------------------------------------
345 // Read message body from socket
346 //----------------------------------------------------------------------------
348 {
349 //--------------------------------------------------------------------------
350 // Retrieve the body
351 //--------------------------------------------------------------------------
352 size_t leftToBeRead = 0;
353 uint32_t bodySize = 0;
355 bodySize = rsphdr->dlen;
356
357 if( message.GetSize() < bodySize + 8 )
358 message.ReAllocate( bodySize + 8 );
359
360 leftToBeRead = bodySize-(message.GetCursor()-8);
361 while( leftToBeRead )
362 {
363 int bytesRead = 0;
364 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
365
366 if( !status.IsOK() || status.code == suRetry )
367 return status;
368
369 leftToBeRead -= bytesRead;
370 message.AdvanceCursor( bytesRead );
371 }
372
373 return XRootDStatus( stOK, suDone );
374 }
375
376 //----------------------------------------------------------------------------
377 // Read more of the message body from socket
378 //----------------------------------------------------------------------------
380 {
382 if( rsphdr->status != kXR_status )
384
385 //--------------------------------------------------------------------------
386 // In case of non kXR_status responses we read all the response, including
387 // data. For kXR_status responses we first read only the remainder of the
388 // header. The header must then be unmarshalled, and then a second call to
389 // GetMore (repeated for suRetry as needed) will read the data.
390 //--------------------------------------------------------------------------
391
392 uint32_t bodySize = rsphdr->dlen;
393 if( bodySize+8 < sizeof( ServerResponseStatus ) )
395 "kXR_status: invalid message size." );
396
398 bodySize += rspst->bdy.dlen;
399
400 if( message.GetSize() < bodySize + 8 )
401 message.ReAllocate( bodySize + 8 );
402
403 size_t leftToBeRead = bodySize-(message.GetCursor()-8);
404 while( leftToBeRead )
405 {
406 int bytesRead = 0;
407 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
408
409 if( !status.IsOK() || status.code == suRetry )
410 return status;
411
412 leftToBeRead -= bytesRead;
413 message.AdvanceCursor( bytesRead );
414 }
415
416 // Unmarchal to message body
417 Log *log = DefaultEnv::GetLog();
419 if( !st.IsOK() && st.code == errDataError )
420 {
421 log->Error( XRootDTransportMsg, "[msg: %p] %s", &message,
422 st.GetErrorMessage().c_str() );
423 return st;
424 }
425
426 if( !st.IsOK() )
427 {
428 log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
429 &message );
430 return st;
431 }
432
433 return XRootDStatus( stOK, suDone );
434 }
435
436 //----------------------------------------------------------------------------
437 // Initialize channel
438 //----------------------------------------------------------------------------
440 AnyObject &channelData )
441 {
442 XRootDChannelInfo *info = new XRootDChannelInfo( url );
443 XrdSysMutexHelper scopedLock( info->mutex );
444 channelData.Set( info );
445
446 Env *env = DefaultEnv::GetEnv();
447 int streams = DefaultSubStreamsPerChannel;
448 env->GetInt( "SubStreamsPerChannel", streams );
449 if( streams < 1 ) streams = 1;
450 info->stream.resize( streams );
451 info->strmSelector.reset( new StreamSelector( streams ) );
452 info->encrypted = url.IsSecure();
453 info->istpc = url.IsTPC();
454 info->logintoken = url.GetLoginToken();
455 }
456
457 //----------------------------------------------------------------------------
458 // Finalize channel
459 //----------------------------------------------------------------------------
463
464 //----------------------------------------------------------------------------
465 // HandShake
466 //----------------------------------------------------------------------------
468 AnyObject &channelData )
469 {
470 XRootDChannelInfo *info = 0;
471 channelData.Get( info );
472
473 if (!info)
475
476 XrdSysMutexHelper scopedLock( info->mutex );
477
478 if( info->stream.size() <= handShakeData->subStreamId )
479 {
480 Log *log = DefaultEnv::GetLog();
482 "[%s] Internal error: not enough substreams",
483 handShakeData->streamName.c_str() );
485 }
486
487 if( handShakeData->subStreamId == 0 )
488 {
489 info->streamName = handShakeData->streamName;
490 return HandShakeMain( handShakeData, channelData );
491 }
492 return HandShakeParallel( handShakeData, channelData );
493 }
494
495 //----------------------------------------------------------------------------
496 // Hand shake the main stream
497 //----------------------------------------------------------------------------
498 XRootDStatus XRootDTransport::HandShakeMain( HandShakeData *handShakeData,
499 AnyObject &channelData )
500 {
501 XRootDChannelInfo *info = 0;
502 channelData.Get( info );
503
504 if (!info) {
506 "[%s] Internal error: no channel info",
507 handShakeData->streamName.c_str());
509 }
510
511 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
512
513 //--------------------------------------------------------------------------
514 // First step - we need to create and initial handshake and send it out
515 //--------------------------------------------------------------------------
516 if( sInfo.status == XRootDStreamInfo::Disconnected ||
517 sInfo.status == XRootDStreamInfo::Broken )
518 {
519 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
521 sInfo.status = XRootDStreamInfo::HandShakeSent;
522 return XRootDStatus( stOK, suContinue );
523 }
524
525 //--------------------------------------------------------------------------
526 // Second step - we got the reply message to the initial handshake
527 //--------------------------------------------------------------------------
528 if( sInfo.status == XRootDStreamInfo::HandShakeSent )
529 {
530 XRootDStatus st = ProcessServerHS( handShakeData, info );
531 if( st.IsOK() )
533 else
534 sInfo.status = XRootDStreamInfo::Broken;
535 return st;
536 }
537
538 //--------------------------------------------------------------------------
539 // Third step - we got the response to the protocol request, we need
540 // to process it and send out a login request
541 //--------------------------------------------------------------------------
542 if( sInfo.status == XRootDStreamInfo::HandShakeReceived )
543 {
544 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
545
546 if( !st.IsOK() )
547 {
548 sInfo.status = XRootDStreamInfo::Broken;
549 return st;
550 }
551
552 if( st.code == suRetry )
553 {
554 handShakeData->out = GenerateProtocol( handShakeData, info,
557 return XRootDStatus( stOK, suRetry );
558 }
559
560 handShakeData->out = GenerateLogIn( handShakeData, info );
561 sInfo.status = XRootDStreamInfo::LoginSent;
562 return XRootDStatus( stOK, suContinue );
563 }
564
565 //--------------------------------------------------------------------------
566 // Fourth step - handle the log in response and proceed with the
567 // authentication if required by the server
568 //--------------------------------------------------------------------------
569 if( sInfo.status == XRootDStreamInfo::LoginSent )
570 {
571 XRootDStatus st = ProcessLogInResp( handShakeData, info );
572
573 if( !st.IsOK() )
574 {
575 sInfo.status = XRootDStreamInfo::Broken;
576 return st;
577 }
578
579 if( st.IsOK() && st.code == suDone )
580 {
581 //----------------------------------------------------------------------
582 // If it's not our first log in we need to end the previous session
583 // to make sure that the server noticed our disconnection and closed
584 // all the writable handles that we owned
585 //----------------------------------------------------------------------
586 if( !info->firstLogIn )
587 {
588 handShakeData->out = GenerateEndSession( handShakeData, info );
590 return XRootDStatus( stOK, suContinue );
591 }
592
593 sInfo.status = XRootDStreamInfo::Connected;
594 info->firstLogIn = false;
595 return st;
596 }
597
598 st = DoAuthentication( handShakeData, info );
599 if( !st.IsOK() )
600 sInfo.status = XRootDStreamInfo::Broken;
601 else
602 sInfo.status = XRootDStreamInfo::AuthSent;
603 return st;
604 }
605
606 //--------------------------------------------------------------------------
607 // Fifth step and later - proceed with the authentication
608 //--------------------------------------------------------------------------
609 if( sInfo.status == XRootDStreamInfo::AuthSent )
610 {
611 XRootDStatus st = DoAuthentication( handShakeData, info );
612
613 if( !st.IsOK() )
614 {
615 sInfo.status = XRootDStreamInfo::Broken;
616 return st;
617 }
618
619 if( st.IsOK() && st.code == suDone )
620 {
621 //----------------------------------------------------------------------
622 // If it's not our first log in we need to end the previous session
623 //----------------------------------------------------------------------
624 if( !info->firstLogIn )
625 {
626 handShakeData->out = GenerateEndSession( handShakeData, info );
628 return XRootDStatus( stOK, suContinue );
629 }
630
631 sInfo.status = XRootDStreamInfo::Connected;
632 info->firstLogIn = false;
633 return st;
634 }
635
636 return st;
637 }
638
639 //--------------------------------------------------------------------------
640 // The last step - kXR_endsess returned
641 //--------------------------------------------------------------------------
642 if( sInfo.status == XRootDStreamInfo::EndSessionSent )
643 {
644 XRootDStatus st = ProcessEndSessionResp( handShakeData, info );
645
646 if( st.IsOK() && st.code == suDone )
647 {
648 sInfo.status = XRootDStreamInfo::Connected;
649 }
650 else if( !st.IsOK() )
651 {
652 sInfo.status = XRootDStreamInfo::Broken;
653 }
654
655 return st;
656 }
657
658 return XRootDStatus( stOK, suDone );
659 }
660
661 //----------------------------------------------------------------------------
662 // Hand shake parallel stream
663 //----------------------------------------------------------------------------
664 XRootDStatus XRootDTransport::HandShakeParallel( HandShakeData *handShakeData,
665 AnyObject &channelData )
666 {
667 XRootDChannelInfo *info = 0;
668 channelData.Get( info );
669
670 if (!info) {
672 "[%s] Internal error: no channel info",
673 handShakeData->streamName.c_str());
674 return XRootDStatus(stFatal, errInternal);
675 }
676
677 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
678
679 //--------------------------------------------------------------------------
680 // First step - we need to create and initial handshake and send it out
681 //--------------------------------------------------------------------------
682 if( sInfo.status == XRootDStreamInfo::Disconnected ||
683 sInfo.status == XRootDStreamInfo::Broken )
684 {
685 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
687 sInfo.status = XRootDStreamInfo::HandShakeSent;
688 return XRootDStatus( stOK, suContinue );
689 }
690
691 //--------------------------------------------------------------------------
692 // Second step - we got the reply message to the initial handshake,
693 // if successful we need to send bind
694 //--------------------------------------------------------------------------
695 if( sInfo.status == XRootDStreamInfo::HandShakeSent )
696 {
697 XRootDStatus st = ProcessServerHS( handShakeData, info );
698 if( st.IsOK() )
700 else
701 sInfo.status = XRootDStreamInfo::Broken;
702 return st;
703 }
704
705 //--------------------------------------------------------------------------
706 // Second step bis - we got the response to the protocol request, we need
707 // to process it and send out a bind request
708 //--------------------------------------------------------------------------
709 if( sInfo.status == XRootDStreamInfo::HandShakeReceived )
710 {
711 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
712
713 if( !st.IsOK() )
714 {
715 sInfo.status = XRootDStreamInfo::Broken;
716 return st;
717 }
718
719 handShakeData->out = GenerateBind( handShakeData, info );
720 sInfo.status = XRootDStreamInfo::BindSent;
721 return XRootDStatus( stOK, suContinue );
722 }
723
724 //--------------------------------------------------------------------------
725 // Third step - we got the response to the kXR_bind
726 //--------------------------------------------------------------------------
727 if( sInfo.status == XRootDStreamInfo::BindSent )
728 {
729 XRootDStatus st = ProcessBindResp( handShakeData, info );
730
731 if( !st.IsOK() )
732 {
733 sInfo.status = XRootDStreamInfo::Broken;
734 return st;
735 }
736 sInfo.status = XRootDStreamInfo::Connected;
737 return XRootDStatus();
738 }
739 return XRootDStatus();
740 }
741
742 //------------------------------------------------------------------------
743 // @return true if handshake has been done and stream is connected,
744 // false otherwise
745 //------------------------------------------------------------------------
747 AnyObject &channelData )
748 {
749 XRootDChannelInfo *info = 0;
750 channelData.Get( info );
751
752 if (!info) {
754 "[%s] Internal error: no channel info",
755 handShakeData->streamName.c_str());
756 return false;
757 }
758
759 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
760 return ( sInfo.status == XRootDStreamInfo::Connected );
761 }
762
763 //----------------------------------------------------------------------------
764 // Check if the stream should be disconnected
765 //----------------------------------------------------------------------------
766 bool XRootDTransport::IsStreamTTLElapsed( time_t inactiveTime,
767 AnyObject &channelData )
768 {
769 XRootDChannelInfo *info = 0;
770 channelData.Get( info );
771
772 Env *env = DefaultEnv::GetEnv();
773 Log *log = DefaultEnv::GetLog();
774
775 if (!info) {
777 "Internal error: no channel info, behaving as if TTL has elapsed");
778 return true;
779 }
780
781 //--------------------------------------------------------------------------
782 // Check the TTL settings for the current server
783 //--------------------------------------------------------------------------
784 int ttl;
785 if( info->serverFlags & kXR_isServer )
786 {
788 env->GetInt( "DataServerTTL", ttl );
789 }
790 else
791 {
793 env->GetInt( "LoadBalancerTTL", ttl );
794 }
795
796 //--------------------------------------------------------------------------
797 // See whether we can give a go-ahead for the disconnection
798 //--------------------------------------------------------------------------
799 XrdSysMutexHelper scopedLock( info->mutex );
800 uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
801 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
802 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
803 info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
804 info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
805
806 if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
807 return false;
808
809 if( !allocatedSIDs && inactiveTime > ttl )
810 return true;
811
812 return false;
813 }
814
815 //----------------------------------------------------------------------------
816 // Check the stream is broken - ie. TCP connection got broken and
817 // went undetected by the TCP stack
818 //----------------------------------------------------------------------------
820 AnyObject &channelData )
821 {
822 XRootDChannelInfo *info = 0;
823 channelData.Get( info );
824 Env *env = DefaultEnv::GetEnv();
825 Log *log = DefaultEnv::GetLog();
826
827 if (!info) {
829 "Internal error: no channel info, behaving as if stream is broken");
830 return true;
831 }
832
833 int streamTimeout = DefaultStreamTimeout;
834 env->GetInt( "StreamTimeout", streamTimeout );
835
836 XrdSysMutexHelper scopedLock( info->mutex );
837
838 const time_t now = time(0);
839 const bool anySID =
840 info->sidManager->IsAnySIDOldAs( now - streamTimeout );
841
842 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
843 "stream timeout: %d, any SID: %d, wait barrier: %s",
844 info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
845 anySID, Utils::TimeToString(info->waitBarrier).c_str() );
846
847 if( inactiveTime < streamTimeout )
848 return Status();
849
850 if( now < info->waitBarrier )
851 return Status();
852
853 if( !anySID )
854 return Status();
855
857 }
858
859 //----------------------------------------------------------------------------
860 // Multiplex
861 //----------------------------------------------------------------------------
863 {
864 return PathID( 0, 0 );
865 }
866
867 //----------------------------------------------------------------------------
868 // Multiplex
869 //----------------------------------------------------------------------------
871 AnyObject &channelData,
872 PathID *hint )
873 {
874 XRootDChannelInfo *info = 0;
875 channelData.Get( info );
876
877 if (!info) {
879 "Internal error: no channel info, cannot multiplex");
880 return PathID(0,0);
881 }
882
883 XrdSysMutexHelper scopedLock( info->mutex );
884
885 //--------------------------------------------------------------------------
886 // If we're not connected to a data server or we don't know that yet
887 // we stream through 0
888 //--------------------------------------------------------------------------
889 if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
890 return PathID( 0, 0 );
891
892 //--------------------------------------------------------------------------
893 // Select the streams
894 //--------------------------------------------------------------------------
895 Log *log = DefaultEnv::GetLog();
896 uint16_t upStream = 0;
897 uint16_t downStream = 0;
898
899 if( hint )
900 {
901 upStream = hint->up;
902 downStream = hint->down;
903 }
904 else
905 {
906 upStream = 0;
907 std::vector<bool> connected;
908 connected.reserve( info->stream.size() - 1 );
909 size_t nbConnected = 0;
910 for( size_t i = 1; i < info->stream.size(); ++i )
911 if( info->stream[i].status == XRootDStreamInfo::Connected )
912 {
913 connected.push_back( true );
914 ++nbConnected;
915 }
916 else
917 connected.push_back( false );
918
919 if( nbConnected == 0 )
920 downStream = 0;
921 else
922 downStream = info->strmSelector->Select( connected );
923 }
924
925 if( upStream >= info->stream.size() )
926 {
928 "[%s] Up link stream %d does not exist, using 0",
929 info->streamName.c_str(), upStream );
930 upStream = 0;
931 }
932
933 if( downStream >= info->stream.size() )
934 {
936 "[%s] Down link stream %d does not exist, using 0",
937 info->streamName.c_str(), downStream );
938 downStream = 0;
939 }
940
941 //--------------------------------------------------------------------------
942 // Modify the message
943 //--------------------------------------------------------------------------
944 UnMarshallRequest( msg );
946 switch( hdr->requestid )
947 {
948 //------------------------------------------------------------------------
949 // Read - we update the path id to tell the server where we want to
950 // get the response, but we still send the request through stream 0
951 // We need to allocate space for read_args if we don't have it
952 // included yet
953 //------------------------------------------------------------------------
954 case kXR_read:
955 {
956 if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
957 {
958 msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
959 void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
960 memset( newBuf, 0, 8 );
962 req->dlen += 8;
963 }
964 read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
965 args->pathid = info->stream[downStream].pathId;
966 break;
967 }
968
969
970 //------------------------------------------------------------------------
971 // PgRead - we update the path id to tell the server where we want to
972 // get the response, but we still send the request through stream 0
973 // We need to allocate space for ClientPgReadReqArgs if we don't have it
974 // included yet
975 //------------------------------------------------------------------------
976 case kXR_pgread:
977 {
978 if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
979 {
980 msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
981 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
982 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
984 req->dlen += sizeof( ClientPgReadReqArgs );
985 }
986 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
987 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
988 args->pathid = info->stream[downStream].pathId;
989 break;
990 }
991
992 //------------------------------------------------------------------------
993 // ReadV - the situation is identical to read but we don't need any
994 // additional structures to specify the return path
995 //------------------------------------------------------------------------
996 case kXR_readv:
997 {
999 req->pathid = info->stream[downStream].pathId;
1000 break;
1001 }
1002
1003 //------------------------------------------------------------------------
1004 // Write - multiplexing writes doesn't work properly in the server
1005 //------------------------------------------------------------------------
1006 case kXR_write:
1007 {
1008// ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
1009// req->pathid = info->stream[downStream].pathId;
1010 break;
1011 }
1012
1013 //------------------------------------------------------------------------
1014 // WriteV - multiplexing writes doesn't work properly in the server
1015 //------------------------------------------------------------------------
1016 case kXR_writev:
1017 {
1018// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1019// req->pathid = info->stream[downStream].pathId;
1020 break;
1021 }
1022
1023 //------------------------------------------------------------------------
1024 // PgWrite - multiplexing writes doesn't work properly in the server
1025 //------------------------------------------------------------------------
1026 case kXR_pgwrite:
1027 {
1028// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1029// req->pathid = info->stream[downStream].pathId;
1030 break;
1031 }
1032 };
1033 MarshallRequest( msg );
1034 return PathID( upStream, downStream );
1035 }
1036
1037 //----------------------------------------------------------------------------
1038 // Return a number of substreams per stream that should be created
1039 // This depends on the environment and whether we are connected to
1040 // a data server or not
1041 //----------------------------------------------------------------------------
1043 {
1044 XRootDChannelInfo *info = 0;
1045 channelData.Get( info );
1046
1047 if (!info) {
1048 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1049 return 1;
1050 }
1051
1052 XrdSysMutexHelper scopedLock( info->mutex );
1053
1054 //--------------------------------------------------------------------------
1055 // If the connection has been opened in order to orchestrate a TPC or
1056 // the remote server is a Manager or Metamanager we will need only one
1057 // (control) stream.
1058 //--------------------------------------------------------------------------
1059 if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1060
1061 //--------------------------------------------------------------------------
1062 // Number of streams requested by user
1063 //--------------------------------------------------------------------------
1064 uint16_t ret = info->stream.size();
1065
1067 int nodata = DefaultTlsNoData;
1068 env->GetInt( "TlsNoData", nodata );
1069
1070 // Does the server require the stream 0 to be encrypted?
1071 bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1072 ( info->serverFlags & kXR_tlsLogin ) ||
1073 ( info->serverFlags & kXR_tlsSess );
1074 // Does the server NOT require the data streams to be encrypted?
1075 bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1076 // Does the user require the stream 0 to be encrypted?
1077 bool usrTlsStrm0 = info->encrypted;
1078 // Does the user NOT require the data streams to be encrypted?
1079 bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1080
1081 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1082 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1083 {
1084 //------------------------------------------------------------------------
1085 // The server or user asked us to encrypt stream 0, but to send the data
1086 // (read/write) using a plain TCP connection
1087 //------------------------------------------------------------------------
1088 if( ret == 1 ) ++ret;
1089 }
1090
1091 if( ret > info->stream.size() )
1092 {
1093 info->stream.resize( ret );
1094 info->strmSelector->AdjustQueues( ret );
1095 }
1096
1097 return ret;
1098 }
1099
1100 //----------------------------------------------------------------------------
1101 // Marshall
1102 //----------------------------------------------------------------------------
1104 {
1105 ClientRequest *req = (ClientRequest*)msg;
1106 switch( req->header.requestid )
1107 {
1108 //------------------------------------------------------------------------
1109 // kXR_protocol
1110 //------------------------------------------------------------------------
1111 case kXR_protocol:
1112 req->protocol.clientpv = htonl( req->protocol.clientpv );
1113 break;
1114
1115 //------------------------------------------------------------------------
1116 // kXR_login
1117 //------------------------------------------------------------------------
1118 case kXR_login:
1119 req->login.pid = htonl( req->login.pid );
1120 break;
1121
1122 //------------------------------------------------------------------------
1123 // kXR_locate
1124 //------------------------------------------------------------------------
1125 case kXR_locate:
1126 req->locate.options = htons( req->locate.options );
1127 break;
1128
1129 //------------------------------------------------------------------------
1130 // kXR_query
1131 //------------------------------------------------------------------------
1132 case kXR_query:
1133 req->query.infotype = htons( req->query.infotype );
1134 break;
1135
1136 //------------------------------------------------------------------------
1137 // kXR_truncate
1138 //------------------------------------------------------------------------
1139 case kXR_truncate:
1140 req->truncate.offset = htonll( req->truncate.offset );
1141 break;
1142
1143 //------------------------------------------------------------------------
1144 // kXR_mkdir
1145 //------------------------------------------------------------------------
1146 case kXR_mkdir:
1147 req->mkdir.mode = htons( req->mkdir.mode );
1148 break;
1149
1150 //------------------------------------------------------------------------
1151 // kXR_chmod
1152 //------------------------------------------------------------------------
1153 case kXR_chmod:
1154 req->chmod.mode = htons( req->chmod.mode );
1155 break;
1156
1157 //------------------------------------------------------------------------
1158 // kXR_open
1159 //------------------------------------------------------------------------
1160 case kXR_open:
1161 req->open.mode = htons( req->open.mode );
1162 req->open.options = htons( req->open.options );
1163 break;
1164
1165 //------------------------------------------------------------------------
1166 // kXR_read
1167 //------------------------------------------------------------------------
1168 case kXR_read:
1169 req->read.offset = htonll( req->read.offset );
1170 req->read.rlen = htonl( req->read.rlen );
1171 break;
1172
1173 //------------------------------------------------------------------------
1174 // kXR_write
1175 //------------------------------------------------------------------------
1176 case kXR_write:
1177 req->write.offset = htonll( req->write.offset );
1178 break;
1179
1180 //------------------------------------------------------------------------
1181 // kXR_mv
1182 //------------------------------------------------------------------------
1183 case kXR_mv:
1184 req->mv.arg1len = htons( req->mv.arg1len );
1185 break;
1186
1187 //------------------------------------------------------------------------
1188 // kXR_readv
1189 //------------------------------------------------------------------------
1190 case kXR_readv:
1191 {
1192 uint16_t numChunks = (req->readv.dlen)/16;
1193 readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1194 for( size_t i = 0; i < numChunks; ++i )
1195 {
1196 dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1197 dataChunk[i].offset = htonll( dataChunk[i].offset );
1198 }
1199 break;
1200 }
1201
1202 //------------------------------------------------------------------------
1203 // kXR_writev
1204 //------------------------------------------------------------------------
1205 case kXR_writev:
1206 {
1207 uint16_t numChunks = (req->writev.dlen)/16;
1208 XrdProto::write_list *wrtList =
1209 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1210 for( size_t i = 0; i < numChunks; ++i )
1211 {
1212 wrtList[i].wlen = htonl( wrtList[i].wlen );
1213 wrtList[i].offset = htonll( wrtList[i].offset );
1214 }
1215
1216 break;
1217 }
1218
1219 case kXR_pgread:
1220 {
1221 req->pgread.offset = htonll( req->pgread.offset );
1222 req->pgread.rlen = htonl( req->pgread.rlen );
1223 break;
1224 }
1225
1226 case kXR_pgwrite:
1227 {
1228 req->pgwrite.offset = htonll( req->pgwrite.offset );
1229 break;
1230 }
1231
1232 //------------------------------------------------------------------------
1233 // kXR_prepare
1234 //------------------------------------------------------------------------
1235 case kXR_prepare:
1236 {
1237 req->prepare.optionX = htons( req->prepare.optionX );
1238 req->prepare.port = htons( req->prepare.port );
1239 break;
1240 }
1241
1242 case kXR_chkpoint:
1243 {
1244 if( req->chkpoint.opcode == kXR_ckpXeq )
1245 MarshallRequest( msg + 24 );
1246 break;
1247 }
1248 };
1249
1250 req->header.requestid = htons( req->header.requestid );
1251 req->header.dlen = htonl( req->header.dlen );
1252 return XRootDStatus();
1253 }
1254
1255 //----------------------------------------------------------------------------
1256 // Unmarshall the request - sometimes the requests need to be rewritten,
1257 // so we need to unmarshall them
1258 //----------------------------------------------------------------------------
1260 {
1261 if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1262 // We rely on the marshaling process to be symmetric!
1263 // First we unmarshall the request ID and the length because
1264 // MarshallRequest() relies on these, and then we need to unmarshall these
1265 // two again, because they get marshalled in MarshallRequest().
1266 // All this is pretty damn ugly and should be rewritten.
1267 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1268 req->header.requestid = htons( req->header.requestid );
1269 req->header.dlen = htonl( req->header.dlen );
1270 XRootDStatus st = MarshallRequest( msg );
1271 req->header.requestid = htons( req->header.requestid );
1272 req->header.dlen = htonl( req->header.dlen );
1273 msg->SetIsMarshalled( false );
1274 return st;
1275 }
1276
1277 //----------------------------------------------------------------------------
1278 // Unmarshall the body of the incoming message
1279 //----------------------------------------------------------------------------
1281 {
1283
1284 //--------------------------------------------------------------------------
1285 // kXR_ok
1286 //--------------------------------------------------------------------------
1287 if( m->hdr.status == kXR_ok )
1288 {
1289 switch( reqType )
1290 {
1291 //----------------------------------------------------------------------
1292 // kXR_protocol
1293 //----------------------------------------------------------------------
1294 case kXR_protocol:
1295 if( m->hdr.dlen < 8 )
1296 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1297 m->body.protocol.pval = ntohl( m->body.protocol.pval );
1298 m->body.protocol.flags = ntohl( m->body.protocol.flags );
1299 break;
1300 }
1301 }
1302 //--------------------------------------------------------------------------
1303 // kXR_error
1304 //--------------------------------------------------------------------------
1305 else if( m->hdr.status == kXR_error )
1306 {
1307 if( m->hdr.dlen < 4 )
1308 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1309 m->body.error.errnum = ntohl( m->body.error.errnum );
1310 }
1311
1312 //--------------------------------------------------------------------------
1313 // kXR_wait
1314 //--------------------------------------------------------------------------
1315 else if( m->hdr.status == kXR_wait )
1316 {
1317 if( m->hdr.dlen < 4 )
1318 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1319 m->body.wait.seconds = htonl( m->body.wait.seconds );
1320 }
1321
1322 //--------------------------------------------------------------------------
1323 // kXR_redirect
1324 //--------------------------------------------------------------------------
1325 else if( m->hdr.status == kXR_redirect )
1326 {
1327 if( m->hdr.dlen < 4 )
1328 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1329 m->body.redirect.port = htonl( m->body.redirect.port );
1330 }
1331
1332 //--------------------------------------------------------------------------
1333 // kXR_waitresp
1334 //--------------------------------------------------------------------------
1335 else if( m->hdr.status == kXR_waitresp )
1336 {
1337 if( m->hdr.dlen < 4 )
1338 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1339 m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1340 }
1341
1342 //--------------------------------------------------------------------------
1343 // kXR_attn
1344 //--------------------------------------------------------------------------
1345 else if( m->hdr.status == kXR_attn )
1346 {
1347 if( m->hdr.dlen < 4 )
1348 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1349 m->body.attn.actnum = htonl( m->body.attn.actnum );
1350 }
1351
1352 return XRootDStatus();
1353 }
1354
1355 //------------------------------------------------------------------------
1357 //------------------------------------------------------------------------
1359 {
1360 //--------------------------------------------------------------------------
1361 // Calculate the crc32c before the unmarshaling the body!
1362 //--------------------------------------------------------------------------
1364 char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1365 size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1366 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1367
1368 size_t stlen = sizeof( ServerResponseStatus );
1369 switch( reqType )
1370 {
1371 case kXR_pgread:
1372 {
1373 stlen += sizeof( ServerResponseBody_pgRead );
1374 break;
1375 }
1376
1377 case kXR_pgwrite:
1378 {
1379 stlen += sizeof( ServerResponseBody_pgWrite );
1380 break;
1381 }
1382 }
1383
1384 if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1385 "kXR_status: invalid message size." );
1386
1387 rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1388 rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1389
1390 switch( reqType )
1391 {
1392 case kXR_pgread:
1393 {
1395 pgrdbdy->offset = ntohll( pgrdbdy->offset );
1396 break;
1397 }
1398
1399 case kXR_pgwrite:
1400 {
1402 pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1403 break;
1404 }
1405 }
1406
1407 //--------------------------------------------------------------------------
1408 // Do the integrity checks
1409 //--------------------------------------------------------------------------
1410 if( crcval != rspst->bdy.crc32c )
1411 {
1412 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1413 "corrupted (crc32c integrity check failed)." );
1414 }
1415
1416 if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1417 rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1418 {
1419 return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1420 "(stream ID mismatch)." );
1421 }
1422
1423
1424
1425 if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1426 {
1427 return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1428 "(request ID mismatch)." );
1429 }
1430
1431 return XRootDStatus();
1432 }
1433
1435 {
1437 uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1438
1439 switch( reqType )
1440 {
1441 case kXR_pgwrite:
1442 {
1443 //--------------------------------------------------------------------------
1444 // If there's no additional data there's nothing to unmarshal
1445 //--------------------------------------------------------------------------
1446 if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1447 //--------------------------------------------------------------------------
1448 // If there's not enough data to form correction-segment report an error
1449 //--------------------------------------------------------------------------
1450 if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1452 "kXR_status: invalid message size." );
1453
1454 //--------------------------------------------------------------------------
1455 // Calculate the crc32c for the additional data
1456 //--------------------------------------------------------------------------
1458 cse->cseCRC = ntohl( cse->cseCRC );
1459 size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1460 void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1461 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1462
1463 //--------------------------------------------------------------------------
1464 // Do the integrity checks
1465 //--------------------------------------------------------------------------
1466 if( crcval != cse->cseCRC )
1467 {
1468 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1469 "corrupted (crc32c integrity check failed)." );
1470 }
1471
1472 cse->dlFirst = ntohs( cse->dlFirst );
1473 cse->dlLast = ntohs( cse->dlLast );
1474
1475 size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1476 sizeof( kXR_int64 );
1477 kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1478 sizeof( ServerResponseBody_pgWrCSE ) );
1479
1480 for( size_t i = 0; i < pgcnt; ++i )
1481 pgoffs[i] = ntohll( pgoffs[i] );
1482
1483 return XRootDStatus();
1484 break;
1485 }
1486
1487 default:
1488 break;
1489 }
1490
1492 }
1493
1494 //----------------------------------------------------------------------------
1495 // Unmarshall the header of the incoming message
1496 //----------------------------------------------------------------------------
1498 {
1500 header->status = ntohs( header->status );
1501 header->dlen = ntohl( header->dlen );
1502 }
1503
1504 //----------------------------------------------------------------------------
1505 // Log server error response
1506 //----------------------------------------------------------------------------
1508 {
1509 Log *log = DefaultEnv::GetLog();
1510 ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1511 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1512 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1513 log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1514 rsp->body.error.errnum, errmsg );
1515 delete [] errmsg;
1516 }
1517
1518 //------------------------------------------------------------------------
1519 // Number of currently connected data streams
1520 //------------------------------------------------------------------------
1522 {
1523 XRootDChannelInfo *info = 0;
1524 channelData.Get( info );
1525
1526 if (!info) {
1527 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1528 return 0;
1529 }
1530
1531 XrdSysMutexHelper scopedLock( info->mutex );
1532
1533 uint16_t nbConnected = 0;
1534 for( size_t i = 1; i < info->stream.size(); ++i )
1535 if( info->stream[i].status == XRootDStreamInfo::Connected )
1536 ++nbConnected;
1537
1538 return nbConnected;
1539 }
1540
1541 //----------------------------------------------------------------------------
1542 // The stream has been disconnected, do the cleanups
1543 //----------------------------------------------------------------------------
1545 uint16_t subStreamId )
1546 {
1547 XRootDChannelInfo *info = 0;
1548 channelData.Get( info );
1549
1550 if (!info) {
1551 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1552 return;
1553 }
1554
1555 XrdSysMutexHelper scopedLock( info->mutex );
1556
1557 CleanUpProtection( info );
1558
1559 if( !info->stream.empty() )
1560 {
1561 XRootDStreamInfo &sInfo = info->stream[subStreamId];
1563 }
1564
1565 if( subStreamId == 0 )
1566 {
1567 info->sidManager->ReleaseAllTimedOut();
1568 info->sentOpens.clear();
1569 info->sentCloses.clear();
1570 info->openFiles = 0;
1571 info->waitBarrier = 0;
1572 }
1573 }
1574
1575 //------------------------------------------------------------------------
1576 // Query the channel
1577 //------------------------------------------------------------------------
1579 AnyObject &result,
1580 AnyObject &channelData )
1581 {
1582 XRootDChannelInfo *info = 0;
1583 channelData.Get( info );
1584
1585 if (!info)
1587
1588 XrdSysMutexHelper scopedLock( info->mutex );
1589
1590 switch( query )
1591 {
1592 //------------------------------------------------------------------------
1593 // Protocol name
1594 //------------------------------------------------------------------------
1596 result.Set( (const char*)"XRootD", false );
1597 return Status();
1598
1599 //------------------------------------------------------------------------
1600 // Authentication
1601 //------------------------------------------------------------------------
1603 result.Set( new std::string( info->authProtocolName ), false );
1604 return Status();
1605
1606 //------------------------------------------------------------------------
1607 // Server flags
1608 //------------------------------------------------------------------------
1610 result.Set( new int( info->serverFlags ), false );
1611 return Status();
1612
1613 //------------------------------------------------------------------------
1614 // Protocol version
1615 //------------------------------------------------------------------------
1617 result.Set( new int( info->protocolVersion ), false );
1618 return Status();
1619
1621 result.Set( new bool( info->encrypted ), false );
1622 return Status();
1623 };
1625 }
1626
1627 //----------------------------------------------------------------------------
1628 // Check whether the transport can hijack the message
1629 //----------------------------------------------------------------------------
1631 uint16_t subStream,
1632 AnyObject &channelData )
1633 {
1634 XRootDChannelInfo *info = 0;
1635 channelData.Get( info );
1636 XrdSysMutexHelper scopedLock( info->mutex );
1637 Log *log = DefaultEnv::GetLog();
1638
1639 //--------------------------------------------------------------------------
1640 // Update the substream queues
1641 //--------------------------------------------------------------------------
1642 info->strmSelector->MsgReceived( subStream );
1643
1644 //--------------------------------------------------------------------------
1645 // Check whether this message is a response to a request that has
1646 // timed out, and if so, drop it
1647 //--------------------------------------------------------------------------
1649 if( rsp->hdr.status == kXR_attn )
1650 {
1651 return NoAction;
1652 }
1653
1654 if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1655 {
1656 log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1657 "response that we're no longer interested in (timed out)",
1658 &msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1659 //------------------------------------------------------------------------
1660 // If it is kXR_waitresp there will be another one,
1661 // so we don't release the sid yet
1662 //------------------------------------------------------------------------
1663 if( rsp->hdr.status != kXR_waitresp )
1664 info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1665 //------------------------------------------------------------------------
1666 // If it is a successful response to an open request
1667 // that timed out, we need to send a close
1668 //------------------------------------------------------------------------
1669 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1670 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1671 if( sidIt != info->sentOpens.end() )
1672 {
1673 info->sentOpens.erase( sidIt );
1674 if( rsp->hdr.status == kXR_ok ) return RequestClose;
1675 }
1676 return DigestMsg;
1677 }
1678
1679 //--------------------------------------------------------------------------
1680 // If we have a wait or waitresp
1681 //--------------------------------------------------------------------------
1682 uint32_t seconds = 0;
1683 if( rsp->hdr.status == kXR_wait )
1684 seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1685 // to re-send the request
1686 else if( rsp->hdr.status == kXR_waitresp )
1687 {
1688 seconds = ntohl( rsp->body.waitresp.seconds );
1689
1690 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1691 "setting up wait barrier.",
1692 info->streamName.c_str(),
1693 seconds );
1694 }
1695
1696 time_t barrier = time(0) + seconds;
1697 if( info->waitBarrier < barrier )
1698 info->waitBarrier = barrier;
1699
1700 //--------------------------------------------------------------------------
1701 // If we got a response to an open request, we may need to bump the counter
1702 // of open files
1703 //--------------------------------------------------------------------------
1704 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1705 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1706 if( sidIt != info->sentOpens.end() )
1707 {
1708 if( rsp->hdr.status == kXR_waitresp )
1709 return NoAction;
1710 info->sentOpens.erase( sidIt );
1711 if( rsp->hdr.status == kXR_ok )
1712 {
1713 ++info->openFiles;
1714 info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1715 }
1716 return NoAction;
1717 }
1718
1719 //--------------------------------------------------------------------------
1720 // If we got a response to a close, we may need to decrement the counter of
1721 // open files
1722 //--------------------------------------------------------------------------
1723 sidIt = info->sentCloses.find( sid );
1724 if( sidIt != info->sentCloses.end() )
1725 {
1726 if( rsp->hdr.status == kXR_waitresp )
1727 return NoAction;
1728 info->sentCloses.erase( sidIt );
1729 --info->openFiles;
1730 return NoAction;
1731 }
1732 return NoAction;
1733 }
1734
1735 //----------------------------------------------------------------------------
1736 // Notify the transport about a message having been sent
1737 //----------------------------------------------------------------------------
1739 uint16_t subStream,
1740 uint32_t bytesSent,
1741 AnyObject &channelData )
1742 {
1743 // Called when a message has been sent. For messages that return on a
1744 // different pathid (and hence may use a different poller) it is possible
1745 // that the server has already replied and the reply will trigger
1746 // MessageReceived() before this method has been called. However for open
1747 // and close this is never the case and this method is used for tracking
1748 // only those.
1749 XRootDChannelInfo *info = 0;
1750 channelData.Get( info );
1751 XrdSysMutexHelper scopedLock( info->mutex );
1752 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1753 uint16_t reqid = ntohs( req->header.requestid );
1754
1755
1756 //--------------------------------------------------------------------------
1757 // We need to track opens to know if we can close streams due to idleness
1758 //--------------------------------------------------------------------------
1759 uint16_t sid;
1760 memcpy( &sid, req->header.streamid, 2 );
1761
1762 if( reqid == kXR_open )
1763 info->sentOpens.insert( sid );
1764 else if( reqid == kXR_close )
1765 info->sentCloses.insert( sid );
1766 }
1767
1768
1769 //----------------------------------------------------------------------------
1770 // Get signature for given message
1771 //----------------------------------------------------------------------------
1773 {
1774 XRootDChannelInfo *info = 0;
1775 channelData.Get( info );
1776 return GetSignature( toSign, sign, info );
1777 }
1778
1779 //------------------------------------------------------------------------
1781 //------------------------------------------------------------------------
1783 Message *&sign,
1784 XRootDChannelInfo *info )
1785 {
1786 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1787 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1788
1789 ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1790 if( !info ) return Status( stError, errInternal );
1791 if( info->protection )
1792 {
1793 SecurityRequest *newreq = 0;
1794 // check if we have to secure the request in the first place
1795 if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1796 // secure (sign/encrypt) the request
1797 int rc = info->protection->Secure( newreq, *thereq, 0 );
1798 // there was an error
1799 if( rc < 0 )
1800 return Status( stError, errInternal, -rc );
1801
1802 sign = new Message();
1803 sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1804 }
1805
1806 return Status();
1807 }
1808
1809 //------------------------------------------------------------------------
1811 //------------------------------------------------------------------------
1813 {
1814 XRootDChannelInfo *info = 0;
1815 channelData.Get( info );
1816 if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1817 info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1818 }
1819
1820 //----------------------------------------------------------------------------
1821 // Wait before exit
1822 //----------------------------------------------------------------------------
1824 {
1825 XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1826 pSecUnloadHandler->unloaded = true;
1827 }
1828
1829 //----------------------------------------------------------------------------
1830 // @return : true if encryption should be turned on, false otherwise
1831 //----------------------------------------------------------------------------
1833 AnyObject &channelData )
1834 {
1835 XRootDChannelInfo *info = 0;
1836 channelData.Get( info );
1837
1839 int notlsok = DefaultNoTlsOK;
1840 env->GetInt( "NoTlsOK", notlsok );
1841
1842
1843 if( notlsok )
1844 return info->encrypted;
1845
1846 // Did the server instructed us to switch to TLS right away?
1847 if( info->serverFlags & kXR_gotoTLS )
1848 {
1849 info->encrypted = true;
1850 return true ;
1851 }
1852
1853 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1854
1855 //--------------------------------------------------------------------------
1856 // The control stream (sub-stream 0) might need to switch to TLS before
1857 // login or after login
1858 //--------------------------------------------------------------------------
1859 if( handShakeData->subStreamId == 0 )
1860 {
1861 //------------------------------------------------------------------------
1862 // We are about to login and the server asked to start encrypting
1863 // before login
1864 //------------------------------------------------------------------------
1865 if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1866 ( info->serverFlags & kXR_tlsLogin ) )
1867 {
1868 info->encrypted = true;
1869 return true;
1870 }
1871
1872 //--------------------------------------------------------------------
1873 // The hand-shake is done and the server requested to encrypt the session
1874 //--------------------------------------------------------------------
1875 if( (sInfo.status == XRootDStreamInfo::Connected ||
1876 //--------------------------------------------------------------------
1877 // we really need to turn on TLS before we sent kXR_endsess and we
1878 // are about to do so (1st enable encryption, then send kXR_endsess)
1879 //--------------------------------------------------------------------
1881 ( info->serverFlags & kXR_tlsSess ) )
1882 {
1883 info->encrypted = true;
1884 return true;
1885 }
1886 }
1887 //--------------------------------------------------------------------------
1888 // A data stream (sub-stream > 0) if need be will be switched to TLS before
1889 // bind.
1890 //--------------------------------------------------------------------------
1891 else
1892 {
1893 //------------------------------------------------------------------------
1894 // We are about to bind a data stream and the server asked to start
1895 // encrypting before bind
1896 //------------------------------------------------------------------------
1897 if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1898 ( info->serverFlags & kXR_tlsData ) )
1899 {
1900 info->encrypted = true;
1901 return true;
1902 }
1903 }
1904
1905 return false;
1906 }
1907
1908 //------------------------------------------------------------------------
1909 // Get bind preference for the next data stream
1910 //------------------------------------------------------------------------
1912 AnyObject &channelData )
1913 {
1914 XRootDChannelInfo *info = 0;
1915 channelData.Get( info );
1916
1917 if(!info || !info->bindSelector)
1918 return url;
1919
1920 return URL( info->bindSelector->Get() );
1921 }
1922
1923 //----------------------------------------------------------------------------
1924 // Generate the message to be sent as an initial handshake
1925 // (handshake+kXR_protocol)
1926 //----------------------------------------------------------------------------
1927 Message *XRootDTransport::GenerateInitialHSProtocol( HandShakeData *hsData,
1928 XRootDChannelInfo *info,
1929 kXR_char expect )
1930 {
1931 Log *log = DefaultEnv::GetLog();
1933 "[%s] Sending out the initial hand shake + kXR_protocol",
1934 hsData->streamName.c_str() );
1935
1936 Message *msg = new Message();
1937
1938 msg->Allocate( 20+sizeof(ClientProtocolRequest) );
1939 msg->Zero();
1940
1942 init->fourth = htonl(4);
1943 init->fifth = htonl(2012);
1944
1946 InitProtocolReq( proto, info, expect );
1947
1948 return msg;
1949 }
1950
1951 //------------------------------------------------------------------------
1952 // Generate the protocol message
1953 //------------------------------------------------------------------------
1954 Message *XRootDTransport::GenerateProtocol( HandShakeData *hsData,
1955 XRootDChannelInfo *info,
1956 kXR_char expect )
1957 {
1958 Log *log = DefaultEnv::GetLog();
1959 log->Debug( XRootDTransportMsg,
1960 "[%s] Sending out the kXR_protocol",
1961 hsData->streamName.c_str() );
1962
1963 Message *msg = new Message();
1964 msg->Allocate( sizeof(ClientProtocolRequest) );
1965 msg->Zero();
1966
1967 ClientProtocolRequest *proto = (ClientProtocolRequest *)msg->GetBuffer();
1968 InitProtocolReq( proto, info, expect );
1969
1970 return msg;
1971 }
1972
1973 //------------------------------------------------------------------------
1974 // Initialize protocol request
1975 //------------------------------------------------------------------------
1976 void XRootDTransport::InitProtocolReq( ClientProtocolRequest *request,
1977 XRootDChannelInfo *info,
1978 kXR_char expect )
1979 {
1980 request->requestid = htons(kXR_protocol);
1981 request->clientpv = htonl(kXR_PROTOCOLVERSION);
1984
1985 int notlsok = DefaultNoTlsOK;
1986 int tlsnodata = DefaultTlsNoData;
1987
1989
1990 env->GetInt( "NoTlsOK", notlsok );
1991
1993 env->GetInt( "TlsNoData", tlsnodata );
1994
1995 if (info->encrypted || InitTLS())
1997
1998 if (info->encrypted && !(notlsok || tlsnodata))
2000
2001 request->expect = expect;
2002
2003 //--------------------------------------------------------------------------
2004 // If we are in the curse of establishing a connection in the context of
2005 // TPC update the expect! (this will be never followed be a bind)
2006 //--------------------------------------------------------------------------
2007 if( info->istpc )
2009 }
2010
2011 //----------------------------------------------------------------------------
2012 // Process the server initial handshake response
2013 //----------------------------------------------------------------------------
2014 XRootDStatus XRootDTransport::ProcessServerHS( HandShakeData *hsData,
2015 XRootDChannelInfo *info )
2016 {
2017 Log *log = DefaultEnv::GetLog();
2018
2019 Message *msg = hsData->in;
2020 ServerResponseHeader *respHdr = (ServerResponseHeader *)msg->GetBuffer();
2021 ServerInitHandShake *hs = (ServerInitHandShake *)msg->GetBuffer(4);
2022
2023 if( respHdr->status != kXR_ok )
2024 {
2025 log->Error( XRootDTransportMsg, "[%s] Invalid hand shake response",
2026 hsData->streamName.c_str() );
2027
2028 return XRootDStatus( stFatal, errHandShakeFailed, 0, "Invalid hand shake response." );
2029 }
2030
2031 info->protocolVersion = ntohl(hs->protover);
2032 info->serverFlags = ntohl(hs->msgval) == kXR_DataServer ?
2035
2036 log->Debug( XRootDTransportMsg,
2037 "[%s] Got the server hand shake response (%s, protocol "
2038 "version %x)",
2039 hsData->streamName.c_str(),
2040 ServerFlagsToStr( info->serverFlags ).c_str(),
2041 info->protocolVersion );
2042
2043 return XRootDStatus( stOK, suContinue );
2044 }
2045
2046 //----------------------------------------------------------------------------
2047 // Process the protocol response
2048 //----------------------------------------------------------------------------
2049 XRootDStatus XRootDTransport::ProcessProtocolResp( HandShakeData *hsData,
2050 XRootDChannelInfo *info )
2051 {
2052 Log *log = DefaultEnv::GetLog();
2053
2054 XRootDStatus st = UnMarshallBody( hsData->in, kXR_protocol );
2055 if( !st.IsOK() )
2056 return st;
2057
2058 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2059
2060
2061 if( rsp->hdr.status != kXR_ok )
2062 {
2063 log->Error( XRootDTransportMsg, "[%s] kXR_protocol request failed",
2064 hsData->streamName.c_str() );
2065
2066 return XRootDStatus( stFatal, errHandShakeFailed, 0, "kXR_protocol request failed" );
2067 }
2068
2070 int notlsok = DefaultNoTlsOK;
2071 env->GetInt( "NoTlsOK", notlsok );
2072
2073 if( rsp->body.protocol.pval < kXR_PROTTLSVERSION && info->encrypted )
2074 {
2075 //------------------------------------------------------------------------
2076 // User requested an encrypted connection but the server is to old to
2077 // support it!
2078 //------------------------------------------------------------------------
2079 if( !notlsok ) return XRootDStatus( stFatal, errTlsError, ENOTSUP, "TLS not supported" );
2080
2081 //------------------------------------------------------------------------
2082 // We are falling back to unencrypted data transmission, as configured
2083 // in XRD_NOTLSOK environment variable
2084 //------------------------------------------------------------------------
2085 log->Info( XRootDTransportMsg,
2086 "[%s] Falling back to unencrypted transmission, server does "
2087 "not support TLS encryption.",
2088 hsData->streamName.c_str() );
2089 info->encrypted = false;
2090 }
2091
2092 if( rsp->body.protocol.pval >= 0x297 )
2093 info->serverFlags = rsp->body.protocol.flags;
2094
2095 if( rsp->hdr.dlen > 8 )
2096 {
2097 info->protRespBody = new ServerResponseBody_Protocol();
2098 info->protRespBody->flags = rsp->body.protocol.flags;
2099 info->protRespBody->pval = rsp->body.protocol.pval;
2100
2101 char* bodybuff = reinterpret_cast<char*>( &rsp->body.protocol.secreq );
2102 size_t bodysize = rsp->hdr.dlen - 8;
2103 XRootDStatus st = ProcessProtocolBody( bodybuff, bodysize, info );
2104 if( !st.IsOK() )
2105 return st;
2106 }
2107
2108 log->Debug( XRootDTransportMsg,
2109 "[%s] kXR_protocol successful (%s, protocol version %x)",
2110 hsData->streamName.c_str(),
2111 ServerFlagsToStr( info->serverFlags ).c_str(),
2112 info->protocolVersion );
2113
2114 if( !( info->serverFlags & kXR_haveTLS ) && info->encrypted )
2115 {
2116 //------------------------------------------------------------------------
2117 // User requested an encrypted connection but the server was not configured
2118 // to support encryption!
2119 //------------------------------------------------------------------------
2120 return XRootDStatus( stFatal, errTlsError, ECONNREFUSED,
2121 "Server was not configured to support encryption." );
2122 }
2123
2124 //--------------------------------------------------------------------------
2125 // Now see if we have to enforce encryption in case the server does not
2126 // support PgRead/PgWrite
2127 //--------------------------------------------------------------------------
2128 int tlsOnNoPgrw = DefaultWantTlsOnNoPgrw;
2129 env->GetInt( "WantTlsOnNoPgrw", tlsOnNoPgrw );
2130 if( !( info->serverFlags & kXR_suppgrw ) && tlsOnNoPgrw )
2131 {
2132 //------------------------------------------------------------------------
2133 // If user requested encryption just make sure it is not switched off for
2134 // data
2135 //------------------------------------------------------------------------
2136 if( info->encrypted )
2137 {
2138 log->Debug( XRootDTransportMsg,
2139 "[%s] Server does not support PgRead/PgWrite and"
2140 " WantTlsOnNoPgrw is on; enforcing encryption for data.",
2141 hsData->streamName.c_str() );
2142 env->PutInt( "TlsNoData", DefaultTlsNoData );
2143 }
2144 //------------------------------------------------------------------------
2145 // Otherwise, if server is not enforcing data encryption, we will need to
2146 // redo the protocol request with kXR_wantTLS set.
2147 //------------------------------------------------------------------------
2148 else if( !( info->serverFlags & kXR_tlsData ) &&
2149 ( info->serverFlags & kXR_haveTLS ) )
2150 {
2151 info->encrypted = true;
2152 return XRootDStatus( stOK, suRetry );
2153 }
2154 }
2155
2156 return XRootDStatus( stOK, suContinue );
2157 }
2158
2159 XRootDStatus XRootDTransport::ProcessProtocolBody( char *bodybuff,
2160 size_t bodysize,
2161 XRootDChannelInfo *info )
2162 {
2163 //--------------------------------------------------------------------------
2164 // Parse bind preferences
2165 //--------------------------------------------------------------------------
2166 XrdProto::bifReqs *bifreq = reinterpret_cast<XrdProto::bifReqs*>( bodybuff );
2167 if( bodysize >= sizeof( XrdProto::bifReqs ) && bifreq->theTag == 'B' )
2168 {
2169 bodybuff += sizeof( XrdProto::bifReqs );
2170 bodysize -= sizeof( XrdProto::bifReqs );
2171
2172 if( bodysize < bifreq->bifILen )
2173 return XRootDStatus( stError, errDataError, 0, "Received incomplete "
2174 "protocol response." );
2175 std::string bindprefs_str( bodybuff, bifreq->bifILen );
2176 std::vector<std::string> bindprefs;
2177 Utils::splitString( bindprefs, bindprefs_str, "," );
2178 info->bindSelector.reset( new BindPrefSelector( std::move( bindprefs ) ) );
2179 bodybuff += bifreq->bifILen;
2180 bodysize -= bifreq->bifILen;
2181 }
2182 //--------------------------------------------------------------------------
2183 // Parse security requirements
2184 //--------------------------------------------------------------------------
2185 XrdProto::secReqs *secreq = reinterpret_cast<XrdProto::secReqs*>( bodybuff );
2186 if( bodysize >= 6 /*XrdProto::secReqs*/ && secreq->theTag == 'S' )
2187 {
2188 memcpy( &info->protRespBody->secreq, secreq, bodysize );
2189 info->protRespSize = bodysize + 8 /*pval & flags*/;
2190 }
2191
2192 return XRootDStatus();
2193 }
2194
2195 //----------------------------------------------------------------------------
2196 // Generate the bind message
2197 //----------------------------------------------------------------------------
2198 Message *XRootDTransport::GenerateBind( HandShakeData *hsData,
2199 XRootDChannelInfo *info )
2200 {
2201 Log *log = DefaultEnv::GetLog();
2202
2203 log->Debug( XRootDTransportMsg,
2204 "[%s] Sending out the bind request",
2205 hsData->streamName.c_str() );
2206
2207
2208 Message *msg = new Message( sizeof( ClientBindRequest ) );
2209 ClientBindRequest *bindReq = (ClientBindRequest *)msg->GetBuffer();
2210
2211 bindReq->requestid = kXR_bind;
2212 memcpy( bindReq->sessid, info->sessionId, 16 );
2213 bindReq->dlen = 0;
2214 MarshallRequest( msg );
2215 return msg;
2216 }
2217
2218 //----------------------------------------------------------------------------
2219 // Generate the bind message
2220 //----------------------------------------------------------------------------
2221 XRootDStatus XRootDTransport::ProcessBindResp( HandShakeData *hsData,
2222 XRootDChannelInfo *info )
2223 {
2224 Log *log = DefaultEnv::GetLog();
2225
2226 XRootDStatus st = UnMarshallBody( hsData->in, kXR_bind );
2227 if( !st.IsOK() )
2228 return st;
2229
2230 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2231
2232 if( rsp->hdr.status != kXR_ok )
2233 {
2234 log->Error( XRootDTransportMsg, "[%s] kXR_bind request failed",
2235 hsData->streamName.c_str() );
2236 return XRootDStatus( stFatal, errHandShakeFailed, 0, "kXR_bind request failed" );
2237 }
2238
2239 info->stream[hsData->subStreamId].pathId = rsp->body.bind.substreamid;
2240 log->Debug( XRootDTransportMsg, "[%s] kXR_bind successful",
2241 hsData->streamName.c_str() );
2242
2243 return XRootDStatus();
2244 }
2245
2246 //----------------------------------------------------------------------------
2247 // Generate the login message
2248 //----------------------------------------------------------------------------
2249 Message *XRootDTransport::GenerateLogIn( HandShakeData *hsData,
2250 XRootDChannelInfo *info )
2251 {
2252 Log *log = DefaultEnv::GetLog();
2253 Env *env = DefaultEnv::GetEnv();
2254
2255 //--------------------------------------------------------------------------
2256 // Compute the login cgi
2257 //--------------------------------------------------------------------------
2258 int timeZone = XrdSysTimer::TimeZone();
2259 char *hostName = XrdNetUtils::MyHostName();
2260 std::string countryCode = Utils::FQDNToCC( hostName );
2261 char *cgiBuffer = new char[1024 + info->logintoken.size()];
2262 std::string appName;
2263 std::string monInfo;
2264 env->GetString( "AppName", appName );
2265 env->GetString( "MonInfo", monInfo );
2266 if( info->logintoken.empty() )
2267 {
2268 snprintf( cgiBuffer, 1024,
2269 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2270 "xrd.hostname=%s&xrd.rn=%s", countryCode.c_str(), timeZone,
2271 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION );
2272 }
2273 else
2274 {
2275 snprintf( cgiBuffer, 1024,
2276 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2277 "xrd.hostname=%s&xrd.rn=%s&%s", countryCode.c_str(), timeZone,
2278 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION, info->logintoken.c_str() );
2279 }
2280 uint16_t cgiLen = strlen( cgiBuffer );
2281 free( hostName );
2282
2283 //--------------------------------------------------------------------------
2284 // Generate the message
2285 //--------------------------------------------------------------------------
2286 Message *msg = new Message( sizeof(ClientLoginRequest) + cgiLen );
2287 ClientLoginRequest *loginReq = (ClientLoginRequest *)msg->GetBuffer();
2288
2289 loginReq->requestid = kXR_login;
2290 loginReq->pid = ::getpid();
2291 loginReq->capver[0] = (kXR_char) kXR_asyncap | (kXR_char) kXR_ver005;
2292 loginReq->dlen = cgiLen;
2294#ifdef WITH_XRDEC
2295 loginReq->ability2 = kXR_ecredir;
2296#endif
2297
2298 int multiProtocol = 0;
2299 env->GetInt( "MultiProtocol", multiProtocol );
2300 if(multiProtocol)
2301 loginReq->ability |= kXR_multipr;
2302
2303 //--------------------------------------------------------------------------
2304 // Check the IP stacks
2305 //--------------------------------------------------------------------------
2307 bool dualStack = false;
2308 bool privateIPv6 = false;
2309 bool privateIPv4 = false;
2310
2311 if( (stacks & XrdNetUtils::hasIP64) == XrdNetUtils::hasIP64 )
2312 {
2313 dualStack = true;
2314 loginReq->ability |= kXR_hasipv64;
2315 }
2316
2317 if( (stacks & XrdNetUtils::hasIPv6) && !(stacks & XrdNetUtils::hasPub6) )
2318 {
2319 privateIPv6 = true;
2320 loginReq->ability |= kXR_onlyprv6;
2321 }
2322
2323 if( (stacks & XrdNetUtils::hasIPv4) && !(stacks & XrdNetUtils::hasPub4) )
2324 {
2325 privateIPv4 = true;
2326 loginReq->ability |= kXR_onlyprv4;
2327 }
2328
2329 // The following code snippet tries to overcome the problem that this host
2330 // may still be dual-stacked but we don't know it because one of the
2331 // interfaces was not registered in DNS.
2332 //
2333 if( !dualStack && hsData->serverAddr )
2334 {if ( ( ( stacks & XrdNetUtils::hasIPv4 )
2335 && hsData->serverAddr->isIPType(XrdNetAddrInfo::IPv6))
2336 || ( ( stacks & XrdNetUtils::hasIPv6 )
2337 && hsData->serverAddr->isIPType(XrdNetAddrInfo::IPv4)))
2338 {dualStack = true;
2339 loginReq->ability |= kXR_hasipv64;
2340 }
2341 }
2342
2343 //--------------------------------------------------------------------------
2344 // Check the username
2345 //--------------------------------------------------------------------------
2346 std::string buffer( 8, 0 );
2347 if( hsData->url->GetUserName().length() )
2348 buffer = hsData->url->GetUserName();
2349 else
2350 {
2351 char *name = new char[1024];
2352 if( !XrdOucUtils::UserName( geteuid(), name, 1024 ) )
2353 buffer = name;
2354 else
2355 buffer = "_anon_";
2356 delete [] name;
2357 }
2358 buffer.resize( 8, 0 );
2359 std::copy( buffer.begin(), buffer.end(), (char*)loginReq->username );
2360
2361 msg->Append( cgiBuffer, cgiLen, 24 );
2362
2363 log->Debug( XRootDTransportMsg, "[%s] Sending out kXR_login request, "
2364 "username: %s, cgi: %s, dual-stack: %s, private IPv4: %s, "
2365 "private IPv6: %s", hsData->streamName.c_str(),
2366 loginReq->username, cgiBuffer, dualStack ? "true" : "false",
2367 privateIPv4 ? "true" : "false",
2368 privateIPv6 ? "true" : "false" );
2369
2370 delete [] cgiBuffer;
2371 MarshallRequest( msg );
2372 return msg;
2373 }
2374
2375 //----------------------------------------------------------------------------
2376 // Process the protocol response
2377 //----------------------------------------------------------------------------
2378 XRootDStatus XRootDTransport::ProcessLogInResp( HandShakeData *hsData,
2379 XRootDChannelInfo *info )
2380 {
2381 Log *log = DefaultEnv::GetLog();
2382
2383 XRootDStatus st = UnMarshallBody( hsData->in, kXR_login );
2384 if( !st.IsOK() )
2385 return st;
2386
2387 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2388
2389 if( rsp->hdr.status != kXR_ok )
2390 {
2391 log->Error( XRootDTransportMsg, "[%s] Got invalid login response",
2392 hsData->streamName.c_str() );
2393 return XRootDStatus( stFatal, errLoginFailed, 0, "Got invalid login response." );
2394 }
2395
2396 if( !info->firstLogIn )
2397 memcpy( info->oldSessionId, info->sessionId, 16 );
2398
2399 if( rsp->hdr.dlen == 0 && info->protocolVersion <= 0x289 )
2400 {
2401 //--------------------------------------------------------------------------
2402 // This if statement is there only to support dCache inaccurate
2403 // implementation of XRoot protocol, that in some cases returns
2404 // an empty login response for protocol version <= 2.8.9.
2405 //--------------------------------------------------------------------------
2406 memset( info->sessionId, 0, 16 );
2407 log->Warning( XRootDTransportMsg,
2408 "[%s] Logged in, accepting empty login response.",
2409 hsData->streamName.c_str() );
2410 return XRootDStatus();
2411 }
2412
2413 if( rsp->hdr.dlen < 16 )
2414 return XRootDStatus( stError, errDataError, 0, "Login response too short." );
2415
2416 memcpy( info->sessionId, rsp->body.login.sessid, 16 );
2417
2418 std::string sessId = Utils::Char2Hex( rsp->body.login.sessid, 16 );
2419
2420 log->Debug( XRootDTransportMsg, "[%s] Logged in, session: %s",
2421 hsData->streamName.c_str(), sessId.c_str() );
2422
2423 //--------------------------------------------------------------------------
2424 // We have an authentication info to process
2425 //--------------------------------------------------------------------------
2426 if( rsp->hdr.dlen > 16 )
2427 {
2428 size_t len = rsp->hdr.dlen-16;
2429 info->authBuffer = new char[len+1];
2430 info->authBuffer[len] = 0;
2431 memcpy( info->authBuffer, rsp->body.login.sec, len );
2432 log->Debug( XRootDTransportMsg, "[%s] Authentication is required: %s",
2433 hsData->streamName.c_str(), info->authBuffer );
2434
2435 return XRootDStatus( stOK, suContinue );
2436 }
2437
2438 return XRootDStatus();
2439 }
2440
2441 //----------------------------------------------------------------------------
2442 // Do the authentication
2443 //----------------------------------------------------------------------------
2444 XRootDStatus XRootDTransport::DoAuthentication( HandShakeData *hsData,
2445 XRootDChannelInfo *info )
2446 {
2447 //--------------------------------------------------------------------------
2448 // Prepare
2449 //--------------------------------------------------------------------------
2450 Log *log = DefaultEnv::GetLog();
2451 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2452 XrdSecCredentials *credentials = 0;
2453 std::string protocolName;
2454
2455 //--------------------------------------------------------------------------
2456 // We're doing this for the first time
2457 //--------------------------------------------------------------------------
2458 if( sInfo.status == XRootDStreamInfo::LoginSent )
2459 {
2460 log->Debug( XRootDTransportMsg, "[%s] Sending authentication data",
2461 hsData->streamName.c_str() );
2462
2463 //------------------------------------------------------------------------
2464 // Set up the authentication environment
2465 //------------------------------------------------------------------------
2466 info->authEnv = new XrdOucEnv();
2467 info->authEnv->Put( "sockname", hsData->clientName.c_str() );
2468 info->authEnv->Put( "username", hsData->url->GetUserName().c_str() );
2469 info->authEnv->Put( "password", hsData->url->GetPassword().c_str() );
2470
2471 const URL::ParamsMap &urlParams = hsData->url->GetParams();
2472 URL::ParamsMap::const_iterator it;
2473 for( it = urlParams.begin(); it != urlParams.end(); ++it )
2474 {
2475 if( it->first.compare( 0, 4, "xrd." ) == 0 ||
2476 it->first.compare( 0, 6, "xrdcl." ) == 0 )
2477 info->authEnv->Put( it->first.c_str(), it->second.c_str() );
2478 }
2479
2480 //------------------------------------------------------------------------
2481 // Initialize some other structs
2482 //------------------------------------------------------------------------
2483 size_t authBuffLen = strlen( info->authBuffer );
2484 char *pars = (char *)malloc( authBuffLen + 1 );
2485 memcpy( pars, info->authBuffer, authBuffLen );
2486 info->authParams = new XrdSecParameters( pars, authBuffLen );
2487 sInfo.status = XRootDStreamInfo::AuthSent;
2488 delete [] info->authBuffer;
2489 info->authBuffer = 0;
2490
2491 //------------------------------------------------------------------------
2492 // Find a protocol that gives us valid credentials
2493 //------------------------------------------------------------------------
2494 XRootDStatus st = GetCredentials( credentials, hsData, info );
2495 if( !st.IsOK() )
2496 {
2497 CleanUpAuthentication( info );
2498 return st;
2499 }
2500 protocolName = info->authProtocol->Entity.prot;
2501 }
2502
2503 //--------------------------------------------------------------------------
2504 // We've been here already
2505 //--------------------------------------------------------------------------
2506 else
2507 {
2508 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2509 protocolName = info->authProtocol->Entity.prot;
2510
2511 //------------------------------------------------------------------------
2512 // We're required to send out more authentication data
2513 //------------------------------------------------------------------------
2514 if( rsp->hdr.status == kXR_authmore )
2515 {
2516 log->Debug( XRootDTransportMsg,
2517 "[%s] Sending more authentication data for %s",
2518 hsData->streamName.c_str(), protocolName.c_str() );
2519
2520 uint32_t len = rsp->hdr.dlen;
2521 char *secTokenData = (char*)malloc( len );
2522 memcpy( secTokenData, rsp->body.authmore.data, len );
2523 XrdSecParameters *secToken = new XrdSecParameters( secTokenData, len );
2524 XrdOucErrInfo ei( "", info->authEnv);
2525 credentials = info->authProtocol->getCredentials( secToken, &ei );
2526 delete secToken;
2527
2528 //----------------------------------------------------------------------
2529 // The protocol handler refuses to give us the data
2530 //----------------------------------------------------------------------
2531 if( !credentials )
2532 {
2533 log->Error( XRootDTransportMsg,
2534 "[%s] Auth protocol handler for %s refuses to give "
2535 "us more credentials %s",
2536 hsData->streamName.c_str(), protocolName.c_str(),
2537 ei.getErrText() );
2538 CleanUpAuthentication( info );
2539 return XRootDStatus( stFatal, errAuthFailed, 0, ei.getErrText() );
2540 }
2541 }
2542
2543 //------------------------------------------------------------------------
2544 // We have succeeded
2545 //------------------------------------------------------------------------
2546 else if( rsp->hdr.status == kXR_ok )
2547 {
2548 info->authProtocolName = info->authProtocol->Entity.prot;
2549
2550 //----------------------------------------------------------------------
2551 // Do we need protection?
2552 //----------------------------------------------------------------------
2553 if( info->protRespBody )
2554 {
2555 int rc = XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
2556 if( rc > 0 )
2557 {
2558 log->Debug( XRootDTransportMsg,
2559 "[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
2560 }
2561 else if( rc == 0 )
2562 {
2563 log->Debug( XRootDTransportMsg,
2564 "[%s] XrdSecProtect: no protection needed.",
2565 hsData->streamName.c_str() );
2566 }
2567 else
2568 {
2569 log->Debug( XRootDTransportMsg,
2570 "[%s] Failed to load XrdSecProtect: %s",
2571 hsData->streamName.c_str(), XrdSysE2T( -rc ) );
2572 CleanUpAuthentication( info );
2573
2574 return XRootDStatus( stError, errAuthFailed, -rc, XrdSysE2T( -rc ) );
2575 }
2576 }
2577
2578 if( !info->protection )
2579 CleanUpAuthentication( info );
2580 else
2581 pSecUnloadHandler->Register( info->authProtocolName );
2582
2583 log->Debug( XRootDTransportMsg,
2584 "[%s] Authenticated with %s.", hsData->streamName.c_str(),
2585 protocolName.c_str() );
2586
2587 //--------------------------------------------------------------------
2588 // Clear the SSL error queue of the calling thread, as there might be
2589 // some leftover from the authentication!
2590 //--------------------------------------------------------------------
2592
2593 return XRootDStatus();
2594 }
2595 //------------------------------------------------------------------------
2596 // Failure
2597 //------------------------------------------------------------------------
2598 else if( rsp->hdr.status == kXR_error )
2599 {
2600 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
2601 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
2602 log->Error( XRootDTransportMsg,
2603 "[%s] Authentication with %s failed: %s",
2604 hsData->streamName.c_str(), protocolName.c_str(),
2605 errmsg );
2606 delete [] errmsg;
2607
2608 info->authProtocol->Delete();
2609 info->authProtocol = 0;
2610
2611 //----------------------------------------------------------------------
2612 // Find another protocol that gives us valid credentials
2613 //----------------------------------------------------------------------
2614 XRootDStatus st = GetCredentials( credentials, hsData, info );
2615 if( !st.IsOK() )
2616 {
2617 CleanUpAuthentication( info );
2618 return st;
2619 }
2620 protocolName = info->authProtocol->Entity.prot;
2621 }
2622 //------------------------------------------------------------------------
2623 // God knows what
2624 //------------------------------------------------------------------------
2625 else
2626 {
2627 info->authProtocolName = info->authProtocol->Entity.prot;
2628 CleanUpAuthentication( info );
2629
2630 log->Error( XRootDTransportMsg,
2631 "[%s] Authentication with %s failed: unexpected answer",
2632 hsData->streamName.c_str(), protocolName.c_str() );
2633 return XRootDStatus( stFatal, errAuthFailed, 0, "Authentication failed: unexpected answer." );
2634 }
2635 }
2636
2637 //--------------------------------------------------------------------------
2638 // Generate the client request
2639 //--------------------------------------------------------------------------
2640 Message *msg = new Message( sizeof(ClientAuthRequest)+credentials->size );
2641 msg->Zero();
2642 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
2643 char *reqBuffer = msg->GetBuffer(sizeof(ClientAuthRequest));
2644
2645 req->header.requestid = kXR_auth;
2646 req->auth.dlen = credentials->size;
2647 memcpy( req->auth.credtype, protocolName.c_str(),
2648 protocolName.length() > 4 ? 4 : protocolName.length() );
2649
2650 memcpy( reqBuffer, credentials->buffer, credentials->size );
2651 hsData->out = msg;
2652 MarshallRequest( msg );
2653 delete credentials;
2654
2655 //------------------------------------------------------------------------
2656 // Clear the SSL error queue of the calling thread, as there might be
2657 // some leftover from the authentication!
2658 //------------------------------------------------------------------------
2660
2661 return XRootDStatus( stOK, suContinue );
2662 }
2663
2664 //------------------------------------------------------------------------
2665 // Get the initial credentials using one of the protocols
2666 //------------------------------------------------------------------------
2667 XRootDStatus XRootDTransport::GetCredentials( XrdSecCredentials *&credentials,
2668 HandShakeData *hsData,
2669 XRootDChannelInfo *info )
2670 {
2671 //--------------------------------------------------------------------------
2672 // Set up the auth handler
2673 //--------------------------------------------------------------------------
2674 Log *log = DefaultEnv::GetLog();
2675 XrdOucErrInfo ei( "", info->authEnv);
2676 XrdSecGetProt_t authHandler = GetAuthHandler();
2677 if( !authHandler )
2678 return XRootDStatus( stFatal, errAuthFailed, 0, "Could not load authentication handler." );
2679
2680 //--------------------------------------------------------------------------
2681 // Retrieve secuid and secgid, if available. These will override the fsuid
2682 // and fsgid of the current thread reading the credentials to prevent
2683 // security holes in case this process is running with elevated permissions.
2684 //--------------------------------------------------------------------------
2685 char *secuidc = (ei.getEnv()) ? ei.getEnv()->Get("xrdcl.secuid") : 0;
2686 char *secgidc = (ei.getEnv()) ? ei.getEnv()->Get("xrdcl.secgid") : 0;
2687
2688 int secuid = -1;
2689 int secgid = -1;
2690
2691 if(secuidc) secuid = atoi(secuidc);
2692 if(secgidc) secgid = atoi(secgidc);
2693
2694#ifdef __linux__
2695 ScopedFsUidSetter uidSetter(secuid, secgid, hsData->streamName);
2696 if(!uidSetter.IsOk()) {
2697 log->Error( XRootDTransportMsg, "[%s] Error while setting (fsuid, fsgid) to (%d, %d)",
2698 hsData->streamName.c_str(), secuid, secgid );
2699 return XRootDStatus( stFatal, errAuthFailed, 0, "Error while setting (fsuid, fsgid)." );
2700 }
2701#else
2702 if(secuid >= 0 || secgid >= 0) {
2703 log->Error( XRootDTransportMsg, "[%s] xrdcl.secuid and xrdcl.secgid only supported on Linux.",
2704 hsData->streamName.c_str() );
2705 return XRootDStatus( stFatal, errAuthFailed, 0, "xrdcl.secuid and xrdcl.secgid"
2706 " only supported on Linux" );
2707 }
2708#endif
2709
2710 //--------------------------------------------------------------------------
2711 // Loop over the possible protocols to find one that gives us valid
2712 // credentials
2713 //--------------------------------------------------------------------------
2714 XrdNetAddr &srvAddrInfo = *const_cast<XrdNetAddr *>(hsData->serverAddr);
2715 srvAddrInfo.SetTLS( info->encrypted );
2716 while(1)
2717 {
2718 //------------------------------------------------------------------------
2719 // Get the protocol
2720 //------------------------------------------------------------------------
2721 info->authProtocol = (*authHandler)( hsData->url->GetHostName().c_str(),
2722 srvAddrInfo,
2723 *info->authParams,
2724 &ei );
2725 if( !info->authProtocol )
2726 {
2727 log->Error( XRootDTransportMsg, "[%s] No protocols left to try",
2728 hsData->streamName.c_str() );
2729 return XRootDStatus( stFatal, errAuthFailed, 0, "No protocols left to try" );
2730 }
2731
2732 std::string protocolName = info->authProtocol->Entity.prot;
2733 log->Debug( XRootDTransportMsg, "[%s] Trying to authenticate using %s",
2734 hsData->streamName.c_str(), protocolName.c_str() );
2735
2736 //------------------------------------------------------------------------
2737 // Get the credentials from the current protocol
2738 //------------------------------------------------------------------------
2739 credentials = info->authProtocol->getCredentials( 0, &ei );
2740 if( !credentials )
2741 {
2742 log->Debug( XRootDTransportMsg,
2743 "[%s] Cannot get credentials for protocol %s: %s",
2744 hsData->streamName.c_str(), protocolName.c_str(),
2745 ei.getErrText() );
2746 info->authProtocol->Delete();
2747 continue;
2748 }
2749 return XRootDStatus( stOK, suContinue );
2750 }
2751 }
2752
2753 //------------------------------------------------------------------------
2754 // Clean up the data structures created for the authentication process
2755 //------------------------------------------------------------------------
2756 Status XRootDTransport::CleanUpAuthentication( XRootDChannelInfo *info )
2757 {
2758 if( info->authProtocol )
2759 info->authProtocol->Delete();
2760 delete info->authParams;
2761 delete info->authEnv;
2762 info->authProtocol = 0;
2763 info->authParams = 0;
2764 info->authEnv = 0;
2766 return Status();
2767 }
2768
2769 //------------------------------------------------------------------------
2770 // Clean up the data structures created for the protection purposes
2771 //------------------------------------------------------------------------
2772 Status XRootDTransport::CleanUpProtection( XRootDChannelInfo *info )
2773 {
2774 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
2775 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
2776
2777 if( info->protection )
2778 {
2779 info->protection->Delete();
2780 info->protection = 0;
2781
2782 CleanUpAuthentication( info );
2783 }
2784
2785 if( info->protRespBody )
2786 {
2787 delete info->protRespBody;
2788 info->protRespBody = 0;
2789 info->protRespSize = 0;
2790 }
2791
2792 return Status();
2793 }
2794
2795 //----------------------------------------------------------------------------
2796 // Get the authentication function handle
2797 //----------------------------------------------------------------------------
2798 XrdSecGetProt_t XRootDTransport::GetAuthHandler()
2799 {
2800 Log *log = DefaultEnv::GetLog();
2801 char errorBuff[1024];
2802
2803 // the static constructor is invoked only once and it is guaranteed that this
2804 // is thread safe
2805 static std::atomic<XrdSecGetProt_t> authHandler( XrdSecLoadSecFactory( errorBuff, 1024 ) );
2806 auto ret = authHandler.load( std::memory_order_relaxed );
2807 if( ret ) return ret;
2808
2809 // if we are here it means we failed to load the security library for the
2810 // first time and we hope the environment changed
2811
2812 // obtain a lock
2813 static XrdSysMutex mtx;
2814 XrdSysMutexHelper lck( mtx );
2815 // check if in the meanwhile some else didn't load the library
2816 ret = authHandler.load( std::memory_order_relaxed );
2817 if( ret ) return ret;
2818
2819 // load the library
2820 ret = XrdSecLoadSecFactory( errorBuff, 1024 );
2821 authHandler.store( ret, std::memory_order_relaxed );
2822 // if we failed report an error
2823 if( !ret )
2824 {
2825 log->Error( XRootDTransportMsg,
2826 "Unable to get the security framework: %s", errorBuff );
2827 return 0;
2828 }
2829 return ret;
2830 }
2831
2832 //----------------------------------------------------------------------------
2833 // Generate the end session message
2834 //----------------------------------------------------------------------------
2835 Message *XRootDTransport::GenerateEndSession( HandShakeData *hsData,
2836 XRootDChannelInfo *info )
2837 {
2838 Log *log = DefaultEnv::GetLog();
2839
2840 //--------------------------------------------------------------------------
2841 // Generate the message
2842 //--------------------------------------------------------------------------
2843 Message *msg = new Message( sizeof(ClientEndsessRequest) );
2844 ClientEndsessRequest *endsessReq = (ClientEndsessRequest *)msg->GetBuffer();
2845
2846 endsessReq->requestid = kXR_endsess;
2847 memcpy( endsessReq->sessid, info->oldSessionId, 16 );
2848 std::string sessId = Utils::Char2Hex( endsessReq->sessid, 16 );
2849
2850 log->Debug( XRootDTransportMsg, "[%s] Sending out kXR_endsess for session:"
2851 " %s", hsData->streamName.c_str(), sessId.c_str() );
2852
2853 MarshallRequest( msg );
2854
2855 Message *sign = 0;
2856 GetSignature( msg, sign, info );
2857 if( sign )
2858 {
2859 //------------------------------------------------------------------------
2860 // Now place both the signature and the request in a single buffer
2861 //------------------------------------------------------------------------
2862 uint32_t size = sign->GetSize();
2863 sign->ReAllocate( size + msg->GetSize() );
2864 char* buffer = sign->GetBuffer( size );
2865 memcpy( buffer, msg->GetBuffer(), msg->GetSize() );
2866 msg->Grab( sign->GetBuffer(), sign->GetSize() );
2867 }
2868
2869 return msg;
2870 }
2871
2872 //----------------------------------------------------------------------------
2873 // Process the protocol response
2874 //----------------------------------------------------------------------------
2875 Status XRootDTransport::ProcessEndSessionResp( HandShakeData *hsData,
2876 XRootDChannelInfo *info )
2877 {
2878 Log *log = DefaultEnv::GetLog();
2879
2880 Status st = UnMarshallBody( hsData->in, kXR_endsess );
2881 if( !st.IsOK() )
2882 return st;
2883
2884 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2885
2886 // If we're good, we're good!
2887 if( rsp->hdr.status == kXR_ok )
2888 return Status();
2889
2890 // we ignore not found errors as such an error means the connection
2891 // has been already terminated
2892 if( rsp->hdr.status == kXR_error && rsp->body.error.errnum == kXR_NotFound )
2893 return Status();
2894
2895 // other errors
2896 if( rsp->hdr.status == kXR_error )
2897 {
2898 std::string errorMsg( rsp->body.error.errmsg, rsp->hdr.dlen - 4 );
2899 log->Error( XRootDTransportMsg, "[%s] Got error response to "
2900 "kXR_endsess: %s", hsData->streamName.c_str(),
2901 errorMsg.c_str() );
2902 return Status( stFatal, errHandShakeFailed );
2903 }
2904
2905 // Wait Response.
2906 if( rsp->hdr.status == kXR_wait )
2907 {
2908 std::string msg( rsp->body.wait.infomsg, rsp->hdr.dlen - 4 );
2909 log->Info( XRootDTransportMsg, "[%s] Got wait response to "
2910 "kXR_endsess: %s", hsData->streamName.c_str(),
2911 msg.c_str() );
2912 hsData->out = GenerateEndSession( hsData, info );
2913 return Status( stOK, suRetry );
2914 }
2915
2916 // Any other response is protocol violation
2917 return Status( stError, errDataError );
2918 }
2919
2920 //----------------------------------------------------------------------------
2921 // Get a string representation of the server flags
2922 //----------------------------------------------------------------------------
2923 std::string XRootDTransport::ServerFlagsToStr( uint32_t flags )
2924 {
2925 std::string repr = "type: ";
2926 if( flags & kXR_isManager )
2927 repr += "manager ";
2928
2929 else if( flags & kXR_isServer )
2930 repr += "server ";
2931
2932 repr += "[";
2933
2934 if( flags & kXR_attrMeta )
2935 repr += "meta ";
2936
2937 else if( flags & kXR_attrCache )
2938 repr += "cache ";
2939
2940 else if( flags & kXR_attrProxy )
2941 repr += "proxy ";
2942
2943 else if( flags & kXR_attrSuper )
2944 repr += "super ";
2945
2946 else
2947 repr += " ";
2948
2949 repr.erase( repr.length()-1, 1 );
2950
2951 repr += "]";
2952 return repr;
2953 }
2954}
2955
2956namespace
2957{
2958 // Extract file name from a request
2959 //----------------------------------------------------------------------------
2960 char *GetDataAsString( char *msg )
2961 {
2963 char *fn = new char[req->dlen+1];
2964 memcpy( fn, msg + 24, req->dlen );
2965 fn[req->dlen] = 0;
2966 return fn;
2967 }
2968}
2969
2970namespace XrdCl
2971{
2972 //----------------------------------------------------------------------------
2973 // Get the description of a message
2974 //----------------------------------------------------------------------------
2975 void XRootDTransport::GenerateDescription( char *msg, std::ostringstream &o )
2976 {
2977 Log *log = DefaultEnv::GetLog();
2978 if( log->GetLevel() < Log::ErrorMsg )
2979 return;
2980
2981 ClientRequestHdr *req = (ClientRequestHdr *)msg;
2982 switch( req->requestid )
2983 {
2984 //------------------------------------------------------------------------
2985 // kXR_open
2986 //------------------------------------------------------------------------
2987 case kXR_open:
2988 {
2989 ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
2990 o << "kXR_open (";
2991 char *fn = GetDataAsString( msg );
2992 o << "file: " << fn << ", ";
2993 delete [] fn;
2994 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
2995 o << std::setbase(10);
2996 o << "flags: ";
2997 if( sreq->options == 0 )
2998 o << "none";
2999 else
3000 {
3001 if( sreq->options & kXR_compress )
3002 o << "kXR_compress ";
3003 if( sreq->options & kXR_delete )
3004 o << "kXR_delete ";
3005 if( sreq->options & kXR_force )
3006 o << "kXR_force ";
3007 if( sreq->options & kXR_mkpath )
3008 o << "kXR_mkpath ";
3009 if( sreq->options & kXR_new )
3010 o << "kXR_new ";
3011 if( sreq->options & kXR_nowait )
3012 o << "kXR_nowait ";
3013 if( sreq->options & kXR_open_apnd )
3014 o << "kXR_open_apnd ";
3015 if( sreq->options & kXR_open_read )
3016 o << "kXR_open_read ";
3017 if( sreq->options & kXR_open_updt )
3018 o << "kXR_open_updt ";
3019 if( sreq->options & kXR_open_wrto )
3020 o << "kXR_open_wrto ";
3021 if( sreq->options & kXR_posc )
3022 o << "kXR_posc ";
3023 if( sreq->options & kXR_prefname )
3024 o << "kXR_prefname ";
3025 if( sreq->options & kXR_refresh )
3026 o << "kXR_refresh ";
3027 if( sreq->options & kXR_4dirlist )
3028 o << "kXR_4dirlist ";
3029 if( sreq->options & kXR_replica )
3030 o << "kXR_replica ";
3031 if( sreq->options & kXR_seqio )
3032 o << "kXR_seqio ";
3033 if( sreq->options & kXR_async )
3034 o << "kXR_async ";
3035 if( sreq->options & kXR_retstat )
3036 o << "kXR_retstat ";
3037 }
3038 o << ")";
3039 break;
3040 }
3041
3042 //------------------------------------------------------------------------
3043 // kXR_close
3044 //------------------------------------------------------------------------
3045 case kXR_close:
3046 {
3048 o << "kXR_close (";
3049 o << "handle: " << FileHandleToStr( sreq->fhandle );
3050 o << ")";
3051 break;
3052 }
3053
3054 //------------------------------------------------------------------------
3055 // kXR_stat
3056 //------------------------------------------------------------------------
3057 case kXR_stat:
3058 {
3059 ClientStatRequest *sreq = (ClientStatRequest *)msg;
3060 o << "kXR_stat (";
3061 if( sreq->dlen )
3062 {
3063 char *fn = GetDataAsString( msg );;
3064 o << "path: " << fn << ", ";
3065 delete [] fn;
3066 }
3067 else
3068 {
3069 o << "handle: " << FileHandleToStr( sreq->fhandle );
3070 o << ", ";
3071 }
3072 o << "flags: ";
3073 if( sreq->options == 0 )
3074 o << "none";
3075 else
3076 {
3077 if( sreq->options & kXR_vfs )
3078 o << "kXR_vfs";
3079 }
3080 o << ")";
3081 break;
3082 }
3083
3084 //------------------------------------------------------------------------
3085 // kXR_read
3086 //------------------------------------------------------------------------
3087 case kXR_read:
3088 {
3089 ClientReadRequest *sreq = (ClientReadRequest *)msg;
3090 o << "kXR_read (";
3091 o << "handle: " << FileHandleToStr( sreq->fhandle );
3092 o << std::setbase(10);
3093 o << ", ";
3094 o << "offset: " << sreq->offset << ", ";
3095 o << "size: " << sreq->rlen << ")";
3096 break;
3097 }
3098
3099 //------------------------------------------------------------------------
3100 // kXR_pgread
3101 //------------------------------------------------------------------------
3102 case kXR_pgread:
3103 {
3105 o << "kXR_pgread (";
3106 o << "handle: " << FileHandleToStr( sreq->fhandle );
3107 o << std::setbase(10);
3108 o << ", ";
3109 o << "offset: " << sreq->offset << ", ";
3110 o << "size: " << sreq->rlen << ")";
3111 break;
3112 }
3113
3114 //------------------------------------------------------------------------
3115 // kXR_write
3116 //------------------------------------------------------------------------
3117 case kXR_write:
3118 {
3120 o << "kXR_write (";
3121 o << "handle: " << FileHandleToStr( sreq->fhandle );
3122 o << std::setbase(10);
3123 o << ", ";
3124 o << "offset: " << sreq->offset << ", ";
3125 o << "size: " << sreq->dlen << ")";
3126 break;
3127 }
3128
3129 //------------------------------------------------------------------------
3130 // kXR_pgwrite
3131 //------------------------------------------------------------------------
3132 case kXR_pgwrite:
3133 {
3135 o << "kXR_pgwrite (";
3136 o << "handle: " << FileHandleToStr( sreq->fhandle );
3137 o << std::setbase(10);
3138 o << ", ";
3139 o << "offset: " << sreq->offset << ", ";
3140 o << "size: " << sreq->dlen << ")";
3141 break;
3142 }
3143
3144 //------------------------------------------------------------------------
3145 // kXR_fattr
3146 //------------------------------------------------------------------------
3147 case kXR_fattr:
3148 {
3150 int nattr = sreq->numattr;
3151 int options = sreq->options;
3152 o << "kXR_fattr";
3153 switch (sreq->subcode) {
3154 case kXR_fattrGet:
3155 o << "Get";
3156 break;
3157 case kXR_fattrSet:
3158 o << "Set";
3159 break;
3160 case kXR_fattrList:
3161 o << "List";
3162 break;
3163 case kXR_fattrDel:
3164 o << "Delete";
3165 break;
3166 default:
3167 o << " unknown subcode: " << sreq->subcode;
3168 break;
3169 }
3170 o << " (handle: " << FileHandleToStr( sreq->fhandle );
3171 o << std::setbase(10);
3172 if (nattr)
3173 o << ", numattr: " << nattr;
3174 if (options) {
3175 o << ", options: ";
3176 if (options & 0x01)
3177 o << "new";
3178 if (options & 0x10)
3179 o << "list values";
3180 }
3181 o << ", total size: " << req->dlen << ")";
3182 break;
3183 }
3184
3185 //------------------------------------------------------------------------
3186 // kXR_sync
3187 //------------------------------------------------------------------------
3188 case kXR_sync:
3189 {
3190 ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3191 o << "kXR_sync (";
3192 o << "handle: " << FileHandleToStr( sreq->fhandle );
3193 o << ")";
3194 break;
3195 }
3196
3197 //------------------------------------------------------------------------
3198 // kXR_truncate
3199 //------------------------------------------------------------------------
3200 case kXR_truncate:
3201 {
3203 o << "kXR_truncate (";
3204 if( !sreq->dlen )
3205 o << "handle: " << FileHandleToStr( sreq->fhandle );
3206 else
3207 {
3208 char *fn = GetDataAsString( msg );
3209 o << "file: " << fn;
3210 delete [] fn;
3211 }
3212 o << std::setbase(10);
3213 o << ", ";
3214 o << "offset: " << sreq->offset;
3215 o << ")";
3216 break;
3217 }
3218
3219 //------------------------------------------------------------------------
3220 // kXR_readv
3221 //------------------------------------------------------------------------
3222 case kXR_readv:
3223 {
3224 unsigned char *fhandle = 0;
3225 o << "kXR_readv (";
3226
3227 o << "handle: ";
3228 readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3229 fhandle = dataChunk[0].fhandle;
3230 if( fhandle )
3231 o << FileHandleToStr( fhandle );
3232 else
3233 o << "unknown";
3234 o << ", ";
3235 o << std::setbase(10);
3236 o << "chunks: [";
3237 uint64_t size = 0;
3238 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3239 {
3240 size += dataChunk[i].rlen;
3241 o << "(offset: " << dataChunk[i].offset;
3242 o << ", size: " << dataChunk[i].rlen << "); ";
3243 }
3244 o << "], ";
3245 o << "total size: " << size << ")";
3246 break;
3247 }
3248
3249 //------------------------------------------------------------------------
3250 // kXR_writev
3251 //------------------------------------------------------------------------
3252 case kXR_writev:
3253 {
3254 unsigned char *fhandle = 0;
3255 o << "kXR_writev (";
3256
3257 XrdProto::write_list *wrtList =
3258 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3259 uint64_t size = 0;
3260 uint32_t numChunks = 0;
3261 for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3262 {
3263 fhandle = wrtList[i].fhandle;
3264 size += wrtList[i].wlen;
3265 ++numChunks;
3266 }
3267 o << "handle: ";
3268 if( fhandle )
3269 o << FileHandleToStr( fhandle );
3270 else
3271 o << "unknown";
3272 o << ", ";
3273 o << std::setbase(10);
3274 o << "chunks: " << numChunks << ", ";
3275 o << "total size: " << size << ")";
3276 break;
3277 }
3278
3279 //------------------------------------------------------------------------
3280 // kXR_locate
3281 //------------------------------------------------------------------------
3282 case kXR_locate:
3283 {
3285 char *fn = GetDataAsString( msg );;
3286 o << "kXR_locate (";
3287 o << "path: " << fn << ", ";
3288 delete [] fn;
3289 o << "flags: ";
3290 if( sreq->options == 0 )
3291 o << "none";
3292 else
3293 {
3294 if( sreq->options & kXR_refresh )
3295 o << "kXR_refresh ";
3296 if( sreq->options & kXR_prefname )
3297 o << "kXR_prefname ";
3298 if( sreq->options & kXR_nowait )
3299 o << "kXR_nowait ";
3300 if( sreq->options & kXR_force )
3301 o << "kXR_force ";
3302 if( sreq->options & kXR_compress )
3303 o << "kXR_compress ";
3304 }
3305 o << ")";
3306 break;
3307 }
3308
3309 //------------------------------------------------------------------------
3310 // kXR_mv
3311 //------------------------------------------------------------------------
3312 case kXR_mv:
3313 {
3314 ClientMvRequest *sreq = (ClientMvRequest *)msg;
3315 o << "kXR_mv (";
3316 o << "source: ";
3317 o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3318 o << ", ";
3319 o << "destination: ";
3320 o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3321 o << ")";
3322 break;
3323 }
3324
3325 //------------------------------------------------------------------------
3326 // kXR_query
3327 //------------------------------------------------------------------------
3328 case kXR_query:
3329 {
3331 o << "kXR_query (";
3332 o << "code: ";
3333 switch( sreq->infotype )
3334 {
3335 case kXR_Qconfig: o << "kXR_Qconfig"; break;
3336 case kXR_Qckscan: o << "kXR_Qckscan"; break;
3337 case kXR_Qcksum: o << "kXR_Qcksum"; break;
3338 case kXR_Qopaque: o << "kXR_Qopaque"; break;
3339 case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3340 case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3341 case kXR_QPrep: o << "kXR_QPrep"; break;
3342 case kXR_Qspace: o << "kXR_Qspace"; break;
3343 case kXR_QStats: o << "kXR_QStats"; break;
3344 case kXR_Qvisa: o << "kXR_Qvisa"; break;
3345 case kXR_Qxattr: o << "kXR_Qxattr"; break;
3346 default: o << sreq->infotype; break;
3347 }
3348 o << ", ";
3349
3350 if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3351 {
3352 o << "handle: " << FileHandleToStr( sreq->fhandle );
3353 o << ", ";
3354 }
3355
3356 o << "arg length: " << sreq->dlen << ")";
3357 break;
3358 }
3359
3360 //------------------------------------------------------------------------
3361 // kXR_rm
3362 //------------------------------------------------------------------------
3363 case kXR_rm:
3364 {
3365 o << "kXR_rm (";
3366 char *fn = GetDataAsString( msg );;
3367 o << "path: " << fn << ")";
3368 delete [] fn;
3369 break;
3370 }
3371
3372 //------------------------------------------------------------------------
3373 // kXR_mkdir
3374 //------------------------------------------------------------------------
3375 case kXR_mkdir:
3376 {
3378 o << "kXR_mkdir (";
3379 char *fn = GetDataAsString( msg );
3380 o << "path: " << fn << ", ";
3381 delete [] fn;
3382 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3383 o << std::setbase(10);
3384 o << "flags: ";
3385 if( sreq->options[0] == 0 )
3386 o << "none";
3387 else
3388 {
3389 if( sreq->options[0] & kXR_mkdirpath )
3390 o << "kXR_mkdirpath";
3391 }
3392 o << ")";
3393 break;
3394 }
3395
3396 //------------------------------------------------------------------------
3397 // kXR_rmdir
3398 //------------------------------------------------------------------------
3399 case kXR_rmdir:
3400 {
3401 o << "kXR_rmdir (";
3402 char *fn = GetDataAsString( msg );
3403 o << "path: " << fn << ")";
3404 delete [] fn;
3405 break;
3406 }
3407
3408 //------------------------------------------------------------------------
3409 // kXR_chmod
3410 //------------------------------------------------------------------------
3411 case kXR_chmod:
3412 {
3414 o << "kXR_chmod (";
3415 char *fn = GetDataAsString( msg );
3416 o << "path: " << fn << ", ";
3417 delete [] fn;
3418 o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3419 break;
3420 }
3421
3422 //------------------------------------------------------------------------
3423 // kXR_ping
3424 //------------------------------------------------------------------------
3425 case kXR_ping:
3426 {
3427 o << "kXR_ping ()";
3428 break;
3429 }
3430
3431 //------------------------------------------------------------------------
3432 // kXR_protocol
3433 //------------------------------------------------------------------------
3434 case kXR_protocol:
3435 {
3437 o << "kXR_protocol (";
3438 o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3439 break;
3440 }
3441
3442 //------------------------------------------------------------------------
3443 // kXR_dirlist
3444 //------------------------------------------------------------------------
3445 case kXR_dirlist:
3446 {
3447 o << "kXR_dirlist (";
3448 char *fn = GetDataAsString( msg );;
3449 o << "path: " << fn << ")";
3450 delete [] fn;
3451 break;
3452 }
3453
3454 //------------------------------------------------------------------------
3455 // kXR_set
3456 //------------------------------------------------------------------------
3457 case kXR_set:
3458 {
3459 o << "kXR_set (";
3460 char *fn = GetDataAsString( msg );;
3461 o << "data: " << fn << ")";
3462 delete [] fn;
3463 break;
3464 }
3465
3466 //------------------------------------------------------------------------
3467 // kXR_prepare
3468 //------------------------------------------------------------------------
3469 case kXR_prepare:
3470 {
3472 o << "kXR_prepare (";
3473 o << "flags: ";
3474
3475 if( sreq->options == 0 )
3476 o << "none";
3477 else
3478 {
3479 if( sreq->options & kXR_stage )
3480 o << "kXR_stage ";
3481 if( sreq->options & kXR_wmode )
3482 o << "kXR_wmode ";
3483 if( sreq->options & kXR_coloc )
3484 o << "kXR_coloc ";
3485 if( sreq->options & kXR_fresh )
3486 o << "kXR_fresh ";
3487 }
3488
3489 o << ", priority: " << (int) sreq->prty << ", ";
3490
3491 char *fn = GetDataAsString( msg );
3492 char *cursor;
3493 for( cursor = fn; *cursor; ++cursor )
3494 if( *cursor == '\n' ) *cursor = ' ';
3495
3496 o << "paths: " << fn << ")";
3497 delete [] fn;
3498 break;
3499 }
3500
3501 case kXR_chkpoint:
3502 {
3504 o << "kXR_chkpoint (";
3505 o << "opcode: ";
3506 if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3507 else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3508 else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3509 else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3510 else if( sreq->opcode == kXR_ckpXeq )
3511 {
3512 o << "kXR_ckpXeq) ";
3513 // In this case our request body will be one of kXR_pgwrite,
3514 // kXR_truncate, kXR_write, or kXR_writev request.
3515 GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3516 }
3517
3518 break;
3519 }
3520
3521 //------------------------------------------------------------------------
3522 // Default
3523 //------------------------------------------------------------------------
3524 default:
3525 {
3526 o << "kXR_unknown (length: " << req->dlen << ")";
3527 break;
3528 }
3529 };
3530 }
3531
3532 //----------------------------------------------------------------------------
3533 // Get a string representation of file handle
3534 //----------------------------------------------------------------------------
3535 std::string XRootDTransport::FileHandleToStr( const unsigned char handle[4] )
3536 {
3537 std::ostringstream o;
3538 o << "0x";
3539 for( uint8_t i = 0; i < 4; ++i )
3540 {
3541 o << std::setbase(16) << std::setfill('0') << std::setw(2);
3542 o << (int)handle[i];
3543 }
3544 return o.str();
3545 }
3546}
static const int kXR_ckpRollback
Definition XProtocol.hh:215
@ kXR_NotFound
kXR_int16 arg1len
Definition XProtocol.hh:430
#define kXR_isManager
struct ClientTruncateRequest truncate
Definition XProtocol.hh:875
union ServerResponse::@0 body
@ kXR_ecredir
Definition XProtocol.hh:371
#define kXR_tlsLogin
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
#define kXR_suppgrw
kXR_char fhandle[4]
Definition XProtocol.hh:531
kXR_unt16 requestid
Definition XProtocol.hh:394
ServerResponseStatus status
kXR_char fhandle[4]
Definition XProtocol.hh:782
#define kXR_gotoTLS
#define kXR_attrMeta
struct ClientPgReadRequest pgread
Definition XProtocol.hh:861
kXR_char fhandle[4]
Definition XProtocol.hh:807
#define kXR_haveTLS
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char fhandle[4]
Definition XProtocol.hh:771
struct ClientMkdirRequest mkdir
Definition XProtocol.hh:858
kXR_int32 dlen
Definition XProtocol.hh:431
struct ClientAuthRequest auth
Definition XProtocol.hh:847
kXR_char streamid[2]
Definition XProtocol.hh:914
kXR_unt16 options
Definition XProtocol.hh:481
static const int kXR_ckpXeq
Definition XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:862
#define kXR_attrSuper
struct ClientReadVRequest readv
Definition XProtocol.hh:868
kXR_char pathid
Definition XProtocol.hh:653
kXR_char credtype[4]
Definition XProtocol.hh:170
kXR_char username[8]
Definition XProtocol.hh:396
@ kXR_open_wrto
Definition XProtocol.hh:469
@ kXR_compress
Definition XProtocol.hh:452
@ kXR_async
Definition XProtocol.hh:458
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_prefname
Definition XProtocol.hh:461
@ kXR_nowait
Definition XProtocol.hh:467
@ kXR_open_read
Definition XProtocol.hh:456
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_mkpath
Definition XProtocol.hh:460
@ kXR_seqio
Definition XProtocol.hh:468
@ kXR_replica
Definition XProtocol.hh:465
@ kXR_posc
Definition XProtocol.hh:466
@ kXR_refresh
Definition XProtocol.hh:459
@ kXR_new
Definition XProtocol.hh:455
@ kXR_force
Definition XProtocol.hh:454
@ kXR_4dirlist
Definition XProtocol.hh:464
@ kXR_open_apnd
Definition XProtocol.hh:462
@ kXR_retstat
Definition XProtocol.hh:463
struct ClientOpenRequest open
Definition XProtocol.hh:860
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_status
Definition XProtocol.hh:907
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_authmore
Definition XProtocol.hh:902
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
Definition XProtocol.hh:846
kXR_char fhandle[4]
Definition XProtocol.hh:509
kXR_char fhandle[4]
Definition XProtocol.hh:645
kXR_char fhandle[4]
Definition XProtocol.hh:659
struct ClientWriteVRequest writev
Definition XProtocol.hh:877
kXR_char fhandle[4]
Definition XProtocol.hh:229
struct ClientLoginRequest login
Definition XProtocol.hh:857
kXR_unt16 requestid
Definition XProtocol.hh:157
kXR_char fhandle[4]
Definition XProtocol.hh:633
kXR_char sessid[16]
Definition XProtocol.hh:181
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_bind
Definition XProtocol.hh:136
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_login
Definition XProtocol.hh:119
@ kXR_auth
Definition XProtocol.hh:112
@ kXR_endsess
Definition XProtocol.hh:135
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_1stRequest
Definition XProtocol.hh:111
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_ping
Definition XProtocol.hh:123
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
@ kXR_prepare
Definition XProtocol.hh:133
struct ClientChmodRequest chmod
Definition XProtocol.hh:850
#define kXR_isServer
#define kXR_attrCache
struct ClientQueryRequest query
Definition XProtocol.hh:866
struct ClientReadRequest read
Definition XProtocol.hh:867
struct ClientMvRequest mv
Definition XProtocol.hh:859
kXR_int32 rlen
Definition XProtocol.hh:660
kXR_unt16 requestid
Definition XProtocol.hh:180
kXR_char sessid[16]
Definition XProtocol.hh:259
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849
struct ServerResponseHeader hdr
@ kXR_asyncap
Definition XProtocol.hh:378
#define kXR_attrProxy
kXR_char options[1]
Definition XProtocol.hh:416
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
static const int kXR_ckpCommit
Definition XProtocol.hh:213
kXR_int64 offset
Definition XProtocol.hh:661
@ kXR_vfs
Definition XProtocol.hh:763
struct ClientPrepareRequest prepare
Definition XProtocol.hh:864
@ kXR_mkdirpath
Definition XProtocol.hh:410
@ kXR_wmode
Definition XProtocol.hh:591
@ kXR_fresh
Definition XProtocol.hh:593
@ kXR_coloc
Definition XProtocol.hh:592
@ kXR_stage
Definition XProtocol.hh:590
static const int kXR_ckpQuery
Definition XProtocol.hh:214
#define kXR_tlsSess
#define kXR_DataServer
struct ClientWriteRequest write
Definition XProtocol.hh:876
#define kXR_PROTTLSVERSION
Definition XProtocol.hh:72
kXR_char capver[1]
Definition XProtocol.hh:399
struct ClientProtocolRequest protocol
Definition XProtocol.hh:865
@ kXR_QPrep
Definition XProtocol.hh:616
@ kXR_Qopaqug
Definition XProtocol.hh:625
@ kXR_Qconfig
Definition XProtocol.hh:621
@ kXR_Qopaquf
Definition XProtocol.hh:624
@ kXR_Qckscan
Definition XProtocol.hh:620
@ kXR_Qxattr
Definition XProtocol.hh:618
@ kXR_Qspace
Definition XProtocol.hh:619
@ kXR_Qvisa
Definition XProtocol.hh:622
@ kXR_QStats
Definition XProtocol.hh:615
@ kXR_Qcksum
Definition XProtocol.hh:617
@ kXR_Qopaque
Definition XProtocol.hh:623
struct ClientLocateRequest locate
Definition XProtocol.hh:856
@ kXR_ver005
Definition XProtocol.hh:389
#define kXR_tlsData
@ kXR_readrdok
Definition XProtocol.hh:360
@ kXR_fullurl
Definition XProtocol.hh:358
@ kXR_onlyprv4
Definition XProtocol.hh:362
@ kXR_lclfile
Definition XProtocol.hh:364
@ kXR_multipr
Definition XProtocol.hh:359
@ kXR_redirflags
Definition XProtocol.hh:365
@ kXR_hasipv64
Definition XProtocol.hh:361
@ kXR_onlyprv6
Definition XProtocol.hh:363
ServerResponseHeader hdr
static const int kXR_ckpBegin
Definition XProtocol.hh:212
long long kXR_int64
Definition XPtypes.hh:98
unsigned char kXR_char
Definition XPtypes.hh:65
XrdVERSIONINFOREF(XrdCl)
XrdSecBuffer XrdSecParameters
XrdSecProtocol *(* XrdSecGetProt_t)(const char *hostname, XrdNetAddrInfo &endPoint, XrdSecParameters &sectoken, XrdOucErrInfo *einfo)
Typedef to simplify the encoding of methods returning XrdSecProtocol.
XrdSecGetProt_t XrdSecLoadSecFactory(char *eBuff, int eBlen, const char *seclib)
int XrdSecGetProtection(XrdSecProtect *&protP, XrdSecProtocol &aprot, ServerResponseBody_Protocol &resp, unsigned int resplen)
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
void Grab(char *buffer, uint32_t size)
Grab a buffer allocated outside.
void Zero()
Zero.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
void Allocate(uint32_t size)
Allocate the buffer.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetCursor() const
Get append cursor.
uint32_t GetSize() const
Get the size of the message.
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
Definition XrdClEnv.cc:110
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Handle diagnostics.
Definition XrdClLog.hh:101
@ ErrorMsg
report errors
Definition XrdClLog.hh:109
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition XrdClLog.hh:258
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
void SetIsMarshalled(bool isMarshalled)
Set the marshalling status.
bool IsMarshalled() const
Check if the message is marshalled.
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
static void ClearErrorQueue()
Clear the error queue for the calling thread.
Definition XrdClTls.cc:422
Perform the handshake and the authentication for each physical stream.
@ RequestClose
Send a close request.
virtual void WaitBeforeExit()=0
Wait before exit.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition XrdClURL.hh:31
std::string GetChannelId() const
Definition XrdClURL.cc:512
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
bool IsSecure() const
Does the protocol indicate encryption.
Definition XrdClURL.cc:482
bool IsTPC() const
Is the URL used in TPC context.
Definition XrdClURL.cc:490
std::string GetLoginToken() const
Get the login token if present in the opaque info.
Definition XrdClURL.cc:367
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
static std::string FQDNToCC(const std::string &fqdn)
Convert the fully qualified host name to country code.
static std::string Char2Hex(uint8_t *array, uint16_t size)
Print a char array as hex.
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
const std::string & GetErrorMessage() const
Get error message.
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
virtual bool IsStreamTTLElapsed(time_t time, AnyObject &channelData)
Check if the stream should be disconnected.
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)
The stream has been disconnected, do the cleanups.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)
Check if the message invokes a stream action.
virtual void WaitBeforeExit()
Wait until the program can safely exit.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
virtual XRootDStatus GetBody(Message &message, Socket *socket)
virtual XRootDStatus GetHeader(Message &message, Socket *socket)
virtual uint16_t SubStreamNumber(AnyObject &channelData)
Return a number of substreams per stream that should be created.
virtual void FinalizeChannel(AnyObject &channelData)
Finalize channel.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
Notify the transport about a message having been sent.
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)
HandShake.
virtual XRootDStatus GetMore(Message &message, Socket *socket)
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
static void LogErrorResponse(const Message &msg)
Log server error response.
virtual void DecFileInstCnt(AnyObject &channelData)
Decrement file object instance count bound to this channel.
virtual PathID Multiplex(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual void InitializeChannel(const URL &url, AnyObject &channelData)
Initialize channel.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)
Query the channel.
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)
Get bind preference for the next data stream.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)
void SetTLS(bool val)
static char * MyHostName(const char *eName="*unknown*", const char **eText=0)
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static int UserName(uid_t uID, char *uName, int uNsz)
virtual int Secure(SecurityRequest *&newreq, ClientRequest &thereq, const char *thedata)
static int TimeZone()
const uint16_t suRetry
const uint16_t errQueryNotSupported
const int DefaultLoadBalancerTTL
const uint64_t XRootDTransportMsg
const uint16_t errTlsError
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errLoginFailed
const int DefaultWantTlsOnNoPgrw
const uint16_t errSocketTimeout
const uint64_t XRootDMsg
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const int DefaultSubStreamsPerChannel
const uint16_t errInvalidOp
const int DefaultDataServerTTL
const uint16_t errHandShakeFailed
const int DefaultStreamTimeout
const uint16_t suAlreadyDone
const uint16_t errNotSupported
const uint16_t suDone
const uint16_t suContinue
bool InitTLS()
Definition XrdClTls.cc:96
const int DefaultTlsNoData
const int DefaultNoTlsOK
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
XrdSysError Log
Definition XrdConfig.cc:113
kXR_char fhandle[4]
Definition XProtocol.hh:832
struct ServerResponseBifs_Protocol bifReqs
kXR_char fhandle[4]
Definition XProtocol.hh:288
BindPrefSelector(std::vector< std::string > &&bindprefs)
Data structure that carries the handshake information.
std::string streamName
Name of the stream.
uint16_t subStreamId
Sub-stream id.
Message * out
Message to be sent out.
static void UnloadHandler(const std::string &trProt)
void Register(const std::string &protocol)
std::set< std::string > protocols
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
Selects less loaded stream for read operation over multiple streams.
void AdjustQueues(uint16_t size)
void MsgReceived(uint16_t substrm)
uint16_t Select(const std::vector< bool > &connected)
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
Information holder for xrootd channels.
std::set< uint16_t > sentCloses
std::unique_ptr< StreamSelector > strmSelector
std::vector< XRootDStreamInfo > StreamInfoVector
std::unique_ptr< BindPrefSelector > bindSelector
std::atomic< uint32_t > finstcnt
ServerResponseBody_Protocol * protRespBody
std::shared_ptr< SIDManager > sidManager
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted
Information holder for XRootDStreams.
Generic structure to pass security information back and forth.
char * buffer
Pointer to the buffer.
int size
Size of the buffer or length of data in the buffer.