aboutsummaryrefslogtreecommitdiffstats
path: root/ports/winnt/ntpd/ntp_iocompletionport.c
diff options
context:
space:
mode:
Diffstat (limited to 'ports/winnt/ntpd/ntp_iocompletionport.c')
-rw-r--r--ports/winnt/ntpd/ntp_iocompletionport.c101
1 files changed, 63 insertions, 38 deletions
diff --git a/ports/winnt/ntpd/ntp_iocompletionport.c b/ports/winnt/ntpd/ntp_iocompletionport.c
index c5db62a8dd5f..9cf095225bc2 100644
--- a/ports/winnt/ntpd/ntp_iocompletionport.c
+++ b/ports/winnt/ntpd/ntp_iocompletionport.c
@@ -708,7 +708,7 @@ QueueSerialWait(
recvbuf_t * buff
)
{
- static const char * const msg =
+ static const char * const msgh =
"QueueSerialWait: cannot wait for COM event";
BOOL rc;
@@ -720,7 +720,7 @@ QueueSerialWait(
buff->fd = lpo->iopad->riofd;
/* keep receive position for continuation of partial lines! */
rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol);
- return rc || IoResultCheck(GetLastError(), lpo, msg);
+ return rc || IoResultCheck(GetLastError(), lpo, msgh);
}
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
@@ -731,7 +731,7 @@ OnSerialWaitComplete(
IoCtx_t * lpo
)
{
- static const char * const msg =
+ static const char * const msgh =
"OnSerialWaitComplete: wait for COM event failed";
DevCtx_t * dev;
@@ -740,7 +740,7 @@ OnSerialWaitComplete(
u_long covc;
/* Make sure this RIO is not closed. */
- if (NULL == getRioFromIoCtx(lpo, key, msg))
+ if (NULL == getRioFromIoCtx(lpo, key, msgh))
return;
/* start next IO and leave if we hit an error */
@@ -847,7 +847,7 @@ QueueSerialReadCommon(
recvbuf_t * buff
)
{
- static const char * const msg =
+ static const char * const msgh =
"QueueSerialRead: cannot schedule device read";
BOOL rc;
@@ -866,7 +866,7 @@ QueueSerialReadCommon(
(char*)buff->recv_buffer + buff->recv_length,
sizeof(buff->recv_buffer) - buff->recv_length,
NULL, &lpo->ol);
- return rc || IoResultCheck(GetLastError(), lpo, msg);
+ return rc || IoResultCheck(GetLastError(), lpo, msgh);
}
/*
@@ -904,11 +904,11 @@ OnSerialReadComplete(
IoCtx_t * lpo
)
{
- static const char * const msg =
+ static const char * const msgh =
"OnSerialReadComplete: read from device failed";
/* Make sure this RIO is not closed. */
- if (NULL == getRioFromIoCtx(lpo, key, msg))
+ if (NULL == getRioFromIoCtx(lpo, key, msgh))
return;
/* start next IO and leave if we hit an error */
@@ -1035,7 +1035,12 @@ OnSerialReadWorker(
st_new_obuf:
/* Get new receive buffer to store the line. */
- obuf = get_free_recv_buffer_alloc();
+ obuf = get_free_recv_buffer_alloc(TRUE);
+ if (!obuf) {
+ ++packets_dropped; /* maybe atomic? */
+ buff->recv_length = 0;
+ goto st_read_fresh;
+ }
obuf->fd = buff->fd;
obuf->receiver = buff->receiver;
obuf->dstadr = NULL;
@@ -1154,11 +1159,11 @@ OnRawSerialReadComplete(
IoCtx_t * lpo
)
{
- static const char * const msg =
+ static const char * const msgh =
"OnRawSerialReadComplete: read from device failed";
recvbuf_t * buff = lpo->recv_buf;
- RIO_t * rio = getRioFromIoCtx(lpo, key, msg);
+ RIO_t * rio = getRioFromIoCtx(lpo, key, msgh);
/* Make sure this RIO is not closed. */
if (rio == NULL)
return;
@@ -1167,10 +1172,16 @@ OnRawSerialReadComplete(
if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) {
buff->recv_length = (int)lpo->byteCount;
set_serial_recv_time(buff, lpo);
- iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff);
- buff = get_free_recv_buffer_alloc();
+ lpo->recv_buf = get_free_recv_buffer_alloc(TRUE);
+ if (lpo->recv_buf) {
+ iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff);
+ } else {
+ ++packets_dropped; /* maybe atomic? */
+ buff->recv_length = 0;
+ lpo->recv_buf = buff;
+ }
}
- IoCtxStartChecked(lpo, QueueSerialWait, buff);
+ IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
}
@@ -1217,7 +1228,7 @@ async_write(
unsigned int count
)
{
- static const char * const msg =
+ static const char * const msgh =
"async_write: cannot schedule device write";
static const char * const dmsg =
"overlapped IO data buffer";
@@ -1242,7 +1253,7 @@ async_write(
rc = WriteFile(lpo->io.hnd, lpo->trans_buf, count,
NULL, &lpo->ol);
- if (rc || IoResultCheck(GetLastError(), lpo, msg))
+ if (rc || IoResultCheck(GetLastError(), lpo, msgh))
return count; /* normal/success return */
errno = EBADF;
@@ -1264,10 +1275,10 @@ OnSerialWriteComplete(
* error processing, and it returns with a valid RIO, just
* drop the complete context.
*/
- static const char * const msg =
+ static const char * const msgh =
"OnSerialWriteComplete: serial output failed";
- if (NULL != getRioFromIoCtx(lpo, key, msg))
+ if (NULL != getRioFromIoCtx(lpo, key, msgh))
IoCtxRelease(lpo);
}
@@ -1367,6 +1378,7 @@ io_completion_port_add_clock_io(
IoCtx_t * lpo;
HANDLE h;
IoHndPad_T * iopad = NULL;
+ recvbuf_t * rbuf;
/* preset to clear state for error cleanup:*/
rio->ioreg_ctx = NULL;
@@ -1395,8 +1407,7 @@ io_completion_port_add_clock_io(
}
if (NULL == (lpo = IoCtxAlloc(iopad, rio->device_ctx))) {
- msyslog(LOG_ERR, "%: Failed to allocate IO context",
- msgh);
+ msyslog(LOG_ERR, "%: no IO context: %m", msgh);
goto fail;
}
@@ -1407,7 +1418,11 @@ io_completion_port_add_clock_io(
}
lpo->io.hnd = h;
memset(&lpo->aux, 0, sizeof(lpo->aux));
- return QueueSerialWait(lpo, get_free_recv_buffer_alloc());
+ if (NULL == (rbuf = get_free_recv_buffer_alloc(TRUE))) {
+ msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh);
+ goto fail;
+ }
+ return QueueSerialWait(lpo, rbuf);
fail:
rio->ioreg_ctx = iohpDetach(rio->ioreg_ctx);
@@ -1470,7 +1485,7 @@ QueueSocketRecv(
recvbuf_t * buff
)
{
- static const char * const msg =
+ static const char * const msgh =
"QueueSocketRecv: cannot schedule socket receive";
WSABUF wsabuf;
@@ -1492,7 +1507,7 @@ QueueSocketRecv(
rc = WSARecvFrom(lpo->io.sfd, &wsabuf, 1, NULL, &lpo->ioFlags,
&buff->recv_srcadr.sa, &buff->recv_srcadr_len,
&lpo->ol, NULL);
- return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg);
+ return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh);
}
/* ----------------------------------------------------------------- */
@@ -1502,7 +1517,7 @@ OnSocketRecv(
IoCtx_t * lpo
)
{
- static const char * const msg =
+ static const char * const msgh =
"OnSocketRecv: receive from socket failed";
recvbuf_t * buff = NULL;
@@ -1511,7 +1526,7 @@ OnSocketRecv(
int rc;
/* order is important -- check first, then get endpoint! */
- rc = socketErrorCheck(lpo, msg);
+ rc = socketErrorCheck(lpo, msgh);
ep = getEndptFromIoCtx(lpo, key);
/* Make sure this endpoint is not closed. */
@@ -1528,10 +1543,15 @@ OnSocketRecv(
if (rc == PKT_OK && lpo->byteCount > 0) {
/* keep input buffer, create new one for IO */
buff = lpo->recv_buf;
- lpo->recv_buf = get_free_recv_buffer_alloc();
-
- buff->recv_time = lpo->aux.RecvTime;
- buff->recv_length = (int)lpo->byteCount;
+ lpo->recv_buf = get_free_recv_buffer_alloc(FALSE);
+ if (lpo->recv_buf) {
+ buff->recv_time = lpo->aux.RecvTime;
+ buff->recv_length = (int)lpo->byteCount;
+ } else {
+ lpo->recv_buf = buff;
+ buff = NULL;
+ ++packets_dropped; /* maybe atomic? */
+ }
} /* Note: else we use the current buffer again */
@@ -1547,7 +1567,7 @@ OnSocketRecv(
* then feed it to the input queue. And we can be sure we have
* a packet here, so we can update the stats.
*/
- if (buff != NULL) {
+ if (buff) {
INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr));
DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n",
(MODE_BROADCAST == get_packet_mode(buff))
@@ -1578,14 +1598,14 @@ OnSocketSend(
)
{
/* this is somewhat easier: */
- static const char * const msg =
+ static const char * const msgh =
"OnSocketSend: send to socket failed";
endpt * ep = NULL;
int rc;
/* order is important -- check first, then get endpoint! */
- rc = socketErrorCheck(lpo, msg);
+ rc = socketErrorCheck(lpo, msgh);
ep = getEndptFromIoCtx(lpo, key);
/* Make sure this endpoint is not closed. */
@@ -1686,13 +1706,14 @@ io_completion_port_add_socket(
/* Assume the endpoint is already registered. Set the socket
* handle into the proper slot, and then start up the IO engine.
*/
- static const char * const msg =
+ static const char * const msgh =
"Can't add socket to i/o completion port";
IoCtx_t * lpo;
size_t n;
ULONG_PTR key;
IoHndPad_T * iopad = NULL;
+ recvbuf_t * rbuf;
key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast;
@@ -1709,16 +1730,20 @@ io_completion_port_add_socket(
if (NULL == CreateIoCompletionPort((HANDLE)sfd,
hndIOCPLPort, key, 0))
{
- msyslog(LOG_ERR, "%s: %m", msg);
+ msyslog(LOG_ERR, "%s: %m", msgh);
goto fail;
}
for (n = s_SockRecvSched; n > 0; --n) {
if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL))) {
- msyslog(LOG_ERR, "%s: no read buffer: %m", msg);
+ msyslog(LOG_ERR, "%s: no IO context: %m", msgh);
goto fail;
}
lpo->io.sfd = sfd;
- if (!QueueSocketRecv(lpo, get_free_recv_buffer_alloc()))
+ if (NULL == (rbuf = get_free_recv_buffer_alloc(FALSE))) {
+ msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh);
+ goto fail;
+ }
+ if (!QueueSocketRecv(lpo, rbuf))
goto fail;
}
return TRUE;
@@ -1765,7 +1790,7 @@ io_completion_port_sendto(
sockaddr_u * dest
)
{
- static const char * const msg =
+ static const char * const msgh =
"sendto: cannot schedule socket send";
static const char * const dmsg =
"overlapped IO data buffer";
@@ -1798,7 +1823,7 @@ io_completion_port_sendto(
rc = WSASendTo(sfd, &wsabuf, 1, NULL, 0,
&dest->sa, SOCKLEN(dest),
&lpo->ol, NULL);
- if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg))
+ if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh))
return (int)len; /* normal/success return */
errno = EBADF;