diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h index e2f6b79d5517..283db0ea3db4 100644 --- a/include/trace/events/rxrpc.h +++ b/include/trace/events/rxrpc.h @@ -218,7 +218,6 @@ EM(rxrpc_conn_put_call, "PUT call ") \ EM(rxrpc_conn_put_call_input, "PUT inp-call") \ EM(rxrpc_conn_put_conn_input, "PUT inp-conn") \ - EM(rxrpc_conn_put_discard, "PUT discard ") \ EM(rxrpc_conn_put_discard_idle, "PUT disc-idl") \ EM(rxrpc_conn_put_local_dead, "PUT loc-dead") \ EM(rxrpc_conn_put_noreuse, "PUT noreuse ") \ @@ -240,12 +239,11 @@ EM(rxrpc_client_chan_activate, "ChActv") \ EM(rxrpc_client_chan_disconnect, "ChDisc") \ EM(rxrpc_client_chan_pass, "ChPass") \ - EM(rxrpc_client_chan_wait_failed, "ChWtFl") \ EM(rxrpc_client_cleanup, "Clean ") \ EM(rxrpc_client_discard, "Discar") \ - EM(rxrpc_client_duplicate, "Duplic") \ EM(rxrpc_client_exposed, "Expose") \ EM(rxrpc_client_replace, "Replac") \ + EM(rxrpc_client_queue_new_call, "Q-Call") \ EM(rxrpc_client_to_active, "->Actv") \ E_(rxrpc_client_to_idle, "->Idle") @@ -273,6 +271,7 @@ EM(rxrpc_call_put_sendmsg, "PUT sendmsg ") \ EM(rxrpc_call_put_unnotify, "PUT unnotify") \ EM(rxrpc_call_put_userid_exists, "PUT u-exists") \ + EM(rxrpc_call_put_userid, "PUT user-id ") \ EM(rxrpc_call_see_accept, "SEE accept ") \ EM(rxrpc_call_see_activate_client, "SEE act-clnt") \ EM(rxrpc_call_see_connect_failed, "SEE con-fail") \ diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index de84061a5447..007258538bee 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -292,7 +292,6 @@ struct rxrpc_local { struct rb_root client_bundles; /* Client connection bundles by socket params */ spinlock_t client_bundles_lock; /* Lock for client_bundles */ bool kill_all_client_conns; - spinlock_t client_conn_cache_lock; /* Lock for ->*_client_conns */ struct list_head idle_client_conns; struct timer_list client_conn_reap_timer; unsigned long client_conn_flags; @@ -304,7 +303,8 @@ struct rxrpc_local { bool dead; bool service_closed; /* Service socket closed */ struct idr conn_ids; /* List of connection IDs */ - spinlock_t conn_lock; /* Lock for client connection pool */ + struct list_head new_client_calls; /* Newly created client calls need connection */ + spinlock_t client_call_lock; /* Lock for ->new_client_calls */ struct sockaddr_rxrpc srx; /* local address */ }; @@ -385,7 +385,6 @@ enum rxrpc_call_completion { * Bits in the connection flags. */ enum rxrpc_conn_flag { - RXRPC_CONN_HAS_IDR, /* Has a client conn ID assigned */ RXRPC_CONN_IN_SERVICE_CONNS, /* Conn is in peer->service_conns */ RXRPC_CONN_DONT_REUSE, /* Don't reuse this connection */ RXRPC_CONN_PROBING_FOR_UPGRADE, /* Probing for service upgrade */ @@ -413,6 +412,7 @@ enum rxrpc_conn_event { */ enum rxrpc_conn_proto_state { RXRPC_CONN_UNUSED, /* Connection not yet attempted */ + RXRPC_CONN_CLIENT_UNSECURED, /* Client connection needs security init */ RXRPC_CONN_CLIENT, /* Client connection */ RXRPC_CONN_SERVICE_PREALLOC, /* Service connection preallocation */ RXRPC_CONN_SERVICE_UNSECURED, /* Service unsecured connection */ @@ -436,11 +436,9 @@ struct rxrpc_bundle { u32 security_level; /* Security level selected */ u16 service_id; /* Service ID for this connection */ bool try_upgrade; /* True if the bundle is attempting upgrade */ - bool alloc_conn; /* True if someone's getting a conn */ bool exclusive; /* T if conn is exclusive */ bool upgrade; /* T if service ID can be upgraded */ - short alloc_error; /* Error from last conn allocation */ - spinlock_t channel_lock; + unsigned short alloc_error; /* Error from last conn allocation */ struct rb_node local_node; /* Node in local->client_conns */ struct list_head waiting_calls; /* Calls waiting for channels */ unsigned long avail_chans; /* Mask of available channels */ @@ -468,7 +466,7 @@ struct rxrpc_connection { unsigned char act_chans; /* Mask of active channels */ struct rxrpc_channel { unsigned long final_ack_at; /* Time at which to issue final ACK */ - struct rxrpc_call __rcu *call; /* Active call */ + struct rxrpc_call *call; /* Active call */ unsigned int call_debug_id; /* call->debug_id */ u32 call_id; /* ID of current call */ u32 call_counter; /* Call ID counter */ @@ -489,6 +487,7 @@ struct rxrpc_connection { struct list_head link; /* link in master connection list */ struct sk_buff_head rx_queue; /* received conn-level packets */ + struct mutex security_lock; /* Lock for security management */ const struct rxrpc_security *security; /* applied security module */ union { struct { @@ -619,7 +618,7 @@ struct rxrpc_call { struct work_struct destroyer; /* In-process-context destroyer */ rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */ struct list_head link; /* link in master call list */ - struct list_head chan_wait_link; /* Link in conn->bundle->waiting_calls */ + struct list_head wait_link; /* Link in local->new_client_calls */ struct hlist_node error_link; /* link in error distribution list */ struct list_head accept_link; /* Link in rx->acceptq */ struct list_head recvmsg_link; /* Link in rx->recvmsg_q */ @@ -866,6 +865,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *, struct sockaddr_rxrpc *, struct rxrpc_call_params *, gfp_t, unsigned int); +void rxrpc_start_call_timer(struct rxrpc_call *call); void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *, struct sk_buff *); void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *); @@ -905,6 +905,7 @@ static inline void rxrpc_set_call_state(struct rxrpc_call *call, { /* Order write of completion info before write of ->state. */ smp_store_release(&call->_state, state); + wake_up(&call->waitq); } static inline enum rxrpc_call_state __rxrpc_call_state(const struct rxrpc_call *call) @@ -940,10 +941,11 @@ extern unsigned int rxrpc_reap_client_connections; extern unsigned long rxrpc_conn_idle_client_expiry; extern unsigned long rxrpc_conn_idle_client_fast_expiry; -void rxrpc_destroy_client_conn_ids(struct rxrpc_local *local); +void rxrpc_purge_client_connections(struct rxrpc_local *local); struct rxrpc_bundle *rxrpc_get_bundle(struct rxrpc_bundle *, enum rxrpc_bundle_trace); void rxrpc_put_bundle(struct rxrpc_bundle *, enum rxrpc_bundle_trace); -int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp); +int rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t gfp); +void rxrpc_connect_client_calls(struct rxrpc_local *local); void rxrpc_expose_client_call(struct rxrpc_call *); void rxrpc_disconnect_client_call(struct rxrpc_bundle *, struct rxrpc_call *); void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle); diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c index c94161acf3c4..3ded5a24627c 100644 --- a/net/rxrpc/call_object.c +++ b/net/rxrpc/call_object.c @@ -150,7 +150,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp, timer_setup(&call->timer, rxrpc_call_timer_expired, 0); INIT_WORK(&call->destroyer, rxrpc_destroy_call); INIT_LIST_HEAD(&call->link); - INIT_LIST_HEAD(&call->chan_wait_link); + INIT_LIST_HEAD(&call->wait_link); INIT_LIST_HEAD(&call->accept_link); INIT_LIST_HEAD(&call->recvmsg_link); INIT_LIST_HEAD(&call->sock_link); @@ -242,7 +242,7 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx, /* * Initiate the call ack/resend/expiry timer. */ -static void rxrpc_start_call_timer(struct rxrpc_call *call) +void rxrpc_start_call_timer(struct rxrpc_call *call) { unsigned long now = jiffies; unsigned long j = now + MAX_JIFFY_OFFSET; @@ -286,6 +286,39 @@ static void rxrpc_put_call_slot(struct rxrpc_call *call) up(limiter); } +/* + * Start the process of connecting a call. We obtain a peer and a connection + * bundle, but the actual association of a call with a connection is offloaded + * to the I/O thread to simplify locking. + */ +static int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp) +{ + struct rxrpc_local *local = call->local; + int ret = 0; + + _enter("{%d,%lx},", call->debug_id, call->user_call_ID); + + call->peer = rxrpc_lookup_peer(local, &call->dest_srx, gfp); + if (!call->peer) + goto error; + + ret = rxrpc_look_up_bundle(call, gfp); + if (ret < 0) + goto error; + + trace_rxrpc_client(NULL, -1, rxrpc_client_queue_new_call); + rxrpc_get_call(call, rxrpc_call_get_io_thread); + spin_lock(&local->client_call_lock); + list_add_tail(&call->wait_link, &local->new_client_calls); + spin_unlock(&local->client_call_lock); + rxrpc_wake_up_io_thread(local); + return 0; + +error: + __set_bit(RXRPC_CALL_DISCONNECTED, &call->flags); + return ret; +} + /* * Set up a call for the given parameters. * - Called with the socket lock held, which it must release. @@ -369,10 +402,6 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx, if (ret < 0) goto error_attached_to_socket; - rxrpc_see_call(call, rxrpc_call_see_connected); - - rxrpc_start_call_timer(call); - _leave(" = %p [new]", call); return call; @@ -387,22 +416,20 @@ error_dup_user_ID: rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, -EEXIST); trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0, rxrpc_call_see_userid_exists); - rxrpc_release_call(rx, call); mutex_unlock(&call->user_mutex); rxrpc_put_call(call, rxrpc_call_put_userid_exists); _leave(" = -EEXIST"); return ERR_PTR(-EEXIST); /* We got an error, but the call is attached to the socket and is in - * need of release. However, we might now race with recvmsg() when - * completing the call queues it. Return 0 from sys_sendmsg() and + * need of release. However, we might now race with recvmsg() when it + * completion notifies the socket. Return 0 from sys_sendmsg() and * leave the error to recvmsg() to deal with. */ error_attached_to_socket: trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), ret, rxrpc_call_see_connect_failed); - set_bit(RXRPC_CALL_DISCONNECTED, &call->flags); - rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, ret); + rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret); _leave(" = c=%08x [err]", call->debug_id); return call; } @@ -460,7 +487,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx, chan = sp->hdr.cid & RXRPC_CHANNELMASK; conn->channels[chan].call_counter = call->call_id; conn->channels[chan].call_id = call->call_id; - rcu_assign_pointer(conn->channels[chan].call, call); + conn->channels[chan].call = call; spin_unlock(&conn->state_lock); spin_lock(&conn->peer->lock); @@ -520,7 +547,7 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call) void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call) { struct rxrpc_connection *conn = call->conn; - bool put = false; + bool put = false, putu = false; _enter("{%d,%d}", call->debug_id, refcount_read(&call->ref)); @@ -555,7 +582,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call) if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { rb_erase(&call->sock_node, &rx->calls); memset(&call->sock_node, 0xdd, sizeof(call->sock_node)); - rxrpc_put_call(call, rxrpc_call_put_userid_exists); + putu = true; } list_del(&call->sock_link); @@ -563,6 +590,9 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call) _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn); + if (putu) + rxrpc_put_call(call, rxrpc_call_put_userid); + _leave(""); } diff --git a/net/rxrpc/call_state.c b/net/rxrpc/call_state.c index 27dc1242b712..6afb54373ebb 100644 --- a/net/rxrpc/call_state.c +++ b/net/rxrpc/call_state.c @@ -65,5 +65,5 @@ void rxrpc_prefail_call(struct rxrpc_call *call, enum rxrpc_call_completion comp call->completion = compl; call->_state = RXRPC_CALL_COMPLETE; trace_rxrpc_call_complete(call); - __set_bit(RXRPC_CALL_RELEASED, &call->flags); + WARN_ON_ONCE(__test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags)); } diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c index ebb43f65ebc5..981ca5b98bcb 100644 --- a/net/rxrpc/conn_client.c +++ b/net/rxrpc/conn_client.c @@ -39,61 +39,19 @@ static void rxrpc_activate_bundle(struct rxrpc_bundle *bundle) atomic_inc(&bundle->active); } -/* - * Get a connection ID and epoch for a client connection from the global pool. - * The connection struct pointer is then recorded in the idr radix tree. The - * epoch doesn't change until the client is rebooted (or, at least, unless the - * module is unloaded). - */ -static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn, - gfp_t gfp) -{ - struct rxrpc_local *local = conn->local; - int id; - - _enter(""); - - idr_preload(gfp); - spin_lock(&local->conn_lock); - - id = idr_alloc_cyclic(&local->conn_ids, conn, - 1, 0x40000000, GFP_NOWAIT); - if (id < 0) - goto error; - - spin_unlock(&local->conn_lock); - idr_preload_end(); - - conn->proto.epoch = local->rxnet->epoch; - conn->proto.cid = id << RXRPC_CIDSHIFT; - set_bit(RXRPC_CONN_HAS_IDR, &conn->flags); - _leave(" [CID %x]", conn->proto.cid); - return 0; - -error: - spin_unlock(&local->conn_lock); - idr_preload_end(); - _leave(" = %d", id); - return id; -} - /* * Release a connection ID for a client connection. */ static void rxrpc_put_client_connection_id(struct rxrpc_local *local, struct rxrpc_connection *conn) { - if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) { - spin_lock(&local->conn_lock); - idr_remove(&local->conn_ids, conn->proto.cid >> RXRPC_CIDSHIFT); - spin_unlock(&local->conn_lock); - } + idr_remove(&local->conn_ids, conn->proto.cid >> RXRPC_CIDSHIFT); } /* * Destroy the client connection ID tree. */ -void rxrpc_destroy_client_conn_ids(struct rxrpc_local *local) +static void rxrpc_destroy_client_conn_ids(struct rxrpc_local *local) { struct rxrpc_connection *conn; int id; @@ -129,7 +87,6 @@ static struct rxrpc_bundle *rxrpc_alloc_bundle(struct rxrpc_call *call, bundle->security_level = call->security_level; refcount_set(&bundle->ref, 1); atomic_set(&bundle->active, 1); - spin_lock_init(&bundle->channel_lock); INIT_LIST_HEAD(&bundle->waiting_calls); trace_rxrpc_bundle(bundle->debug_id, 1, rxrpc_bundle_new); } @@ -169,69 +126,68 @@ void rxrpc_put_bundle(struct rxrpc_bundle *bundle, enum rxrpc_bundle_trace why) } } +/* + * Get rid of outstanding client connection preallocations when a local + * endpoint is destroyed. + */ +void rxrpc_purge_client_connections(struct rxrpc_local *local) +{ + rxrpc_destroy_client_conn_ids(local); +} + /* * Allocate a client connection. */ static struct rxrpc_connection * -rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle, gfp_t gfp) +rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle) { struct rxrpc_connection *conn; - struct rxrpc_net *rxnet = bundle->local->rxnet; - int ret; + struct rxrpc_local *local = bundle->local; + struct rxrpc_net *rxnet = local->rxnet; + int id; _enter(""); - conn = rxrpc_alloc_connection(rxnet, gfp); - if (!conn) { - _leave(" = -ENOMEM"); + conn = rxrpc_alloc_connection(rxnet, GFP_ATOMIC | __GFP_NOWARN); + if (!conn) return ERR_PTR(-ENOMEM); + + id = idr_alloc_cyclic(&local->conn_ids, conn, 1, 0x40000000, + GFP_ATOMIC | __GFP_NOWARN); + if (id < 0) { + kfree(conn); + return ERR_PTR(id); } refcount_set(&conn->ref, 1); - conn->bundle = bundle; - conn->local = bundle->local; - conn->peer = bundle->peer; - conn->key = bundle->key; + conn->proto.cid = id << RXRPC_CIDSHIFT; + conn->proto.epoch = local->rxnet->epoch; + conn->out_clientflag = RXRPC_CLIENT_INITIATED; + conn->bundle = rxrpc_get_bundle(bundle, rxrpc_bundle_get_client_conn); + conn->local = rxrpc_get_local(bundle->local, rxrpc_local_get_client_conn); + conn->peer = rxrpc_get_peer(bundle->peer, rxrpc_peer_get_client_conn); + conn->key = key_get(bundle->key); + conn->security = bundle->security; conn->exclusive = bundle->exclusive; conn->upgrade = bundle->upgrade; conn->orig_service_id = bundle->service_id; conn->security_level = bundle->security_level; - conn->out_clientflag = RXRPC_CLIENT_INITIATED; - conn->state = RXRPC_CONN_CLIENT; + conn->state = RXRPC_CONN_CLIENT_UNSECURED; conn->service_id = conn->orig_service_id; - ret = rxrpc_get_client_connection_id(conn, gfp); - if (ret < 0) - goto error_0; - - ret = rxrpc_init_client_conn_security(conn); - if (ret < 0) - goto error_1; + if (conn->security == &rxrpc_no_security) + conn->state = RXRPC_CONN_CLIENT; atomic_inc(&rxnet->nr_conns); write_lock(&rxnet->conn_lock); list_add_tail(&conn->proc_link, &rxnet->conn_proc_list); write_unlock(&rxnet->conn_lock); - rxrpc_get_bundle(bundle, rxrpc_bundle_get_client_conn); - rxrpc_get_peer(conn->peer, rxrpc_peer_get_client_conn); - rxrpc_get_local(conn->local, rxrpc_local_get_client_conn); - key_get(conn->key); - - trace_rxrpc_conn(conn->debug_id, refcount_read(&conn->ref), - rxrpc_conn_new_client); + rxrpc_see_connection(conn, rxrpc_conn_new_client); atomic_inc(&rxnet->nr_client_conns); trace_rxrpc_client(conn, -1, rxrpc_client_alloc); - _leave(" = %p", conn); return conn; - -error_1: - rxrpc_put_client_connection_id(bundle->local, conn); -error_0: - kfree(conn); - _leave(" = %d", ret); - return ERR_PTR(ret); } /* @@ -249,7 +205,8 @@ static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn) if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags)) goto dont_reuse; - if (conn->state != RXRPC_CONN_CLIENT || + if ((conn->state != RXRPC_CONN_CLIENT_UNSECURED && + conn->state != RXRPC_CONN_CLIENT) || conn->proto.epoch != rxnet->epoch) goto mark_dont_reuse; @@ -280,7 +237,7 @@ dont_reuse: * Look up the conn bundle that matches the connection parameters, adding it if * it doesn't yet exist. */ -static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t gfp) +int rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t gfp) { static atomic_t rxrpc_bundle_id; struct rxrpc_bundle *bundle, *candidate; @@ -295,7 +252,7 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t if (test_bit(RXRPC_CALL_EXCLUSIVE, &call->flags)) { call->bundle = rxrpc_alloc_bundle(call, gfp); - return call->bundle; + return call->bundle ? 0 : -ENOMEM; } /* First, see if the bundle is already there. */ @@ -324,7 +281,7 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t /* It wasn't. We need to add one. */ candidate = rxrpc_alloc_bundle(call, gfp); if (!candidate) - return ERR_PTR(-ENOMEM); + return -ENOMEM; _debug("search 2"); spin_lock(&local->client_bundles_lock); @@ -355,7 +312,7 @@ static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_call *call, gfp_t call->bundle = rxrpc_get_bundle(candidate, rxrpc_bundle_get_client_call); spin_unlock(&local->client_bundles_lock); _leave(" = B=%u [new]", call->bundle->debug_id); - return call->bundle; + return 0; found_bundle_free: rxrpc_free_bundle(candidate); @@ -364,160 +321,77 @@ found_bundle: rxrpc_activate_bundle(bundle); spin_unlock(&local->client_bundles_lock); _leave(" = B=%u [found]", call->bundle->debug_id); - return call->bundle; -} - -/* - * Create or find a client bundle to use for a call. - * - * If we return with a connection, the call will be on its waiting list. It's - * left to the caller to assign a channel and wake up the call. - */ -static struct rxrpc_bundle *rxrpc_prep_call(struct rxrpc_call *call, gfp_t gfp) -{ - struct rxrpc_bundle *bundle; - - _enter("{%d,%lx},", call->debug_id, call->user_call_ID); - - call->peer = rxrpc_lookup_peer(call->local, &call->dest_srx, gfp); - if (!call->peer) - goto error; - - call->tx_last_sent = ktime_get_real(); - call->cong_ssthresh = call->peer->cong_ssthresh; - if (call->cong_cwnd >= call->cong_ssthresh) - call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; - else - call->cong_mode = RXRPC_CALL_SLOW_START; - - /* Find the client connection bundle. */ - bundle = rxrpc_look_up_bundle(call, gfp); - if (!bundle) - goto error; - - /* Get this call queued. Someone else may activate it whilst we're - * lining up a new connection, but that's fine. - */ - spin_lock(&bundle->channel_lock); - list_add_tail(&call->chan_wait_link, &bundle->waiting_calls); - spin_unlock(&bundle->channel_lock); - - _leave(" = [B=%x]", bundle->debug_id); - return bundle; - -error: - _leave(" = -ENOMEM"); - return ERR_PTR(-ENOMEM); + return 0; } /* * Allocate a new connection and add it into a bundle. */ -static void rxrpc_add_conn_to_bundle(struct rxrpc_bundle *bundle, gfp_t gfp) - __releases(bundle->channel_lock) +static bool rxrpc_add_conn_to_bundle(struct rxrpc_bundle *bundle, + unsigned int slot) { - struct rxrpc_connection *candidate = NULL, *old = NULL; - bool conflict; - int i; + struct rxrpc_connection *conn, *old; + unsigned int shift = slot * RXRPC_MAXCALLS; + unsigned int i; - _enter(""); - - conflict = bundle->alloc_conn; - if (!conflict) - bundle->alloc_conn = true; - spin_unlock(&bundle->channel_lock); - if (conflict) { - _leave(" [conf]"); - return; + old = bundle->conns[slot]; + if (old) { + bundle->conns[slot] = NULL; + trace_rxrpc_client(old, -1, rxrpc_client_replace); + rxrpc_put_connection(old, rxrpc_conn_put_noreuse); } - candidate = rxrpc_alloc_client_connection(bundle, gfp); - - spin_lock(&bundle->channel_lock); - bundle->alloc_conn = false; - - if (IS_ERR(candidate)) { - bundle->alloc_error = PTR_ERR(candidate); - spin_unlock(&bundle->channel_lock); - _leave(" [err %ld]", PTR_ERR(candidate)); - return; + conn = rxrpc_alloc_client_connection(bundle); + if (IS_ERR(conn)) { + bundle->alloc_error = PTR_ERR(conn); + return false; } - bundle->alloc_error = 0; - - for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) { - unsigned int shift = i * RXRPC_MAXCALLS; - int j; - - old = bundle->conns[i]; - if (!rxrpc_may_reuse_conn(old)) { - if (old) - trace_rxrpc_client(old, -1, rxrpc_client_replace); - candidate->bundle_shift = shift; - rxrpc_activate_bundle(bundle); - bundle->conns[i] = candidate; - for (j = 0; j < RXRPC_MAXCALLS; j++) - set_bit(shift + j, &bundle->avail_chans); - candidate = NULL; - break; - } - - old = NULL; - } - - spin_unlock(&bundle->channel_lock); - - if (candidate) { - _debug("discard C=%x", candidate->debug_id); - trace_rxrpc_client(candidate, -1, rxrpc_client_duplicate); - rxrpc_put_connection(candidate, rxrpc_conn_put_discard); - } - - rxrpc_put_connection(old, rxrpc_conn_put_noreuse); - _leave(""); + rxrpc_activate_bundle(bundle); + conn->bundle_shift = shift; + bundle->conns[slot] = conn; + for (i = 0; i < RXRPC_MAXCALLS; i++) + set_bit(shift + i, &bundle->avail_chans); + return true; } /* * Add a connection to a bundle if there are no usable connections or we have * connections waiting for extra capacity. */ -static void rxrpc_maybe_add_conn(struct rxrpc_bundle *bundle, gfp_t gfp) +static bool rxrpc_bundle_has_space(struct rxrpc_bundle *bundle) { - struct rxrpc_call *call; - int i, usable; + int slot = -1, i, usable; _enter(""); - spin_lock(&bundle->channel_lock); + bundle->alloc_error = 0; /* See if there are any usable connections. */ usable = 0; - for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) + for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) { if (rxrpc_may_reuse_conn(bundle->conns[i])) usable++; - - if (!usable && !list_empty(&bundle->waiting_calls)) { - call = list_first_entry(&bundle->waiting_calls, - struct rxrpc_call, chan_wait_link); - if (test_bit(RXRPC_CALL_UPGRADE, &call->flags)) - bundle->try_upgrade = true; + else if (slot == -1) + slot = i; } + if (!usable && bundle->upgrade) + bundle->try_upgrade = true; + if (!usable) goto alloc_conn; if (!bundle->avail_chans && !bundle->try_upgrade && - !list_empty(&bundle->waiting_calls) && usable < ARRAY_SIZE(bundle->conns)) goto alloc_conn; - spin_unlock(&bundle->channel_lock); _leave(""); - return; + return usable; alloc_conn: - return rxrpc_add_conn_to_bundle(bundle, gfp); + return slot >= 0 ? rxrpc_add_conn_to_bundle(bundle, slot) : false; } /* @@ -531,11 +405,13 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn, struct rxrpc_channel *chan = &conn->channels[channel]; struct rxrpc_bundle *bundle = conn->bundle; struct rxrpc_call *call = list_entry(bundle->waiting_calls.next, - struct rxrpc_call, chan_wait_link); + struct rxrpc_call, wait_link); u32 call_id = chan->call_counter + 1; _enter("C=%x,%u", conn->debug_id, channel); + list_del_init(&call->wait_link); + trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate); /* Cancel the final ACK on the previous call if it hasn't been sent yet @@ -545,65 +421,50 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn, clear_bit(conn->bundle_shift + channel, &bundle->avail_chans); rxrpc_see_call(call, rxrpc_call_see_activate_client); - list_del_init(&call->chan_wait_link); call->conn = rxrpc_get_connection(conn, rxrpc_conn_get_activate_call); call->cid = conn->proto.cid | channel; call->call_id = call_id; call->dest_srx.srx_service = conn->service_id; - - trace_rxrpc_connect_call(call); - - rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_SEND_REQUEST); - - /* Paired with the read barrier in rxrpc_connect_call(). This orders - * cid and epoch in the connection wrt to call_id without the need to - * take the channel_lock. - * - * We provisionally assign a callNumber at this point, but we don't - * confirm it until the call is about to be exposed. - * - * TODO: Pair with a barrier in the data_ready handler when that looks - * at the call ID through a connection channel. - */ - smp_wmb(); + call->cong_ssthresh = call->peer->cong_ssthresh; + if (call->cong_cwnd >= call->cong_ssthresh) + call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; + else + call->cong_mode = RXRPC_CALL_SLOW_START; chan->call_id = call_id; chan->call_debug_id = call->debug_id; - rcu_assign_pointer(chan->call, call); + chan->call = call; + + rxrpc_see_call(call, rxrpc_call_see_connected); + trace_rxrpc_connect_call(call); + call->tx_last_sent = ktime_get_real(); + rxrpc_start_call_timer(call); + rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_SEND_REQUEST); wake_up(&call->waitq); } /* * Remove a connection from the idle list if it's on it. */ -static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn) +static void rxrpc_unidle_conn(struct rxrpc_connection *conn) { - struct rxrpc_local *local = bundle->local; - bool drop_ref; - if (!list_empty(&conn->cache_link)) { - drop_ref = false; - spin_lock(&local->client_conn_cache_lock); - if (!list_empty(&conn->cache_link)) { - list_del_init(&conn->cache_link); - drop_ref = true; - } - spin_unlock(&local->client_conn_cache_lock); - if (drop_ref) - rxrpc_put_connection(conn, rxrpc_conn_put_unidle); + list_del_init(&conn->cache_link); + rxrpc_put_connection(conn, rxrpc_conn_put_unidle); } } /* - * Assign channels and callNumbers to waiting calls with channel_lock - * held by caller. + * Assign channels and callNumbers to waiting calls. */ -static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle) +static void rxrpc_activate_channels(struct rxrpc_bundle *bundle) { struct rxrpc_connection *conn; unsigned long avail, mask; unsigned int channel, slot; + trace_rxrpc_client(NULL, -1, rxrpc_client_activate_chans); + if (bundle->try_upgrade) mask = 1; else @@ -623,7 +484,7 @@ static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle) if (bundle->try_upgrade) set_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags); - rxrpc_unidle_conn(bundle, conn); + rxrpc_unidle_conn(conn); channel &= (RXRPC_MAXCALLS - 1); conn->act_chans |= 1 << channel; @@ -632,125 +493,24 @@ static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle) } /* - * Assign channels and callNumbers to waiting calls. + * Connect waiting channels (called from the I/O thread). */ -static void rxrpc_activate_channels(struct rxrpc_bundle *bundle) +void rxrpc_connect_client_calls(struct rxrpc_local *local) { - _enter("B=%x", bundle->debug_id); + struct rxrpc_call *call; - trace_rxrpc_client(NULL, -1, rxrpc_client_activate_chans); + while ((call = list_first_entry_or_null(&local->new_client_calls, + struct rxrpc_call, wait_link)) + ) { + struct rxrpc_bundle *bundle = call->bundle; - if (!bundle->avail_chans) - return; + spin_lock(&local->client_call_lock); + list_move_tail(&call->wait_link, &bundle->waiting_calls); + spin_unlock(&local->client_call_lock); - spin_lock(&bundle->channel_lock); - rxrpc_activate_channels_locked(bundle); - spin_unlock(&bundle->channel_lock); - _leave(""); -} - -/* - * Wait for a callNumber and a channel to be granted to a call. - */ -static int rxrpc_wait_for_channel(struct rxrpc_bundle *bundle, - struct rxrpc_call *call, gfp_t gfp) -{ - DECLARE_WAITQUEUE(myself, current); - int ret = 0; - - _enter("%d", call->debug_id); - - if (!gfpflags_allow_blocking(gfp)) { - rxrpc_maybe_add_conn(bundle, gfp); - rxrpc_activate_channels(bundle); - ret = bundle->alloc_error ?: -EAGAIN; - goto out; + if (rxrpc_bundle_has_space(bundle)) + rxrpc_activate_channels(bundle); } - - add_wait_queue_exclusive(&call->waitq, &myself); - for (;;) { - rxrpc_maybe_add_conn(bundle, gfp); - rxrpc_activate_channels(bundle); - ret = bundle->alloc_error; - if (ret < 0) - break; - - switch (call->interruptibility) { - case RXRPC_INTERRUPTIBLE: - case RXRPC_PREINTERRUPTIBLE: - set_current_state(TASK_INTERRUPTIBLE); - break; - case RXRPC_UNINTERRUPTIBLE: - default: - set_current_state(TASK_UNINTERRUPTIBLE); - break; - } - if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) - break; - if ((call->interruptibility == RXRPC_INTERRUPTIBLE || - call->interruptibility == RXRPC_PREINTERRUPTIBLE) && - signal_pending(current)) { - ret = -ERESTARTSYS; - break; - } - schedule(); - } - remove_wait_queue(&call->waitq, &myself); - __set_current_state(TASK_RUNNING); - -out: - _leave(" = %d", ret); - return ret; -} - -/* - * find a connection for a call - * - called in process context with IRQs enabled - */ -int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp) -{ - struct rxrpc_bundle *bundle; - int ret = 0; - - _enter("{%d,%lx},", call->debug_id, call->user_call_ID); - - rxrpc_get_call(call, rxrpc_call_get_io_thread); - - bundle = rxrpc_prep_call(call, gfp); - if (IS_ERR(bundle)) { - rxrpc_put_call(call, rxrpc_call_get_io_thread); - ret = PTR_ERR(bundle); - goto out; - } - - if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_AWAIT_CONN) { - ret = rxrpc_wait_for_channel(bundle, call, gfp); - if (ret < 0) - goto wait_failed; - } - -granted_channel: - /* Paired with the write barrier in rxrpc_activate_one_channel(). */ - smp_rmb(); - -out: - _leave(" = %d", ret); - return ret; - -wait_failed: - spin_lock(&bundle->channel_lock); - list_del_init(&call->chan_wait_link); - spin_unlock(&bundle->channel_lock); - - if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) { - ret = 0; - goto granted_channel; - } - - trace_rxrpc_client(call->conn, ret, rxrpc_client_chan_wait_failed); - rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret); - rxrpc_disconnect_client_call(bundle, call); - goto out; } /* @@ -808,8 +568,6 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call _enter("c=%x", call->debug_id); - spin_lock(&bundle->channel_lock); - /* Calls that have never actually been assigned a channel can simply be * discarded. */ @@ -818,8 +576,8 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call _debug("call is waiting"); ASSERTCMP(call->call_id, ==, 0); ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags)); - list_del_init(&call->chan_wait_link); - goto out; + list_del_init(&call->wait_link); + return; } cid = call->cid; @@ -827,10 +585,8 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call chan = &conn->channels[channel]; trace_rxrpc_client(conn, channel, rxrpc_client_chan_disconnect); - if (rcu_access_pointer(chan->call) != call) { - spin_unlock(&bundle->channel_lock); - BUG(); - } + if (WARN_ON(chan->call != call)) + return; may_reuse = rxrpc_may_reuse_conn(conn); @@ -851,16 +607,15 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call trace_rxrpc_client(conn, channel, rxrpc_client_to_active); bundle->try_upgrade = false; if (may_reuse) - rxrpc_activate_channels_locked(bundle); + rxrpc_activate_channels(bundle); } - } /* See if we can pass the channel directly to another call. */ if (may_reuse && !list_empty(&bundle->waiting_calls)) { trace_rxrpc_client(conn, channel, rxrpc_client_chan_pass); rxrpc_activate_one_channel(conn, channel); - goto out; + return; } /* Schedule the final ACK to be transmitted in a short while so that it @@ -878,7 +633,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call } /* Deactivate the channel. */ - rcu_assign_pointer(chan->call, NULL); + chan->call = NULL; set_bit(conn->bundle_shift + channel, &conn->bundle->avail_chans); conn->act_chans &= ~(1 << channel); @@ -891,15 +646,10 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call conn->idle_timestamp = jiffies; rxrpc_get_connection(conn, rxrpc_conn_get_idle); - spin_lock(&local->client_conn_cache_lock); list_move_tail(&conn->cache_link, &local->idle_client_conns); - spin_unlock(&local->client_conn_cache_lock); rxrpc_set_client_reap_timer(local); } - -out: - spin_unlock(&bundle->channel_lock); } /* @@ -909,7 +659,6 @@ static void rxrpc_unbundle_conn(struct rxrpc_connection *conn) { struct rxrpc_bundle *bundle = conn->bundle; unsigned int bindex; - bool need_drop = false; int i; _enter("C=%x", conn->debug_id); @@ -917,18 +666,13 @@ static void rxrpc_unbundle_conn(struct rxrpc_connection *conn) if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK) rxrpc_process_delayed_final_acks(conn, true); - spin_lock(&bundle->channel_lock); bindex = conn->bundle_shift / RXRPC_MAXCALLS; if (bundle->conns[bindex] == conn) { _debug("clear slot %u", bindex); bundle->conns[bindex] = NULL; for (i = 0; i < RXRPC_MAXCALLS; i++) clear_bit(conn->bundle_shift + i, &bundle->avail_chans); - need_drop = true; - } - spin_unlock(&bundle->channel_lock); - - if (need_drop) { + rxrpc_put_client_connection_id(bundle->local, conn); rxrpc_deactivate_bundle(bundle); rxrpc_put_connection(conn, rxrpc_conn_put_unbundle); } @@ -990,24 +734,16 @@ void rxrpc_discard_expired_client_conns(struct rxrpc_local *local) _enter(""); - if (list_empty(&local->idle_client_conns)) { - _leave(" [empty]"); - return; - } - /* We keep an estimate of what the number of conns ought to be after * we've discarded some so that we don't overdo the discarding. */ nr_conns = atomic_read(&local->rxnet->nr_client_conns); next: - spin_lock(&local->client_conn_cache_lock); - - if (list_empty(&local->idle_client_conns)) - goto out; - - conn = list_entry(local->idle_client_conns.next, - struct rxrpc_connection, cache_link); + conn = list_first_entry_or_null(&local->idle_client_conns, + struct rxrpc_connection, cache_link); + if (!conn) + return; if (!local->kill_all_client_conns) { /* If the number of connections is over the reap limit, we @@ -1032,8 +768,6 @@ next: trace_rxrpc_client(conn, -1, rxrpc_client_discard); list_del_init(&conn->cache_link); - spin_unlock(&local->client_conn_cache_lock); - rxrpc_unbundle_conn(conn); /* Drop the ->cache_link ref */ rxrpc_put_connection(conn, rxrpc_conn_put_discard_idle); @@ -1053,8 +787,6 @@ not_yet_expired: if (!local->kill_all_client_conns) timer_reduce(&local->client_conn_reap_timer, conn_expires_at); -out: - spin_unlock(&local->client_conn_cache_lock); _leave(""); } @@ -1063,34 +795,19 @@ out: */ void rxrpc_clean_up_local_conns(struct rxrpc_local *local) { - struct rxrpc_connection *conn, *tmp; - LIST_HEAD(graveyard); + struct rxrpc_connection *conn; _enter(""); - spin_lock(&local->client_conn_cache_lock); local->kill_all_client_conns = true; - spin_unlock(&local->client_conn_cache_lock); del_timer_sync(&local->client_conn_reap_timer); - spin_lock(&local->client_conn_cache_lock); - - list_for_each_entry_safe(conn, tmp, &local->idle_client_conns, - cache_link) { - if (conn->local == local) { - atomic_dec(&conn->active); - trace_rxrpc_client(conn, -1, rxrpc_client_discard); - list_move(&conn->cache_link, &graveyard); - } - } - - spin_unlock(&local->client_conn_cache_lock); - - while (!list_empty(&graveyard)) { - conn = list_entry(graveyard.next, - struct rxrpc_connection, cache_link); + while ((conn = list_first_entry_or_null(&local->idle_client_conns, + struct rxrpc_connection, cache_link))) { list_del_init(&conn->cache_link); + atomic_dec(&conn->active); + trace_rxrpc_client(conn, -1, rxrpc_client_discard); rxrpc_unbundle_conn(conn); rxrpc_put_connection(conn, rxrpc_conn_put_local_dead); } diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c index 8d0b9ff0a5e1..44414e724415 100644 --- a/net/rxrpc/conn_event.c +++ b/net/rxrpc/conn_event.c @@ -100,9 +100,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, /* If the last call got moved on whilst we were waiting to run, just * ignore this packet. */ - call_id = READ_ONCE(chan->last_call); - /* Sync with __rxrpc_disconnect_call() */ - smp_rmb(); + call_id = chan->last_call; if (skb && call_id != sp->hdr.callNumber) return; @@ -119,9 +117,12 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, iov[2].iov_base = &ack_info; iov[2].iov_len = sizeof(ack_info); + serial = atomic_inc_return(&conn->serial); + pkt.whdr.epoch = htonl(conn->proto.epoch); pkt.whdr.cid = htonl(conn->proto.cid | channel); pkt.whdr.callNumber = htonl(call_id); + pkt.whdr.serial = htonl(serial); pkt.whdr.seq = 0; pkt.whdr.type = chan->last_type; pkt.whdr.flags = conn->out_clientflag; @@ -158,31 +159,15 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, iov[0].iov_len += sizeof(pkt.ack); len += sizeof(pkt.ack) + 3 + sizeof(ack_info); ioc = 3; - break; - default: - return; - } - - /* Resync with __rxrpc_disconnect_call() and check that the last call - * didn't get advanced whilst we were filling out the packets. - */ - smp_rmb(); - if (READ_ONCE(chan->last_call) != call_id) - return; - - serial = atomic_inc_return(&conn->serial); - pkt.whdr.serial = htonl(serial); - - switch (chan->last_type) { - case RXRPC_PACKET_TYPE_ABORT: - break; - case RXRPC_PACKET_TYPE_ACK: trace_rxrpc_tx_ack(chan->call_debug_id, serial, ntohl(pkt.ack.firstPacket), ntohl(pkt.ack.serial), pkt.ack.reason, 0); break; + + default: + return; } ret = kernel_sendmsg(conn->local->socket, &msg, iov, ioc, len); @@ -207,12 +192,8 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn) _enter("{%d},%x", conn->debug_id, conn->abort_code); - spin_lock(&conn->bundle->channel_lock); - for (i = 0; i < RXRPC_MAXCALLS; i++) { - call = rcu_dereference_protected( - conn->channels[i].call, - lockdep_is_held(&conn->bundle->channel_lock)); + call = conn->channels[i].call; if (call) rxrpc_set_call_completion(call, conn->completion, @@ -220,7 +201,6 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn) conn->error); } - spin_unlock(&conn->bundle->channel_lock); _leave(""); } @@ -316,9 +296,7 @@ again: if (!test_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags)) continue; - smp_rmb(); /* vs rxrpc_disconnect_client_call */ - ack_at = READ_ONCE(chan->final_ack_at); - + ack_at = chan->final_ack_at; if (time_before(j, ack_at) && !force) { if (time_before(ack_at, next_j)) { next_j = ack_at; @@ -446,15 +424,8 @@ void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb) if (conn->state != RXRPC_CONN_SERVICE) break; - spin_lock(&conn->bundle->channel_lock); - for (loop = 0; loop < RXRPC_MAXCALLS; loop++) - rxrpc_call_is_secure( - rcu_dereference_protected( - conn->channels[loop].call, - lockdep_is_held(&conn->bundle->channel_lock))); - - spin_unlock(&conn->bundle->channel_lock); + rxrpc_call_is_secure(conn->channels[loop].call); break; } diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c index 3d8c1dc6a82a..ac85d4644a3c 100644 --- a/net/rxrpc/conn_object.c +++ b/net/rxrpc/conn_object.c @@ -67,6 +67,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *rxnet, INIT_WORK(&conn->destructor, rxrpc_clean_up_connection); INIT_LIST_HEAD(&conn->proc_link); INIT_LIST_HEAD(&conn->link); + mutex_init(&conn->security_lock); skb_queue_head_init(&conn->rx_queue); conn->rxnet = rxnet; conn->security = &rxrpc_no_security; @@ -157,7 +158,7 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn, _enter("%d,%x", conn->debug_id, call->cid); - if (rcu_access_pointer(chan->call) == call) { + if (chan->call == call) { /* Save the result of the call so that we can repeat it if necessary * through the channel, whilst disposing of the actual call record. */ @@ -177,12 +178,9 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn, break; } - /* Sync with rxrpc_conn_retransmit(). */ - smp_wmb(); chan->last_call = chan->call_id; chan->call_id = chan->call_counter; - - rcu_assign_pointer(chan->call, NULL); + chan->call = NULL; } _leave(""); @@ -210,10 +208,7 @@ void rxrpc_disconnect_call(struct rxrpc_call *call) if (rxrpc_is_client_call(call)) { rxrpc_disconnect_client_call(call->bundle, call); } else { - spin_lock(&conn->bundle->channel_lock); __rxrpc_disconnect_call(conn, call); - spin_unlock(&conn->bundle->channel_lock); - conn->idle_timestamp = jiffies; if (atomic_dec_and_test(&conn->active)) rxrpc_set_service_reap_timer(conn->rxnet, @@ -316,10 +311,10 @@ static void rxrpc_clean_up_connection(struct work_struct *work) container_of(work, struct rxrpc_connection, destructor); struct rxrpc_net *rxnet = conn->rxnet; - ASSERT(!rcu_access_pointer(conn->channels[0].call) && - !rcu_access_pointer(conn->channels[1].call) && - !rcu_access_pointer(conn->channels[2].call) && - !rcu_access_pointer(conn->channels[3].call)); + ASSERT(!conn->channels[0].call && + !conn->channels[1].call && + !conn->channels[2].call && + !conn->channels[3].call); ASSERT(list_empty(&conn->cache_link)); del_timer_sync(&conn->timer); diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c index 2a55a88b2a5b..f30323de82bd 100644 --- a/net/rxrpc/conn_service.c +++ b/net/rxrpc/conn_service.c @@ -11,7 +11,6 @@ static struct rxrpc_bundle rxrpc_service_dummy_bundle = { .ref = REFCOUNT_INIT(1), .debug_id = UINT_MAX, - .channel_lock = __SPIN_LOCK_UNLOCKED(&rxrpc_service_dummy_bundle.channel_lock), }; /* diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c index a299cc34c140..9e9dfb2fc559 100644 --- a/net/rxrpc/io_thread.c +++ b/net/rxrpc/io_thread.c @@ -369,10 +369,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, return just_discard; } - rcu_read_lock(); - call = rxrpc_try_get_call(rcu_dereference(chan->call), - rxrpc_call_get_input); - rcu_read_unlock(); + call = rxrpc_try_get_call(chan->call, rxrpc_call_get_input); if (sp->hdr.callNumber > chan->call_id) { if (rxrpc_to_client(sp)) { @@ -453,6 +450,9 @@ int rxrpc_io_thread(void *data) continue; } + if (!list_empty(&local->new_client_calls)) + rxrpc_connect_client_calls(local); + /* Process received packets and errors. */ if ((skb = __skb_dequeue(&rx_queue))) { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); @@ -492,7 +492,10 @@ int rxrpc_io_thread(void *data) should_stop = kthread_should_stop(); if (!skb_queue_empty(&local->rx_queue) || !list_empty(&local->call_attend_q) || - !list_empty(&local->conn_attend_q)) { + !list_empty(&local->conn_attend_q) || + !list_empty(&local->new_client_calls) || + test_bit(RXRPC_CLIENT_CONN_REAP_TIMER, + &local->client_conn_flags)) { __set_current_state(TASK_RUNNING); continue; } diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c index 9bc8d08ca12c..b8eaca5d9f22 100644 --- a/net/rxrpc/local_object.c +++ b/net/rxrpc/local_object.c @@ -117,7 +117,6 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net, local->client_bundles = RB_ROOT; spin_lock_init(&local->client_bundles_lock); local->kill_all_client_conns = false; - spin_lock_init(&local->client_conn_cache_lock); INIT_LIST_HEAD(&local->idle_client_conns); timer_setup(&local->client_conn_reap_timer, rxrpc_client_conn_reap_timeout, 0); @@ -133,7 +132,8 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net, if (tmp == 0) tmp = 1; idr_set_cursor(&local->conn_ids, tmp); - spin_lock_init(&local->conn_lock); + INIT_LIST_HEAD(&local->new_client_calls); + spin_lock_init(&local->client_call_lock); trace_rxrpc_local(local->debug_id, rxrpc_local_new, 1, 1); } @@ -435,7 +435,7 @@ void rxrpc_destroy_local(struct rxrpc_local *local) * local endpoint. */ rxrpc_purge_queue(&local->rx_queue); - rxrpc_destroy_client_conn_ids(local); + rxrpc_purge_client_connections(local); } /* diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c index c39ef94602ed..750158a085cd 100644 --- a/net/rxrpc/proc.c +++ b/net/rxrpc/proc.c @@ -12,6 +12,7 @@ static const char *const rxrpc_conn_states[RXRPC_CONN__NR_STATES] = { [RXRPC_CONN_UNUSED] = "Unused ", + [RXRPC_CONN_CLIENT_UNSECURED] = "ClUnsec ", [RXRPC_CONN_CLIENT] = "Client ", [RXRPC_CONN_SERVICE_PREALLOC] = "SvPrealc", [RXRPC_CONN_SERVICE_UNSECURED] = "SvUnsec ", diff --git a/net/rxrpc/rxkad.c b/net/rxrpc/rxkad.c index dfb01e7b90fb..1bf571a66e02 100644 --- a/net/rxrpc/rxkad.c +++ b/net/rxrpc/rxkad.c @@ -1122,36 +1122,31 @@ static int rxkad_verify_response(struct rxrpc_connection *conn, goto protocol_error_free; } - spin_lock(&conn->bundle->channel_lock); for (i = 0; i < RXRPC_MAXCALLS; i++) { - struct rxrpc_call *call; u32 call_id = ntohl(response->encrypted.call_id[i]); + u32 counter = READ_ONCE(conn->channels[i].call_counter); if (call_id > INT_MAX) { rxrpc_abort_conn(conn, skb, RXKADSEALEDINCON, -EPROTO, rxkad_abort_resp_bad_callid); - goto protocol_error_unlock; + goto protocol_error_free; } - if (call_id < conn->channels[i].call_counter) { + if (call_id < counter) { rxrpc_abort_conn(conn, skb, RXKADSEALEDINCON, -EPROTO, rxkad_abort_resp_call_ctr); - goto protocol_error_unlock; + goto protocol_error_free; } - if (call_id > conn->channels[i].call_counter) { - call = rcu_dereference_protected( - conn->channels[i].call, - lockdep_is_held(&conn->bundle->channel_lock)); - if (call && !__rxrpc_call_is_complete(call)) { + if (call_id > counter) { + if (conn->channels[i].call) { rxrpc_abort_conn(conn, skb, RXKADSEALEDINCON, -EPROTO, rxkad_abort_resp_call_state); - goto protocol_error_unlock; + goto protocol_error_free; } conn->channels[i].call_counter = call_id; } } - spin_unlock(&conn->bundle->channel_lock); if (ntohl(response->encrypted.inc_nonce) != conn->rxkad.nonce + 1) { rxrpc_abort_conn(conn, skb, RXKADOUTOFSEQUENCE, -EPROTO, @@ -1179,8 +1174,6 @@ static int rxkad_verify_response(struct rxrpc_connection *conn, _leave(" = 0"); return 0; -protocol_error_unlock: - spin_unlock(&conn->bundle->channel_lock); protocol_error_free: kfree(ticket); protocol_error: diff --git a/net/rxrpc/security.c b/net/rxrpc/security.c index 78af14694618..cd66634dffe6 100644 --- a/net/rxrpc/security.c +++ b/net/rxrpc/security.c @@ -97,38 +97,31 @@ found: */ int rxrpc_init_client_conn_security(struct rxrpc_connection *conn) { - const struct rxrpc_security *sec; struct rxrpc_key_token *token; struct key *key = conn->key; - int ret; + int ret = 0; _enter("{%d},{%x}", conn->debug_id, key_serial(key)); - if (!key) - return 0; - - ret = key_validate(key); - if (ret < 0) - return ret; - for (token = key->payload.data[0]; token; token = token->next) { - sec = rxrpc_security_lookup(token->security_index); - if (sec) + if (token->security_index == conn->security->security_index) goto found; } return -EKEYREJECTED; found: - conn->security = sec; - - ret = conn->security->init_connection_security(conn, token); - if (ret < 0) { - conn->security = &rxrpc_no_security; - return ret; + mutex_lock(&conn->security_lock); + if (conn->state == RXRPC_CONN_CLIENT_UNSECURED) { + ret = conn->security->init_connection_security(conn, token); + if (ret == 0) { + spin_lock(&conn->state_lock); + if (conn->state == RXRPC_CONN_CLIENT_UNSECURED) + conn->state = RXRPC_CONN_CLIENT; + spin_unlock(&conn->state_lock); + } } - - _leave(" = 0"); - return 0; + mutex_unlock(&conn->security_lock); + return ret; } /* diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index a5d0005b7ce5..da49fcf1c456 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -38,6 +38,60 @@ bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error, return false; } +/* + * Wait for a call to become connected. Interruption here doesn't cause the + * call to be aborted. + */ +static int rxrpc_wait_to_be_connected(struct rxrpc_call *call, long *timeo) +{ + DECLARE_WAITQUEUE(myself, current); + int ret = 0; + + _enter("%d", call->debug_id); + + if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) + return call->error; + + add_wait_queue_exclusive(&call->waitq, &myself); + + for (;;) { + ret = call->error; + if (ret < 0) + break; + + switch (call->interruptibility) { + case RXRPC_INTERRUPTIBLE: + case RXRPC_PREINTERRUPTIBLE: + set_current_state(TASK_INTERRUPTIBLE); + break; + case RXRPC_UNINTERRUPTIBLE: + default: + set_current_state(TASK_UNINTERRUPTIBLE); + break; + } + if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN) { + ret = call->error; + break; + } + if ((call->interruptibility == RXRPC_INTERRUPTIBLE || + call->interruptibility == RXRPC_PREINTERRUPTIBLE) && + signal_pending(current)) { + ret = sock_intr_errno(*timeo); + break; + } + *timeo = schedule_timeout(*timeo); + } + + remove_wait_queue(&call->waitq, &myself); + __set_current_state(TASK_RUNNING); + + if (ret == 0 && rxrpc_call_is_complete(call)) + ret = call->error; + + _leave(" = %d", ret); + return ret; +} + /* * Return true if there's sufficient Tx queue space. */ @@ -239,6 +293,16 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT); + ret = rxrpc_wait_to_be_connected(call, &timeo); + if (ret < 0) + return ret; + + if (call->conn->state == RXRPC_CONN_CLIENT_UNSECURED) { + ret = rxrpc_init_client_conn_security(call->conn); + if (ret < 0) + return ret; + } + /* this should be in poll */ sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);