Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions src/python/WMCore/Storage/RucioFileCatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import re

from builtins import str, range

from urllib.parse import urlparse

class RucioFileCatalog(dict):
"""
Expand All @@ -37,7 +37,7 @@ def addMapping(self, protocol, match, result,
"""
Add an lfn to pfn mapping to this instance
:param protocol: name of protocol, for example XRootD
:param match: regular expression string to perform path matching
:param match: regular expression string to perform path matching
:param result: result of the path matching
:param chain: name of chained protocol
:param mapping_type: type of path matching
Expand All @@ -52,11 +52,14 @@ def addMapping(self, protocol, match, result,

def _doMatch(self, protocol, path, style, caller):
"""
Generalised way of building up the mappings.
Generalised way of building up the mappings.
:param protocol: the name of a protocol, for example XRootD
:path: a LFN path, for example /store/abc/xyz.root
:style: type of conversion. lfn-to-pfn is to convert LFN to PFN and pfn-to-pfn is for PFN to LFN
:caller is the method from there this method was called. It's used for resolving chained rules. When a rule is chained, the path translation of protocol defined in "chain" attribute should be applied first before the one specified in this rule. Here is an example. In this storage description, https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json, the rule of protocol WebDAV of volume KIT_MSS is chained to the protocol pnfs of the same volume. The path translation of WebDAV rule must be done by applying the path translation of pnfs rule first before its own path translation is applied.
:caller: is the method from there this method was called. It's used for resolving chained rules.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean "where" instead of "there"?

When a rule is chained, the path translation of protocol defined in "chain" attribute should be applied first before the one specified in this rule. Here is an example.
In this storage description, https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json, the rule of protocol WebDAV of volume KIT_MSS is chained to the protocol pnfs of the same volume.
The path translation of WebDAV rule must be done by applying the path translation of pnfs rule first before its own path translation is applied.
"""
for mapping in self[style]:
if mapping['protocol'] != protocol:
Expand Down Expand Up @@ -136,7 +139,7 @@ def storageJsonPath(currentSite, currentSubsite, storageSite):
# return path override if it is defined and exists
siteConfigPathOverride = os.getenv('WMAGENT_RUCIO_CATALOG_OVERRIDE', None)
if siteConfigPathOverride and os.path.exists(siteConfigPathOverride):
return siteConfigPathOverride
return siteConfigPathOverride

# get site config
siteConfigPath = os.getenv('SITECONFIG_PATH', None)
Expand Down Expand Up @@ -182,7 +185,7 @@ def readRFC(filename, storageSite, volume, protocol):
except Exception as ex:
msg = "Error reading storage description file: %s\n" % filename
msg += str(ex)
raise RuntimeError(msg)
raise RuntimeError(msg) from ex
# now loop over elements, select the one matched with inputs (storageSite, volume, protocol) and fill lfn-to-pfn
for jsElement in jsElements:
# check to see if the storageSite and volume matchs with "site" and "volume" in storage.json
Expand Down Expand Up @@ -239,9 +242,61 @@ def rseName(currentSite, currentSubsite, storageSite, volume):
except Exception as ex:
msg = "RucioFileCatalog.py:rseName() Error reading storage.json: %s\n" % storageJsonName
msg += str(ex)
raise RuntimeError(msg)
raise RuntimeError(msg) from ex
for jsElement in jsElements:
if jsElement['site'] == storageSite and jsElement['volume'] == volume:
rse = jsElement['rse']
break
return rse

def get_default_cmd(currentSite, currentSubsite, storageSite, volume, protocolName):
"""
Return default command for a protocol, for example:
https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L17
:currentSite is the site where jobs are executing
:currentSubsite is the sub site if jobs are running here
:storageSite is the site for storage
:volume is the volume name, for example:
https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L3
:protocolName is the 'protocol' in site-local-config.xml under stageOut
"""

storageJsonName = storageJsonPath(currentSite, currentSubsite, storageSite)
try:
with open(storageJsonName, encoding="utf-8") as jsonFile:
jsElements = json.load(jsonFile)
except Exception as ex:
msg = "RucioFileCatalog.py:getDefaultCmd() Error reading storage.json: %s\n" % storageJsonName
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please update this error message to keep consistent with the actual method name?
Or maybe I would suggest to rephrase it to something like (the module and line number are already present in the logs):

        msg = f"Failed to open storage.json: {storageJsonName}\n. Error: {str(ex)}"

msg += str(ex)
raise RuntimeError(msg) from ex

url_scheme = ''

for entry in jsElements:
if entry.get('site') != storageSite or entry.get('volume') != volume:
continue

for proto in entry.get("protocols", []):
if proto.get("protocol") != protocolName:
continue

# First try rules
rules = proto.get("rules", [])
if rules:
pfn = rules[0].get("pfn", "")
url_scheme = urlparse(pfn).scheme

# If no rules try 'prefix'
if not url_scheme and "prefix" in proto:
url_scheme = urlparse(proto["prefix"]).scheme

# Map scheme to command
return {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest moving this map to the top of this module, as a global variable, such that it is easier to identify this map in the code. Perhaps:

STAGEOUT_PROTOCOL_MAP =  'root': 'xrdcp',
                'davs': 'gfal2',
                'file': 'cp'
            }

'root': 'xrdcp',
'davs': 'gfal2',
'file': 'cp'
}.get(url_scheme, 'gfal2')

