Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:11:49

0001 // Author: Matevz Tadel, September 2010
0002 //
0003 // Server / client classes for serving a list of files.
0004 // Push and pull modes are supported.
0005 // Works OK, needs to be incorporated into Fireworks (when needed).
0006 
0007 #if defined(__CINT__) && !defined(__MAKECINT__)
0008 {
0009    Info("FileUrlService.C", "Has to be run in compiled mode ... doing this for you.");
0010    gSystem->CompileMacro("FileUrlService.C");
0011    FileUrlService();
0012 }
0013 #else
0014 
0015 #include "TError.h"
0016 #include "TList.h"
0017 
0018 #include "TMessage.h"
0019 #include "TMonitor.h"
0020 #include "TSocket.h"
0021 #include "TServerSocket.h"
0022 
0023 #include "TThread.h"
0024 
0025 #include <cassert>
0026 #include <memory>
0027 #include <sstream>
0028 #include <stdexcept>
0029 
0030 #include <list>
0031 #include <map>
0032 
0033 
0034 //==============================================================================
0035 // Utilities
0036 //==============================================================================
0037 
0038 namespace
0039 {
0040    TString RecvLine(TSocket* socket)
0041    {
0042       std::vector<char> buf;
0043       buf.reserve(256);
0044 
0045       char c = 255;
0046       while (c != 0)
0047       {
0048          Int_t ret = socket->RecvRaw(&c, 1);
0049          if (ret < 0)
0050          {
0051             std::ostringstream err;
0052             err << "Reading line from socket failed, error code " << ret << ".";
0053             throw std::runtime_error(err.str());
0054          }
0055          if (c == 13)
0056             continue;
0057          if (c == 10)
0058             c = 0;
0059          buf.push_back(c);
0060       }
0061 
0062       return TString(&buf[0]);
0063    }
0064 
0065    void SendLine(TSocket* socket, const TString& msg)
0066    {
0067       // Send msg ... new-line character is added here.
0068 
0069       static const char eol(10);
0070 
0071       socket->SendRaw(msg.Data(), msg.Length());
0072       socket->SendRaw(&eol,       1);
0073    }
0074 }
0075 
0076 
0077 //==============================================================================
0078 // FileUrlRequest
0079 //==============================================================================
0080 
0081 class FileUrlRequest : public TObject
0082 {
0083 public:
0084    enum EMode { kGoodBye=-1, kPullNext, kPullLast, kServerPush };
0085 
0086    EMode   fMode;
0087    Int_t   fFileId;
0088    TString fFileUrl;
0089    Bool_t  fFileOK;
0090 
0091    FileUrlRequest(EMode mode=kPullNext) :
0092       TObject(),
0093       fMode(mode),
0094       fFileId(-1),
0095       fFileOK(kFALSE) {}
0096 
0097    void SetFile(Int_t id, const TString& f)
0098    {
0099       fFileId  = id;
0100       fFileUrl = f;
0101       fFileOK  = kTRUE;
0102    }
0103 
0104    void ResetFile()
0105    {
0106       fFileId  = -1;
0107       fFileUrl = "";
0108       fFileOK  = kFALSE;
0109    }
0110 
0111    ClassDef(FileUrlRequest, 1);
0112 };
0113 
0114 
0115 //==============================================================================
0116 // FileUrlBase
0117 //==============================================================================
0118 
0119 
0120 class FileUrlBase
0121 {
0122 protected:
0123    void      SendMessage   (TSocket& socket, TMessage& msg, const TString& eh);
0124    TMessage* ReceiveMessage(TSocket& socket, UInt_t type, const TString& eh);
0125 
0126    void      SendString    (TSocket& socket, TString& str, const TString& eh);
0127    void      ReceiveString (TSocket& socket, TString& str, const TString& eh);
0128 
0129    void      SendRequest   (TSocket& socket, FileUrlRequest& request, const TString& eh);
0130    void      ReceiveRequest(TSocket& socket, FileUrlRequest& request, const TString& eh);
0131 
0132    void      SendGoodBye   (TSocket& socket, const TString& eh);
0133 
0134    static const UInt_t gkUrlRequestMT; // Message type
0135    static const UInt_t gkStringMT;     // Message type
0136 
0137 public:
0138    FileUrlBase() {}
0139    virtual ~FileUrlBase() {}
0140 
0141    ClassDef(FileUrlBase, 0);
0142 };
0143 
0144 //==============================================================================
0145 
0146 const UInt_t FileUrlBase::gkUrlRequestMT = 666;
0147 const UInt_t FileUrlBase::gkStringMT     = 667;
0148 
0149 //------------------------------------------------------------------------------
0150 
0151 void FileUrlBase::SendMessage(TSocket& socket, TMessage& msg, const TString& eh)
0152 {
0153    Int_t len = socket.Send(msg);
0154    if (len <= 0)
0155    {
0156       std::ostringstream err;
0157       err << eh  << " Failed sending message, ret = "
0158           << len << ".";
0159       throw std::runtime_error(err.str());
0160    }
0161 }
0162 
0163 TMessage* FileUrlBase::ReceiveMessage(TSocket& socket, UInt_t type, const TString& eh)
0164 {
0165    TMessage *msg = 0;
0166    Int_t len = socket.Recv(msg);
0167    if (len <= 0)
0168    {
0169       std::ostringstream err;
0170       err << eh  << " Failed receiving message, ret = "
0171           << len << ".";
0172       throw std::runtime_error(err.str());
0173    }
0174    if (type != 0 && msg->What() != type)
0175    {
0176       std::ostringstream err;
0177       err << eh  << " Wrong message type received. Expected " << type
0178           << ", got " << msg->What() << ".";
0179       throw std::runtime_error(err.str());
0180    }
0181    return msg;
0182 }
0183 
0184 //------------------------------------------------------------------------------
0185 
0186 void FileUrlBase::SendString(TSocket& socket, TString& str, const TString& eh)
0187 {
0188    TMessage msg(gkStringMT);
0189    str.Streamer(msg);
0190    SendMessage(socket, msg, eh);
0191 }
0192 
0193 void FileUrlBase::ReceiveString(TSocket& socket, TString& str, const TString& eh)
0194 {
0195    std::unique_ptr<TMessage> msg(ReceiveMessage(socket, gkStringMT, eh));
0196    str.Streamer(*msg);
0197 }
0198 
0199 //------------------------------------------------------------------------------
0200 
0201 void FileUrlBase::SendRequest(TSocket& socket, FileUrlRequest& request, const TString& eh)
0202 {
0203    TMessage msg(gkUrlRequestMT);
0204    request.Streamer(msg);
0205    SendMessage(socket, msg, eh);
0206 }
0207 
0208 void FileUrlBase::ReceiveRequest(TSocket& socket, FileUrlRequest& request, const TString& eh)
0209 {
0210    std::unique_ptr<TMessage> msg(ReceiveMessage(socket, gkUrlRequestMT, eh));
0211    request.Streamer(*msg);
0212 }
0213 
0214 //------------------------------------------------------------------------------
0215 
0216 void FileUrlBase::SendGoodBye(TSocket& socket, const TString& eh)
0217 {
0218    static FileUrlRequest sByeRequest(FileUrlRequest::kGoodBye);
0219 
0220    SendRequest(socket, sByeRequest, eh);
0221 }
0222 
0223 
0224 //==============================================================================
0225 // FileUrlServer
0226 //==============================================================================
0227 
0228 class FileUrlServer : public FileUrlBase
0229 {
0230 protected:
0231    typedef std::map<Int_t, TString>            mIdToFile_t;
0232    typedef mIdToFile_t::iterator               mIdToFile_i; 
0233 
0234    typedef std::map<TSocket*, FileUrlRequest>  mSockToReq_t;
0235    typedef mSockToReq_t::iterator              mSockToReq_i;
0236 
0237    // ----------------------------------------------------------------
0238 
0239    TString                   fName;
0240    TString                   fUrlPrefix;
0241 
0242    Int_t                     fFileNMax;
0243    Int_t                     fFileLastId;
0244    mIdToFile_t               fFileMap;
0245    mSockToReq_t              fPushClientMap;
0246 
0247    TMonitor                 *fMonitor;
0248    TServerSocket            *fServerSocket;
0249    TThread                  *fServerThread;
0250    TMutex                   *fMutex;
0251 
0252    TString                   fNewFileFile;
0253    TTimer                   *fNewFileFileCheckTimer;
0254 
0255    // ----------------------------------------------------------------
0256 
0257    void AcceptConnection();
0258    void CloseConnection(TSocket *cs);
0259 
0260    void MessageFrom(TSocket* cs);
0261 
0262    // ----------------------------------------------------------------
0263 
0264    static void* RunServer(FileUrlServer* srv);
0265 
0266 public:
0267    FileUrlServer(const char* name, Int_t port);
0268    virtual ~FileUrlServer();
0269 
0270    // --------------------------------
0271 
0272    TString GetUrlPrefix() const            { return fUrlPrefix; }
0273    void    SetUrlPrefix(const TString& up) { fUrlPrefix = up;   }
0274 
0275    Int_t   GetFileNMax() const             { return fFileNMax; }
0276    void    SetFileNMax(Int_t n)            { fFileNMax = n; RemoveExtraFiles(); }
0277 
0278    Int_t   GetFileLastId() const           { return fFileLastId; }
0279 
0280    // --------------------------------
0281 
0282    void AddFile(const TString& file);
0283    void RemoveFile(const TString& file);
0284    void RemoveExtraFiles();
0285 
0286    // --------------------------------
0287 
0288    // Pain: how to determine file is complete and closed?
0289    // Use the LastFile thing from afs
0290    void BeginMonitoringNewFileFile(const TString& file, Int_t sec=1);
0291    void CheckNewFileFile();
0292    void EndMonitoringNewFileFile();
0293 
0294    ClassDef(FileUrlServer, 0);
0295 };
0296 
0297 //==============================================================================
0298 
0299 FileUrlServer::FileUrlServer(const char* name, Int_t port) :
0300    fName(name),
0301    fFileNMax(0),
0302    fFileLastId(0),
0303    fMonitor(0), fServerSocket(0), fServerThread(0), fMutex(0),
0304    fNewFileFileCheckTimer(0)
0305 {
0306    fServerSocket = new TServerSocket(port, kTRUE, 4, 4096);
0307 
0308    if (!fServerSocket->IsValid())
0309    {
0310       std::ostringstream err;
0311       err << "Creation of server socket failed ";
0312       switch (fServerSocket->GetErrorCode())
0313       {
0314          case -1:
0315             err << "in low level socket().";
0316             break;
0317          case -2:
0318             err << "in low level bind().";
0319             break;
0320          case -3:
0321             err << "in low level listen().";
0322             break;
0323          default:
0324             err << "with unknown error " << fServerSocket->GetErrorCode() << ".";
0325             break;
0326       }
0327       throw std::runtime_error(err.str());
0328    }
0329 
0330    fMonitor = new TMonitor(kFALSE);
0331    fMonitor->Add(fServerSocket);
0332 
0333    fMutex = new TMutex(kTRUE);
0334    fServerThread = new TThread((TThread::VoidRtnFunc_t) RunServer);
0335    fServerThread->Run(this);
0336 }
0337 
0338 FileUrlServer::~FileUrlServer()
0339 {
0340    if (fNewFileFileCheckTimer)
0341    {
0342       EndMonitoringNewFileFile();
0343    }
0344 
0345    fServerThread->Kill();
0346    fServerThread->Join();
0347    delete fServerThread;
0348 
0349    {
0350       std::unique_ptr<TList> socks(fMonitor->GetListOfActives());
0351       while ( ! socks->IsEmpty())
0352       {
0353          TObject *obj = socks->First();
0354          socks->RemoveFirst();
0355          delete obj;
0356       }
0357    }
0358    delete fMonitor;
0359 
0360    delete fMutex;
0361 }
0362 
0363 //------------------------------------------------------------------------------
0364 
0365 void FileUrlServer::AddFile(const TString& file)
0366 {
0367    static const TString _eh("FileUrlServer::AddFile");
0368 
0369    TLockGuard _lck(fMutex);
0370 
0371    ++fFileLastId;
0372    fFileMap[fFileLastId] = file;
0373 
0374    for (mSockToReq_i i = fPushClientMap.begin(); i != fPushClientMap.end(); ++i)
0375    {
0376       i->second.SetFile(fFileLastId, fUrlPrefix + file);
0377       SendRequest(*i->first, i->second, _eh);
0378    }
0379 
0380    RemoveExtraFiles();
0381 }
0382 
0383 void FileUrlServer::RemoveFile(const TString& file)
0384 {
0385    TLockGuard _lck(fMutex);
0386 
0387    for (mIdToFile_i i = fFileMap.begin(); i != fFileMap.end(); ++i)
0388    {
0389       if (i->second == file)
0390       {
0391          fFileMap.erase(i);
0392          break;
0393       }
0394    }
0395 }
0396 
0397 void FileUrlServer::RemoveExtraFiles()
0398 {
0399    TLockGuard _lck(fMutex);
0400 
0401    if (fFileNMax > 0)
0402    {
0403       while ((Int_t) fFileMap.size() > fFileNMax)
0404       {
0405          fFileMap.erase(fFileMap.begin());
0406       }
0407    }
0408 }
0409 
0410 //------------------------------------------------------------------------------
0411 
0412 void FileUrlServer::AcceptConnection()
0413 {
0414    static const TString _eh("FileUrlServer::AcceptConnection");
0415 
0416    TLockGuard _lck(fMutex);
0417 
0418    TSocket *cs = fServerSocket->Accept();
0419 
0420    Info(_eh, "Connection from %s:%d.",
0421         cs->GetInetAddress().GetHostName(), fServerSocket->GetLocalPort());
0422 
0423    TString hello;
0424    hello.Form("Hello! I am FileUrlServer serving '%s'.", fName.Data());
0425    SendString(*cs, hello, _eh);
0426 
0427    fMonitor->Add(cs);
0428 }
0429 
0430 void FileUrlServer::CloseConnection(TSocket *cs)
0431 {
0432    fMonitor->Remove(cs);
0433    fPushClientMap.erase(cs);
0434    delete cs;
0435 }
0436 
0437 //------------------------------------------------------------------------------
0438 
0439 void FileUrlServer::MessageFrom(TSocket* cs)
0440 {
0441    static const TString _eh("FileUrlServer::MessageFrom");
0442 
0443    TLockGuard _lck(fMutex);
0444 
0445    try
0446    {
0447       FileUrlRequest req;
0448 
0449       ReceiveRequest(*cs, req, _eh);
0450 
0451       switch (req.fMode)
0452       {
0453          case FileUrlRequest::kGoodBye:
0454          {
0455             CloseConnection(cs);
0456             break;
0457          }
0458          case FileUrlRequest::kPullNext:
0459          {
0460             req.fFileOK = kFALSE;
0461             mIdToFile_i i = fFileMap.upper_bound(req.fFileId);
0462             if (i != fFileMap.end())
0463             {
0464                req.SetFile(i->first, fUrlPrefix + i->second);
0465             }
0466             SendRequest(*cs, req, _eh);
0467             break;
0468          }
0469          case FileUrlRequest::kPullLast:
0470          {
0471             req.fFileOK = kFALSE;
0472             if ( ! fFileMap.empty())
0473             {
0474                mIdToFile_i i = --fFileMap.end();
0475                if (i->first > req.fFileId)
0476                {
0477                   req.SetFile(i->first, fUrlPrefix + i->second);
0478                }
0479             }
0480             SendRequest(*cs, req, _eh);
0481             break;
0482          }
0483          case FileUrlRequest::kServerPush:
0484          {
0485             fPushClientMap[cs] = req;
0486             break;
0487          }
0488       }
0489    }
0490    catch (std::runtime_error& err)
0491    {
0492       Error(_eh, err.what());
0493       CloseConnection(cs);
0494    }
0495 }
0496 
0497 //------------------------------------------------------------------------------
0498 
0499 void* FileUrlServer::RunServer(FileUrlServer* srv)
0500 {
0501    TThread::SetCancelDeferred();
0502 
0503    TList ready;
0504 
0505    while (kTRUE)
0506    {
0507       TThread::SetCancelOn();
0508       srv->fMonitor->Select(&ready, 0, 1000000);
0509       TThread::SetCancelOff();
0510 
0511       while ( ! ready.IsEmpty())
0512       {
0513          TSocket *sock = (TSocket*) ready.First();
0514          ready.RemoveFirst();
0515 
0516          if (sock == srv->fServerSocket)
0517          {
0518             srv->AcceptConnection();
0519          }
0520          else
0521          {
0522             srv->MessageFrom(sock);
0523          }
0524       }
0525    }
0526 
0527    return 0;
0528 }
0529 
0530 //------------------------------------------------------------------------------
0531 
0532 void FileUrlServer::BeginMonitoringNewFileFile(const TString& file, Int_t sec)
0533 {
0534    static const TString _eh("FileUrlServer::BeginMonitoringNewFileFile");
0535 
0536    if (fNewFileFileCheckTimer != 0)
0537    {
0538       Error(_eh, "File monitoring already in progress, end it first.");
0539       return;
0540    }
0541 
0542    fNewFileFile = file;
0543 
0544    fNewFileFileCheckTimer = new TTimer(1000l*sec);
0545    fNewFileFileCheckTimer->Connect("Timeout()", "FileUrlServer", this, "CheckNewFileFile()");
0546 
0547    gTQSender = fNewFileFileCheckTimer;
0548    CheckNewFileFile();
0549    gTQSender = 0;
0550 }
0551 
0552 void FileUrlServer::CheckNewFileFile()
0553 {
0554    // Should be private, but is a slot.
0555 
0556    static const TString _eh("FileUrlServer::CheckNewFileFile");
0557 
0558    if (fNewFileFileCheckTimer == 0 || gTQSender != fNewFileFileCheckTimer)
0559    {
0560       Error(_eh, "Timer is not running or method called directly.");
0561       return;
0562    }
0563 
0564    fNewFileFileCheckTimer->TurnOff();
0565 
0566    FILE *fp = fopen(fNewFileFile, "r");
0567    if (fp == 0)
0568    {
0569       Warning(_eh, "Could not open new-file file for reading.");
0570       return;
0571    }
0572    TString line;
0573    line.Gets(fp, kTRUE);
0574    fclose(fp);
0575    if ( ! line.IsNull())
0576    {
0577       TLockGuard _lck(fMutex);
0578 
0579       if (!fFileMap.empty())
0580       {
0581          mIdToFile_i i = --fFileMap.end();
0582          if (i->second != line)
0583          {
0584             printf("Previous: '%s'\n", i->second.Data());
0585             printf("New:      '%s'\n", line.Data());
0586             AddFile(line);
0587          }
0588       }
0589    }
0590    fNewFileFileCheckTimer->Reset();
0591    fNewFileFileCheckTimer->TurnOn();
0592 }
0593 
0594 void FileUrlServer::EndMonitoringNewFileFile()
0595 {
0596    static const TString _eh("FileUrlServer::EndMonitoringNewFileFile");
0597 
0598    if (fNewFileFileCheckTimer == 0)
0599    {
0600       Error(_eh, "No file monitoring in progress.");
0601       return;
0602    }
0603 
0604    fNewFileFileCheckTimer->TurnOff();
0605    delete fNewFileFileCheckTimer;
0606    fNewFileFileCheckTimer = 0;
0607 }
0608 
0609 
0610 //==============================================================================
0611 // FileUrlClient
0612 //==============================================================================
0613 
0614 class FileUrlClient : public TQObject,
0615                       public FileUrlBase 
0616 {
0617 protected:
0618    TString        fHost;
0619    Int_t          fPort;
0620 
0621    FileUrlRequest fRequest;
0622 
0623    TSocket       *fSocket;     // Non-zero when in server-push mode.
0624    TMonitor      *fMonitor; //
0625 
0626    TSocket*  OpenSocket(const TString& eh);
0627    void      CleanupServerPushMode();
0628 
0629 public:
0630    FileUrlClient(const char* host, Int_t port) :
0631       fHost(host),
0632       fPort(port),
0633       fSocket(0),
0634       fMonitor(0)
0635    {}
0636 
0637    virtual ~FileUrlClient()
0638    {
0639       if (fSocket) EndServerPushMode();
0640    }
0641 
0642    // --------------------------------
0643 
0644    Bool_t  HasCurrentFile() const { return fRequest.fFileOK;  }
0645    TString GetCurrentFile() const { return fRequest.fFileUrl; }
0646 
0647    // --------------------------------
0648 
0649    void GetNextFile(Bool_t loop=kFALSE);
0650    void GetLastFile();
0651 
0652    void BeginServerPushMode();
0653    void ServerPushAction();
0654    void NewFileArrived(); // *SIGNAL*
0655    void EndServerPushMode();
0656 
0657    ClassDef(FileUrlClient, 0);
0658 };
0659 
0660 //==============================================================================
0661 
0662 TSocket* FileUrlClient::OpenSocket(const TString& eh)
0663 {
0664    std::unique_ptr<TSocket> s(new TSocket(fHost, fPort));
0665 
0666    if (!s->IsValid())
0667    {
0668       std::ostringstream err;
0669       err << eh    << " Failed connecting to "
0670           << fHost << ":" << fPort << ".";
0671       throw std::runtime_error(err.str());
0672    }
0673 
0674    TString str;
0675    ReceiveString(*s, str, eh);
0676    Info(eh, TString("socket opened, greeting is: ") + str);
0677 
0678    return s.release();
0679 }
0680 
0681 //------------------------------------------------------------------------------
0682 
0683 void FileUrlClient::GetNextFile(Bool_t loop)
0684 {
0685    static const TString _eh("FileUrlClient::GetNextFile");
0686 
0687    if (fSocket != 0)
0688    {
0689       Error(_eh, "Currently in server-push mode.");
0690       return;
0691    }
0692 
0693    std::unique_ptr<TSocket> s(OpenSocket(_eh));
0694 
0695    fRequest.fMode = FileUrlRequest::kPullNext;
0696 
0697    SendRequest(*s, fRequest, _eh);
0698    ReceiveRequest(*s, fRequest, _eh);
0699 
0700    if (!fRequest.fFileOK && loop)
0701    {
0702       fRequest.ResetFile();
0703       SendRequest(*s, fRequest, _eh);
0704       ReceiveRequest(*s, fRequest, _eh);
0705    }
0706 
0707    Info(_eh, "loop=%s; ok=%d, id=%d, file=%s",
0708         loop ? "true" : "false",
0709         fRequest.fFileOK, fRequest.fFileId, fRequest.fFileUrl.Data());
0710 
0711    SendGoodBye(*s, _eh);
0712 }
0713 
0714 void FileUrlClient::GetLastFile()
0715 {
0716    static const TString _eh("FileUrlClient::GetLastFile");
0717 
0718    if (fSocket != 0)
0719    {
0720       Error(_eh, "Currently in server-push mode.");
0721       return;
0722    }
0723 
0724    std::unique_ptr<TSocket> s(OpenSocket(_eh));
0725 
0726    fRequest.fMode = FileUrlRequest::kPullLast;
0727 
0728    SendRequest(*s, fRequest, _eh);
0729    ReceiveRequest(*s, fRequest, _eh);
0730 
0731    Info(_eh, "ok=%d, id=%d, file=%s",
0732         fRequest.fFileOK, fRequest.fFileId, fRequest.fFileUrl.Data());
0733 
0734    SendGoodBye(*s, _eh);
0735 }
0736 
0737 //------------------------------------------------------------------------------
0738 
0739 void FileUrlClient::BeginServerPushMode()
0740 {
0741    static const TString _eh("FileUrlClient::BeginServerPushMode");
0742 
0743    if (fSocket != 0)
0744    {
0745       Error(_eh, "Already in server-push mode.");
0746       return;
0747    }
0748 
0749    try
0750    {
0751       std::unique_ptr<TSocket> s(OpenSocket(_eh));
0752       fRequest.fMode = FileUrlRequest::kServerPush;
0753       SendRequest(*s, fRequest, _eh);
0754       fSocket = s.release();
0755    }
0756    catch (std::runtime_error& err)
0757    {
0758       Error(_eh, "Sending of request failed, ending server-push mode.");
0759       EndServerPushMode();
0760       throw;
0761    }
0762 
0763    fMonitor = new TMonitor();
0764    fMonitor->Add(fSocket);
0765    fMonitor->Connect("Ready(TSocket*)", "FileUrlClient", this, "ServerPushAction()");
0766 }
0767 
0768 void FileUrlClient::ServerPushAction()
0769 {
0770    static const TString _eh("FileUrlClient::ServerPushAction");
0771 
0772    if (fSocket == 0)
0773    {
0774       Error(_eh, "Should only get called from TMonitor in server-push mode.");
0775       return;
0776    }
0777 
0778    try
0779    {
0780       ReceiveRequest(*fSocket, fRequest, _eh);
0781    }
0782    catch (std::runtime_error& err)
0783    {
0784       Error(_eh, "Receiving of request failed, ending server-push mode.");
0785       CleanupServerPushMode();
0786       return;
0787    }
0788 
0789    Info(_eh, "ok=%d, id=%d, file=%s",
0790         fRequest.fFileOK, fRequest.fFileId, fRequest.fFileUrl.Data());
0791 
0792    NewFileArrived();
0793 }
0794 
0795 void FileUrlClient::NewFileArrived()
0796 {
0797    Emit("NewFileArrived()");
0798 }
0799 
0800 void FileUrlClient::EndServerPushMode()
0801 {
0802    static const TString _eh("FileUrlClient::EndServerPushMode");
0803 
0804    if (fSocket == 0)
0805    {
0806       Error(_eh, "Currently not in server-push mode.");
0807       return;
0808    }
0809 
0810    SendGoodBye(*fSocket, _eh);
0811 
0812    CleanupServerPushMode();
0813 }
0814 
0815 void FileUrlClient::CleanupServerPushMode()
0816 {
0817    delete fMonitor; fMonitor = 0;
0818    delete fSocket;  fSocket  = 0;
0819 }
0820 
0821 
0822 //==============================================================================
0823 // main() substitute
0824 //==============================================================================
0825 
0826 FileUrlServer *g_fu_server = 0;
0827 FileUrlClient *g_fu_client = 0;
0828 
0829 void FileUrlService()
0830 {
0831    printf("FileUrlService loaded ... starting server at port 4444!\n");
0832    g_fu_server = new FileUrlServer("Testos", 4444);
0833    g_fu_server->SetUrlPrefix("http://matevz.web.cern.ch/matevz/tmp/");
0834    g_fu_server->AddFile("EVDISPSM_1284666332001.root");
0835    g_fu_server->AddFile("EVDISPSM_1284666332002.root");
0836    g_fu_server->AddFile("EVDISPSM_1284666332003.root");
0837 
0838    g_fu_client = new FileUrlClient("pcalice14", 4444);
0839    // g_fu_client->GetNextFile();
0840 }
0841 
0842 
0843 //==============================================================================
0844 // Directory watching
0845 //==============================================================================
0846 //
0847 // This doen't work at all for afs.
0848 // Is OK for local file-systems.
0849 
0850 /*
0851 
0852 #include <sys/inotify.h>
0853 
0854 #include "TSystem.h"
0855 
0856 class DirectoryMonitor
0857 {
0858    Int_t fFd;
0859    Int_t fWd;
0860 
0861    TFileHandler *fFileHandler;
0862 
0863 public:
0864    DirectoryMonitor(const TString& path="/afs/cern.ch/cms/CAF/CMSCOMM/COMM_GLOBAL/EventDisplay/RootFileTempStorageArea")
0865    // DirectoryMonitor(const TString& path="/tmp")
0866    {
0867       fFd = inotify_init();
0868       if (fFd < 0)
0869       {
0870          perror("inotfiy_init");
0871          return;
0872       }
0873 
0874       fWd = inotify_add_watch(fFd, path, IN_CLOSE | IN_MOVED_TO);
0875 
0876       fFileHandler = new TFileHandler(fFd, TFileHandler::kRead);
0877       fFileHandler->Connect("Notified()", "DirectoryMonitor", this, "StuffReady()");
0878 
0879       gSystem->AddFileHandler(fFileHandler);
0880 
0881       printf("fd %d, wd %d\n", fFd, fWd);
0882    }
0883 
0884    virtual ~DirectoryMonitor()
0885    {
0886       gSystem->RemoveFileHandler(fFileHandler);
0887       inotify_rm_watch(fFd, fWd);
0888       close(fFd);
0889    }
0890 
0891    void StuffReady()
0892    {
0893       printf("StuffReady ... do read it!\n");
0894 
0895       char buf[1024];
0896       ssize_t n = read(fFd, buf, 1024);
0897       if (n < 0)
0898       {
0899          perror("StuffReady -- read failed");
0900          return;
0901       }
0902       printf("  Read %zd bytes.\n", n);
0903 
0904       int i = 0;
0905       while (i < n)
0906       {
0907          struct inotify_event& e = *(struct inotify_event*)(&buf[i]);
0908          printf("  wd=%d mask=%x cookie=%u len=%u, name=%s\n",
0909                 e.wd, e.mask, e.cookie, e.len, e.len ? e.name : "<none>");
0910 
0911          i += sizeof(struct inotify_event) + e.len;
0912       }
0913    }
0914 
0915    ClassDef(DirectoryMonitor, 0);
0916 };
0917 
0918 */
0919 
0920 #endif