[funini.com] -> [kei@sodan] -> Kernel Reading

root/net/rxrpc/ar-recvmsg.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rxrpc_remove_user_ID
  2. rxrpc_recvmsg
  3. rxrpc_kernel_data_delivered
  4. rxrpc_kernel_is_data_last
  5. rxrpc_kernel_get_abort_code
  6. rxrpc_kernel_get_error_number

/* RxRPC recvmsg() implementation
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 */

#include <linux/net.h>
#include <linux/skbuff.h>
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "ar-internal.h"

/*
 * removal a call's user ID from the socket tree to make the user ID available
 * again and so that it won't be seen again in association with that call
 */
void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
{
        _debug("RELEASE CALL %d", call->debug_id);

        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
                write_lock_bh(&rx->call_lock);
                rb_erase(&call->sock_node, &call->socket->calls);
                clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
                write_unlock_bh(&rx->call_lock);
        }

        read_lock_bh(&call->state_lock);
        if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
            !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
                rxrpc_queue_call(call);
        read_unlock_bh(&call->state_lock);
}

/*
 * receive a message from an RxRPC socket
 * - we need to be careful about two or more threads calling recvmsg
 *   simultaneously
 */
int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock,
                  struct msghdr *msg, size_t len, int flags)
{
        struct rxrpc_skb_priv *sp;
        struct rxrpc_call *call = NULL, *continue_call = NULL;
        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
        struct sk_buff *skb;
        long timeo;
        int copy, ret, ullen, offset, copied = 0;
        u32 abort_code;

        DEFINE_WAIT(wait);

        _enter(",,,%zu,%d", len, flags);

        if (flags & (MSG_OOB | MSG_TRUNC))
                return -EOPNOTSUPP;

        ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long);

        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
        msg->msg_flags |= MSG_MORE;

        lock_sock(&rx->sk);

        for (;;) {
                /* return immediately if a client socket has no outstanding
                 * calls */
                if (RB_EMPTY_ROOT(&rx->calls)) {
                        if (copied)
                                goto out;
                        if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
                                release_sock(&rx->sk);
                                if (continue_call)
                                        rxrpc_put_call(continue_call);
                                return -ENODATA;
                        }
                }

                /* get the next message on the Rx queue */
                skb = skb_peek(&rx->sk.sk_receive_queue);
                if (!skb) {
                        /* nothing remains on the queue */
                        if (copied &&
                            (msg->msg_flags & MSG_PEEK || timeo == 0))
                                goto out;

                        /* wait for a message to turn up */
                        release_sock(&rx->sk);
                        prepare_to_wait_exclusive(rx->sk.sk_sleep, &wait,
                                                  TASK_INTERRUPTIBLE);
                        ret = sock_error(&rx->sk);
                        if (ret)
                                goto wait_error;

                        if (skb_queue_empty(&rx->sk.sk_receive_queue)) {
                                if (signal_pending(current))
                                        goto wait_interrupted;
                                timeo = schedule_timeout(timeo);
                        }
                        finish_wait(rx->sk.sk_sleep, &wait);
                        lock_sock(&rx->sk);
                        continue;
                }

        peek_next_packet:
                sp = rxrpc_skb(skb);
                call = sp->call;
                ASSERT(call != NULL);

                _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);

                /* make sure we wait for the state to be updated in this call */
                spin_lock_bh(&call->lock);
                spin_unlock_bh(&call->lock);

                if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
                        _debug("packet from released call");
                        if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
                                BUG();
                        rxrpc_free_skb(skb);
                        continue;
                }

                /* determine whether to continue last data receive */
                if (continue_call) {
                        _debug("maybe cont");
                        if (call != continue_call ||
                            skb->mark != RXRPC_SKB_MARK_DATA) {
                                release_sock(&rx->sk);
                                rxrpc_put_call(continue_call);
                                _leave(" = %d [noncont]", copied);
                                return copied;
                        }
                }

                rxrpc_get_call(call);

                /* copy the peer address and timestamp */
                if (!continue_call) {
                        if (msg->msg_name && msg->msg_namelen > 0)
                                memcpy(msg->msg_name,
                                       &call->conn->trans->peer->srx,
                                       sizeof(call->conn->trans->peer->srx));
                        sock_recv_timestamp(msg, &rx->sk, skb);
                }

                /* receive the message */
                if (skb->mark != RXRPC_SKB_MARK_DATA)
                        goto receive_non_data_message;

                _debug("recvmsg DATA #%u { %d, %d }",
                       ntohl(sp->hdr.seq), skb->len, sp->offset);

                if (!continue_call) {
                        /* only set the control data once per recvmsg() */
                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
                                       ullen, &call->user_call_ID);
                        if (ret < 0)
                                goto copy_error;
                        ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
                }

                ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
                ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
                call->rx_data_recv = ntohl(sp->hdr.seq);

                ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);

                offset = sp->offset;
                copy = skb->len - offset;
                if (copy > len - copied)
                        copy = len - copied;

                if (skb->ip_summed == CHECKSUM_UNNECESSARY) {
                        ret = skb_copy_datagram_iovec(skb, offset,
                                                      msg->msg_iov, copy);
                } else {
                        ret = skb_copy_and_csum_datagram_iovec(skb, offset,
                                                               msg->msg_iov);
                        if (ret == -EINVAL)
                                goto csum_copy_error;
                }

                if (ret < 0)
                        goto copy_error;

                /* handle piecemeal consumption of data packets */
                _debug("copied %d+%d", copy, copied);

                offset += copy;
                copied += copy;

                if (!(flags & MSG_PEEK))
                        sp->offset = offset;

                if (sp->offset < skb->len) {
                        _debug("buffer full");
                        ASSERTCMP(copied, ==, len);
                        break;
                }

                /* we transferred the whole data packet */
                if (sp->hdr.flags & RXRPC_LAST_PACKET) {
                        _debug("last");
                        if (call->conn->out_clientflag) {
                                 /* last byte of reply received */
                                ret = copied;
                                goto terminal_message;
                        }

                        /* last bit of request received */
                        if (!(flags & MSG_PEEK)) {
                                _debug("eat packet");
                                if (skb_dequeue(&rx->sk.sk_receive_queue) !=
                                    skb)
                                        BUG();
                                rxrpc_free_skb(skb);
                        }
                        msg->msg_flags &= ~MSG_MORE;
                        break;
                }

                /* move on to the next data message */
                _debug("next");
                if (!continue_call)
                        continue_call = sp->call;
                else
                        rxrpc_put_call(call);
                call = NULL;

                if (flags & MSG_PEEK) {
                        _debug("peek next");
                        skb = skb->next;
                        if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue)
                                break;
                        goto peek_next_packet;
                }

                _debug("eat packet");
                if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
                        BUG();
                rxrpc_free_skb(skb);
        }

        /* end of non-terminal data packet reception for the moment */
        _debug("end rcv data");
