[Develope]/Network

Asynchronous IO

하늘을닮은호수M 2005. 2. 21. 13:28
반응형

윈도우에서는 iocp라는 것을 통해서 AsynchronousIO를 구현할 수 있었다.
(사실은 나도 자세히는 모른다.)

그러던 중 스트리밍 서버 프로젝트를 진행하면서 AsynchronouIO를 구현할 필요를 느꼈는데,

이를 구현한 간단한 프로그램을 ACE라는 라이브러리 안의 예제 파일에서 구할 수 있었다.

아래 예제는 위에서 얻은 프로그램을 RT Signal을 사용하여 동작할 수 있도록 약간 수정한 것이다.

간단히 설명하자면, 한 쪽에서는 파일에 일정 스트링을 쓰려고 하고, 한 쪽에서는 파일에 쓰여진 스티링을 읽으려고 한다. 이 두 동작을 같이 동작시키면, 쓰려는 쪽에서 동작이 완료되면 RT Signal(SIGRTMIN)을 날려 동작완료를 알린다. 이 때 읽으려는 쪽에서는 동작이 완료가 되지 않았기 때문에 "AIO read in progress"라는 메세지를 날리고, 그 후에 읽기 동작이 완료가 되면 RT Signal(SIGRTMIN + 1)을 날려 동작완료를 알리게 됩니다.

// modified_aiocb.cpp,v 0.1 2004/02/18

// ============================================================================
//
// = LIBRARY
// proactor
//
// = FILENAME
// modified_aiocb.cpp
//
// = DESCRIPTION
// modified_aiocb.cpp which use RT Signal.
// Original Source : ACE's test_aiocb_ace.cpp
//
// = COMPILE and RUN
// %g++ -g -o modified_aiocb -lrt modified_aiocb.cpp
// % ./modified_aiocb
//
// = AUTHOR
// sunsson <sunsson@varovision.com>
//
// ============================================================================

#include
#include
#include
#include
#include
#include
#include
#include
#include

#include

extern "C"
{
typedef void (* SIGNAL_C_FUNC)(int, siginfo_t *, void *);
}

void *
mySigHandler(int sig, struct siginfo* info, void * pContext);

class Test_Aio
{
public:
Test_Aio (void);
// Default constructor.

int init (void);
// Initting the output file and the buffer.

int do_aio (void);
// Doing the testing stuff.

int setup_signal_handler (int signal_number) const;
// Set up the specified signal so that signal information will be
// passed to sigwaitinfo/sigtimedwait.

//void *mySigHandler (int sig, struct aiocb * info, void * pContext);

~Test_Aio (void);
// Destructor.
private:
int out_fd_;
// Output file descriptor.

sigset_t RT_completion_signals_;
//to block waiting for SIGRTMIN

struct aiocb *aiocb_write_;
// For writing to the file.

struct aiocb *aiocb_read_;
// Reading stuff from the file.

char *buffer_write_;
// The buffer to be written to the out_fd.

char *buffer_read_;
// The buffer to be read back from the file.
};

Test_Aio::Test_Aio (void)
: aiocb_write_ (new struct aiocb),
aiocb_read_ (new struct aiocb),
buffer_write_ (0),
buffer_read_ (0)
{
#if 1
//clear the signal set
sigemptyset(&this->RT_completion_signals_);
sigaddset(&this->RT_completion_signals_, SIGRTMIN);
sigaddset(&this->RT_completion_signals_, SIGRTMIN+1);
//sigprocmask(SIG_BLOCK, &this->RT_completion_signals_, NULL); //set에 설정된 시그널을 블럭 시그널셋에 추가시킨다.
//pthread_sigmask (SIG_BLOCK, &this->RT_completion_signals_, 0);
this->setup_signal_handler(SIGRTMIN);
this->setup_signal_handler(SIGRTMIN+1);
#else
/* set up signal handler for SIGRTMIN */
struct sigaction action;

sigemptyset (&action.sa_mask);
action.sa_sigaction = SIGNAL_C_FUNC(mySigHandler);
action.sa_flags = SA_SIGINFO;
int sigaction_return = sigaction (SIGRTMIN, &action, NULL);
#endif
}

Test_Aio::~Test_Aio (void)
{
delete aiocb_write_;
delete aiocb_read_;
delete buffer_write_;
delete buffer_read_;
}

int
Test_Aio::setup_signal_handler (int signal_number) const
{
struct sigaction reaction;
sigemptyset (&reaction.sa_mask); // Nothing else to mask.
reaction.sa_flags = SA_SIGINFO; // Realtime flag.
reaction.sa_sigaction = SIGNAL_C_FUNC (mySigHandler); // (SIG_DFL);
int sigaction_return = sigaction (signal_number, &reaction, 0);
if (sigaction_return == -1)
printf("Proactor couldnt do sigaction for the RT SIGNAL");
}


// Init the output file and init the buffer.
int
Test_Aio::init (void)
{
// Open the output file.
this->out_fd_ = open ("test_aio.log", O_RDWR | O_CREAT | O_TRUNC, 0666);
if (this->out_fd_ == 0)
{
cout << "Error : Opening file" << endl;
return -1;
}

// Init the buffers.
this->buffer_write_ = strdup ("Welcome to the world of AIO... AIO Rules !!!");
cout << "The WRITE buffer is : " << this->buffer_write_ << endl;
this->buffer_read_ = new char [strlen (this->buffer_write_) + 1];
return 0;
}

