diff --git a/c/TEController.cpp b/c/TEController.cpp index f9872b0..df6b3db 100644 --- a/c/TEController.cpp +++ b/c/TEController.cpp @@ -10,6 +10,7 @@ #include "TEController.h" #include "teprob.h" +#include TEController* TEController::instance = 0; diff --git a/c/TEPlant.cpp b/c/TEPlant.cpp index 5ea82a5..744c38c 100644 --- a/c/TEPlant.cpp +++ b/c/TEPlant.cpp @@ -10,7 +10,7 @@ #include // std::cout, std::ostream, std::ios #include - +#include #include "teprob.h" #include "TEPlant.h" diff --git a/c/TETypes.h b/c/TETypes.h index c3b8fc3..a4c28dc 100644 --- a/c/TETypes.h +++ b/c/TETypes.h @@ -14,6 +14,9 @@ #define __TEETYPES_H__ #include +#include + +using namespace std; typedef std::pair pq_pair; typedef std::pair sp_pair; diff --git a/c/rtclient/rtclient.cpp b/c/rtclient/rtclient.cpp index ccbb5fc..e7b91df 100644 --- a/c/rtclient/rtclient.cpp +++ b/c/rtclient/rtclient.cpp @@ -15,193 +15,300 @@ #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #define XMV_SHMEM_NAME ("xmv_shmem") #define IDV_SHMEM_NAME ("idv_shmem") #define SIM_SHMEM_NAME ("sim_shmem") #define SP_SHMEM_NAME ("sp_shmem") +#include +#include #include #include #include -#include +#include #include -#include "TEPlant.h" -#include "TEController.h" +#include "../TEPlant.h" +#include "../TEController.h" typedef struct { int index; double value; } xmv_pair; typedef struct { int index; int value; } idv_pair; +using namespace std::literals::chrono_literals; + int main(int argc, char* argv[]) { - // program option variables - bool set_idv = false, unset_idv = false, set_xmv = false; - unsigned idv_index = 0, xmv_index = 0; - double xmv_value = 0.0; - bool print_all = false; - - bool apply_sp_override = false; - double prod_rate_sp, reactor_pressure_sp, reactor_level_sp, reactor_temp_sp, - pctg_sp, sep_level_sp, stripper_level_sp; - - - // program options - namespace po = boost::program_options; - po::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "print the help message") - ("setidv,i", po::value(&idv_index), "id (1-based) of the disturbance to enable") - ("unsetidv,u", po::value(&idv_index), "id (1-based) of the disturbance to disable") - ("xmv-index,x", po::value(&xmv_index), "index (1-based) of the new xmv/setpoint") - ("xmv-value,v", po::value(&xmv_value), "value for the new xmv/setpoint") - ("print-all-vars,p", po::bool_switch(&print_all)->default_value(false), "list the measured values") - - // set point overrides - ("sp-prod-rate", po::value(&prod_rate_sp), "enables change to setpoint") - ("sp-reactor-pressure", po::value(&reactor_pressure_sp), "enables change to setpoint") - ("sp-reactor-level", po::value(&reactor_level_sp), "enables change to setpoint") - ("sp-reactor-temp", po::value(&reactor_temp_sp), "enables change to setpoint") - ("sp-pctg", po::value(&pctg_sp), "enables change to setpoint") - ("sp-separator-level", po::value(&sep_level_sp), "enables change to setpoint") - ("sp-stripper-level", po::value(&stripper_level_sp), "enables change to setpoint") - ; - - po::variables_map vm; - try { - po::store(po::parse_command_line(argc, argv, desc), vm); - if (vm.count("help")) - { - std::cout << desc << std::endl; - return 0; - } - - po::notify(vm); - - if (vm.count("setidv")) { set_idv = true; } - if (vm.count("unsetidv")) { unset_idv = true; } - - if (vm.count("sp-prod-rate")) { apply_sp_override = true; } - else if (vm.count("sp-reactor-pressure")) { apply_sp_override = true; } - else if (vm.count("sp-reactor-level")) { apply_sp_override = true; } - else if (vm.count("sp-reactor-temp")) { apply_sp_override = true; } - else if (vm.count("sp-pctg")) { apply_sp_override = true; } - else if (vm.count("sp-separator-level")) { apply_sp_override = true; } - else if (vm.count("sp-stripper-level")) { apply_sp_override = true; } - } - catch (po::error& e) { - std::cerr << "ERROR: " << e.what() << std::endl << std::endl; - std::cerr << desc << std::endl; - return 0; - } - - using namespace boost::interprocess; - try - { - if (vm.count("x")) - { - shared_memory_object xmv_shm(open_only, XMV_SHMEM_NAME, read_write); - mapped_region reg_xmv(xmv_shm, read_write); - - xmv_pair xmv_update; - xmv_update.index = xmv_index - 1; - xmv_update.value = xmv_value; - std::cout << "setting xmv " << xmv_index << " to " << xmv_value << std::endl; - xmv_pair *mem = static_cast(reg_xmv.get_address()); - *mem = xmv_update; - } - - if (set_idv || unset_idv) - { - shared_memory_object idv_shm(open_only, IDV_SHMEM_NAME, read_write); - mapped_region reg_idv(idv_shm, read_write); - - std::cout << "setting idv " << idv_index << std::endl; - idv_pair *mem = static_cast(reg_idv.get_address()); - idv_pair idv_update; - idv_update.index = idv_index; - idv_update.value = set_idv ? 1 : 0; - *mem = idv_update; - } - - if (print_all) - { - shared_memory_object xmeas_shm(open_only, SIM_SHMEM_NAME, read_write); - mapped_region reg_proc_vars(xmeas_shm, read_write); - double *mem = static_cast(reg_proc_vars.get_address()); - unsigned sz_shm = reg_proc_vars.get_size() / sizeof(double); - std::cout << "size: " << sz_shm << std::endl; - for (unsigned ii = 0; ii < sz_shm; ii++) - { - std::cout << std::setprecision(10) << ii << ": " << mem[ii] << "\n"; - } - } - - if (apply_sp_override) - { - shared_memory_object sp_shm(open_only, SP_SHMEM_NAME, read_write); - mapped_region reg_sp(sp_shm, read_write); - sp_override_pair *mem = static_cast(reg_sp.get_address()); - - if (vm.count("sp-prod-rate")) - { - mem->first = true; - mem->second.first = TEController::PROD_RATE; - mem->second.second = prod_rate_sp; - std::cout << "setting prod rate to " << prod_rate_sp << std::endl; - } - else if (vm.count("sp-reactor-pressure")) - { - mem->first = true; - mem->second.first = TEController::REACTOR_PRESS; - mem->second.second = reactor_pressure_sp; - std::cout << "setting reactor pressure to " << reactor_pressure_sp << std::endl; - } - else if (vm.count("sp-reactor-level")) - { - mem->first = true; - mem->second.first = TEController::REACTOR_LEVEL; - mem->second.second = reactor_level_sp; - std::cout << "setting reactor level to " << reactor_level_sp << std::endl; - } - else if (vm.count("sp-reactor-temp")) - { - mem->first = true; - mem->second.first = TEController::REACTOR_TEMP; - mem->second.second = reactor_temp_sp; - std::cout << "setting reactor temp to " << reactor_temp_sp << std::endl; - } - else if (vm.count("sp-pctg")) - { - mem->first = true; - mem->second.first = TEController::PCTG; - mem->second.second = pctg_sp; - std::cout << "setting PCT G to " << pctg_sp << std::endl; - } - else if (vm.count("sp-separator-level")) - { - mem->first = true; - mem->second.first = TEController::SEP_LEVEL; - mem->second.second = sep_level_sp; - std::cout << "setting separator level to " << sep_level_sp << std::endl; - } - else if (vm.count("sp-stripper-level")) - { - mem->first = true; - mem->second.first = TEController::STRIP_LEVEL; - mem->second.second = stripper_level_sp; - std::cout << "setting stripper level to " << stripper_level_sp << std::endl; - } - } - - } - catch (interprocess_exception & ex) - { - std::cout << ex.what() << std::endl; - return 1; - } - - return 0; -} + // program option variables + bool set_idv = false, unset_idv = false, set_xmv = false; + unsigned idv_index = 0, xmv_index = 0; + double xmv_value = 0.0; + bool print_all = false; + bool stream = false; + bool stdout = false; + + bool apply_sp_override = false; + double prod_rate_sp, reactor_pressure_sp, reactor_level_sp, reactor_temp_sp, + pctg_sp, sep_level_sp, stripper_level_sp; + + + // program options + namespace po = boost::program_options; + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "print the help message") + ("setidv,i", po::value(&idv_index), "id (1-based) of the disturbance to enable") + ("unsetidv,u", po::value(&idv_index), "id (1-based) of the disturbance to disable") + ("xmv-index,x", po::value(&xmv_index), "index (1-based) of the new xmv/setpoint") + ("xmv-value,v", po::value(&xmv_value), "value for the new xmv/setpoint") + ("print-all-vars,p", po::bool_switch(&print_all)->default_value(false), "list the measured values") + ("stream-kinesis,k", po::bool_switch(&stream)->default_value(false), "stream measured values to kinesis stream") + ("stream-stdout,s", po::bool_switch(&stdout)->default_value(false), "stream measured values to stdout") + // set point overrides + ("sp-prod-rate", po::value(&prod_rate_sp), "enables change to setpoint") + ("sp-reactor-pressure", po::value(&reactor_pressure_sp), "enables change to setpoint") + ("sp-reactor-level", po::value(&reactor_level_sp), "enables change to setpoint") + ("sp-reactor-temp", po::value(&reactor_temp_sp), "enables change to setpoint") + ("sp-pctg", po::value(&pctg_sp), "enables change to setpoint") + ("sp-separator-level", po::value(&sep_level_sp), "enables change to setpoint") + ("sp-stripper-level", po::value(&stripper_level_sp), "enables change to setpoint") + ; + + po::variables_map vm; + try { + po::store(po::parse_command_line(argc, argv, desc), vm); + if (vm.count("help")) + { + std::cout << desc << std::endl; + return 0; + } + + po::notify(vm); + + if (vm.count("setidv")) { set_idv = true; } + if (vm.count("unsetidv")) { unset_idv = true; } + + if (vm.count("sp-prod-rate")) { apply_sp_override = true;} + else if (vm.count("sp-reactor-pressure")) { apply_sp_override = true;} + else if (vm.count("sp-reactor-level")) { apply_sp_override = true;} + else if (vm.count("sp-reactor-temp")) { apply_sp_override = true;} + else if (vm.count("sp-pctg")) { apply_sp_override = true;} + else if (vm.count("sp-separator-level")) { apply_sp_override = true;} + else if (vm.count("sp-stripper-level")) { apply_sp_override = true;} + } + catch (po::error& e) { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return 0; + } + + using namespace boost::interprocess; + try + { + if (vm.count("x")) + { + shared_memory_object xmv_shm(open_only, XMV_SHMEM_NAME, read_write); + mapped_region reg_xmv(xmv_shm, read_write); + + xmv_pair xmv_update; + xmv_update.index = xmv_index - 1; + xmv_update.value = xmv_value; + std::cout << "setting xmv " << xmv_index << " to " << xmv_value << std::endl; + xmv_pair *mem = static_cast(reg_xmv.get_address()); + *mem = xmv_update; + } + + if (set_idv || unset_idv) + { + shared_memory_object idv_shm(open_only, IDV_SHMEM_NAME, read_write); + mapped_region reg_idv(idv_shm, read_write); + + std::cout << "setting idv " << idv_index << std::endl; + idv_pair *mem = static_cast(reg_idv.get_address()); + idv_pair idv_update; + idv_update.index = idv_index; + idv_update.value = set_idv ? 1 : 0; + *mem = idv_update; + } + + if (print_all) + { + shared_memory_object xmeas_shm(open_only, SIM_SHMEM_NAME, read_write); + mapped_region reg_proc_vars(xmeas_shm, read_write); + double *mem = static_cast(reg_proc_vars.get_address()); + unsigned sz_shm = reg_proc_vars.get_size() / sizeof(double); + std::cout << "size: " << sz_shm << std::endl; + for (unsigned ii = 0; ii < sz_shm; ii++) + { + std::cout << std::setprecision(10) << ii << ": " << mem[ii] << "\n"; + } + } + if (stream) + { + Aws::SDKOptions options; + Aws::InitAPI(options); + const Aws::String streamName("tep-ingest"); + Aws::Client::ClientConfiguration clientConfig; + // set your region + // clientConfig.region = Aws::Region::EU_CENTRAL_1; + Aws::Kinesis::KinesisClient kinesisClient(clientConfig); + Aws::Kinesis::Model::PutRecordsRequest putRecordsRequest; + putRecordsRequest.SetStreamName(streamName); + + + while (true) + { + shared_memory_object xmeas_shm(open_only, SIM_SHMEM_NAME, read_write); + mapped_region reg_proc_vars(xmeas_shm, read_write); + double *mem = static_cast(reg_proc_vars.get_address()); + unsigned sz_shm = reg_proc_vars.get_size() / sizeof(double); + //std::cout << "size: " << sz_shm << std::endl; + + //envelope + Aws::Vector putRecordsRequestEntryList; + //one entry to envelope + Aws::Kinesis::Model::PutRecordsRequestEntry putRecordsRequestEntry; + //data object (for string json object) + Aws::StringStream pk; + pk << "1"; + putRecordsRequestEntry.SetPartitionKey(pk.str()); + Aws::StringStream data; + data << "{"; + + for (unsigned ii = 1; ii <= sz_shm; ii++) + { + if (ii == sz_shm){ + data << std::setprecision(10) << "\"xmeas_" << ii << "\"" << ": " << mem[ii]; + } + else { + data << std::setprecision(10) << "\"xmeas_" << ii << "\"" << ": " << mem[ii] << ","; + } + } + //sleep(1); + std::this_thread::sleep_for(20ms); + data << "}\n"; + + // data to bytes + Aws::Utils::ByteBuffer bytes((unsigned char*)data.str().c_str(), data.str().length()); + putRecordsRequestEntry.SetData(bytes); + putRecordsRequestEntryList.emplace_back(putRecordsRequestEntry); + + // push to kinesis + putRecordsRequest.SetRecords(putRecordsRequestEntryList); + Aws::Kinesis::Model::PutRecordsOutcome putRecordsResult = kinesisClient.PutRecords(putRecordsRequest); + + //print to stdout + std::cout << data.str().c_str(); + } + } + + if (stdout) + { + while (true) + { + shared_memory_object xmeas_shm(open_only, SIM_SHMEM_NAME, read_write); + mapped_region reg_proc_vars(xmeas_shm, read_write); + double *mem = static_cast(reg_proc_vars.get_address()); + unsigned sz_shm = reg_proc_vars.get_size() / sizeof(double); + Aws::StringStream data; + data << "{"; + + for (unsigned ii = 1; ii <= sz_shm; ii++) + { + if (ii == sz_shm){ + data << std::setprecision(10) << "\"xmeas_" << ii << "\"" << ": " << mem[ii]; + } + else { + data << std::setprecision(10) << "\"xmeas_" << ii << "\"" << ": " << mem[ii] << ","; + } + } + //sleep for 250ms + std::this_thread::sleep_for(10ms); + data << "}\n"; + //print to stdout + std::cout << data.str().c_str(); + } + } + + if (apply_sp_override) + { + shared_memory_object sp_shm(open_only, SP_SHMEM_NAME, read_write); + mapped_region reg_sp(sp_shm, read_write); + sp_override_pair *mem = static_cast(reg_sp.get_address()); + + if (vm.count("sp-prod-rate")) + { + mem->first = true; + mem->second.first = TEController::PROD_RATE; + mem->second.second = prod_rate_sp; + std::cout << "setting prod rate to " << prod_rate_sp << std::endl; + } + else if (vm.count("sp-reactor-pressure")) + { + mem->first = true; + mem->second.first = TEController::REACTOR_PRESS; + mem->second.second = reactor_pressure_sp; + std::cout << "setting reactor pressure to " << reactor_pressure_sp << std::endl; + } + else if (vm.count("sp-reactor-level")) + { + mem->first = true; + mem->second.first = TEController::REACTOR_LEVEL; + mem->second.second = reactor_level_sp; + std::cout << "setting reactor level to " << reactor_level_sp << std::endl; + } + else if (vm.count("sp-reactor-temp")) + { + mem->first = true; + mem->second.first = TEController::REACTOR_TEMP; + mem->second.second = reactor_temp_sp; + std::cout << "setting reactor temp to " << reactor_temp_sp << std::endl; + } + else if (vm.count("sp-pctg")) + { + mem->first = true; + mem->second.first = TEController::PCTG; + mem->second.second = pctg_sp; + std::cout << "setting PCT G to " << pctg_sp << std::endl; + } + else if (vm.count("sp-separator-level")) + { + mem->first = true; + mem->second.first = TEController::SEP_LEVEL; + mem->second.second = sep_level_sp; + std::cout << "setting separator level to " << sep_level_sp << std::endl; + } + else if (vm.count("sp-stripper-level")) + { + mem->first = true; + mem->second.first = TEController::STRIP_LEVEL; + mem->second.second = stripper_level_sp; + std::cout << "setting stripper level to " << stripper_level_sp << std::endl; + } + } + + } + catch (interprocess_exception & ex) + { + std::cout << ex.what() << std::endl; + return 1; + } + + return 0; +} diff --git a/c/tesim_main.cpp b/c/tesim_main.cpp index 93d6ee5..e51f0c3 100644 --- a/c/tesim_main.cpp +++ b/c/tesim_main.cpp @@ -29,6 +29,8 @@ #endif #include #include +#include +#include #include #include #include @@ -206,7 +208,7 @@ int main(int argc, char* argv[]) std::cout << "Simulation time : " << simtime << std::endl; std::cout << "Tplant: " << tplant << " hrs (" << tplant*3600 << " secs)" << std::endl; - std::cout << "Tctlr: " << tctlr << " hrs (" << tplant*3600 << " secs)" << std::endl; + std::cout << "Tctlr: " << tctlr << " hrs (" << tctlr*3600 << " secs)" << std::endl; std::cout << "Ksave: " << ksave << std::endl; std::cout << "log file prefix: " << log_file_prefix << std::endl; std::cout << "Append: " << append_flag << std::endl;