#defile _SILENCE_NONFLOATING_COMPLEX_DEPRECATION_WARNING #include "wavetable.hpp" #include <atomic> #include <boost/algorithm/string.hpp> #include <boost/filesystem.hpp> #include <boost/format.hpp> #include <boost/lexical_cast.hpp> #include <boost/process.hpp> #include <boost/program_options.hpp> #include <chrono> #include <cmath> #include <complex> #include <condition_variable> #include <csignal> #include <fstream> #include <functional> #include <iostream> #include <mutex> #include <numeric> #include <queue> #include <regex> #include <thread> #include <uhd/convert.hpp> #include <uhd/exception.hpp> #include <uhd/types/tune_request.hpp> #include <uhd/usrp/multi_usrp.hpp> #include <uhd/utils/safe_main.hpp> #include <uhd/utils/static.hpp> #include <uhd/utils/thread.hpp> #include <ctime> #include <iomanip> namespace po = boost::program_options; using namespace std::chrono_literals; std::mutex recv_mutex; static bool overflow_message = true; std::atomic<bool> flag_external = false; std::atomic<bool> flag_identical = false; static bool stop_signal_called = false; void sig_int_handler(int) { stop_signal_called = true; } template <typename samp_type> void preload_samples_from_file(const std::string& file, std::vector<samp_type>& samples, double samples_rate) { std::ifstream infile(file.c_str(), std::ifstream::binary); if (!infile.is_open()) { throw std::runtime_error("Failed to open the file."); } infile.seekg(0, std::ios::end); size_t file_size = infile.tellg(); infile.seekg(0, std::ios::beg); size_t num_samples = file_size / sizeof(samp_type); samples.resize(num_samples); infile.read(reinterpret_cast<char*>(samples.data()), file_size); infile.close(); if(infile.gcount() != file_size){ throw std::runtime_error("Error reading the full file. Expected " + std::to_string(file_size) + " bytes, but read: " + std::to_string(infile.gcount()) + " bytes."); } double file_duration = num_samples / sample_rate; // Log information to show samples being loaded into the vector std::cout << "" << std::endl; std::cout << "********************************************" << std::endl; std::cout << "* PRELOADING *" << std::endl; std::cout << "********************************************" << std::endl; std::cout << "Preloaded " << num_samples << " samples into the vector." << std::endl; std::cout << "Memory address of the first sample: " << static_cast<const void*>(samples.data()) << std::endl; std::cout << "Memory address of the last sample: " << static_cast<const void*>(samples.data() + num_samples - 1) << std::endl; std::cout << "Preloaded " << num_samples << "samples into the vector, corresponding to " << file_duration << " seconds of data." << std::endl; } // Function to send samples for the specified TX duration template <typename samp_type> void send_from_file( uhd::tx_streamer::sptr tx_stream, const std::vector<samp_type>& samples, size_t samps_per_buff, unsigned long long num_requested_samples, uhd::usrp::multi_usrp::sptr usrp, double time_requested = 0.0, double tx_rate = 0.0) { const auto start_time = std::chrono::steady_clock::now(); const auto stop_time = start_time + (1s * time_requested); uhd::tx_metadata_t md; md.start_of_burst = true; md.end_of_burst = false; size_t total_samples = samples.size(); size_t sample_index = 0; std::vector<samp_type> buff(samps_per_buff); std::vector<samp_type> buffs(tx_stream->get_num_channels(), &buff.front()); size_t samples_needed_for_tx_duration = static_cast<size_t>(time_requested * tx_rate); size_t samples_to_transmit = std::min(total_samples, samples_needed_for_tx_duration); std::cout << "" << std::endl; std::cout << "********************************************" << std::endl; std::cout << "* TRANSMISSION *" << std::endl; std::cout << "********************************************" << std::endl; std::cout << "Starting transmission of " << total_samples << " samples for " << time_requested << " seconds..." << std::endl; while (sample_index < samples_to_transmit && !stop_signal_called and flag_identical and (num_requested_samples != sample_indx or num_requested_samples == 0) and (time_requested == 0.0 or std::chrono::steady_clock::now() <= stop_time)) { size_t remaining_samples = samples_to_transmit - sample_index; size_t num_tx_samps = std::min(samps_per_buff, remaining_samples); std::copy(samples.begin() + sample_index, samples.begin() + sample_index + num_tx_samps, buff.begin()); sample_index += num_tx_samps; md.end_of_burst = (sample_index == samples_to_transmit); size_t samples_sent = tx_stream->send(buffs, num_tx_samps, md); if (samples_sent != num_tx_samps) { UHD_LOG_ERROR("TX-STREAM", "The tx_stream timed out sending " << num_tx_samps << " samples (" << samples_sent << " sent."); return; } md.start_of_burst = false; // After the first burst } const auto actual_stop_time = std::chrono::steady_clock::now(); const double acutal_duration_seconds = std::chrono::duration<float>(actual_stop_time - start_time).count(); if(stop_signal_called == true){ //std::cout << "Tx only " << actual_duration_seconds << "seconds of the total " << total_samples / tx_rate << "seconds of data." << std::endl; size_t total_data = total_samples/tx_rate; std::cout << boost::format("Tx only %f seconds of the total %d seconds of data. ") % actual_duration_seconds % total_data << std::endl; } else{ if (samples_to_transmit < samples_needed_for_tx_duration) { std::cout << "Tx only " << samples_to_transmit / tx_rate << " seconds, file contains less data than " << time_requested << " seconds." << std::endl; //std::cout << boost::format("Tx only %f seconds, file contains less data than %d seconds. ") % actual_duration_seconds % time_requested << std::endl; } else { //std::cout << "Tx only " << actual_duration_seconds << " seconds of the total " << total_samples / tx_rate << " seconds of data." << std::endl; std::cout << "Tx only " << samples_to_transmit / tx_rate << " seconds of the total " << total_samples / tx_rate << " seconds of data." << std::endl; //size_t total_data = total_samples / tx_rate; //std::cout << boost::format("Tx only %f seconds of the total %d seconds of data. ") % actual_duration_seconds % total_data << std::endl; } } std::cout << "Transmission complete." << std::endl; buff.clear(); samples.clear(); } template <typename samp_type> void send_from_file_preloaded( uhd::tx_streamer::sptr tx_stream, const std::vector<samp_type>& samples, std::vector<samp_type> samples_vector, size_t samps_per_buff, unsigned long long num_requested_samples, uhd::usrp::multi_usrp::sptr usrp, double time_requested = 0.0, double tx_rate = 0.0) { const auto start_time = std::chrono::steady_clock::now(); const auto stop_time = start_time + (1s * time_requested); uhd::tx_metadata_t md; md.start_of_burst = true; md.end_of_burst = false; size_t total_samples = samples.size(); size_t sample_index = 0; std::vector<samp_type> buff(samps_per_buff); std::vector<samp_type> buffs(tx_stream->get_num_channels(), &buff.front()); size_t samples_needed_for_tx_duration = static_cast<size_t>(time_requested * tx_rate); size_t samples_to_transmit = std::min(total_samples, samples_needed_for_tx_duration); std::cout << "" << std::endl; std::cout << "********************************************" << std::endl; std::cout << "* TRANSMISSION *" << std::endl; std::cout << "********************************************" << std::endl; std::cout << "Starting transmission of " << total_samples << " samples for " << time_requested << " seconds..." << std::endl; while (sample_index < samples_to_transmit && !stop_signal_called and flag_identical and (num_requested_samples != sample_indx or num_requested_samples == 0) and (time_requested == 0.0 or std::chrono::steady_clock::now() <= stop_time)) { size_t remaining_samples = samples_to_transmit - sample_index; size_t num_tx_samps = std::min(samps_per_buff, remaining_samples); std::copy(samples.begin() + sample_index, samples.begin() + sample_index + num_tx_samps, buff.begin()); sample_index += num_tx_samps; md.end_of_burst = (sample_index == samples_to_transmit); size_t samples_sent = tx_stream->send(buffs, num_tx_samps, md); if (samples_sent != num_tx_samps) { UHD_LOG_ERROR("TX-STREAM", "The tx_stream timed out sending " << num_tx_samps << " samples (" << samples_sent << " sent."); return; } md.start_of_burst = false; // After the first burst } const auto actual_stop_time = std::chrono::steady_clock::now(); const double acutal_duration_seconds = std::chrono::duration<float>(actual_stop_time - start_time).count(); if(stop_signal_called == true){ //std::cout << "Tx only " << actual_duration_seconds << "seconds of the total " << total_samples / tx_rate << "seconds of data." << std::endl; size_t total_data = total_samples/tx_rate; std::cout << boost::format("Tx only %f seconds of the total %d seconds of data. ") % actual_duration_seconds % total_data << std::endl; } else{ if (samples_to_transmit < samples_needed_for_tx_duration) { std::cout << "Tx only " << samples_to_transmit / tx_rate << " seconds, file contains less data than " << time_requested << " seconds." << std::endl; //std::cout << boost::format("Tx only %f seconds, file contains less data than %d seconds. ") % actual_duration_seconds % time_requested << std::endl; } else { //std::cout << "Tx only " << actual_duration_seconds << " seconds of the total " << total_samples / tx_rate << " seconds of data." << std::endl; std::cout << "Tx only " << samples_to_transmit / tx_rate << " seconds of the total " << total_samples / tx_rate << " seconds of data." << std::endl; //size_t total_data = total_samples / tx_rate; //std::cout << boost::format("Tx only %f seconds of the total %d seconds of data. ") % actual_duration_seconds % total_data << std::endl; } } std::cout << "Transmission complete." << std::endl; buff.clear(); samples.clear(); } template <typename samp_type> void recv_to_file(uhd::usrp::multi_usrp::sptr usrp, std::vector<std::complex<double>>& d_samples, std::vector<std::complex<double>>& d_samples, const std::string& cpu_format, const std::string& wire_format, const std::vector<size_t>& channel_nums, const size_t total_num_channels, const std::string& file, size_t samps_per_buff, // 500,000 samples per buffer unsigned long long num_requested_samples, double& bw, double time_requested = 0.0, bool stats = false, bool null = false, bool enable_size_map = false, bool continue_on_bad_packet = false, const std::string& thread_prefix = "") { unsigned long long num_total_samps = 0; // Number of buffers and setting samples per buffer const size_t num_buffers = 10; // We want 10 buffers size_t circ_buffer_size = samps_per_buff; // Create a receive streamer uhd::stream_args_t stream_args(cpu_format, wire_format); stream_args.channels = channel_nums; uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args); uhd::rx_metadata_t md; // Allocate 10 buffers for each channel std::vector<std::vector<samp_type*>> buffs(rx_stream->get_num_channels(), std::vector<samp_type*>(num_buffers)); try { // Allocate memory for each buffer for (size_t ch = 0; ch < rx_stream->get_num_channels(); ch++) { for (size_t buf_idx = 0; buf_idx < num_buffers; buf_idx++) { buffs[ch][buf_idx] = new samp_type[samps_per_buff]; } } } catch (std::bad_alloc& exc) { std::cerr << "Memory allocation failed. Try reducing the buffer size or free up memory." << std::endl; std::exit(EXIT_FAILURE); } std::vector<std::ofstream> outfiles(rx_stream->get_num_channels()); for (size_t ch = 0; ch < rx_stream->get_num_channels(); ch++) { if (!null) { std::string filename = rx_stream->get_num_channels() == 1 ? file : "ch" + std::to_string(channel_nums[ch]) + "_" + file; outfiles[ch].open(filename.c_str(), std::ofstream::binary); } } // Initialize circular buffer tracking indices std::vector<size_t> buffer_write_index(rx_stream->get_num_channels(), 0); std::vector<size_t> buffer_cycle_index(rx_stream->get_num_channels(), 0); // Atomic flag to track buffer availability std::vector<std::vector<std::atomic<bool>>> buffer_available(rx_stream->get_num_channels(), std::vector<std::atomic<bool>>(num_buffers, true)); std::mutex write_mutex; std::condition_variable write_cond; std::queue<size_t> buffer_queue; // Separate writing thread function auto write_thread_func = [&](size_t ch) { while (!stop_signal_called) { std::unique_lock<std::mutex> lock(write_mutex); write_cond.wait(lock, [&] { return !buffer_queue.empty() || stop_signal_called; }); if (!buffer_queue.empty()) { size_t buffer_idx = buffer_queue.front(); buffer_queue.pop(); lock.unlock(); // Write the buffer to file if (outfiles[ch].is_open()) { outfiles[ch].write((const char*)buffs[ch][buffer_idx], samps_per_buff * sizeof(samp_type)); } // Mark buffer as available again buffer_available[ch][buffer_idx].store(true); } } }; // Launch the writing thread for each channel std::vector<std::thread> write_threads; for (size_t ch = 0; ch < rx_stream->get_num_channels(); ch++) { write_threads.emplace_back(write_thread_func, ch); } // Setup streaming uhd::stream_cmd_t stream_cmd((num_requested_samples == 0) ? uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS : uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE); stream_cmd.num_samps = size_t(num_requested_samples); stream_cmd.stream_now = rx_stream->get_num_channels() == 1; stream_cmd.time_spec = usrp->get_time_now() + uhd::time_spec_t(0.05); rx_stream->issue_stream_cmd(stream_cmd); const auto start_time = std::chrono::steady_clock::now(); const auto stop_time = start_time + std::chrono::seconds(static_cast<int>(time_requested)); unsigned int retry_count = 0; while (!stop_signal_called && (num_requested_samples != num_total_samps || num_requested_samples == 0) && (time_requested == 0.0 || std::chrono::steady_clock::now() <= stop_time)) { // Find the next available buffer size_t ch = 0; // Assuming single-channel for simplicity, modify if using multiple channels size_t buffer_idx = buffer_cycle_index[ch]; // Check if the next buffer is available if (!buffer_available[ch][buffer_idx].load()) { // Critical overflow condition std::cerr << thread_prefix << "Critical Overflow! Write thread too slow." << std::endl; std::exit(EXIT_FAILURE); // Terminate the program } // Receive data into the available buffer size_t num_rx_samps = rx_stream->recv(buffs[ch][buffer_idx], samps_per_buff, md, 3.0, enable_size_map); // Handle potential errors during receiving if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_TIMEOUT) { std::cerr << thread_prefix << "Timeout while streaming" << std::endl; if (++retry_count > MAX_RETRIES) { std::cerr << "Maximum retries reached. Exiting..." << std::endl; break; } continue; } if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) { // Normal overflow, allow program to continue std::cerr << "Overflow! Dropping samples." << std::endl; continue; } if (md.error_code != uhd::rx_metadata_t::ERROR_CODE_NONE) { std::cerr << "Receiver error: " << md.strerror() << std::endl; throw std::runtime_error(thread_prefix + "Receiver error: " + md.strerror()); } retry_count = 0; // Reset retry count after a successful operation num_total_samps += num_rx_samps; // Mark the buffer as not available for the receiver buffer_available[ch][buffer_idx].store(false); // Signal that this buffer is ready to be written { std::unique_lock<std::mutex> lock(write_mutex); buffer_queue.push(buffer_idx); } write_cond.notify_all(); // Update buffer cycle index for next write buffer_cycle_index[ch] = (buffer_cycle_index[ch] + 1) % num_buffers; } // Stop streaming stream_cmd.stream_mode = uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS; rx_stream->issue_stream_cmd(stream_cmd); // Signal writing threads to stop stop_signal_called = true; write_cond.notify_all(); // Join all writing threads for (auto& thread : write_threads) { if (thread.joinable()) { thread.join(); } } // Cleanup: Close files and free memory for (size_t ch = 0; ch < outfiles.size(); ch++) { if (outfiles[ch].is_open()) { outfiles[ch].close(); } for (size_t buf_idx = 0; buf_idx < num_buffers; buf_idx++) { delete[] buffs[ch][buf_idx]; } } // Print stats if required if (stats) { const double actual_duration_seconds = std::chrono::duration<float>(std::chrono::steady_clock::now() - start_time).count(); std::cout << boost::format("%sReceived %d samples in %f seconds") % thread_prefix % num_total_samps % actual_duration_seconds << std::endl; } } while(!stop_signal_called){ if(!flag_identical){ std::thread recv_thread([&](){ #define recv_to_file_args(format) \ (usrp, \ format, \ wirefmt, \ chans_in_thread, \ channel_list.size(), \ multithread ? "ch" + std::to_string(chans_in_thread[0]) + "_" + file : file, \ spb, \ total_num_samps, \ rates[i], \ total_time, \ stats, \ null, \ enable_size_map, \ continue_on_bad_packet, \ th_prefix) for (size_t i = 0; i < channel_list.size(); i++) { std::string th_prefix = ""; if (multithread) { chans_in_thread.clear(); chans_in_thread.push_back(channel_list[i]); th_prefix = "Thread " + std::to_string(i) + ":\n"; } else { chans_in_thread = channel_list; } threads.push_back(std::thread([=, &rates]() { // recv to file if (wirefmt == "s16") { if (type == "double") recv_to_file<double> recv_to_file_args("f64"); else if (type == "float") recv_to_file<float> recv_to_file_args("f32"); else if (type == "short") recv_to_file<short> recv_to_file_args("s16"); else throw std::runtime_error("Unknown type " + type); } else { if (type == "double") recv_to_file<std::complex<double>> recv_to_file_args("fc64"); else if (type == "float") recv_to_file<std::complex<float>> recv_to_file_args("fc32"); else if (type == "short") recv_to_file<std::complex<short>> recv_to_file_args("sc16"); else throw std::runtime_error("Unknown type " + type); } })); if (!multithread) { break; } } if (total_time == 0) { if (total_num_samps > 0) { total_time = std::ceil(total_num_samps / usrp->get_rx_rate()); } } // Wait a bit extra for the first updates from each thread std::this_thread::sleep_for(500ms); const auto end_time = std::chrono::steady_clock::now() + (total_time - 1) * 1s; while (threads.size() > 0 && (std::chrono::steady_clock::now() < end_time || total_time == 0) && !stop_signal_called) { std::this_thread::sleep_for(1s); // Remove any threads that are finished for (size_t i = 0; i < threads.size(); i++) { if (!threads[i].joinable()) { // Thread is not joinable, i.e. it has finished and 'joined' already // Remove the thread from the list. threads.erase(threads.begin() + i); // Clear last bandwidth value after thread is finished rates[i] = 0; } } // Report the bandwidth of remaining threads if (bw_summary && threads.size() > 0) { const std::lock_guard<std::mutex> lock(recv_mutex); std::cout << "\t" << (std::accumulate(std::begin(rates), std::end(rates), 0) / 1e6 / threads.size()) << " Msps" << std::endl; } } // join any remaining threads for (size_t i = 0; i < threads.size(); i++) { if (threads[i].joinable()) { threads[i].join(); } } }); recv_thread.join(); flag_identical = true; } else{ if(txfile != rxfile){ std::thread tx_thread_external_file([&](){ if (tx_total_time == 0) { if (total_num_samps > 0) { total_time = std::ceil(total_num_samps / tx_usrp->get_rx_rate()); } } // Wait a bit extra for the first updates from each thread std::this_thread::sleep_for(500ms); const auto end_time = std::chrono::steady_clock::now() + (tx_total_time - 1) * 1s; while (threads.size() > 0 && (std::chrono::steady_clock::now() < end_time || tx_total_time == 0) && !stop_signal_called) { std::this_thread::sleep_for(1s); // send from file do { if (type == "double") preload_samples_from_file<std::complex<double>>(file, d_samples, tx_rate); send_from_file<std::complex<double>>(tx_stream, d_samples, spb, total_num_samps, tx_usrp, tx_total_time, tx_rate); else if (type == "float") preload_samples_from_file<std::complex<float>>(file, f_samples, tx_rate); send_from_file<std::complex<float>>(tx_stream, f_samples, spb, total_num_samps, tx_usrp, tx_total_time, tx_rate); else if (type == "short") preload_samples_from_file<std::complex<short>>(file, s_samples, tx_rate); send_from_file<std::complex<short>>(tx_stream, s_samples, spb, total_num_samps, tx_usrp, tx_total_time, tx_rate); else throw std::runtime_error("Unknown type " + type); flag_identical = true; if (repeat and delay > 0.0) { std::this_thread::sleep_for(std::chrono::milliseconds(int64_t(delay * 1000))); } } while (repeat and not flag_identical); }); tx_thread_identical.join(); flag_identical = false; } } } // finished std::cout << std::endl << "Done!" << std::endl << std::endl; return EXIT_SUCCESS; }
Write, Run & Share C++ code online using OneCompiler's C++ online compiler for free. It's one of the robust, feature-rich online compilers for C++ language, running on the latest version 17. Getting started with the OneCompiler's C++ compiler is simple and pretty fast. The editor shows sample boilerplate code when you choose language as C++
and start coding!
OneCompiler's C++ online compiler supports stdin and users can give inputs to programs using the STDIN textbox under the I/O tab. Following is a sample program which takes name as input and print your name with hello.
#include <iostream>
#include <string>
using namespace std;
int main()
{
string name;
cout << "Enter name:";
getline (cin, name);
cout << "Hello " << name;
return 0;
}
C++ is a widely used middle-level programming language.
When ever you want to perform a set of operations based on a condition If-Else is used.
if(conditional-expression) {
//code
}
else {
//code
}
You can also use if-else for nested Ifs and If-Else-If ladder when multiple conditions are to be performed on a single variable.
Switch is an alternative to If-Else-If ladder.
switch(conditional-expression){
case value1:
// code
break; // optional
case value2:
// code
break; // optional
......
default:
code to be executed when all the above cases are not matched;
}
For loop is used to iterate a set of statements based on a condition.
for(Initialization; Condition; Increment/decrement){
//code
}
While is also used to iterate a set of statements based on a condition. Usually while is preferred when number of iterations are not known in advance.
while (condition) {
// code
}
Do-while is also used to iterate a set of statements based on a condition. It is mostly used when you need to execute the statements atleast once.
do {
// code
} while (condition);
Function is a sub-routine which contains set of statements. Usually functions are written when multiple calls are required to same set of statements which increases re-usuability and modularity. Function gets run only when it is called.
return_type function_name(parameters);
function_name (parameters)
return_type function_name(parameters) {
// code
}