kmessageio.cpp

00001 /*
00002     This file is part of the KDE games library
00003     Copyright (C) 2001 Burkhard Lehner (Burkhard.Lehner@gmx.de)
00004 
00005     This library is free software; you can redistribute it and/or
00006     modify it under the terms of the GNU Library General Public
00007     License version 2 as published by the Free Software Foundation.
00008 
00009     This library is distributed in the hope that it will be useful,
00010     but WITHOUT ANY WARRANTY; without even the implied warranty of
00011     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00012     Library General Public License for more details.
00013 
00014     You should have received a copy of the GNU Library General Public License
00015     along with this library; see the file COPYING.LIB.  If not, write to
00016     the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
00017     Boston, MA 02110-1301, USA.
00018 */
00019 
00020 /*
00021      KMessageIO class and subclasses KMessageSocket and KMessageDirect
00022 */
00023 
00024 #include "kmessageio.h"
00025 #include <qsocket.h>
00026 #include <kdebug.h>
00027 #include <kprocess.h>
00028 #include <qfile.h>
00029 
00030 // ----------------------- KMessageIO -------------------------
00031 
00032 KMessageIO::KMessageIO (QObject *parent, const char *name)
00033   : QObject (parent, name), m_id (0)
00034 {}
00035 
00036 KMessageIO::~KMessageIO ()
00037 {}
00038 
00039 void KMessageIO::setId (Q_UINT32 id)
00040 {
00041   m_id = id;
00042 }
00043 
00044 Q_UINT32 KMessageIO::id ()
00045 {
00046   return m_id;
00047 }
00048 
00049 // ----------------------KMessageSocket -----------------------
00050 
00051 KMessageSocket::KMessageSocket (QString host, Q_UINT16 port, QObject *parent, 
00052 const char *name)
00053   : KMessageIO (parent, name)
00054 {
00055   mSocket = new QSocket ();
00056   mSocket->connectToHost (host, port);
00057   initSocket ();
00058 }
00059 
00060 KMessageSocket::KMessageSocket (QHostAddress host, Q_UINT16 port, QObject 
00061 *parent, const char *name)
00062   : KMessageIO (parent, name)
00063 {
00064   mSocket = new QSocket ();
00065   mSocket->connectToHost (host.toString(), port);
00066   initSocket ();
00067 }
00068 
00069 KMessageSocket::KMessageSocket (QSocket *socket, QObject *parent, const char 
00070 *name)
00071   : KMessageIO (parent, name)
00072 {
00073   mSocket = socket;
00074   initSocket ();
00075 }
00076 
00077 KMessageSocket::KMessageSocket (int socketFD, QObject *parent, const char 
00078 *name)
00079   : KMessageIO (parent, name)
00080 {
00081   mSocket = new QSocket ();
00082   mSocket->setSocket (socketFD);
00083   initSocket ();
00084 }
00085 
00086 KMessageSocket::~KMessageSocket ()
00087 {
00088   delete mSocket;
00089 }
00090 
00091 bool KMessageSocket::isConnected () const
00092 {
00093   return mSocket->state() == QSocket::Connection;
00094 }
00095 
00096 void KMessageSocket::send (const QByteArray &msg)
00097 {
00098   QDataStream str (mSocket);
00099   str << Q_UINT8 ('M');  // magic number for begin of message
00100   str.writeBytes (msg.data(), msg.size());  // writes the length (as Q_UINT32) and the data
00101 }
00102 
00103 void KMessageSocket::processNewData ()
00104 {
00105   if (isRecursive)
00106     return;
00107   isRecursive = true;
00108 
00109   QDataStream str (mSocket);
00110   while (mSocket->bytesAvailable() > 0)
00111   {
00112     if (mAwaitingHeader)
00113     {
00114       // Header = magic number + packet length = 5 bytes
00115       if (mSocket->bytesAvailable() < 5)
00116       {
00117         isRecursive = false;
00118         return;
00119       }
00120 
00121       // Read the magic number first. If something unexpected is found,
00122       // start over again, ignoring the data that was read up to then.
00123 
00124       Q_UINT8 v;
00125       str >> v;
00126       if (v != 'M')
00127       {
00128         kdWarning(11001) << k_funcinfo << ": Received unexpected data, magic number wrong!" << endl;
00129         continue;
00130       }
00131 
00132       str >> mNextBlockLength;
00133       mAwaitingHeader = false;
00134     }
00135     else
00136     {
00137       // Data not completely read => wait for more
00138       if (mSocket->bytesAvailable() < (Q_ULONG) mNextBlockLength)
00139       {
00140         isRecursive = false;
00141         return;
00142       }
00143 
00144       QByteArray msg (mNextBlockLength);
00145       str.readRawBytes (msg.data(), mNextBlockLength);
00146 
00147       // send the received message
00148       emit received (msg);
00149 
00150       // Waiting for the header of the next message
00151       mAwaitingHeader = true;
00152     }
00153   }
00154 
00155   isRecursive = false;
00156 }
00157 
00158 void KMessageSocket::initSocket ()
00159 {
00160   connect (mSocket, SIGNAL (error(int)), SIGNAL (connectionBroken()));
00161   connect (mSocket, SIGNAL (connectionClosed()), SIGNAL (connectionBroken()));
00162   connect (mSocket, SIGNAL (readyRead()), SLOT (processNewData()));
00163   mAwaitingHeader = true;
00164   mNextBlockLength = 0;
00165   isRecursive = false;
00166 }
00167 
00168 Q_UINT16 KMessageSocket::peerPort () const
00169 {
00170   return mSocket->peerPort();
00171 }
00172 
00173 QString KMessageSocket::peerName () const
00174 {
00175   return mSocket->peerName();
00176 }
00177 
00178 // ----------------------KMessageDirect -----------------------
00179 
00180 KMessageDirect::KMessageDirect (KMessageDirect *partner, QObject *parent, 
00181 const char *name)
00182   : KMessageIO (parent, name), mPartner (0)
00183 {
00184   // 0 as first parameter leaves the object unconnected
00185   if (!partner)
00186     return;
00187 
00188   // Check if the other object is already connected
00189   if (partner && partner->mPartner)
00190   {
00191     kdWarning(11001) << k_funcinfo << ": Object is already connected!" << endl;
00192     return;
00193   }
00194 
00195   // Connect from us to that object
00196   mPartner = partner;
00197 
00198   // Connect the other object to us
00199   partner->mPartner = this;
00200 }
00201 
00202 KMessageDirect::~KMessageDirect ()
00203 {
00204   if (mPartner)
00205   {
00206     mPartner->mPartner = 0;
00207     emit mPartner->connectionBroken();
00208   }
00209 }
00210 
00211 bool KMessageDirect::isConnected () const
00212 {
00213   return mPartner != 0;
00214 }
00215 
00216 void KMessageDirect::send (const QByteArray &msg)
00217 {
00218   if (mPartner)
00219     emit mPartner->received (msg);
00220   else
00221     kdError(11001) << k_funcinfo << ": Not yet connected!" << endl;
00222 }
00223 
00224 
00225 // ----------------------- KMessageProcess ---------------------------
00226 
00227 KMessageProcess::~KMessageProcess()
00228 {
00229   kdDebug(11001) << "@@@KMessageProcess::Delete process" << endl;
00230   if (mProcess)
00231   {
00232     mProcess->kill();
00233     delete mProcess;
00234     mProcess=0;
00235     // Remove not send buffers
00236     mQueue.setAutoDelete(true);
00237     mQueue.clear();
00238     // Maybe todo: delete mSendBuffer
00239   }
00240 }
00241 KMessageProcess::KMessageProcess(QObject *parent, QString file) : KMessageIO(parent,0)
00242 {
00243   // Start process
00244   kdDebug(11001) << "@@@KMessageProcess::Start process" << endl;
00245   mProcessName=file;
00246   mProcess=new KProcess;
00247   int id=0;
00248   *mProcess << mProcessName << QString("%1").arg(id);
00249   kdDebug(11001) << "@@@KMessageProcess::Init:Id= " << id << endl;
00250   kdDebug(11001) << "@@@KMessgeProcess::Init:Processname: " << mProcessName << endl;
00251   connect(mProcess, SIGNAL(receivedStdout(KProcess *, char *, int )),
00252                         this, SLOT(slotReceivedStdout(KProcess *, char * , int )));
00253   connect(mProcess, SIGNAL(receivedStderr(KProcess *, char *, int )),
00254                         this, SLOT(slotReceivedStderr(KProcess *, char * , int )));
00255   connect(mProcess, SIGNAL(processExited(KProcess *)),
00256                         this, SLOT(slotProcessExited(KProcess *)));
00257   connect(mProcess, SIGNAL(wroteStdin(KProcess *)),
00258                         this, SLOT(slotWroteStdin(KProcess *)));
00259   mProcess->start(KProcess::NotifyOnExit,KProcess::All);
00260   mSendBuffer=0;
00261   mReceiveCount=0;
00262   mReceiveBuffer.resize(1024);
00263 }
00264 bool KMessageProcess::isConnected() const
00265 {
00266   kdDebug(11001) << "@@@KMessageProcess::Is conencted" << endl;
00267   if (!mProcess) return false;
00268   return mProcess->isRunning();
00269 }
00270 void KMessageProcess::send(const QByteArray &msg)
00271 {
00272   kdDebug(11001) << "@@@KMessageProcess:: SEND("<<msg.size()<<") to process" << endl;
00273   unsigned int size=msg.size()+2*sizeof(long);
00274 
00275   char *tmpbuffer=new char[size];
00276   long *p1=(long *)tmpbuffer;
00277   long *p2=p1+1;
00278   kdDebug(11001)  << "p1="<<p1 << "p2="<< p2 << endl;
00279   memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
00280   *p1=0x4242aeae;
00281   *p2=size;
00282   
00283   QByteArray *buffer=new QByteArray();
00284   buffer->assign(tmpbuffer,size);
00285   // buffer->duplicate(msg);
00286   mQueue.enqueue(buffer);
00287   writeToProcess(); 
00288 }
00289 void KMessageProcess::writeToProcess()
00290 {
00291   // Previous send ok and item in queue
00292   if (mSendBuffer || mQueue.isEmpty()) return ;
00293   mSendBuffer=mQueue.dequeue();
00294   if (!mSendBuffer) return ;
00295 
00296   // write it out to the process
00297   //  kdDebug(11001) << " @@@@@@ writeToProcess::SEND to process " << mSendBuffer->size() << " BYTE " << endl;
00298   //  char *p=mSendBuffer->data();
00299   //  for (int i=0;i<16;i++) printf("%02x ",(unsigned char)(*(p+i)));printf("\n");
00300   mProcess->writeStdin(mSendBuffer->data(),mSendBuffer->size());
00301 
00302 }
00303 void KMessageProcess::slotWroteStdin(KProcess * )
00304 {
00305   kdDebug(11001) << k_funcinfo << endl;
00306   if (mSendBuffer)
00307   {
00308     delete mSendBuffer;
00309     mSendBuffer=0;
00310   }
00311   writeToProcess();
00312 }
00313 
00314 void KMessageProcess::slotReceivedStderr(KProcess * proc, char *buffer, int buflen)
00315 {
00316   int pid=0;
00317   int len;
00318   char *p;
00319   char *pos;
00320 //  kdDebug(11001)<<"############# Got stderr " << buflen << " bytes" << endl;
00321 
00322   if (!buffer || buflen==0) return ; 
00323   if (proc) pid=proc->pid();
00324 
00325 
00326   pos=buffer;
00327   do
00328   {
00329     p=(char *)memchr(pos,'\n',buflen);
00330     if (!p) len=buflen;
00331     else len=p-pos;
00332 
00333     QByteArray a;
00334     a.setRawData(pos,len);
00335     QString s(a);
00336     kdDebug(11001) << "PID" <<pid<< ":" << s << endl;
00337     a.resetRawData(pos,len);
00338     if (p) pos=p+1;
00339     buflen-=len+1;
00340   }while(buflen>0);
00341 }
00342 
00343 
00344 void KMessageProcess::slotReceivedStdout(KProcess * , char *buffer, int buflen)
00345 {
00346   kdDebug(11001) << "$$$$$$ " << k_funcinfo << ": Received " << buflen << " bytes over inter process communication" << endl;
00347 
00348   // TODO Make a plausibility check on buflen to avoid memory overflow
00349   while (mReceiveCount+buflen>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
00350   memcpy(mReceiveBuffer.data()+mReceiveCount,buffer,buflen);
00351   mReceiveCount+=buflen;
00352 
00353   // Possbile message
00354   while (mReceiveCount>2*sizeof(long))
00355   {
00356     long *p1=(long *)mReceiveBuffer.data();
00357     long *p2=p1+1;
00358     unsigned int len;
00359     if (*p1!=0x4242aeae)
00360     {
00361       kdDebug(11001) << k_funcinfo << ": Cookie error...transmission failure...serious problem..." << endl;
00362 //      for (int i=0;i<mReceiveCount;i++) fprintf(stderr,"%02x ",mReceiveBuffer[i]);fprintf(stderr,"\n");
00363     }
00364     len=(int)(*p2);
00365     if (len<2*sizeof(long))
00366     {
00367       kdDebug(11001) << k_funcinfo << ": Message size error" << endl;
00368       break;
00369     }
00370     if (len<=mReceiveCount)
00371     {
00372       kdDebug(11001) << k_funcinfo << ": Got message with len " << len << endl;
00373 
00374       QByteArray msg;
00375     //  msg.setRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
00376       msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
00377       emit received(msg);
00378      // msg.resetRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
00379       // Shift buffer
00380       if (len<mReceiveCount)
00381       {
00382         memmove(mReceiveBuffer.data(),mReceiveBuffer.data()+len,mReceiveCount-len);
00383       }
00384       mReceiveCount-=len;
00385     }
00386     else break;
00387   }
00388 }
00389 
00390 void KMessageProcess::slotProcessExited(KProcess * /*p*/)
00391 {
00392   kdDebug(11001) << "Process exited (slot)" << endl;
00393   emit connectionBroken();
00394   delete mProcess;
00395   mProcess=0;
00396 }
00397 
00398 
00399 // ----------------------- KMessageFilePipe ---------------------------
00400 KMessageFilePipe::KMessageFilePipe(QObject *parent,QFile *readfile,QFile *writefile) : KMessageIO(parent,0)
00401 {
00402   mReadFile=readfile;
00403   mWriteFile=writefile;
00404   mReceiveCount=0;
00405   mReceiveBuffer.resize(1024);
00406 }
00407 
00408 KMessageFilePipe::~KMessageFilePipe()
00409 {
00410 }
00411 
00412 bool KMessageFilePipe::isConnected () const
00413 {
00414   return (mReadFile!=0)&&(mWriteFile!=0);
00415 }
00416 
00417 void KMessageFilePipe::send(const QByteArray &msg)
00418 {
00419   unsigned int size=msg.size()+2*sizeof(long);
00420 
00421   char *tmpbuffer=new char[size];
00422   long *p1=(long *)tmpbuffer;
00423   long *p2=p1+1;
00424   memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
00425   *p1=0x4242aeae;
00426   *p2=size;
00427   
00428   QByteArray buffer;
00429   buffer.assign(tmpbuffer,size);
00430   mWriteFile->writeBlock(buffer);
00431   mWriteFile->flush();
00432   /*
00433   fprintf(stderr,"+++ KMessageFilePipe:: SEND(%d to parent) realsize=%d\n",msg.size(),buffer.size());
00434   for (int i=0;i<buffer.size();i++) fprintf(stderr,"%02x ",buffer[i]);fprintf(stderr,"\n");
00435   fflush(stderr);
00436   */
00437 }
00438 
00439 void KMessageFilePipe::exec()
00440 {
00441 
00442   // According to BL: Blocking read is ok
00443   // while(mReadFile->atEnd()) { usleep(100); }
00444 
00445    int ch=mReadFile->getch();
00446 
00447    while (mReceiveCount>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
00448    mReceiveBuffer[mReceiveCount]=(char)ch;
00449    mReceiveCount++;
00450 
00451    // Change for message 
00452    if (mReceiveCount>=2*sizeof(long))
00453    {
00454      long *p1=(long *)mReceiveBuffer.data();
00455      long *p2=p1+1;
00456      unsigned int len;
00457      if (*p1!=0x4242aeae)
00458      {
00459        fprintf(stderr,"KMessageFilePipe::exec:: Cookie error...transmission failure...serious problem...\n");
00460 //       for (int i=0;i<16;i++) fprintf(stderr,"%02x ",mReceiveBuffer[i]);fprintf(stderr,"\n");
00461      }
00462      len=(int)(*p2);
00463      if (len==mReceiveCount)
00464      {
00465        //fprintf(stderr,"KMessageFilePipe::exec:: Got Message with len %d\n",len);
00466 
00467        QByteArray msg;
00468        //msg.setRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
00469        msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
00470        emit received(msg);
00471        //msg.resetRawData(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
00472        mReceiveCount=0;
00473      }
00474    }
00475    
00476 
00477    return ;
00478 
00479   
00480 }
00481 
00482 #include "kmessageio.moc"
KDE Home | KDE Accessibility Home | Description of Access Keys