// Set the necessary things for the AIO stuff.
// Write the buffer asynchly.hmm Disable signals.
// Go on aio_suspend. Wait for completion.
// Print out the result.
int
Test_Aio::do_aio (void)
{
// = Read from that file.
// Setup AIOCB.
this->aiocb_read_->aio_fildes = this->out_fd_;
this->aiocb_read_->aio_offset = 0;
this->aiocb_read_->aio_buf = this->buffer_read_;
this->aiocb_read_->aio_nbytes = strlen (this->buffer_write_);
this->aiocb_read_->aio_reqprio = 0;
this->aiocb_read_->aio_sigevent.sigev_notify = SIGEV_SIGNAL;

//RTSIG Usage test by sunsson
this->aiocb_read_->aio_sigevent.sigev_signo = SIGRTMIN;
this->aiocb_read_->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
this->aiocb_read_->aio_sigevent.sigev_value.sival_ptr = (void *) this->aiocb_write_;

// = Write to the file.
// Setup AIOCB.
this->aiocb_write_->aio_fildes = this->out_fd_;
this->aiocb_write_->aio_offset = 0;
this->aiocb_write_->aio_buf = this->buffer_write_;
this->aiocb_write_->aio_nbytes = strlen (this->buffer_write_);
this->aiocb_write_->aio_reqprio = 0;

//RTSIG Usage test by sunsson
this->aiocb_write_->aio_sigevent.sigev_signo = SIGRTMIN+1;
this->aiocb_write_->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
this->aiocb_write_->aio_sigevent.sigev_value.sival_ptr = (void *) this->aiocb_write_;

// Fire off the aio write.
if (aio_write (this->aiocb_write_) != 0)
{
perror ("aio_write");
return -1;
}

// Fire off the aio write. If it doesnt get queued, carry on to get
// the completion for the first one.
if (aio_read (this->aiocb_read_) < 0)
perror ("aio_read");

// Wait for the completion on aio_suspend.
struct aiocb *list_aiocb[2];
list_aiocb [0] = this->aiocb_write_;
list_aiocb [1] = this->aiocb_read_;

// Do suspend till all the aiocbs in the list are done.
int done = 0;
int return_val = 0;
while (!done)
{
return_val = aio_suspend (list_aiocb, 2, 0);
cerr << "Return value :" << return_val << endl;

// Analyze return and error values.
if (list_aiocb[0] != 0)
{
if (aio_error (list_aiocb [0]) != EINPROGRESS)
{
if (aio_return (list_aiocb [0]) == -1)
{
perror ("aio_return");
return -1;
}
else
{
// Successful. Store the pointer somewhere and make the
// entry NULL in the list.
this->aiocb_write_ = list_aiocb [0];
list_aiocb [0] = 0;
}
}
else
cout << "AIO write in progress" << endl;
}

if (list_aiocb[1] != 0)
{
if (aio_error (list_aiocb [1]) != EINPROGRESS)
{
int read_return = aio_return (list_aiocb[1]);
if (read_return == -1)
{
perror ("aio_return");
return -1;
}
else
{
// Successful. Store the pointer somewhere and make the
// entry NULL in the list.
this->aiocb_read_ = list_aiocb [1];
list_aiocb [1] = 0;
this->buffer_read_[read_return] = '';
}
}
else
cout << "AIO read in progress" << endl;
}

// Is it done?
if ((list_aiocb [0] == 0) && (list_aiocb [1] == 0))
done = 1;
}

cout << "Both the AIO operations done." << endl;
cout << "The READ buffer is : " << this->buffer_read_ << endl;

return 0;

}

void *
mySigHandler(int sig, struct siginfo* info, void * pContext)
{

if(info->si_signo == SIGRTMIN)
{
/* print out what was read */
printf("mySigHandler : Got signal for aio readn" );
printf("tUser RTS signal's PID : %dn", info->si_pid);
printf("tUser RTS Signal's Number : %dn",info->si_signo);
//printf("User Data is %dn",info->si_value.sival_int);
}
else if(info->si_signo == SIGRTMIN+1)
{
/* print out what was write */
printf("mySigHandler : Got signal for aio writen" );
printf("tUser RTS signal's PID : %dn", info->si_pid);
printf("tUser RTS Signal's Number : %dn",info->si_signo);
//printf("User Data is %dn",info->si_value.sival_int);
}

/* write is complete so let's do cleanup for it here */
if (aio_return ((aiocb*)info->si_value.sival_ptr) == -1)
{
printf ("mySigHandler: aio_return for aiocb_write failedn");
}
}


int
main (int argc, char **argv)
{
Test_Aio test_aio;

if (test_aio.init () != 0)
{
printf ("AIOCB test failed:n"
"POSIX_AIOCB_PROACTOR may not work in this platformn");
return -1;
}

if (test_aio.do_aio () != 0)
{
printf ("AIOCB test failed:n"
"POSIX_AIOCB_PROACTOR may not work in this platformn");
return -1;
}
printf ("AIOCB test successful:n");
//"POSIX_AIOCB_PROACTOR should work in this platformn");
return 0;
}

반응형