Statistics
| Branch: | Tag: | Revision:

root / host / lib / usrp / usrp2 / io_impl.cpp @ 11c83c60

History | View | Annotate | Download (9.9 kB)

1
//
2
// Copyright 2010 Ettus Research LLC
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

    
18
#include "../../transport/vrt_packet_handler.hpp"
19
#include "usrp2_impl.hpp"
20
#include "usrp2_regs.hpp"
21
#include <uhd/utils/thread_priority.hpp>
22
#include <uhd/transport/convert_types.hpp>
23
#include <uhd/transport/alignment_buffer.hpp>
24
#include <boost/format.hpp>
25
#include <boost/asio.hpp> //htonl and ntohl
26
#include <boost/bind.hpp>
27
#include <boost/thread.hpp>
28
#include <iostream>
29

    
30
using namespace uhd;
31
using namespace uhd::usrp;
32
using namespace uhd::transport;
33
namespace asio = boost::asio;
34

    
35
static const int underflow_flags = async_metadata_t::EVENT_CODE_UNDERFLOW | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET;
36

    
37
/***********************************************************************
38
 * io impl details (internal to this file)
39
 * - pirate crew
40
 * - alignment buffer
41
 * - thread loop
42
 * - vrt packet handler states
43
 **********************************************************************/
44
struct usrp2_impl::io_impl{
45
    typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type;
46

    
47
    io_impl(size_t num_frames, size_t width):
48
        packet_handler_recv_state(width),
49
        recv_pirate_booty(alignment_buffer_type::make(num_frames, width)),
50
        async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/))
51
    {
52
        /* NOP */
53
    }
54

    
55
    ~io_impl(void){
56
        recv_pirate_crew_raiding = false;
57
        recv_pirate_crew.interrupt_all();
58
        recv_pirate_crew.join_all();
59
    }
60

    
61
    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, size_t timeout_ms){
62
        boost::this_thread::disable_interruption di; //disable because the wait can throw
63
        return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(timeout_ms));
64
    }
65

    
66
    //state management for the vrt packet handler code
67
    vrt_packet_handler::recv_state packet_handler_recv_state;
68
    vrt_packet_handler::send_state packet_handler_send_state;
69

    
70
    //methods and variables for the pirate crew
71
    void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t);
72
    boost::thread_group recv_pirate_crew;
73
    bool recv_pirate_crew_raiding;
74
    alignment_buffer_type::sptr recv_pirate_booty;
75
    bounded_buffer<async_metadata_t>::sptr async_msg_fifo;
76
};
77

    
78
/***********************************************************************
79
 * Receive Pirate Loop
80
 * - while raiding, loot for recv buffers
81
 * - put booty into the alignment buffer
82
 **********************************************************************/
83
void usrp2_impl::io_impl::recv_pirate_loop(
84
    zero_copy_if::sptr zc_if,
85
    usrp2_mboard_impl::sptr mboard,
86
    size_t index
87
){
88
    set_thread_priority_safe();
89
    recv_pirate_crew_raiding = true;
90
    size_t next_packet_seq = 0;
91

    
92
    while(recv_pirate_crew_raiding){
93
        managed_recv_buffer::sptr buff = zc_if->get_recv_buff();
94
        if (not buff.get()) continue; //ignore timeout/error buffers
95

    
96
        try{
97
            //extract the vrt header packet info
98
            vrt::if_packet_info_t if_packet_info;
99
            if_packet_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
100
            const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>();
101
            vrt::if_hdr_unpack_be(vrt_hdr, if_packet_info);
102

    
103
            //handle a tx async report message
104
            if (if_packet_info.sid == 1 and if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
105

    
106
                //fill in the async metadata
107
                async_metadata_t metadata;
108
                metadata.channel = index;
109
                metadata.has_time_spec = if_packet_info.has_tsi and if_packet_info.has_tsf;
110
                metadata.time_spec = time_spec_t(
111
                    time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq()
112
                );
113
                metadata.event_code = vrt_packet_handler::get_context_code<async_metadata_t::event_code_t>(vrt_hdr, if_packet_info);
114

    
115
                //print the famous U, and push the metadata into the message queue
116
                if (metadata.event_code & underflow_flags) std::cerr << "U" << std::flush;
117
                async_msg_fifo->push_with_pop_on_full(metadata);
118
                continue;
119
            }
120

    
121
            //handle the packet count / sequence number
122
            if (if_packet_info.packet_count != next_packet_seq){
123
                //std::cerr << "S" << (if_packet_info.packet_count - next_packet_seq)%16;
124
                std::cerr << "O" << std::flush; //report overflow (drops in the kernel)
125
            }
126
            next_packet_seq = (if_packet_info.packet_count+1)%16;
127

    
128
            //extract the timespec and round to the nearest packet
129
            UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf);
130
            time_spec_t time(
131
                time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq()
132
            );
133

    
134
            //push the packet into the buffer with the new time
135
            recv_pirate_booty->push_with_pop_on_full(buff, time, index);
136
        }catch(const std::exception &e){
137
            std::cerr << "Error (usrp2 recv pirate loop): " << e.what() << std::endl;
138
        }
139
    }