break # matching site+volume found and processed
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest adding a log record here to say that the json was processed but no matches were found.
Note that log syntax in this module seems to be different, so we could probably use something like:

logging.log(logging.WARNING, message)


return None #no matched protocol so command is None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this line, I would add a logging.ERROR report lack of information (site/volume etc).

Copy link
Copy Markdown
Collaborator Author

@nhduongvn nhduongvn Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amaltaro, I addressed all your comments

14 changes: 7 additions & 7 deletions src/python/WMCore/Storage/SiteLocalConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from builtins import next, str, object

from WMCore.Algorithms.ParseXMLFile import xmlFileToNode
from WMCore.Storage.RucioFileCatalog import rseName
from WMCore.Storage.RucioFileCatalog import rseName, get_default_cmd


def loadSiteLocalConfig():
Expand Down Expand Up @@ -73,7 +73,6 @@ class SiteConfigError(Exception):
Exception class placeholder

"""
pass


class SiteLocalConfig(object):
Expand Down Expand Up @@ -138,7 +137,7 @@ def read(self):
except Exception as ex:
msg = "Unable to read SiteConfigFile: %s\n" % self.siteConfigFile
msg += str(ex)
raise SiteConfigError(msg)
raise SiteConfigError(msg) from ex

nodeResult = nodeReader(node)

Expand Down Expand Up @@ -300,14 +299,15 @@ def processStageOut():

localReport = {}
localReport['storageSite'] = aStorageSite
localReport['command'] = subnode.attrs.get('command', None)
# use default command='gfal2' when 'command' is not specified
if localReport['command'] is None:
localReport['command'] = 'gfal2'
#Do not support 'command' from site-local-config.xml anymore
#get command based on rule PFN or prefix of the protocol for example root://, davs:// file://
if aProtocol is None: localReport['command'] = None
else: localReport['command'] = get_default_cmd(report["siteName"], subSiteName, aStorageSite, aVolume, aProtocol)
localReport['option'] = subnode.attrs.get('option', None)
localReport['volume'] = aVolume
localReport['protocol'] = aProtocol
localReport['phedex-node'] = rseName(report["siteName"], subSiteName, aStorageSite, aVolume)

report['stageOuts'].append(localReport)


Expand Down
2 changes: 1 addition & 1 deletion test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def testFNALSiteLocalConfig(self):
"Error: Protocol is not correct."
assert mySiteConfig.stageOuts[0]["option"] == "-p", \
"Error: option is not correct."
# assert False
#assert False
return

def testLoadingConfigFromOverridenEnvVarriable(self):
Expand Down