/*
* Seven Kingdoms: Ancient Adversaries
*
* Copyright 1997,1998 Enlight Software Ltd.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
*/
//Filename : OREMOTE2.CPP
//Description : Object Remote - part 2
#include
#include
#include
#define DEBUG_LOG_LOCAL 1
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
//---------- Define static functions ----------//
// static int validate_queue(char* queueBuf, int queuedSize);
//-------- Begin of function Remote::new_msg ---------//
//
// Allocate a RemoteMsg with the specified data size.
//
// msgId - id. of the message.
// dataSize - the data size of the RemoteMsg needed (size of RemoteMsg::data_buf)
//
// Data structure:
//
// size of RemoteMsg + RemoteMsg body
// ^ ^
// | Allocate starts here | return variable here to the client function
//
RemoteMsg* Remote::new_msg(int msgId, int dataSize)
{
int msgSize = sizeof(DWORD) + dataSize;
// is for the message id.
char* shortPtr;
//--- use common_msg_buf, if the requested one is not bigger than common_msg_buf ---//
int reqSize = sizeof(short) + msgSize;
// for length
if(reqSize <= COMMON_MSG_BUF_SIZE )
{
shortPtr = common_msg_buf;
}
else //---- allocate a new RemoteMsg if the requested one is bigger than common_msg_buf ----//
{
shortPtr = (char *)mem_add( reqSize );
}
//---------- return RemoteMsg now -----------//
*(short *)shortPtr = msgSize;
shortPtr += sizeof(short);
RemoteMsg* remoteMsgPtr = (RemoteMsg*) shortPtr;
remoteMsgPtr->id = (DWORD) msgId;
return remoteMsgPtr;
}
//--------- End of function Remote::new_msg ---------//
//-------- Begin of function Remote::free_msg ---------//
//
// Free a RemoteMsg previously allocated by remote_msg().
//
// Data structure:
//
// size of RemoteMsg + RemoteMsg body
// ^ ^
// | Allocate starts here | return variable here to the client function
//
void Remote::free_msg(RemoteMsg* remoteMsgPtr)
{
char* memPtr = (char *)remoteMsgPtr - sizeof(short);
// include size of message, sizeof MPMSG_FREE_MSG
if( memPtr != common_msg_buf ) // the real allocation block start 2 bytes before remoteMsgPtr
mem_del( memPtr );
}
//--------- End of function Remote::free_msg ---------//
//-------- Begin of function Remote::send_msg ---------//
//
// remoteMsgPtr - the pointer to RemoteMsg to be sent.
// [DPID] receiverId - send to whom, 0 for everybody
// (default : 0)
//
// size of RemoteMsg + RemoteMsg body
// ^ ^
// | Allocate starts here | return variable here to the client function
//
void Remote::send_msg(RemoteMsg* remoteMsgPtr, int receiverId)
{
if( handle_vga_lock )
vga_front.temp_unlock();
// structure : ,
char* memPtr = (char*)remoteMsgPtr - sizeof(short); // the preceeding is the size of RemoteMsg
int msgSize = *(short *)memPtr + sizeof(short);
// mp_ptr->send( receiverId, memPtr, msgSize );
ec_remote.send( ec_remote.get_ec_player_id(receiverId), memPtr, msgSize);
packet_send_count++;
if( handle_vga_lock )
vga_front.temp_restore_lock();
}
//--------- End of function Remote::send_msg ---------//
//-------- Begin of function Remote::send_free_msg ---------//
//
// remoteMsgPtr - the pointer to RemoteMsg to be sent.
// [DPID] receiverId - send to whom, 0 for everybody
// (default : 0)
//
// size of RemoteMsg + RemoteMsg body
// ^ ^
// | Allocate starts here | return variable here to the client function
//
void Remote::send_free_msg(RemoteMsg* remoteMsgPtr, int receiverId)
{
if( handle_vga_lock )
vga_front.temp_unlock();
char* memPtr = (char*)remoteMsgPtr - sizeof(short); // the preceeding is the size of RemoteMsg
int msgSize = *(short *)memPtr + sizeof(short);
// mp_ptr->send( receiverId, memPtr, msgSize );
ec_remote.send( ec_remote.get_ec_player_id(receiverId), memPtr, msgSize);
packet_send_count++;
if( memPtr != common_msg_buf ) // the real allocation block start 2 bytes before remoteMsgPtr
mem_del( memPtr );
if( handle_vga_lock )
vga_front.temp_restore_lock();
}
//--------- End of function Remote::send_free_msg ---------//
//-------- Begin of function Remote::new_send_queue_msg ---------//
//
// Add the message into the sending queue.
//
// msgId - id. of the message.
// dataSize - the data size of the RemoteMsg needed (size of RemoteMsg::data_buf)
//
// Queue data structure:
//
// <1st msg size> + <1st msg body> + <2nd msg size> + ...
//
// return : the pointer RemoteMsg::data_buf of the allocated message
//
char* Remote::new_send_queue_msg(int msgId, int dataSize)
{
char *dataPtr = send_queue[0].reserve(sizeof(short) + sizeof(DWORD) + dataSize);
// ----- put the size of message ------//
*(short *)dataPtr = sizeof(DWORD) + dataSize;
dataPtr += sizeof(short);
// ----- put the message id --------//
((RemoteMsg *)dataPtr)->id = msgId;
return ((RemoteMsg *)dataPtr)->data_buf;
}
//--------- End of function Remote::new_send_queue_msg ---------//
//-------- Begin of function Remote::send_queue_now ---------//
//
// Send out all send_queued message.
//
// [DPID] receiverId - send to whom, 0 for everybody
// (default: 0)
//
// return : 1 - sent successfully, the message has been received
// by the receivers.
// 0 - not successful.
//
int Remote::send_queue_now(int receiverId)
{
if( send_queue[0].length() ==0 )
return 1;
if( !send_queue[0].validate_queue() )
err.run( "Queue Corrupted, Remote::send_queue_now()" );
if( handle_vga_lock )
vga_front.temp_unlock();
//----------------------------------------------//
int sendFlag = 1;
//if(!mp_ptr->send( receiverId, send_queue[0].queue_buf, send_queue[0].length()))
if(ec_remote.send( ec_remote.get_ec_player_id(receiverId), send_queue[0].queue_buf, send_queue[0].length()) <= 0 )
sendFlag = 0;
packet_send_count++;
//---------------------------------------------//
if( handle_vga_lock )
vga_front.temp_restore_lock();
return sendFlag;
}
//--------- End of function Remote::send_queue_now ---------//
//-------- Begin of function Remote::append_send_to_receive ---------//
//
// Append all send queues to the receiving queue.
//
void Remote::append_send_to_receive()
{
if( send_queue[0].length() ==0 )
return;
int n;
for( n = 0; n < RECEIVE_QUEUE_BACKUP; ++n)
{
if( send_frame_count[0] == receive_frame_count[n])
{
receive_queue[n].append_queue(send_queue[0]);
if( !receive_queue[n].validate_queue() )
err.run( "Queue Corrupted, Remote::append_send_to_receive()-3" );
break;
}
}
err_when( n >= RECEIVE_QUEUE_BACKUP ); // not found
}
//--------- End of function Remote::append_send_to_receive ---------//
//-------- Begin of function Remote::poll_msg ---------//
int Remote::poll_msg()
{
if ( !is_enable() || poll_msg_flag <= 0)
return 0;
//--------------------------------------------//
int receivedFlag=0;
DWORD msgListSize;
int loopCount=0;
if( handle_vga_lock )
vga_front.temp_unlock();
ec_remote.yield();
char *recvBuf;
// DPID from, to;
char from;
DWORD recvLen;
// while( (recvBuf = mp_ptr->receive(&from, &to, &recvLen)) != NULL)
while( (recvBuf = ec_remote.receive(&from, (long unsigned int *)&recvLen)) != NULL)
{
err_when(++loopCount > 1000 );
msgListSize = recvLen;
//-------- received successfully ----------//
/*
if( !power.enable_flag ) // power.enable_flag determines whether the multiplayer game has started or not
{
int validateLen = receive_queue[0].length();
memcpy( receive_queue[0].reserve(msgListSize), recvBuf, msgListSize );
if( !receive_queue[0].validate_queue(validateLen) )
err.run( "Queue Corrupted, Remote::poll_msg()-99" );
receivedFlag=1;
}
else
*/
err_when( msgListSize < sizeof(short) + sizeof(DWORD) );
RemoteMsg *rMsg = (RemoteMsg *)(recvBuf + sizeof(short) );
int queueToCopy = -1;
//--- if message id !=MSG_QUEUE_HEADER, this packet is sent by send_free_msg(), not by send_queue_now() ---//
if( rMsg->id != MSG_QUEUE_HEADER )
{
// DEBUG_LOG("begin MSG_xxxx received");
// DEBUG_LOG(rMsg->id);
// DEBUG_LOG(sys.frame_count);
// DEBUG_LOG("end MSG_xxxx received");
queueToCopy = 0;
}
else
{
// ------- find which receive queue to hold the message -----//
DWORD senderFrameCount = *(DWORD *)rMsg->data_buf;
for(int n = 0; n < RECEIVE_QUEUE_BACKUP; ++n )
{
if( senderFrameCount == receive_frame_count[n] )
{
queueToCopy = n;
break;
}
}
// DEBUG_LOG("begin MSG_QUEUE_HEADER received");
// DEBUG_LOG(senderFrameCount);
// DEBUG_LOG(sys.frame_count);
// DEBUG_LOG(n);
// DEBUG_LOG("end MSG_QUEUE_HEADER received");
}
if( queueToCopy >= 0 && queueToCopy < RECEIVE_QUEUE_BACKUP )
{
RemoteQueue &rq = receive_queue[queueToCopy];
int validateLen = rq.length();
memcpy( rq.reserve(msgListSize), recvBuf, msgListSize );
if( !rq.validate_queue(validateLen) )
err.run( "Queue Corrupted, Remote::poll_msg()-2" );
}
else
{
// discard the frame in non-debug mode
DEBUG_LOG("message is discard" );
if( rMsg->id != MSG_QUEUE_HEADER )
{
DEBUG_LOG(rMsg->id);
}
else
{
DWORD senderFrameCount = *(DWORD *)rMsg->data_buf;
DEBUG_LOG("MSG_QUEUE_HEADER message");
DEBUG_LOG(senderFrameCount);
}
}
ec_remote.de_recv_queue();
receivedFlag = 1;
packet_receive_count++;
}
if( handle_vga_lock )
vga_front.temp_restore_lock();
return receivedFlag;
}
//--------- End of function Remote::poll_msg ---------//
//-------- Begin of function Remote::process_receive_queue ---------//
//
// Process messages in the receive queue.
//
void Remote::process_receive_queue()
{
if( !process_queue_flag ) // avoid sys.yield()
return;
int processFlag;
int loopCount=0;
disable_poll_msg();
RemoteQueue &rq = receive_queue[0];
RemoteQueueTraverse rqt(rq);
if( !rq.validate_queue() )
err.run( "Queue corrupted, Remote::process_receive_queue()" );
//---- if not started yet, process the message without handling the message sequence ----//
if( !power.enable_flag )
{
for( rqt.traverse_set_start(); !rqt.traverse_finish(); rqt.traverse_next() )
{
err_when( ++loopCount > 1000 );
RemoteMsg* remoteMsgPtr = rqt.get_remote_msg();
remoteMsgPtr->process_msg();
}
}
else //--------- process in the order of nation recno --------//
{
LOG_BEGIN;
for( int nationRecno=1 ; nationRecno<=nation_array.size() ; ++nationRecno )
{
if( nation_array.is_deleted(nationRecno) || nation_array[nationRecno]->nation_type==NATION_AI )
continue;
nation_processing = nationRecno;
processFlag=0;
//-------- scan through the message queue --------//
loopCount = 0;
for( rqt.traverse_set_start(); !rqt.traverse_finish(); rqt.traverse_next() )
{
err_when( ++loopCount > 1000 );
RemoteMsg* remoteMsgPtr = rqt.get_remote_msg();
//--- check if this message indicates the start of a new message queue ---//
if( remoteMsgPtr->id == MSG_QUEUE_HEADER )
{
short msgNation = *(short *)(&remoteMsgPtr->data_buf[sizeof(DWORD)]);
//--- if this is the message queue of the nation we should process now ----//
if(msgNation==nationRecno ) // struct of data_buf: frameCount + nationRecno
{
processFlag = 1;
}
else //--- if this is a message queue that should be processed now ---//
{
if( processFlag ) //--- if we were previously processing the right queue, now the processing is complete by this point ---//
break; // message on next nation is met, stop this nation
}
}
else if( remoteMsgPtr->id == MSG_QUEUE_TRAILER )
{
short msgNation = *(short *)remoteMsgPtr->data_buf;
//--- if this is the message queue of the nation we should process now ----//
if(msgNation==nationRecno ) // struct of data_buf: nationRecno
break; // end of that nation
}
else
{
//--- if this is the message queue of the nation we should process now ----//
if( processFlag )
{
#if defined(DEBUG) && defined(ENABLE_LOG)
String logStr("begin process remote message id:");
logStr += (long) remoteMsgPtr->id;
logStr += " of nation ";
logStr += nation_processing;
LOG_MSG(logStr);
#endif
remoteMsgPtr->process_msg();
LOG_MSG("end process remote message");
LOG_MSG(m.get_random_seed());
}
}
}
nation_processing = 0;
}
LOG_END;
}
//---------- clear receive_queue[0] and shift from next queue ------//
rq.clear();
int n;
for( n = 1; n < RECEIVE_QUEUE_BACKUP; ++n)
{
receive_queue[n-1].swap(receive_queue[n]);
receive_frame_count[n-1] = receive_frame_count[n];
}
receive_frame_count[n-1]++;
enable_poll_msg();
}
//--------- End of function Remote::process_receive_queue ---------//
//-------- Begin of function Remote::process_specific_msg ---------//
//
// Pre-process a specific message type.
//
void Remote::process_specific_msg(DWORD msgId)
{
if( !process_queue_flag ) // avoid sys.yield()
return;
int loopCount=0;
disable_poll_msg();
RemoteQueue &rq = receive_queue[0];
RemoteQueueTraverse rqt(rq);
if( !rq.validate_queue() )
err.run( "Queue corrupted, Remote::process_specific_msg()" );
for( rqt.traverse_set_start(); !rqt.traverse_finish(); rqt.traverse_next() )
{
err_when( ++loopCount > 1000 );
RemoteMsg* remoteMsgPtr = rqt.get_remote_msg();
if( remoteMsgPtr->id == msgId )
{
remoteMsgPtr->process_msg();
remoteMsgPtr->id = 0;
}
}
enable_poll_msg();
}
//--------- End of function Remote::process_specific_msg ---------//
/*
//-------- Begin of function Remote::validate_queue ---------//
//
// Pre-process a specific message type.
//
static int validate_queue(char* queueBuf, int queuedSize)
{
char* queuePtr = queueBuf;
RemoteMsg* remoteMsgPtr;
int msgSize, processedSize=0;
int loopCount=0;
while( processedSize < queuedSize )
{
msgSize = *((short*)queuePtr);
remoteMsgPtr = (RemoteMsg*) (queuePtr + sizeof(short));
if( remoteMsgPtr->id )
{
if( remoteMsgPtr->idid>LAST_REMOTE_MSG_ID )
return 0;
}
queuePtr += sizeof(short) + msgSize;
processedSize += sizeof(short) + msgSize;
err_when( loopCount++ > 10000 ); // deadloop
}
return 1;
}
//--------- End of function Remote::validate_queue ---------//
*/
//-------- Begin of function Remote::copy_send_to_backup ---------//
//
// Copy the whole send queue to the backup queue.
//
void Remote::copy_send_to_backup()
{
// ------ shift send_queue ---------//
send_queue[SEND_QUEUE_BACKUP-1].clear();
for(int n = SEND_QUEUE_BACKUP-1; n > 1; --n)
{
send_queue[n].swap(send_queue[n-1]);
send_frame_count[n] = send_frame_count[n-1];
}
// now send_queue[1] is empty.
send_queue[1] = send_queue[0];
send_frame_count[1] = send_frame_count[0];
}
//--------- End of function Remote::copy_send_to_backup ---------//
//-------- Begin of function Remote::send_backup_now ---------//
//
// Send out the backup queue now.
//
// [DPID] receiverId - send to whom, 0 for everybody
// (default: 0)
//
// requestFrameCount - need to send the backup buffer of the requested frame count
//
// return : 1 - sent successfully, the message has been received
// by the receivers.
// 0 - not successful.
//
int Remote::send_backup_now(int receiverId, DWORD requestFrameCount)
{
//------ determine which backup buffer to send -----//
font_san.disp( ZOOM_X2-100, 4, "", ZOOM_X2);
for( int n = 1; n < SEND_QUEUE_BACKUP; ++n)
{
if( requestFrameCount == send_frame_count[n] )
{
//---------- if nothing to send -------------//
int retFlag = 1;
if( send_queue[n].length() > 0 )
{
if( handle_vga_lock )
vga_front.temp_unlock();
// retFlag = mp_ptr->send(receiverId, send_queue[n].queue_buf, send_queue[n].length());
retFlag = ec_remote.send(ec_remote.get_ec_player_id(receiverId), send_queue[n].queue_buf, send_queue[n].length()) > 0;
packet_send_count++;
if( handle_vga_lock )
vga_front.temp_restore_lock();
}
return retFlag;
}
}
if( requestFrameCount < send_frame_count[SEND_QUEUE_BACKUP-1])
err.run( "requestFrameCount:%d < backup2_frame_count:%d", requestFrameCount, send_frame_count[SEND_QUEUE_BACKUP-1] );
return 0;
}
//--------- End of function Remote::send_backup_now ---------//
//-------- Begin of function Remote::init_send_queue ---------//
//
// Initialize the header of the sending queue.
//
// frameCount - frame count of this queue
// nationCount - nation recno of the sender
//
void Remote::init_send_queue(DWORD frameCount, short nationRecno)
{
send_queue[0].clear();
// put into the queue : , MSG_QUEUE_HEADER, ,
int msgSize = sizeof(DWORD)*2 + sizeof(short);
char *sendPtr = send_queue[0].reserve(sizeof(short) + msgSize);
*(short *)sendPtr = msgSize;
sendPtr += sizeof(short);
*(DWORD *)sendPtr = MSG_QUEUE_HEADER;
sendPtr += sizeof(DWORD);
*(DWORD *)sendPtr = send_frame_count[0] = next_send_frame(nationRecno, frameCount + process_frame_delay);
sendPtr += sizeof(DWORD);
*(short *)sendPtr = nationRecno;
sendPtr += sizeof(short);
}
//--------- End of function Remote::init_send_queue ----------//
//-------- Begin of function Remote::init_receive_queue ---------//
//
// Initialize the header of the receiving queue,
// call this only when power is just enabled
//
// frameCount - frame count of this queue
// nationCount - nation recno of the receiveer
//
void Remote::init_receive_queue(DWORD frameCount)
{
int n;
for(n = 0; n < RECEIVE_QUEUE_BACKUP; ++n)
{
receive_queue[n].clear();
receive_frame_count[n] = n+frameCount;
}
for(n = 0; n < process_frame_delay; ++n)
{
// nations are not created yet, put MSG_QUEUE_HEADER/MSG_NEXT_FRAME and MSG_QUEUE_TRAILER
// even though the nation will not exist
for(short nationRecno = 1; nationRecno <= MAX_NATION; ++nationRecno)
{
//if( nation_array.is_deleted(nationRecno) ||
// nation_array[nationRecno]->nation_type==NATION_AI )
// continue;
char *receivePtr;
int msgSize;
// put into the queue : , MSG_QUEUE_HEADER, ,
msgSize = sizeof(DWORD)*2 + sizeof(short);
receivePtr = receive_queue[n].reserve(sizeof(short) + msgSize);
*(short *)receivePtr = msgSize;
receivePtr += sizeof(short);
*(DWORD *)receivePtr = MSG_QUEUE_HEADER;
receivePtr += sizeof(DWORD);
*(DWORD *)receivePtr = frameCount + n;
receivePtr += sizeof(DWORD);
*(short *)receivePtr = nationRecno;
receivePtr += sizeof(short);
// put into the queue : , MSG_NEXT_FRAME,
msgSize = sizeof(DWORD) + sizeof(short);
receivePtr = receive_queue[n].reserve(sizeof(short) + msgSize);
*(short *)receivePtr = msgSize;
receivePtr += sizeof(short);
*(DWORD *)receivePtr = MSG_NEXT_FRAME;
receivePtr += sizeof(DWORD);
*(short *)receivePtr = nationRecno;
receivePtr += sizeof(short);
// put into the queue : , MSG_QUEUE_TRAILER,
msgSize = sizeof(DWORD) + sizeof(short);
receivePtr = receive_queue[n].reserve(sizeof(short) + msgSize);
*(short *)receivePtr = msgSize;
receivePtr += sizeof(short);
*(DWORD *)receivePtr = MSG_QUEUE_TRAILER;
receivePtr += sizeof(DWORD);
*(short *)receivePtr = nationRecno;
receivePtr += sizeof(short);
}
}
}
//--------- End of function Remote::init_receive_queue ----------//
//--------- Begin of function Remote::init_start_mp ----------//
void Remote::init_start_mp()
{
int n;
for( n = 0; n < SEND_QUEUE_BACKUP; ++n)
{
send_queue[n].clear();
send_frame_count[n] = 0;
}
for( n = 0; n < RECEIVE_QUEUE_BACKUP; ++n)
{
receive_queue[n].clear();
receive_frame_count[n] = 0;
}
}
//--------- End of function Remote::init_start_mp ----------//
//--------- Begin of function Remote::enable_process_queue --------//
void Remote::enable_process_queue()
{
process_queue_flag = 1;
}
//--------- End of function Remote::enable_process_queue --------//
//--------- Begin of function Remote::disable_process_queue --------//
void Remote::disable_process_queue()
{
process_queue_flag = 0;
}
//--------- End of function Remote::disable_process_queue --------//
//-------- Begin of function Remote::enable_poll_msg ---------//
void Remote::enable_poll_msg()
{
poll_msg_flag = 1;
}
//-------- End of function Remote::enable_poll_msg ---------//
//-------- Begin of function Remote::disable_poll_msg ---------//
void Remote::disable_poll_msg()
{
poll_msg_flag = 0;
}
//-------- End of function Remote::disable_poll_msg ---------//