aboutsummaryrefslogtreecommitdiff
path: root/net/rxrpc/ar-input.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rxrpc/ar-input.c')
-rw-r--r--net/rxrpc/ar-input.c60
1 files changed, 33 insertions, 27 deletions
diff --git a/net/rxrpc/ar-input.c b/net/rxrpc/ar-input.c
index 323c3454561..ceb5d619a1d 100644
--- a/net/rxrpc/ar-input.c
+++ b/net/rxrpc/ar-input.c
@@ -42,6 +42,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
bool force, bool terminal)
{
struct rxrpc_skb_priv *sp;
+ struct rxrpc_sock *rx = call->socket;
struct sock *sk;
int skb_len, ret;
@@ -64,7 +65,7 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
return 0;
}
- sk = &call->socket->sk;
+ sk = &rx->sk;
if (!force) {
/* cast skb->rcvbuf to unsigned... It's pointless, but
@@ -89,25 +90,30 @@ int rxrpc_queue_rcv_skb(struct rxrpc_call *call, struct sk_buff *skb,
skb->sk = sk;
atomic_add(skb->truesize, &sk->sk_rmem_alloc);
- /* Cache the SKB length before we tack it onto the receive
- * queue. Once it is added it no longer belongs to us and
- * may be freed by other threads of control pulling packets
- * from the queue.
- */
- skb_len = skb->len;
-
- _net("post skb %p", skb);
- __skb_queue_tail(&sk->sk_receive_queue, skb);
- spin_unlock_bh(&sk->sk_receive_queue.lock);
-
- if (!sock_flag(sk, SOCK_DEAD))
- sk->sk_data_ready(sk, skb_len);
-
if (terminal) {
_debug("<<<< TERMINAL MESSAGE >>>>");
set_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags);
}
+ /* allow interception by a kernel service */
+ if (rx->interceptor) {
+ rx->interceptor(sk, call->user_call_ID, skb);
+ spin_unlock_bh(&sk->sk_receive_queue.lock);
+ } else {
+
+ /* Cache the SKB length before we tack it onto the
+ * receive queue. Once it is added it no longer
+ * belongs to us and may be freed by other threads of
+ * control pulling packets from the queue */
+ skb_len = skb->len;
+
+ _net("post skb %p", skb);
+ __skb_queue_tail(&sk->sk_receive_queue, skb);
+ spin_unlock_bh(&sk->sk_receive_queue.lock);
+
+ if (!sock_flag(sk, SOCK_DEAD))
+ sk->sk_data_ready(sk, skb_len);
+ }
skb = NULL;
} else {
spin_unlock_bh(&sk->sk_receive_queue.lock);
@@ -232,7 +238,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
read_lock(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE &&
!test_and_set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events))
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
read_unlock(&call->state_lock);
}
@@ -267,7 +273,7 @@ enqueue_packet:
atomic_inc(&call->ackr_not_idle);
read_lock(&call->state_lock);
if (call->state < RXRPC_CALL_DEAD)
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
read_unlock(&call->state_lock);
_leave(" = 0 [queued]");
return 0;
@@ -360,7 +366,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
call->state = RXRPC_CALL_REMOTELY_ABORTED;
call->abort_code = abort_code;
set_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
}
goto free_packet_unlock;
@@ -375,7 +381,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
case RXRPC_CALL_CLIENT_SEND_REQUEST:
call->state = RXRPC_CALL_SERVER_BUSY;
set_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
case RXRPC_CALL_SERVER_BUSY:
goto free_packet_unlock;
default:
@@ -419,7 +425,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
read_lock_bh(&call->state_lock);
if (call->state < RXRPC_CALL_DEAD) {
skb_queue_tail(&call->rx_queue, skb);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
skb = NULL;
}
read_unlock_bh(&call->state_lock);
@@ -434,7 +440,7 @@ protocol_error_locked:
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->abort_code = RX_PROTOCOL_ERROR;
set_bit(RXRPC_CALL_ABORT, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
}
free_packet_unlock:
write_unlock_bh(&call->state_lock);
@@ -506,7 +512,7 @@ protocol_error:
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->abort_code = RX_PROTOCOL_ERROR;
set_bit(RXRPC_CALL_ABORT, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
}
write_unlock_bh(&call->state_lock);
_leave("");
@@ -542,7 +548,7 @@ static void rxrpc_post_packet_to_call(struct rxrpc_connection *conn,
switch (call->state) {
case RXRPC_CALL_LOCALLY_ABORTED:
if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
case RXRPC_CALL_REMOTELY_ABORTED:
case RXRPC_CALL_NETWORK_ERROR:
case RXRPC_CALL_DEAD:
@@ -591,7 +597,7 @@ dead_call:
sp->hdr.seq == __constant_cpu_to_be32(1)) {
_debug("incoming call");
skb_queue_tail(&conn->trans->local->accept_queue, skb);
- schedule_work(&conn->trans->local->acceptor);
+ rxrpc_queue_work(&conn->trans->local->acceptor);
goto done;
}
@@ -630,7 +636,7 @@ found_completed_call:
_debug("final ack again");
rxrpc_get_call(call);
set_bit(RXRPC_CALL_ACK_FINAL, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
free_unlock:
read_unlock(&call->state_lock);
@@ -651,7 +657,7 @@ static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
atomic_inc(&conn->usage);
skb_queue_tail(&conn->rx_queue, skb);
- schedule_work(&conn->processor);
+ rxrpc_queue_conn(conn);
}
/*
@@ -767,7 +773,7 @@ cant_route_call:
if (sp->hdr.seq == __constant_cpu_to_be32(1)) {
_debug("first packet");
skb_queue_tail(&local->accept_queue, skb);
- schedule_work(&local->acceptor);
+ rxrpc_queue_work(&local->acceptor);
rxrpc_put_local(local);
_leave(" [incoming]");
return;