libceph: move msgr1 protocol specific fields to its own struct

A couple whitespace fixups, no functional changes.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
This commit is contained in:
Ilya Dryomov 2020-11-12 16:31:41 +01:00
parent 2f713615dd
commit a56dd9bf47
3 changed files with 257 additions and 247 deletions

View file

@ -264,6 +264,43 @@ struct ceph_msg {
#define BASE_DELAY_INTERVAL (HZ / 4)
#define MAX_DELAY_INTERVAL (15 * HZ)
struct ceph_connection_v1_info {
struct kvec out_kvec[8], /* sending header/footer data */
*out_kvec_cur;
int out_kvec_left; /* kvec's left in out_kvec */
int out_skip; /* skip this many bytes */
int out_kvec_bytes; /* total bytes left */
bool out_more; /* there is more data after the kvecs */
bool out_msg_done;
struct ceph_auth_handshake *auth;
int auth_retry; /* true if we need a newer authorizer */
/* connection negotiation temps */
u8 in_banner[CEPH_BANNER_MAX_LEN];
struct ceph_entity_addr actual_peer_addr;
struct ceph_entity_addr peer_addr_for_me;
struct ceph_msg_connect out_connect;
struct ceph_msg_connect_reply in_reply;
int in_base_pos; /* bytes read */
/* message in temps */
u8 in_tag; /* protocol control byte */
struct ceph_msg_header in_hdr;
__le64 in_temp_ack; /* for reading an ack */
/* message out temps */
struct ceph_msg_header out_hdr;
__le64 out_temp_ack; /* for writing an ack */
struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
stamp */
u32 connect_seq; /* identify the most recent connection
attempt for this session */
u32 peer_global_seq; /* peer's global seq for this connection */
};
/*
* A single connection with another host.
*
@ -281,21 +318,13 @@ struct ceph_connection {
int state; /* CEPH_CON_S_* */
atomic_t sock_state;
struct socket *sock;
struct ceph_entity_addr peer_addr; /* peer address */
struct ceph_entity_addr peer_addr_for_me;
unsigned long flags; /* CEPH_CON_F_* */
const char *error_msg; /* error message, if any */
struct ceph_entity_name peer_name; /* peer name */
struct ceph_entity_addr peer_addr; /* peer address */
u64 peer_features;
u32 connect_seq; /* identify the most recent connection
attempt for this connection, client */
u32 peer_global_seq; /* peer's global seq for this connection */
struct ceph_auth_handshake *auth;
int auth_retry; /* true if we need a newer authorizer */
struct mutex mutex;
@ -306,41 +335,18 @@ struct ceph_connection {
u64 in_seq, in_seq_acked; /* last message received, acked */
/* connection negotiation temps */
char in_banner[CEPH_BANNER_MAX_LEN];
struct ceph_msg_connect out_connect;
struct ceph_msg_connect_reply in_reply;
struct ceph_entity_addr actual_peer_addr;
/* message out temps */
struct ceph_msg_header out_hdr;
struct ceph_msg *in_msg;
struct ceph_msg *out_msg; /* sending message (== tail of
out_sent) */
bool out_msg_done;
struct kvec out_kvec[8], /* sending header/footer data */
*out_kvec_cur;
int out_kvec_left; /* kvec's left in out_kvec */
int out_skip; /* skip this many bytes */
int out_kvec_bytes; /* total bytes left */
int out_more; /* there is more data after the kvecs */
__le64 out_temp_ack; /* for writing an ack */
struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
stamp */
/* message in temps */
struct ceph_msg_header in_hdr;
struct ceph_msg *in_msg;
u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */
char in_tag; /* protocol control byte */
int in_base_pos; /* bytes read */
__le64 in_temp_ack; /* for reading an ack */
struct timespec64 last_keepalive_ack; /* keepalive2 ack stamp */
struct delayed_work work; /* send|recv work */
unsigned long delay; /* current delay interval */
struct ceph_connection_v1_info v1;
};
extern struct page *ceph_zero_page;

View file

@ -1448,11 +1448,11 @@ static void con_fault_finish(struct ceph_connection *con)
* in case we faulted due to authentication, invalidate our
* current tickets so that we can get new ones.
*/
if (con->auth_retry) {
dout("auth_retry %d, invalidating\n", con->auth_retry);
if (con->v1.auth_retry) {
dout("auth_retry %d, invalidating\n", con->v1.auth_retry);
if (con->ops->invalidate_authorizer)
con->ops->invalidate_authorizer(con);
con->auth_retry = 0;
con->v1.auth_retry = 0;
}
if (con->ops->fault)
@ -1631,7 +1631,7 @@ static void clear_standby(struct ceph_connection *con)
if (con->state == CEPH_CON_S_STANDBY) {
dout("clear_standby %p and ++connect_seq\n", con);
con->state = CEPH_CON_S_PREOPEN;
con->connect_seq++;
con->v1.connect_seq++;
WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
}

View file

@ -110,25 +110,25 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
static void con_out_kvec_reset(struct ceph_connection *con)
{
BUG_ON(con->out_skip);
BUG_ON(con->v1.out_skip);
con->out_kvec_left = 0;
con->out_kvec_bytes = 0;
con->out_kvec_cur = &con->out_kvec[0];
con->v1.out_kvec_left = 0;
con->v1.out_kvec_bytes = 0;
con->v1.out_kvec_cur = &con->v1.out_kvec[0];
}
static void con_out_kvec_add(struct ceph_connection *con,
size_t size, void *data)
{
int index = con->out_kvec_left;
int index = con->v1.out_kvec_left;
BUG_ON(con->out_skip);
BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
BUG_ON(con->v1.out_skip);
BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
con->out_kvec[index].iov_len = size;
con->out_kvec[index].iov_base = data;
con->out_kvec_left++;
con->out_kvec_bytes += size;
con->v1.out_kvec[index].iov_len = size;
con->v1.out_kvec[index].iov_base = data;
con->v1.out_kvec_left++;
con->v1.out_kvec_bytes += size;
}
/*
@ -138,15 +138,14 @@ static void con_out_kvec_add(struct ceph_connection *con,
*/
static int con_out_kvec_skip(struct ceph_connection *con)
{
int off = con->out_kvec_cur - con->out_kvec;
int skip = 0;
if (con->out_kvec_bytes > 0) {
skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
BUG_ON(con->out_kvec_bytes < skip);
BUG_ON(!con->out_kvec_left);
con->out_kvec_bytes -= skip;
con->out_kvec_left--;
if (con->v1.out_kvec_bytes > 0) {
skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
BUG_ON(con->v1.out_kvec_bytes < skip);
BUG_ON(!con->v1.out_kvec_left);
con->v1.out_kvec_bytes -= skip;
con->v1.out_kvec_left--;
}
return skip;
@ -186,8 +185,8 @@ static void prepare_write_message_footer(struct ceph_connection *con)
} else {
m->old_footer.flags = m->footer.flags;
}
con->out_more = m->more_to_follow;
con->out_msg_done = true;
con->v1.out_more = m->more_to_follow;
con->v1.out_msg_done = true;
}
/*
@ -199,16 +198,16 @@ static void prepare_write_message(struct ceph_connection *con)
u32 crc;
con_out_kvec_reset(con);
con->out_msg_done = false;
con->v1.out_msg_done = false;
/* Sneak an ack in there first? If we can get it into the same
* TCP packet that's a good thing. */
if (con->in_seq > con->in_seq_acked) {
con->in_seq_acked = con->in_seq;
con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
con_out_kvec_add(con, sizeof (con->out_temp_ack),
&con->out_temp_ack);
con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
&con->v1.out_temp_ack);
}
ceph_con_get_out_msg(con);
@ -223,7 +222,7 @@ static void prepare_write_message(struct ceph_connection *con)
/* tag + hdr + front + middle */
con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
if (m->middle)
@ -233,7 +232,7 @@ static void prepare_write_message(struct ceph_connection *con)
/* fill in hdr crc and finalize hdr */
crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
con->out_msg->hdr.crc = cpu_to_le32(crc);
memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
/* fill in front and middle crc, footer */
crc = crc32c(0, m->front.iov_base, m->front.iov_len);
@ -253,7 +252,7 @@ static void prepare_write_message(struct ceph_connection *con)
con->out_msg->footer.data_crc = 0;
if (m->data_length) {
prepare_message_data(con->out_msg, m->data_length);
con->out_more = 1; /* data + footer will follow */
con->v1.out_more = 1; /* data + footer will follow */
} else {
/* no, queue up footer too and be done */
prepare_write_message_footer(con);
@ -275,11 +274,11 @@ static void prepare_write_ack(struct ceph_connection *con)
con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
con_out_kvec_add(con, sizeof (con->out_temp_ack),
&con->out_temp_ack);
con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
&con->v1.out_temp_ack);
con->out_more = 1; /* more will follow.. eventually.. */
con->v1.out_more = 1; /* more will follow.. eventually.. */
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
@ -294,9 +293,9 @@ static void prepare_write_seq(struct ceph_connection *con)
con_out_kvec_reset(con);
con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
con_out_kvec_add(con, sizeof (con->out_temp_ack),
&con->out_temp_ack);
con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
&con->v1.out_temp_ack);
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
@ -313,9 +312,9 @@ static void prepare_write_keepalive(struct ceph_connection *con)
ktime_get_real_ts64(&now);
con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
ceph_encode_timespec64(&con->out_temp_keepalive2, &now);
con_out_kvec_add(con, sizeof(con->out_temp_keepalive2),
&con->out_temp_keepalive2);
ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
&con->v1.out_temp_keepalive2);
} else {
con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
}
@ -332,19 +331,20 @@ static int get_connect_authorizer(struct ceph_connection *con)
int auth_proto;
if (!con->ops->get_authorizer) {
con->auth = NULL;
con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
con->out_connect.authorizer_len = 0;
con->v1.auth = NULL;
con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
con->v1.out_connect.authorizer_len = 0;
return 0;
}
auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry);
auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
if (IS_ERR(auth))
return PTR_ERR(auth);
con->auth = auth;
con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len);
con->v1.auth = auth;
con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
con->v1.out_connect.authorizer_len =
cpu_to_le32(auth->authorizer_buf_len);
return 0;
}
@ -357,18 +357,19 @@ static void prepare_write_banner(struct ceph_connection *con)
con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
&con->msgr->my_enc_addr);
con->out_more = 0;
con->v1.out_more = 0;
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
static void __prepare_write_connect(struct ceph_connection *con)
{
con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect);
if (con->auth)
con_out_kvec_add(con, con->auth->authorizer_buf_len,
con->auth->authorizer_buf);
con_out_kvec_add(con, sizeof(con->v1.out_connect),
&con->v1.out_connect);
if (con->v1.auth)
con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
con->v1.auth->authorizer_buf);
con->out_more = 0;
con->v1.out_more = 0;
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
}
@ -393,15 +394,15 @@ static int prepare_write_connect(struct ceph_connection *con)
}
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
con->connect_seq, global_seq, proto);
con->v1.connect_seq, global_seq, proto);
con->out_connect.features =
cpu_to_le64(from_msgr(con->msgr)->supported_features);
con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(global_seq);
con->out_connect.protocol_version = cpu_to_le32(proto);
con->out_connect.flags = 0;
con->v1.out_connect.features =
cpu_to_le64(from_msgr(con->msgr)->supported_features);
con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
con->v1.out_connect.protocol_version = cpu_to_le32(proto);
con->v1.out_connect.flags = 0;
ret = get_connect_authorizer(con);
if (ret)
@ -421,35 +422,36 @@ static int write_partial_kvec(struct ceph_connection *con)
{
int ret;
dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
while (con->out_kvec_bytes > 0) {
ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
con->out_kvec_left, con->out_kvec_bytes,
con->out_more);
dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
while (con->v1.out_kvec_bytes > 0) {
ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
con->v1.out_kvec_left,
con->v1.out_kvec_bytes,
con->v1.out_more);
if (ret <= 0)
goto out;
con->out_kvec_bytes -= ret;
if (con->out_kvec_bytes == 0)
con->v1.out_kvec_bytes -= ret;
if (!con->v1.out_kvec_bytes)
break; /* done */
/* account for full iov entries consumed */
while (ret >= con->out_kvec_cur->iov_len) {
BUG_ON(!con->out_kvec_left);
ret -= con->out_kvec_cur->iov_len;
con->out_kvec_cur++;
con->out_kvec_left--;
while (ret >= con->v1.out_kvec_cur->iov_len) {
BUG_ON(!con->v1.out_kvec_left);
ret -= con->v1.out_kvec_cur->iov_len;
con->v1.out_kvec_cur++;
con->v1.out_kvec_left--;
}
/* and for a partially-consumed entry */
if (ret) {
con->out_kvec_cur->iov_len -= ret;
con->out_kvec_cur->iov_base += ret;
con->v1.out_kvec_cur->iov_len -= ret;
con->v1.out_kvec_cur->iov_base += ret;
}
}
con->out_kvec_left = 0;
con->v1.out_kvec_left = 0;
ret = 1;
out:
dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
con->out_kvec_bytes, con->out_kvec_left, ret);
con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
return ret; /* done! */
}
@ -530,17 +532,17 @@ static int write_partial_skip(struct ceph_connection *con)
int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
int ret;
dout("%s %p %d left\n", __func__, con, con->out_skip);
while (con->out_skip > 0) {
size_t size = min(con->out_skip, (int) PAGE_SIZE);
dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
while (con->v1.out_skip > 0) {
size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
if (size == con->out_skip)
if (size == con->v1.out_skip)
more = MSG_MORE;
ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
more);
if (ret <= 0)
goto out;
con->out_skip -= ret;
con->v1.out_skip -= ret;
}
ret = 1;
out:
@ -553,39 +555,39 @@ static int write_partial_skip(struct ceph_connection *con)
static void prepare_read_banner(struct ceph_connection *con)
{
dout("prepare_read_banner %p\n", con);
con->in_base_pos = 0;
con->v1.in_base_pos = 0;
}
static void prepare_read_connect(struct ceph_connection *con)
{
dout("prepare_read_connect %p\n", con);
con->in_base_pos = 0;
con->v1.in_base_pos = 0;
}
static void prepare_read_ack(struct ceph_connection *con)
{
dout("prepare_read_ack %p\n", con);
con->in_base_pos = 0;
con->v1.in_base_pos = 0;
}
static void prepare_read_seq(struct ceph_connection *con)
{
dout("prepare_read_seq %p\n", con);
con->in_base_pos = 0;
con->in_tag = CEPH_MSGR_TAG_SEQ;
con->v1.in_base_pos = 0;
con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
}
static void prepare_read_tag(struct ceph_connection *con)
{
dout("prepare_read_tag %p\n", con);
con->in_base_pos = 0;
con->in_tag = CEPH_MSGR_TAG_READY;
con->v1.in_base_pos = 0;
con->v1.in_tag = CEPH_MSGR_TAG_READY;
}
static void prepare_read_keepalive_ack(struct ceph_connection *con)
{
dout("prepare_read_keepalive_ack %p\n", con);
con->in_base_pos = 0;
con->v1.in_base_pos = 0;
}
/*
@ -595,7 +597,7 @@ static int prepare_read_message(struct ceph_connection *con)
{
dout("prepare_read_message %p\n", con);
BUG_ON(con->in_msg != NULL);
con->in_base_pos = 0;
con->v1.in_base_pos = 0;
con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
return 0;
}
@ -603,13 +605,13 @@ static int prepare_read_message(struct ceph_connection *con)
static int read_partial(struct ceph_connection *con,
int end, int size, void *object)
{
while (con->in_base_pos < end) {
int left = end - con->in_base_pos;
while (con->v1.in_base_pos < end) {
int left = end - con->v1.in_base_pos;
int have = size - left;
int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
if (ret <= 0)
return ret;
con->in_base_pos += ret;
con->v1.in_base_pos += ret;
}
return 1;
}
@ -623,28 +625,28 @@ static int read_partial_banner(struct ceph_connection *con)
int end;
int ret;
dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
/* peer's banner */
size = strlen(CEPH_BANNER);
end = size;
ret = read_partial(con, end, size, con->in_banner);
ret = read_partial(con, end, size, con->v1.in_banner);
if (ret <= 0)
goto out;
size = sizeof (con->actual_peer_addr);
size = sizeof(con->v1.actual_peer_addr);
end += size;
ret = read_partial(con, end, size, &con->actual_peer_addr);
ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
if (ret <= 0)
goto out;
ceph_decode_banner_addr(&con->actual_peer_addr);
ceph_decode_banner_addr(&con->v1.actual_peer_addr);
size = sizeof (con->peer_addr_for_me);
size = sizeof(con->v1.peer_addr_for_me);
end += size;
ret = read_partial(con, end, size, &con->peer_addr_for_me);
ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
if (ret <= 0)
goto out;
ceph_decode_banner_addr(&con->peer_addr_for_me);
ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
out:
return ret;
@ -656,34 +658,34 @@ static int read_partial_connect(struct ceph_connection *con)
int end;
int ret;
dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
size = sizeof (con->in_reply);
size = sizeof(con->v1.in_reply);
end = size;
ret = read_partial(con, end, size, &con->in_reply);
ret = read_partial(con, end, size, &con->v1.in_reply);
if (ret <= 0)
goto out;
if (con->auth) {
size = le32_to_cpu(con->in_reply.authorizer_len);
if (size > con->auth->authorizer_reply_buf_len) {
if (con->v1.auth) {
size = le32_to_cpu(con->v1.in_reply.authorizer_len);
if (size > con->v1.auth->authorizer_reply_buf_len) {
pr_err("authorizer reply too big: %d > %zu\n", size,
con->auth->authorizer_reply_buf_len);
con->v1.auth->authorizer_reply_buf_len);
ret = -EINVAL;
goto out;
}
end += size;
ret = read_partial(con, end, size,
con->auth->authorizer_reply_buf);
con->v1.auth->authorizer_reply_buf);
if (ret <= 0)
goto out;
}
dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
con, (int)con->in_reply.tag,
le32_to_cpu(con->in_reply.connect_seq),
le32_to_cpu(con->in_reply.global_seq));
con, con->v1.in_reply.tag,
le32_to_cpu(con->v1.in_reply.connect_seq),
le32_to_cpu(con->v1.in_reply.global_seq));
out:
return ret;
}
@ -693,7 +695,7 @@ static int read_partial_connect(struct ceph_connection *con)
*/
static int verify_hello(struct ceph_connection *con)
{
if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
pr_err("connect to %s got bad banner\n",
ceph_pr_addr(&con->peer_addr));
con->error_msg = "protocol error, bad banner";
@ -716,15 +718,15 @@ static int process_banner(struct ceph_connection *con)
* end may not yet know their ip address, so if it's 0.0.0.0, give
* them the benefit of the doubt.
*/
if (memcmp(&con->peer_addr, &con->actual_peer_addr,
if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
sizeof(con->peer_addr)) != 0 &&
!(ceph_addr_is_blank(&con->actual_peer_addr) &&
con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
!(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
pr_warn("wrong peer, want %s/%u, got %s/%u\n",
ceph_pr_addr(&con->peer_addr),
le32_to_cpu(con->peer_addr.nonce),
ceph_pr_addr(&con->actual_peer_addr),
le32_to_cpu(con->actual_peer_addr.nonce));
ceph_pr_addr(&con->v1.actual_peer_addr),
le32_to_cpu(con->v1.actual_peer_addr.nonce));
con->error_msg = "wrong peer at address";
return -1;
}
@ -734,8 +736,8 @@ static int process_banner(struct ceph_connection *con)
*/
if (ceph_addr_is_blank(my_addr)) {
memcpy(&my_addr->in_addr,
&con->peer_addr_for_me.in_addr,
sizeof(con->peer_addr_for_me.in_addr));
&con->v1.peer_addr_for_me.in_addr,
sizeof(con->v1.peer_addr_for_me.in_addr));
ceph_addr_set_port(my_addr, 0);
ceph_encode_my_addr(con->msgr);
dout("process_banner learned my addr is %s\n",
@ -749,13 +751,13 @@ static int process_connect(struct ceph_connection *con)
{
u64 sup_feat = from_msgr(con->msgr)->supported_features;
u64 req_feat = from_msgr(con->msgr)->required_features;
u64 server_feat = le64_to_cpu(con->in_reply.features);
u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
int ret;
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
if (con->auth) {
int len = le32_to_cpu(con->in_reply.authorizer_len);
if (con->v1.auth) {
int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
/*
* Any connection that defines ->get_authorizer()
@ -764,9 +766,10 @@ static int process_connect(struct ceph_connection *con)
*
* See get_connect_authorizer().
*/
if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
if (con->v1.in_reply.tag ==
CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
ret = con->ops->add_authorizer_challenge(
con, con->auth->authorizer_reply_buf, len);
con, con->v1.auth->authorizer_reply_buf, len);
if (ret < 0)
return ret;
@ -785,7 +788,7 @@ static int process_connect(struct ceph_connection *con)
}
}
switch (con->in_reply.tag) {
switch (con->v1.in_reply.tag) {
case CEPH_MSGR_TAG_FEATURES:
pr_err("%s%lld %s feature set mismatch,"
" my %llx < server's %llx, missing %llx\n",
@ -800,16 +803,16 @@ static int process_connect(struct ceph_connection *con)
" my %d != server's %d\n",
ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr),
le32_to_cpu(con->out_connect.protocol_version),
le32_to_cpu(con->in_reply.protocol_version));
le32_to_cpu(con->v1.out_connect.protocol_version),
le32_to_cpu(con->v1.in_reply.protocol_version));
con->error_msg = "protocol version mismatch";
return -1;
case CEPH_MSGR_TAG_BADAUTHORIZER:
con->auth_retry++;
con->v1.auth_retry++;
dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
con->auth_retry);
if (con->auth_retry == 2) {
con->v1.auth_retry);
if (con->v1.auth_retry == 2) {
con->error_msg = "connect authorization failure";
return -1;
}
@ -829,7 +832,7 @@ static int process_connect(struct ceph_connection *con)
* dropped messages.
*/
dout("process_connect got RESET peer seq %u\n",
le32_to_cpu(con->in_reply.connect_seq));
le32_to_cpu(con->v1.in_reply.connect_seq));
pr_info("%s%lld %s session reset\n",
ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr));
@ -855,9 +858,9 @@ static int process_connect(struct ceph_connection *con)
* again with a larger value.
*/
dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
le32_to_cpu(con->out_connect.connect_seq),
le32_to_cpu(con->in_reply.connect_seq));
con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
le32_to_cpu(con->v1.out_connect.connect_seq),
le32_to_cpu(con->v1.in_reply.connect_seq));
con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
con_out_kvec_reset(con);
ret = prepare_write_connect(con);
if (ret < 0)
@ -871,10 +874,10 @@ static int process_connect(struct ceph_connection *con)
* again with a larger value.
*/
dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
con->peer_global_seq,
le32_to_cpu(con->in_reply.global_seq));
con->v1.peer_global_seq,
le32_to_cpu(con->v1.in_reply.global_seq));
ceph_get_global_seq(con->msgr,
le32_to_cpu(con->in_reply.global_seq));
le32_to_cpu(con->v1.in_reply.global_seq));
con_out_kvec_reset(con);
ret = prepare_write_connect(con);
if (ret < 0)
@ -896,23 +899,24 @@ static int process_connect(struct ceph_connection *con)
WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
con->state = CEPH_CON_S_OPEN;
con->auth_retry = 0; /* we authenticated; clear flag */
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
con->v1.auth_retry = 0; /* we authenticated; clear flag */
con->v1.peer_global_seq =
le32_to_cpu(con->v1.in_reply.global_seq);
con->v1.connect_seq++;
con->peer_features = server_feat;
dout("process_connect got READY gseq %d cseq %d (%d)\n",
con->peer_global_seq,
le32_to_cpu(con->in_reply.connect_seq),
con->connect_seq);
WARN_ON(con->connect_seq !=
le32_to_cpu(con->in_reply.connect_seq));
con->v1.peer_global_seq,
le32_to_cpu(con->v1.in_reply.connect_seq),
con->v1.connect_seq);
WARN_ON(con->v1.connect_seq !=
le32_to_cpu(con->v1.in_reply.connect_seq));
if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
con->delay = 0; /* reset backoff memory */
if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
prepare_write_seq(con);
prepare_read_seq(con);
} else {
@ -942,10 +946,10 @@ static int process_connect(struct ceph_connection *con)
*/
static int read_partial_ack(struct ceph_connection *con)
{
int size = sizeof (con->in_temp_ack);
int size = sizeof(con->v1.in_temp_ack);
int end = size;
return read_partial(con, end, size, &con->in_temp_ack);
return read_partial(con, end, size, &con->v1.in_temp_ack);
}
/*
@ -953,9 +957,9 @@ static int read_partial_ack(struct ceph_connection *con)
*/
static void process_ack(struct ceph_connection *con)
{
u64 ack = le64_to_cpu(con->in_temp_ack);
u64 ack = le64_to_cpu(con->v1.in_temp_ack);
if (con->in_tag == CEPH_MSGR_TAG_ACK)
if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
ceph_con_discard_sent(con, ack);
else
ceph_con_discard_requeued(con, ack);
@ -1045,39 +1049,39 @@ static int read_partial_message(struct ceph_connection *con)
dout("read_partial_message con %p msg %p\n", con, m);
/* header */
size = sizeof (con->in_hdr);
size = sizeof(con->v1.in_hdr);
end = size;
ret = read_partial(con, end, size, &con->in_hdr);
ret = read_partial(con, end, size, &con->v1.in_hdr);
if (ret <= 0)
return ret;
crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
if (cpu_to_le32(crc) != con->in_hdr.crc) {
crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
pr_err("read_partial_message bad hdr crc %u != expected %u\n",
crc, con->in_hdr.crc);
crc, con->v1.in_hdr.crc);
return -EBADMSG;
}
front_len = le32_to_cpu(con->in_hdr.front_len);
front_len = le32_to_cpu(con->v1.in_hdr.front_len);
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
return -EIO;
middle_len = le32_to_cpu(con->in_hdr.middle_len);
middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
return -EIO;
data_len = le32_to_cpu(con->in_hdr.data_len);
data_len = le32_to_cpu(con->v1.in_hdr.data_len);
if (data_len > CEPH_MSG_MAX_DATA_LEN)
return -EIO;
/* verify seq# */
seq = le64_to_cpu(con->in_hdr.seq);
seq = le64_to_cpu(con->v1.in_hdr.seq);
if ((s64)seq - (s64)con->in_seq < 1) {
pr_info("skipping %s%lld %s seq %lld expected %lld\n",
ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr),
seq, con->in_seq + 1);
con->in_base_pos = -front_len - middle_len - data_len -
sizeof_footer(con);
con->in_tag = CEPH_MSGR_TAG_READY;
con->v1.in_base_pos = -front_len - middle_len - data_len -
sizeof_footer(con);
con->v1.in_tag = CEPH_MSGR_TAG_READY;
return 1;
} else if ((s64)seq - (s64)con->in_seq > 1) {
pr_err("read_partial_message bad seq %lld expected %lld\n",
@ -1090,9 +1094,9 @@ static int read_partial_message(struct ceph_connection *con)
if (!con->in_msg) {
int skip = 0;
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
front_len, data_len);
ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip);
ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
if (ret < 0)
return ret;
@ -1100,9 +1104,9 @@ static int read_partial_message(struct ceph_connection *con)
if (skip) {
/* skip this message */
dout("alloc_msg said skip message\n");
con->in_base_pos = -front_len - middle_len - data_len -
sizeof_footer(con);
con->in_tag = CEPH_MSGR_TAG_READY;
con->v1.in_base_pos = -front_len - middle_len -
data_len - sizeof_footer(con);
con->v1.in_tag = CEPH_MSGR_TAG_READY;
con->in_seq++;
return 1;
}
@ -1214,8 +1218,8 @@ int ceph_con_v1_try_read(struct ceph_connection *con)
BUG_ON(!con->sock);
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos);
dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
con->v1.in_base_pos);
if (con->state == CEPH_CON_S_V1_BANNER) {
ret = read_partial_banner(con);
@ -1253,27 +1257,27 @@ int ceph_con_v1_try_read(struct ceph_connection *con)
WARN_ON(con->state != CEPH_CON_S_OPEN);
if (con->in_base_pos < 0) {
if (con->v1.in_base_pos < 0) {
/*
* skipping + discarding content.
*/
ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
if (ret <= 0)
goto out;
dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
con->in_base_pos += ret;
if (con->in_base_pos)
dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
con->v1.in_base_pos += ret;
if (con->v1.in_base_pos)
goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_READY) {
if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
/*
* what's next?
*/
ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
if (ret <= 0)
goto out;
dout("try_read got tag %d\n", (int)con->in_tag);
switch (con->in_tag) {
dout("try_read got tag %d\n", con->v1.in_tag);
switch (con->v1.in_tag) {
case CEPH_MSGR_TAG_MSG:
prepare_read_message(con);
break;
@ -1291,7 +1295,7 @@ int ceph_con_v1_try_read(struct ceph_connection *con)
goto bad_tag;
}
}
if (con->in_tag == CEPH_MSGR_TAG_MSG) {
if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
ret = read_partial_message(con);
if (ret <= 0) {
switch (ret) {
@ -1307,15 +1311,15 @@ int ceph_con_v1_try_read(struct ceph_connection *con)
}
goto out;
}
if (con->in_tag == CEPH_MSGR_TAG_READY)
if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
goto more;
ceph_con_process_message(con);
if (con->state == CEPH_CON_S_OPEN)
prepare_read_tag(con);
goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_ACK ||
con->in_tag == CEPH_MSGR_TAG_SEQ) {
if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
/*
* the final handshake seq exchange is semantically
* equivalent to an ACK
@ -1326,7 +1330,7 @@ int ceph_con_v1_try_read(struct ceph_connection *con)
process_ack(con);
goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
ret = read_keepalive_ack(con);
if (ret <= 0)
goto out;
@ -1338,7 +1342,7 @@ int ceph_con_v1_try_read(struct ceph_connection *con)
return ret;
bad_tag:
pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
pr_err("try_read bad tag %d\n", con->v1.in_tag);
con->error_msg = "protocol error, garbage tag";
ret = -1;
goto out;
@ -1369,7 +1373,7 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
prepare_read_banner(con);
BUG_ON(con->in_msg);
con->in_tag = CEPH_MSGR_TAG_READY;
con->v1.in_tag = CEPH_MSGR_TAG_READY;
dout("try_write initiating connect on %p new state %d\n",
con, con->state);
ret = ceph_tcp_connect(con);
@ -1380,16 +1384,16 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
}
more:
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
BUG_ON(!con->sock);
/* kvec data queued? */
if (con->out_kvec_left) {
if (con->v1.out_kvec_left) {
ret = write_partial_kvec(con);
if (ret <= 0)
goto out;
}
if (con->out_skip) {
if (con->v1.out_skip) {
ret = write_partial_skip(con);
if (ret <= 0)
goto out;
@ -1397,7 +1401,7 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
/* msg pages? */
if (con->out_msg) {
if (con->out_msg_done) {
if (con->v1.out_msg_done) {
ceph_msg_put(con->out_msg);
con->out_msg = NULL; /* we're done with this one */
goto do_next;
@ -1446,57 +1450,57 @@ void ceph_con_v1_revoke(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
WARN_ON(con->out_skip);
WARN_ON(con->v1.out_skip);
/* footer */
if (con->out_msg_done) {
con->out_skip += con_out_kvec_skip(con);
if (con->v1.out_msg_done) {
con->v1.out_skip += con_out_kvec_skip(con);
} else {
WARN_ON(!msg->data_length);
con->out_skip += sizeof_footer(con);
con->v1.out_skip += sizeof_footer(con);
}
/* data, middle, front */
if (msg->data_length)
con->out_skip += msg->cursor.total_resid;
con->v1.out_skip += msg->cursor.total_resid;
if (msg->middle)
con->out_skip += con_out_kvec_skip(con);
con->out_skip += con_out_kvec_skip(con);
con->v1.out_skip += con_out_kvec_skip(con);
con->v1.out_skip += con_out_kvec_skip(con);
dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
con->out_kvec_bytes, con->out_skip);
con->v1.out_kvec_bytes, con->v1.out_skip);
}
void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
{
unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
/* skip rest of message */
con->in_base_pos = con->in_base_pos -
con->v1.in_base_pos = con->v1.in_base_pos -
sizeof(struct ceph_msg_header) -
front_len -
middle_len -
data_len -
sizeof(struct ceph_msg_footer);
con->in_tag = CEPH_MSGR_TAG_READY;
con->v1.in_tag = CEPH_MSGR_TAG_READY;
con->in_seq++;
dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos);
dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
}
bool ceph_con_v1_opened(struct ceph_connection *con)
{
return con->connect_seq;
return con->v1.connect_seq;
}
void ceph_con_v1_reset_session(struct ceph_connection *con)
{
con->connect_seq = 0;
con->peer_global_seq = 0;
con->v1.connect_seq = 0;
con->v1.peer_global_seq = 0;
}
void ceph_con_v1_reset_protocol(struct ceph_connection *con)
{
con->out_skip = 0;
con->v1.out_skip = 0;
}