00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "kmessageio.h"
00025 #include <qsocket.h>
00026 #include <kdebug.h>
00027 #include <kprocess.h>
00028 #include <qfile.h>
00029
00030
00031
00032 KMessageIO::KMessageIO (QObject *parent, const char *name)
00033 : QObject (parent, name), m_id (0)
00034 {}
00035
00036 KMessageIO::~KMessageIO ()
00037 {}
00038
00039 void KMessageIO::setId (Q_UINT32 id)
00040 {
00041 m_id = id;
00042 }
00043
00044 Q_UINT32 KMessageIO::id ()
00045 {
00046 return m_id;
00047 }
00048
00049
00050
00051 KMessageSocket::KMessageSocket (QString host, Q_UINT16 port, QObject *parent,
00052 const char *name)
00053 : KMessageIO (parent, name)
00054 {
00055 mSocket = new QSocket ();
00056 mSocket->connectToHost (host, port);
00057 initSocket ();
00058 }
00059
00060 KMessageSocket::KMessageSocket (QHostAddress host, Q_UINT16 port, QObject
00061 *parent, const char *name)
00062 : KMessageIO (parent, name)
00063 {
00064 mSocket = new QSocket ();
00065 mSocket->connectToHost (host.toString(), port);
00066 initSocket ();
00067 }
00068
00069 KMessageSocket::KMessageSocket (QSocket *socket, QObject *parent, const char
00070 *name)
00071 : KMessageIO (parent, name)
00072 {
00073 mSocket = socket;
00074 initSocket ();
00075 }
00076
00077 KMessageSocket::KMessageSocket (int socketFD, QObject *parent, const char
00078 *name)
00079 : KMessageIO (parent, name)
00080 {
00081 mSocket = new QSocket ();
00082 mSocket->setSocket (socketFD);
00083 initSocket ();
00084 }
00085
00086 KMessageSocket::~KMessageSocket ()
00087 {
00088 delete mSocket;
00089 }
00090
00091 bool KMessageSocket::isConnected () const
00092 {
00093 return mSocket->state() == QSocket::Connection;
00094 }
00095
00096 void KMessageSocket::send (const QByteArray &msg)
00097 {
00098 QDataStream str (mSocket);
00099 str << Q_UINT8 ('M');
00100 str.writeBytes (msg.data(), msg.size());
00101 }
00102
00103 void KMessageSocket::processNewData ()
00104 {
00105 if (isRecursive)
00106 return;
00107 isRecursive = true;
00108
00109 QDataStream str (mSocket);
00110 while (mSocket->bytesAvailable() > 0)
00111 {
00112 if (mAwaitingHeader)
00113 {
00114
00115 if (mSocket->bytesAvailable() < 5)
00116 {
00117 isRecursive = false;
00118 return;
00119 }
00120
00121
00122
00123
00124 Q_UINT8 v;
00125 str >> v;
00126 if (v != 'M')
00127 {
00128 kdWarning(11001) << k_funcinfo << ": Received unexpected data, magic number wrong!" << endl;
00129 continue;
00130 }
00131
00132 str >> mNextBlockLength;
00133 mAwaitingHeader = false;
00134 }
00135 else
00136 {
00137
00138 if (mSocket->bytesAvailable() < (Q_ULONG) mNextBlockLength)
00139 {
00140 isRecursive = false;
00141 return;
00142 }
00143
00144 QByteArray msg (mNextBlockLength);
00145 str.readRawBytes (msg.data(), mNextBlockLength);
00146
00147
00148 emit received (msg);
00149
00150
00151 mAwaitingHeader = true;
00152 }
00153 }
00154
00155 isRecursive = false;
00156 }
00157
00158 void KMessageSocket::initSocket ()
00159 {
00160 connect (mSocket, SIGNAL (error(int)), SIGNAL (connectionBroken()));
00161 connect (mSocket, SIGNAL (connectionClosed()), SIGNAL (connectionBroken()));
00162 connect (mSocket, SIGNAL (readyRead()), SLOT (processNewData()));
00163 mAwaitingHeader = true;
00164 mNextBlockLength = 0;
00165 isRecursive = false;
00166 }
00167
00168 Q_UINT16 KMessageSocket::peerPort () const
00169 {
00170 return mSocket->peerPort();
00171 }
00172
00173 QString KMessageSocket::peerName () const
00174 {
00175 return mSocket->peerName();
00176 }
00177
00178
00179
00180 KMessageDirect::KMessageDirect (KMessageDirect *partner, QObject *parent,
00181 const char *name)
00182 : KMessageIO (parent, name), mPartner (0)
00183 {
00184
00185 if (!partner)
00186 return;
00187
00188
00189 if (partner && partner->mPartner)
00190 {
00191 kdWarning(11001) << k_funcinfo << ": Object is already connected!" << endl;
00192 return;
00193 }
00194
00195
00196 mPartner = partner;
00197
00198
00199 partner->mPartner = this;
00200 }
00201
00202 KMessageDirect::~KMessageDirect ()
00203 {
00204 if (mPartner)
00205 {
00206 mPartner->mPartner = 0;
00207 emit mPartner->connectionBroken();
00208 }
00209 }
00210
00211 bool KMessageDirect::isConnected () const
00212 {
00213 return mPartner != 0;
00214 }
00215
00216 void KMessageDirect::send (const QByteArray &msg)
00217 {
00218 if (mPartner)
00219 emit mPartner->received (msg);
00220 else
00221 kdError(11001) << k_funcinfo << ": Not yet connected!" << endl;
00222 }
00223
00224
00225
00226
00227 KMessageProcess::~KMessageProcess()
00228 {
00229 kdDebug(11001) << "@@@KMessageProcess::Delete process" << endl;
00230 if (mProcess)
00231 {
00232 mProcess->kill();
00233 delete mProcess;
00234 mProcess=0;
00235
00236 mQueue.setAutoDelete(true);
00237 mQueue.clear();
00238
00239 }
00240 }
00241 KMessageProcess::KMessageProcess(QObject *parent, QString file) : KMessageIO(parent,0)
00242 {
00243
00244 kdDebug(11001) << "@@@KMessageProcess::Start process" << endl;
00245 mProcessName=file;
00246 mProcess=new KProcess;
00247 int id=0;
00248 *mProcess << mProcessName << QString("%1").arg(id);
00249 kdDebug(11001) << "@@@KMessageProcess::Init:Id= " << id << endl;
00250 kdDebug(11001) << "@@@KMessgeProcess::Init:Processname: " << mProcessName << endl;
00251 connect(mProcess, SIGNAL(receivedStdout(KProcess *, char *, int )),
00252 this, SLOT(slotReceivedStdout(KProcess *, char * , int )));
00253 connect(mProcess, SIGNAL(receivedStderr(KProcess *, char *, int )),
00254 this, SLOT(slotReceivedStderr(KProcess *, char * , int )));
00255 connect(mProcess, SIGNAL(processExited(KProcess *)),
00256 this, SLOT(slotProcessExited(KProcess *)));
00257 connect(mProcess, SIGNAL(wroteStdin(KProcess *)),
00258 this, SLOT(slotWroteStdin(KProcess *)));
00259 mProcess->start(KProcess::NotifyOnExit,KProcess::All);
00260 mSendBuffer=0;
00261 mReceiveCount=0;
00262 mReceiveBuffer.resize(1024);
00263 }
00264 bool KMessageProcess::isConnected() const
00265 {
00266 kdDebug(11001) << "@@@KMessageProcess::Is conencted" << endl;
00267 if (!mProcess) return false;
00268 return mProcess->isRunning();
00269 }
00270 void KMessageProcess::send(const QByteArray &msg)
00271 {
00272 kdDebug(11001) << "@@@KMessageProcess:: SEND("<<msg.size()<<") to process" << endl;
00273 unsigned int size=msg.size()+2*sizeof(long);
00274
00275 char *tmpbuffer=new char[size];
00276 long *p1=(long *)tmpbuffer;
00277 long *p2=p1+1;
00278 kdDebug(11001) << "p1="<<p1 << "p2="<< p2 << endl;
00279 memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
00280 *p1=0x4242aeae;
00281 *p2=size;
00282
00283 QByteArray *buffer=new QByteArray();
00284 buffer->assign(tmpbuffer,size);
00285
00286 mQueue.enqueue(buffer);
00287 writeToProcess();
00288 }
00289 void KMessageProcess::writeToProcess()
00290 {
00291
00292 if (mSendBuffer || mQueue.isEmpty()) return ;
00293 mSendBuffer=mQueue.dequeue();
00294 if (!mSendBuffer) return ;
00295
00296
00297
00298
00299
00300 mProcess->writeStdin(mSendBuffer->data(),mSendBuffer->size());
00301
00302 }
00303 void KMessageProcess::slotWroteStdin(KProcess * )
00304 {
00305 kdDebug(11001) << k_funcinfo << endl;
00306 if (mSendBuffer)
00307 {
00308 delete mSendBuffer;
00309 mSendBuffer=0;
00310 }
00311 writeToProcess();
00312 }
00313
00314 void KMessageProcess::slotReceivedStderr(KProcess * proc, char *buffer, int buflen)
00315 {
00316 int pid=0;
00317 int len;
00318 char *p;
00319 char *pos;
00320
00321
00322 if (!buffer || buflen==0) return ;
00323 if (proc) pid=proc->pid();
00324
00325
00326 pos=buffer;
00327 do
00328 {
00329 p=(char *)memchr(pos,'\n',buflen);
00330 if (!p) len=buflen;
00331 else len=p-pos;
00332
00333 QByteArray a;
00334 a.setRawData(pos,len);
00335 QString s(a);
00336 kdDebug(11001) << "PID" <<pid<< ":" << s << endl;
00337 a.resetRawData(pos,len);
00338 if (p) pos=p+1;
00339 buflen-=len+1;
00340 }while(buflen>0);
00341 }
00342
00343
00344 void KMessageProcess::slotReceivedStdout(KProcess * , char *buffer, int buflen)
00345 {
00346 kdDebug(11001) << "$$$$$$ " << k_funcinfo << ": Received " << buflen << " bytes over inter process communication" << endl;
00347
00348
00349 while (mReceiveCount+buflen>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
00350 memcpy(mReceiveBuffer.data()+mReceiveCount,buffer,buflen);
00351 mReceiveCount+=buflen;
00352
00353
00354 while (mReceiveCount>2*sizeof(long))
00355 {
00356 long *p1=(long *)mReceiveBuffer.data();
00357 long *p2=p1+1;
00358 unsigned int len;
00359 if (*p1!=0x4242aeae)
00360 {
00361 kdDebug(11001) << k_funcinfo << ": Cookie error...transmission failure...serious problem..." << endl;
00362
00363 }
00364 len=(int)(*p2);
00365 if (len<2*sizeof(long))
00366 {
00367 kdDebug(11001) << k_funcinfo << ": Message size error" << endl;
00368 break;
00369 }
00370 if (len<=mReceiveCount)
00371 {
00372 kdDebug(11001) << k_funcinfo << ": Got message with len " << len << endl;
00373
00374 QByteArray msg;
00375
00376 msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
00377 emit received(msg);
00378
00379
00380 if (len<mReceiveCount)
00381 {
00382 memmove(mReceiveBuffer.data(),mReceiveBuffer.data()+len,mReceiveCount-len);
00383 }
00384 mReceiveCount-=len;
00385 }
00386 else break;
00387 }
00388 }
00389
00390 void KMessageProcess::slotProcessExited(KProcess * )
00391 {
00392 kdDebug(11001) << "Process exited (slot)" << endl;
00393 emit connectionBroken();
00394 delete mProcess;
00395 mProcess=0;
00396 }
00397
00398
00399
00400 KMessageFilePipe::KMessageFilePipe(QObject *parent,QFile *readfile,QFile *writefile) : KMessageIO(parent,0)
00401 {
00402 mReadFile=readfile;
00403 mWriteFile=writefile;
00404 mReceiveCount=0;
00405 mReceiveBuffer.resize(1024);
00406 }
00407
00408 KMessageFilePipe::~KMessageFilePipe()
00409 {
00410 }
00411
00412 bool KMessageFilePipe::isConnected () const
00413 {
00414 return (mReadFile!=0)&&(mWriteFile!=0);
00415 }
00416
00417 void KMessageFilePipe::send(const QByteArray &msg)
00418 {
00419 unsigned int size=msg.size()+2*sizeof(long);
00420
00421 char *tmpbuffer=new char[size];
00422 long *p1=(long *)tmpbuffer;
00423 long *p2=p1+1;
00424 memcpy(tmpbuffer+2*sizeof(long),msg.data(),msg.size());
00425 *p1=0x4242aeae;
00426 *p2=size;
00427
00428 QByteArray buffer;
00429 buffer.assign(tmpbuffer,size);
00430 mWriteFile->writeBlock(buffer);
00431 mWriteFile->flush();
00432
00433
00434
00435
00436
00437 }
00438
00439 void KMessageFilePipe::exec()
00440 {
00441
00442
00443
00444
00445 int ch=mReadFile->getch();
00446
00447 while (mReceiveCount>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
00448 mReceiveBuffer[mReceiveCount]=(char)ch;
00449 mReceiveCount++;
00450
00451
00452 if (mReceiveCount>=2*sizeof(long))
00453 {
00454 long *p1=(long *)mReceiveBuffer.data();
00455 long *p2=p1+1;
00456 unsigned int len;
00457 if (*p1!=0x4242aeae)
00458 {
00459 fprintf(stderr,"KMessageFilePipe::exec:: Cookie error...transmission failure...serious problem...\n");
00460
00461 }
00462 len=(int)(*p2);
00463 if (len==mReceiveCount)
00464 {
00465
00466
00467 QByteArray msg;
00468
00469 msg.duplicate(mReceiveBuffer.data()+2*sizeof(long),len-2*sizeof(long));
00470 emit received(msg);
00471
00472 mReceiveCount=0;
00473 }
00474 }
00475
00476
00477 return ;
00478
00479
00480 }
00481
00482 #include "kmessageio.moc"