out:
        release_sock(&rx->sk);
        if (call)
                rxrpc_put_call(call);
        if (continue_call)
                rxrpc_put_call(continue_call);
        _leave(" = %d [data]", copied);
        return copied;

        /* handle non-DATA messages such as aborts, incoming connections and
         * final ACKs */
receive_non_data_message:
        _debug("non-data");

        if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) {
                _debug("RECV NEW CALL");
                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code);
                if (ret < 0)
                        goto copy_error;
                if (!(flags & MSG_PEEK)) {
                        if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
                                BUG();
                        rxrpc_free_skb(skb);
                }
                goto out;
        }

        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
                       ullen, &call->user_call_ID);
        if (ret < 0)
                goto copy_error;
        ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));

        switch (skb->mark) {
        case RXRPC_SKB_MARK_DATA:
                BUG();
        case RXRPC_SKB_MARK_FINAL_ACK:
                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code);
                break;
        case RXRPC_SKB_MARK_BUSY:
                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
                break;
        case RXRPC_SKB_MARK_REMOTE_ABORT:
                abort_code = call->abort_code;
                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
                break;
        case RXRPC_SKB_MARK_NET_ERROR:
                _debug("RECV NET ERROR %d", sp->error);
                abort_code = sp->error;
                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code);
                break;
        case RXRPC_SKB_MARK_LOCAL_ERROR:
                _debug("RECV LOCAL ERROR %d", sp->error);
                abort_code = sp->error;
                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
                               &abort_code);
                break;
        default:
                BUG();
                break;
        }

        if (ret < 0)
                goto copy_error;

terminal_message:
        _debug("terminal");
        msg->msg_flags &= ~MSG_MORE;
        msg->msg_flags |= MSG_EOR;

        if (!(flags & MSG_PEEK)) {
                _net("free terminal skb %p", skb);
                if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
                        BUG();
                rxrpc_free_skb(skb);
                rxrpc_remove_user_ID(rx, call);
        }

        release_sock(&rx->sk);
        rxrpc_put_call(call);
        if (continue_call)
                rxrpc_put_call(continue_call);
        _leave(" = %d", ret);
        return ret;

copy_error:
        _debug("copy error");
        release_sock(&rx->sk);
        rxrpc_put_call(call);
        if (continue_call)
                rxrpc_put_call(continue_call);
        _leave(" = %d", ret);
        return ret;

csum_copy_error:
        _debug("csum error");
        release_sock(&rx->sk);
        if (continue_call)
                rxrpc_put_call(continue_call);
        rxrpc_kill_skb(skb);
        skb_kill_datagram(&rx->sk, skb, flags);
        rxrpc_put_call(call);
        return -EAGAIN;

wait_interrupted:
        ret = sock_intr_errno(timeo);
wait_error:
        finish_wait(rx->sk.sk_sleep, &wait);
        if (continue_call)
                rxrpc_put_call(continue_call);
        if (copied)
                copied = ret;
        _leave(" = %d [waitfail %d]", copied, ret);
        return copied;

}

/**
 * rxrpc_kernel_data_delivered - Record delivery of data message
 * @skb: Message holding data
 *
 * Record the delivery of a data message.  This permits RxRPC to keep its
 * tracking correct.  The socket buffer will be deleted.
 */
void rxrpc_kernel_data_delivered(struct sk_buff *skb)
{
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
        struct rxrpc_call *call = sp->call;

        ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
        ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
        call->rx_data_recv = ntohl(sp->hdr.seq);

        ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
        rxrpc_free_skb(skb);
}

EXPORT_SYMBOL(rxrpc_kernel_data_delivered);

/**
 * rxrpc_kernel_is_data_last - Determine if data message is last one
 * @skb: Message holding data
 *
 * Determine if data message is last one for the parent call.
 */
bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
{
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);

        ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);

        return sp->hdr.flags & RXRPC_LAST_PACKET;
}

EXPORT_SYMBOL(rxrpc_kernel_is_data_last);

/**
 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
 * @skb: Message indicating an abort
 *
 * Get the abort code from an RxRPC abort message.
 */
u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
{
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);

        ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT);

        return sp->call->abort_code;
}

EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);

/**
 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
 * @skb: Message indicating an error
 *
 * Get the error number from an RxRPC error message.
 */
int rxrpc_kernel_get_error_number(struct sk_buff *skb)
{
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);

        return sp->error;
}

EXPORT_SYMBOL(rxrpc_kernel_get_error_number);

/* [<][>][^][v][top][bottom][index][help] */

[funini.com] -> [kei@sodan] -> Kernel Reading