140
}
141

    
142
/***********************************************************************
143
 * Helper Functions
144
 **********************************************************************/
145
void usrp2_impl::io_init(void){
146
    //send a small data packet so the usrp2 knows the udp source port
147
    BOOST_FOREACH(zero_copy_if::sptr data_transport, _data_transports){
148
        managed_send_buffer::sptr send_buff = data_transport->get_send_buff();
149
        static const boost::uint32_t data = htonl(USRP2_INVALID_VRT_HEADER);
150
        std::memcpy(send_buff->cast<void*>(), &data, sizeof(data));
151
        send_buff->commit(sizeof(data));
152
        //drain the recv buffers (may have junk)
153
        while (data_transport->get_recv_buff().get());
154
    }
155

    
156
    //the number of recv frames is the number for the first transport
157
    //the assumption is that all data transports should be identical
158
    size_t num_frames = _data_transports.front()->get_num_recv_frames();
159

    
160
    //create new io impl
161
    _io_impl = UHD_PIMPL_MAKE(io_impl, (num_frames, _data_transports.size()));
162

    
163
    //create a new pirate thread for each zc if (yarr!!)
164
    for (size_t i = 0; i < _data_transports.size(); i++){
165
        _io_impl->recv_pirate_crew.create_thread(boost::bind(
166
            &usrp2_impl::io_impl::recv_pirate_loop,
167
            _io_impl.get(), _data_transports.at(i),
168
            _mboards.at(i), i
169
        ));
170
    }
171

    
172
    std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl;
173
    std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl;
174
    std::cout << "Recv pirate num frames: " << num_frames << std::endl;
175
}
176

    
177
/***********************************************************************
178
 * Async Data
179
 **********************************************************************/
180
bool usrp2_impl::recv_async_msg(
181
    async_metadata_t &async_metadata, size_t timeout_ms
182
){
183
    boost::this_thread::disable_interruption di; //disable because the wait can throw
184
    return _io_impl->async_msg_fifo->pop_with_timed_wait(
185
        async_metadata, boost::posix_time::milliseconds(timeout_ms)
186
    );
187
}
188

    
189
/***********************************************************************
190
 * Send Data
191
 **********************************************************************/
192
bool get_send_buffs(
193
    const std::vector<udp_zero_copy::sptr> &trans,
194
    vrt_packet_handler::managed_send_buffs_t &buffs
195
){
196
    UHD_ASSERT_THROW(trans.size() == buffs.size());
197
    for (size_t i = 0; i < buffs.size(); i++){
198
        buffs[i] = trans[i]->get_send_buff();
199
    }
200
    return true;
201
}
202

    
203
size_t usrp2_impl::send(
204
    const std::vector<const void *> &buffs, size_t num_samps,
205
    const tx_metadata_t &metadata, const io_type_t &io_type,
206
    send_mode_t send_mode
207
){
208
    return vrt_packet_handler::send(
209
        _io_impl->packet_handler_send_state,       //last state of the send handler
210
        buffs, num_samps,                          //buffer to fill
211
        metadata, send_mode,                       //samples metadata
212
        io_type, _io_helper.get_tx_otw_type(),     //input and output types to convert
213
        _mboards.front()->get_master_clock_freq(), //master clock tick rate
214
        uhd::transport::vrt::if_hdr_pack_be,
215
        boost::bind(&get_send_buffs, _data_transports, _1),
216
        get_max_send_samps_per_packet()
217
    );
218
}
219

    
220
/***********************************************************************
221
 * Receive Data
222
 **********************************************************************/
223
size_t usrp2_impl::recv(
224
    const std::vector<void *> &buffs, size_t num_samps,
225
    rx_metadata_t &metadata, const io_type_t &io_type,
226
    recv_mode_t recv_mode, size_t timeout_ms
227
){
228
    return vrt_packet_handler::recv(
229
        _io_impl->packet_handler_recv_state,       //last state of the recv handler
230
        buffs, num_samps,                          //buffer to fill
231
        metadata, recv_mode,                       //samples metadata
232
        io_type, _io_helper.get_rx_otw_type(),     //input and output types to convert
233
        _mboards.front()->get_master_clock_freq(), //master clock tick rate
234
        uhd::transport::vrt::if_hdr_unpack_be,
235
        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout_ms)
236
    );
237
}