SUNRPC: Move TCP receive state variables into private data structure

Move the TCP receive state variables from the generic rpc_xprt structure to
a private structure maintained inside net/sunrpc/xprtsock.c.

Also rename a function/variable pair to refer to RPC fragment headers
instead of record markers, to be consistent with types defined in
sunrpc/*.h.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
This commit is contained in:
Chuck Lever 2006-12-05 16:35:19 -05:00 committed by Trond Myklebust
parent ee0ac0c227
commit 51971139b2
2 changed files with 85 additions and 76 deletions

View File

@ -157,15 +157,6 @@ struct rpc_xprt {
unsigned char shutdown : 1, /* being shut down */ unsigned char shutdown : 1, /* being shut down */
resvport : 1; /* use a reserved port */ resvport : 1; /* use a reserved port */
/*
* State of TCP reply receive stuff
*/
__be32 tcp_recm, /* Fragment header */
tcp_xid; /* Current XID */
u32 tcp_reclen, /* fragment length */
tcp_offset; /* fragment offset */
unsigned long tcp_copied, /* copied to request */
tcp_flags;
/* /*
* Connection of transports * Connection of transports
*/ */

View File

@ -133,6 +133,18 @@ struct sock_xprt {
*/ */
struct socket * sock; struct socket * sock;
struct sock * inet; struct sock * inet;
/*
* State of TCP reply receive
*/
__be32 tcp_fraghdr,
tcp_xid;
u32 tcp_offset,
tcp_reclen;
unsigned long tcp_copied,
tcp_flags;
}; };
static void xs_format_peer_addresses(struct rpc_xprt *xprt) static void xs_format_peer_addresses(struct rpc_xprt *xprt)
@ -628,73 +640,73 @@ static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
{ {
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
size_t len, used; size_t len, used;
char *p; char *p;
p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
used = xs_tcp_copy_data(desc, p, len); used = xs_tcp_copy_data(desc, p, len);
xprt->tcp_offset += used; transport->tcp_offset += used;
if (used != len) if (used != len)
return; return;
xprt->tcp_reclen = ntohl(xprt->tcp_recm); transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
if (xprt->tcp_reclen & RPC_LAST_STREAM_FRAGMENT) if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
xprt->tcp_flags |= XPRT_LAST_FRAG; transport->tcp_flags |= XPRT_LAST_FRAG;
else else
xprt->tcp_flags &= ~XPRT_LAST_FRAG; transport->tcp_flags &= ~XPRT_LAST_FRAG;
xprt->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK; transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
xprt->tcp_flags &= ~XPRT_COPY_RECM; transport->tcp_flags &= ~XPRT_COPY_RECM;
xprt->tcp_offset = 0; transport->tcp_offset = 0;
/* Sanity check of the record length */ /* Sanity check of the record length */
if (unlikely(xprt->tcp_reclen < 4)) { if (unlikely(transport->tcp_reclen < 4)) {
dprintk("RPC: invalid TCP record fragment length\n"); dprintk("RPC: invalid TCP record fragment length\n");
xprt_disconnect(xprt); xprt_disconnect(xprt);
return; return;
} }
dprintk("RPC: reading TCP record fragment of length %d\n", dprintk("RPC: reading TCP record fragment of length %d\n",
xprt->tcp_reclen); transport->tcp_reclen);
} }
static void xs_tcp_check_recm(struct rpc_xprt *xprt) static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
{ {
dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", if (transport->tcp_offset == transport->tcp_reclen) {
xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); transport->tcp_flags |= XPRT_COPY_RECM;
if (xprt->tcp_offset == xprt->tcp_reclen) { transport->tcp_offset = 0;
xprt->tcp_flags |= XPRT_COPY_RECM; if (transport->tcp_flags & XPRT_LAST_FRAG) {
xprt->tcp_offset = 0; transport->tcp_flags &= ~XPRT_COPY_DATA;
if (xprt->tcp_flags & XPRT_LAST_FRAG) { transport->tcp_flags |= XPRT_COPY_XID;
xprt->tcp_flags &= ~XPRT_COPY_DATA; transport->tcp_copied = 0;
xprt->tcp_flags |= XPRT_COPY_XID;
xprt->tcp_copied = 0;
} }
} }
} }
static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) static inline void xs_tcp_read_xid(struct sock_xprt *transport, skb_reader_t *desc)
{ {
size_t len, used; size_t len, used;
char *p; char *p;
len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; len = sizeof(transport->tcp_xid) - transport->tcp_offset;
dprintk("RPC: reading XID (%Zu bytes)\n", len); dprintk("RPC: reading XID (%Zu bytes)\n", len);
p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
used = xs_tcp_copy_data(desc, p, len); used = xs_tcp_copy_data(desc, p, len);
xprt->tcp_offset += used; transport->tcp_offset += used;
if (used != len) if (used != len)
return; return;
xprt->tcp_flags &= ~XPRT_COPY_XID; transport->tcp_flags &= ~XPRT_COPY_XID;
xprt->tcp_flags |= XPRT_COPY_DATA; transport->tcp_flags |= XPRT_COPY_DATA;
xprt->tcp_copied = 4; transport->tcp_copied = 4;
dprintk("RPC: reading reply for XID %08x\n", dprintk("RPC: reading reply for XID %08x\n",
ntohl(xprt->tcp_xid)); ntohl(transport->tcp_xid));
xs_tcp_check_recm(xprt); xs_tcp_check_fraghdr(transport);
} }
static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
{ {
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct rpc_rqst *req; struct rpc_rqst *req;
struct xdr_buf *rcvbuf; struct xdr_buf *rcvbuf;
size_t len; size_t len;
@ -702,34 +714,34 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc
/* Find and lock the request corresponding to this xid */ /* Find and lock the request corresponding to this xid */
spin_lock(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
req = xprt_lookup_rqst(xprt, xprt->tcp_xid); req = xprt_lookup_rqst(xprt, transport->tcp_xid);
if (!req) { if (!req) {
xprt->tcp_flags &= ~XPRT_COPY_DATA; transport->tcp_flags &= ~XPRT_COPY_DATA;
dprintk("RPC: XID %08x request not found!\n", dprintk("RPC: XID %08x request not found!\n",
ntohl(xprt->tcp_xid)); ntohl(transport->tcp_xid));
spin_unlock(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
return; return;
} }
rcvbuf = &req->rq_private_buf; rcvbuf = &req->rq_private_buf;
len = desc->count; len = desc->count;
if (len > xprt->tcp_reclen - xprt->tcp_offset) { if (len > transport->tcp_reclen - transport->tcp_offset) {
skb_reader_t my_desc; skb_reader_t my_desc;
len = xprt->tcp_reclen - xprt->tcp_offset; len = transport->tcp_reclen - transport->tcp_offset;
memcpy(&my_desc, desc, sizeof(my_desc)); memcpy(&my_desc, desc, sizeof(my_desc));
my_desc.count = len; my_desc.count = len;
r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
&my_desc, xs_tcp_copy_data); &my_desc, xs_tcp_copy_data);
desc->count -= r; desc->count -= r;
desc->offset += r; desc->offset += r;
} else } else
r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
desc, xs_tcp_copy_data); desc, xs_tcp_copy_data);
if (r > 0) { if (r > 0) {
xprt->tcp_copied += r; transport->tcp_copied += r;
xprt->tcp_offset += r; transport->tcp_offset += r;
} }
if (r != len) { if (r != len) {
/* Error when copying to the receive buffer, /* Error when copying to the receive buffer,
@ -741,77 +753,79 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc
* Any remaining data from this record will * Any remaining data from this record will
* be discarded. * be discarded.
*/ */
xprt->tcp_flags &= ~XPRT_COPY_DATA; transport->tcp_flags &= ~XPRT_COPY_DATA;
dprintk("RPC: XID %08x truncated request\n", dprintk("RPC: XID %08x truncated request\n",
ntohl(xprt->tcp_xid)); ntohl(transport->tcp_xid));
dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); xprt, transport->tcp_copied, transport->tcp_offset,
transport->tcp_reclen);
goto out; goto out;
} }
dprintk("RPC: XID %08x read %Zd bytes\n", dprintk("RPC: XID %08x read %Zd bytes\n",
ntohl(xprt->tcp_xid), r); ntohl(transport->tcp_xid), r);
dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); xprt, transport->tcp_copied, transport->tcp_offset,
transport->tcp_reclen);
if (xprt->tcp_copied == req->rq_private_buf.buflen) if (transport->tcp_copied == req->rq_private_buf.buflen)
xprt->tcp_flags &= ~XPRT_COPY_DATA; transport->tcp_flags &= ~XPRT_COPY_DATA;
else if (xprt->tcp_offset == xprt->tcp_reclen) { else if (transport->tcp_offset == transport->tcp_reclen) {
if (xprt->tcp_flags & XPRT_LAST_FRAG) if (transport->tcp_flags & XPRT_LAST_FRAG)
xprt->tcp_flags &= ~XPRT_COPY_DATA; transport->tcp_flags &= ~XPRT_COPY_DATA;
} }
out: out:
if (!(xprt->tcp_flags & XPRT_COPY_DATA)) if (!(transport->tcp_flags & XPRT_COPY_DATA))
xprt_complete_rqst(req->rq_task, xprt->tcp_copied); xprt_complete_rqst(req->rq_task, transport->tcp_copied);
spin_unlock(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
xs_tcp_check_recm(xprt); xs_tcp_check_fraghdr(transport);
} }
static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) static inline void xs_tcp_read_discard(struct sock_xprt *transport, skb_reader_t *desc)
{ {
size_t len; size_t len;
len = xprt->tcp_reclen - xprt->tcp_offset; len = transport->tcp_reclen - transport->tcp_offset;
if (len > desc->count) if (len > desc->count)
len = desc->count; len = desc->count;
desc->count -= len; desc->count -= len;
desc->offset += len; desc->offset += len;
xprt->tcp_offset += len; transport->tcp_offset += len;
dprintk("RPC: discarded %Zu bytes\n", len); dprintk("RPC: discarded %Zu bytes\n", len);
xs_tcp_check_recm(xprt); xs_tcp_check_fraghdr(transport);
} }
static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len) static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
{ {
struct rpc_xprt *xprt = rd_desc->arg.data; struct rpc_xprt *xprt = rd_desc->arg.data;
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
skb_reader_t desc = { skb_reader_t desc = {
.skb = skb, .skb = skb,
.offset = offset, .offset = offset,
.count = len, .count = len,
.csum = 0
}; };
dprintk("RPC: xs_tcp_data_recv started\n"); dprintk("RPC: xs_tcp_data_recv started\n");
do { do {
/* Read in a new fragment marker if necessary */ /* Read in a new fragment marker if necessary */
/* Can we ever really expect to get completely empty fragments? */ /* Can we ever really expect to get completely empty fragments? */
if (xprt->tcp_flags & XPRT_COPY_RECM) { if (transport->tcp_flags & XPRT_COPY_RECM) {
xs_tcp_read_fraghdr(xprt, &desc); xs_tcp_read_fraghdr(xprt, &desc);
continue; continue;
} }
/* Read in the xid if necessary */ /* Read in the xid if necessary */
if (xprt->tcp_flags & XPRT_COPY_XID) { if (transport->tcp_flags & XPRT_COPY_XID) {
xs_tcp_read_xid(xprt, &desc); xs_tcp_read_xid(transport, &desc);
continue; continue;
} }
/* Read in the request data */ /* Read in the request data */
if (xprt->tcp_flags & XPRT_COPY_DATA) { if (transport->tcp_flags & XPRT_COPY_DATA) {
xs_tcp_read_request(xprt, &desc); xs_tcp_read_request(xprt, &desc);
continue; continue;
} }
/* Skip over any trailing bytes on short reads */ /* Skip over any trailing bytes on short reads */
xs_tcp_read_discard(xprt, &desc); xs_tcp_read_discard(transport, &desc);
} while (desc.count); } while (desc.count);
dprintk("RPC: xs_tcp_data_recv done\n"); dprintk("RPC: xs_tcp_data_recv done\n");
return len - desc.count; return len - desc.count;
@ -865,11 +879,15 @@ static void xs_tcp_state_change(struct sock *sk)
case TCP_ESTABLISHED: case TCP_ESTABLISHED:
spin_lock_bh(&xprt->transport_lock); spin_lock_bh(&xprt->transport_lock);
if (!xprt_test_and_set_connected(xprt)) { if (!xprt_test_and_set_connected(xprt)) {
struct sock_xprt *transport = container_of(xprt,
struct sock_xprt, xprt);
/* Reset TCP record info */ /* Reset TCP record info */
xprt->tcp_offset = 0; transport->tcp_offset = 0;
xprt->tcp_reclen = 0; transport->tcp_reclen = 0;
xprt->tcp_copied = 0; transport->tcp_copied = 0;
xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; transport->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
xprt_wake_pending_tasks(xprt, 0); xprt_wake_pending_tasks(xprt, 0);
} }