Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions elasticsearch.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ set(
IRODS_PLUGIN_POLICY_COMPILE_DEFINITIONS
RODS_SERVER
ENABLE_RE
IRODS_ENABLE_SYSLOG
SPDLOG_FMT_EXTERNAL
SPDLOG_NO_TLS
)

set(
Expand All @@ -38,6 +41,7 @@ target_include_directories(
${IRODS_INCLUDE_DIRS}
${IRODS_EXTERNALS_FULLPATH_BOOST}/include
${IRODS_EXTERNALS_FULLPATH_FMT}/include
${IRODS_EXTERNALS_FULLPATH_SPDLOG}/include
)

target_link_libraries(
Expand Down
8 changes: 5 additions & 3 deletions indexing.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ set(
IRODS_PLUGIN_POLICY_COMPILE_DEFINITIONS
RODS_SERVER
ENABLE_RE
)
IRODS_ENABLE_SYSLOG
SPDLOG_FMT_EXTERNAL
SPDLOG_NO_TLS
)

set(
IRODS_PLUGIN_POLICY_LINK_LIBRARIES
Expand All @@ -34,8 +37,7 @@ target_include_directories(
${IRODS_INCLUDE_DIRS}
${IRODS_EXTERNALS_FULLPATH_BOOST}/include
${IRODS_EXTERNALS_FULLPATH_FMT}/include
${IRODS_EXTERNALS_FULLPATH_JANSSON}/include
${IRODS_EXTERNALS_FULLPATH_FMT}/include
${IRODS_EXTERNALS_FULLPATH_SPDLOG}/include
${CMAKE_CURRENT_SOURCE_DIR}/include
)

Expand Down
80 changes: 74 additions & 6 deletions libirods_rule_engine_plugin-elasticsearch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,29 @@
#include "utilities.hpp"

#include <irods/MD5Strategy.hpp>
#include <irods/dataObjInpOut.h>
#include <irods/irods_hasher_factory.hpp>
#include <irods/irods_log.hpp>
#include <irods/irods_re_plugin.hpp>
#include <irods/irods_re_ruleexistshelper.hpp>
#include <irods/irods_rs_comm_query.hpp>
#include <irods/rodsErrorTable.h>
#include <irods/rsModAVUMetadata.hpp>
#include <irods/library_features.h>
#include <irods/irods_at_scope_exit.hpp>

#define IRODS_QUERY_ENABLE_SERVER_SIDE_API
#include <irods/irods_query.hpp>

#define IRODS_IO_TRANSPORT_ENABLE_SERVER_SIDE_API
#include <irods/dstream.hpp>
#include <irods/transport/default_transport.hpp>
#ifdef IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature test macro name is just a placeholder until irods/irods#7530 is resolved.

This PR isn't blocked by that work. Once the irods/irods issue is resolved, these preprocessor macros can be updated to match the real feature test macro.

# define IRODS_IO_TRANSPORT_ENABLE_SERVER_SIDE_API
# include <irods/dstream.hpp>
# include <irods/transport/default_transport.hpp>
#else
# include <irods/rs_replica_open.hpp>
# include <irods/rs_replica_close.hpp>
# include <irods/rsDataObjRead.hpp>
#endif // IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES

#define IRODS_FILESYSTEM_ENABLE_SERVER_SIDE_API
#include <irods/filesystem.hpp>
Expand Down Expand Up @@ -340,16 +350,58 @@ namespace
{
try {
const std::string object_id = get_object_index_id(_rei, _object_path);
std::vector<char> buffer(config->read_size);
irods::experimental::io::server::basic_transport<char> xport(*_rei->rsComm);
irods::experimental::io::idstream in{xport, _object_path};

std::vector<char> buffer(config->read_size);
int chunk_counter{0};
bool need_final_perform{false};
std::stringstream ss;

#ifdef IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES
irods::experimental::io::server::basic_transport<char> xport(*_rei->rsComm);
irods::experimental::io::idstream in{xport, _object_path}; // TODO Add admin mode option when available.

while (in) {
in.read(buffer.data(), buffer.size());
#else
DataObjInp obj_input{};
irods::at_scope_exit free_obj_input{[&obj_input] { clearKeyVal(&obj_input.condInput); }};

obj_input.dataSize = -1;
obj_input.openFlags = O_RDONLY;

_object_path.copy(obj_input.objPath, sizeof(obj_input.objPath) - 1);

// Add the admin keyword if the user invoking this is a rodsadmin.
// This primarily applies when the rule is invoked via irule by a rodsadmin and
// the rodsadmin does not have read permissions on the data object.
if (irods::is_privileged_client(*_rei->rsComm)) {
addKeyVal(&obj_input.condInput, ADMIN_KW, "");
}

char* json_output{};
irods::at_scope_exit free_json_output{[&json_output] { std::free(json_output); }};

// We use rs_replica_open to ease policy management for admins.
// If we chose to use rsDataObjOpen, that means the admin has to worry about
// the PEP changing after dstream grows support for admin mode.
const auto fd = rs_replica_open(_rei->rsComm, &obj_input, &json_output);
if (fd < 3) {
THROW(fd, fmt::format("Could not open data object [{}] for reading", _object_path));
}

OpenedDataObjInp read_input{};
read_input.l1descInx = fd;
read_input.len = buffer.size();

BytesBuf bbuf_output{};
bbuf_output.len = read_input.len;
bbuf_output.buf = buffer.data();

int bytes_read = 0;

while (true) {
bytes_read = rsDataObjRead(_rei->rsComm, &read_input, &bbuf_output);
#endif // IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES

// The indexing instruction.
// clang-format off
Expand All @@ -369,7 +421,13 @@ namespace
// clang-format off
ss << json{
{"absolutePath", _object_path},
#ifdef IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES
{"data", std::string_view(buffer.data(), in.gcount())}
#else
// Take the max to avoid passing an integer that's less than zero to the
// the string_view constructor.
{"data", std::string_view(buffer.data(), std::max(0, bytes_read))}
Comment on lines +427 to +433
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a statement explaining how bytes_read can result in an integer less than zero.

#endif // IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES
}.dump(indent, indent_char, ensure_ascii, json::error_handler_t::ignore) << '\n';
// clang-format on

Expand All @@ -391,6 +449,16 @@ namespace

ss.str("");
}

#ifndef IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES
if (0 == bytes_read) {
break;
}
else if (bytes_read < 0) {
rodsLog(LOG_ERROR, "%s: Read error on data object [%s]", __func__, _object_path.c_str());
break;
}
#endif // IRODS_HAS_FEATURE_ADMIN_MODE_FOR_DSTREAM_LIBRARIES
}

if (chunk_counter > 0) {
Expand Down
Loading