Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 75 additions & 6 deletions src/python/WMCore/Storage/RucioFileCatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@
import re

from builtins import str, range
from urllib.parse import urlparse

import logging

STAGEOUT_PROTOCOL_MAP = {
'root': 'xrdcp',
'davs': 'gfal2',
'file': 'cp'
}

class RucioFileCatalog(dict):
"""
Expand All @@ -37,7 +45,7 @@ def addMapping(self, protocol, match, result,
"""
Add an lfn to pfn mapping to this instance
:param protocol: name of protocol, for example XRootD
:param match: regular expression string to perform path matching
:param match: regular expression string to perform path matching
:param result: result of the path matching
:param chain: name of chained protocol
:param mapping_type: type of path matching
Expand All @@ -52,11 +60,14 @@ def addMapping(self, protocol, match, result,

def _doMatch(self, protocol, path, style, caller):
"""
Generalised way of building up the mappings.
Generalised way of building up the mappings.
:param protocol: the name of a protocol, for example XRootD
:path: a LFN path, for example /store/abc/xyz.root
:style: type of conversion. lfn-to-pfn is to convert LFN to PFN and pfn-to-pfn is for PFN to LFN
:caller is the method from there this method was called. It's used for resolving chained rules. When a rule is chained, the path translation of protocol defined in "chain" attribute should be applied first before the one specified in this rule. Here is an example. In this storage description, https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json, the rule of protocol WebDAV of volume KIT_MSS is chained to the protocol pnfs of the same volume. The path translation of WebDAV rule must be done by applying the path translation of pnfs rule first before its own path translation is applied.
:caller: is the method where this method was called. It's used for resolving chained rules.
When a rule is chained, the path translation of protocol defined in "chain" attribute should be applied first before the one specified in this rule. Here is an example.
In this storage description, https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json, the rule of protocol WebDAV of volume KIT_MSS is chained to the protocol pnfs of the same volume.
The path translation of WebDAV rule must be done by applying the path translation of pnfs rule first before its own path translation is applied.
"""
for mapping in self[style]:
if mapping['protocol'] != protocol:
Expand Down Expand Up @@ -136,7 +147,7 @@ def storageJsonPath(currentSite, currentSubsite, storageSite):
# return path override if it is defined and exists
siteConfigPathOverride = os.getenv('WMAGENT_RUCIO_CATALOG_OVERRIDE', None)
if siteConfigPathOverride and os.path.exists(siteConfigPathOverride):
return siteConfigPathOverride
return siteConfigPathOverride

# get site config
siteConfigPath = os.getenv('SITECONFIG_PATH', None)
Expand Down Expand Up @@ -182,7 +193,7 @@ def readRFC(filename, storageSite, volume, protocol):
except Exception as ex:
msg = "Error reading storage description file: %s\n" % filename
msg += str(ex)
raise RuntimeError(msg)
raise RuntimeError(msg) from ex
# now loop over elements, select the one matched with inputs (storageSite, volume, protocol) and fill lfn-to-pfn
for jsElement in jsElements:
# check to see if the storageSite and volume matchs with "site" and "volume" in storage.json
Expand Down Expand Up @@ -239,9 +250,67 @@ def rseName(currentSite, currentSubsite, storageSite, volume):
except Exception as ex:
msg = "RucioFileCatalog.py:rseName() Error reading storage.json: %s\n" % storageJsonName
msg += str(ex)
raise RuntimeError(msg)
raise RuntimeError(msg) from ex
for jsElement in jsElements:
if jsElement['site'] == storageSite and jsElement['volume'] == volume:
rse = jsElement['rse']
break
return rse

def get_default_cmd(currentSite, currentSubsite, storageSite, volume, protocolName):
"""
Return default command for a protocol, for example:
https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L17
:currentSite is the site where jobs are executing
:currentSubsite is the sub site if jobs are running here
:storageSite is the site for storage
:volume is the volume name, for example:
https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L3
:protocolName is the 'protocol' in site-local-config.xml under stageOut
"""

storageJsonName = storageJsonPath(currentSite, currentSubsite, storageSite)
try:
with open(storageJsonName, encoding="utf-8") as jsonFile:
jsElements = json.load(jsonFile)
except Exception as ex:
msg = f"Failed to open storage.json: {storageJsonName}\n. Error: {str(ex)}"
raise RuntimeError(msg) from ex

url_scheme = ''

for entry in jsElements:
if entry.get('site') != storageSite or entry.get('volume') != volume:
continue

for proto in entry.get("protocols", []):
if proto.get("protocol") != protocolName:
continue

# First try rules
rules = proto.get("rules", [])
if rules:
#rules are just different matching patterns of the same protocol, use the first rule to get command from its pfn is enough
pfn = rules[0].get("pfn", "")
url_scheme = urlparse(pfn).scheme

# If no rules try 'prefix'
if not url_scheme and "prefix" in proto:
url_scheme = urlparse(proto["prefix"]).scheme

# Map scheme to command
cmd = STAGEOUT_PROTOCOL_MAP.get(url_scheme, 'gfal2')
if cmd == 'gfal2' and url_scheme not in STAGEOUT_PROTOCOL_MAP:
if rules:
logging.log(logging.WARNING, "RucioFileCatalog.get_default_cmd: Can not get the command from rules of protocol %s of %s site and %s volume. Default command gfal2 is used. Rule: %s", protocolName, storageSite, volume, rules[0])
else:
logging.log(logging.WARNING, "RucioFileCatalog.get_default_cmd: Can not get the command from prefix of protocol %s of %s site and %s volume. Default command gfal2 is used. Prefix: %s", protocolName, storageSite, volume, proto.get("prefix", None))
return cmd

# no matched protocol
logging.log(logging.ERROR, "RucioFileCatalog.get_default_cmd: No matched %s protocol for %s volume of %s storage site in the storage json %s found", protocolName, volume, storageSite, storageJsonName)
return None

# no matched storage site or volume
logging.log(logging.ERROR, "RucioFileCatalog.get_default_cmd: No matched %s storage site or %s volume in the storage json %s found", storageSite, volume, storageJsonName)
return None
14 changes: 7 additions & 7 deletions src/python/WMCore/Storage/SiteLocalConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from builtins import next, str, object

from WMCore.Algorithms.ParseXMLFile import xmlFileToNode
from WMCore.Storage.RucioFileCatalog import rseName
from WMCore.Storage.RucioFileCatalog import rseName, get_default_cmd


def loadSiteLocalConfig():
Expand Down Expand Up @@ -73,7 +73,6 @@ class SiteConfigError(Exception):
Exception class placeholder

"""
pass


class SiteLocalConfig(object):
Expand Down Expand Up @@ -138,7 +137,7 @@ def read(self):
except Exception as ex:
msg = "Unable to read SiteConfigFile: %s\n" % self.siteConfigFile
msg += str(ex)
raise SiteConfigError(msg)
raise SiteConfigError(msg) from ex

nodeResult = nodeReader(node)

Expand Down Expand Up @@ -300,14 +299,15 @@ def processStageOut():

localReport = {}
localReport['storageSite'] = aStorageSite
localReport['command'] = subnode.attrs.get('command', None)
# use default command='gfal2' when 'command' is not specified
if localReport['command'] is None:
localReport['command'] = 'gfal2'
#Do not support 'command' from site-local-config.xml anymore
#get command based on rule PFN or prefix of the protocol for example root://, davs:// file://
if aProtocol is None: localReport['command'] = None
else: localReport['command'] = get_default_cmd(report["siteName"], subSiteName, aStorageSite, aVolume, aProtocol)
localReport['option'] = subnode.attrs.get('option', None)
localReport['volume'] = aVolume
localReport['protocol'] = aProtocol
localReport['phedex-node'] = rseName(report["siteName"], subSiteName, aStorageSite, aVolume)

report['stageOuts'].append(localReport)


Expand Down
57 changes: 55 additions & 2 deletions test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def testFNALSiteLocalConfig(self):
fnalConfigFileName = os.path.join(getTestBase(),
"WMCore_t/Storage_t",
"T1_US_FNAL_SiteLocalConfig.xml")

mySiteConfig = SiteLocalConfig(fnalConfigFileName)

assert mySiteConfig.siteName == "T1_US_FNAL", "Error: Wrong site name."
Expand Down Expand Up @@ -66,14 +67,66 @@ def testFNALSiteLocalConfig(self):

assert len(goldenProxies) == 0, \
"Error: Missing proxy servers."


#test second stage out method with command extraction from rules
assert mySiteConfig.stageOuts[0]["command"] == "xrdcp", \
"Error: Wrong stage out command."
assert mySiteConfig.stageOuts[0]["protocol"] == "XRootD", \
"Error: Protocol is not correct."
assert mySiteConfig.stageOuts[0]["option"] == "-p", \
"Error: option is not correct."
# assert False

#assert False
return

def testGetDefaultStageOutCmd(self):
"""
_testGetDefaultStageOutCmd_

Verify that the default stage out command is returned correctly for given protocol.
"""
os.environ['SITECONFIG_PATH'] = os.path.join(getTestBase(),
'WMCore_t/Storage_t',
'T1_DE_KIT')
configFileName = os.path.join(getTestBase(),
"WMCore_t/Storage_t",
"T1_DE_KIT/JobConfig/site-local-config-testStageOut-T1_DE_KIT.xml")

mySiteConfig = SiteLocalConfig(configFileName)
#test the first stage out method with command extraction from prefix
assert mySiteConfig.stageOuts[0]["command"] == "gfal2", \
"Error: Wrong stage out command."
assert mySiteConfig.stageOuts[0]["protocol"] == "WebDAV", \
"Error: Protocol is not correct."
#test the second stage out method with command extraction from rules
assert mySiteConfig.stageOuts[1]["command"] == "gfal2", \
"Error: Wrong stage out command."
assert mySiteConfig.stageOuts[1]["protocol"] == "WebDAV", \
"Error: Protocol is not correct."
#test the third stage out method with no command found, fall to default gfal2
assert mySiteConfig.stageOuts[2]["command"] == "gfal2", \
"Error: Wrong stage out command."
assert mySiteConfig.stageOuts[2]["protocol"] == "xrootd-module", \
"Error: Protocol is not correct."
#test the fourth stage out method with "prefix": "root://172.26.19.197:1094//root://cmsxrootd-test.gridka.de:1094/"
assert mySiteConfig.stageOuts[3]["command"] == "xrdcp", \
"Error: Wrong stage out command."
assert mySiteConfig.stageOuts[3]["protocol"] == "XRootDHoreKaGridKa", \
"Error: Protocol is not correct."

os.environ['SITECONFIG_PATH'] = '/cvmfs/cms.cern.ch/SITECONF/T1_IT_CNAF'
configFileName = os.path.join(getTestBase(),
"WMCore_t/Storage_t",
"T1_IT_CNAF_SiteLocalConfig.xml")

mySiteConfig = SiteLocalConfig(configFileName)
#test the first stage out method for command 'cp' with protocol 'file'
assert mySiteConfig.stageOuts[0]["command"] == "cp", \
"Error: Wrong stage out command."
assert mySiteConfig.stageOuts[0]["protocol"] == "file", \
"Error: Protocol is not correct."

#assert False
return

def testLoadingConfigFromOverridenEnvVarriable(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
</fallback-stage-out>

<stage-out>
<method volume="KIT_dCache" protocol="WebDAV"/>
<method volume="KIT_dCache" protocol="WebDAV"/>
<method volume="KIT_MSS" protocol="WebDAV"/>
<method volume="KIT_dCache" protocol="xrootd-module"/>
<method volume="KIT_dCache" protocol="XRootDHoreKaGridKa"/>
<method site="T2_DE_DESY" volume="DESY_dCache" protocol="WebDAV"/>
</stage-out>

<calib-data>
Expand Down
50 changes: 50 additions & 0 deletions test/python/WMCore_t/Storage_t/T1_IT_CNAF_SiteLocalConfig.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<site-local-config>
<site name="T1_IT_CNAF">
<event-data>
<catalog url="trivialcatalog_file:/cvmfs/cms.cern.ch/SITECONF/local/PhEDEx/storage.xml?protocol=file"/>
<catalog url="trivialcatalog_file:/cvmfs/cms.cern.ch/SITECONF/local/PhEDEx/storage.xml?protocol=xrootd"/>
</event-data>
<data-access>
<catalog volume="CNAF_GPFS" protocol="file"/>
<catalog volume="Eurasian_Federation" protocol="XRootD"/>
</data-access>
<source-config>
<statistics-destination name="cms-udpmon-collector.cern.ch:9331" />
</source-config>
<local-stage-out>
<catalog url="trivialcatalog_file:/cvmfs/cms.cern.ch/SITECONF/local/PhEDEx/storage.xml?protocol=davs"/>
<se-name value="storm-fe-cms.cr.cnaf.infn.it"/>
<phedex-node value="T1_IT_CNAF_Disk"/>
<command value="gfal2"/>
<option value="-v "/>
</local-stage-out>
<fallback-stage-out>
<se-name value="t2-srm-02.lnl.infn.it"/>
<phedex-node value="T2_IT_Legnaro"/>
<lfn-prefix value="davs://t2-xrdcms.lnl.infn.it:2880/pnfs/lnl.infn.it/data/cms"/>
<command value="gfal2"/>
</fallback-stage-out>
<stage-out>
<method volume="CNAF_GPFS" protocol="file"/>
<method volume="CNAF_GPFS" protocol="WebDAV" option="-v"/>
<method site="T2_IT_Legnaro" volume="Legnaro_dCache" protocol="WebDAV"/>
</stage-out>
<calib-data>
<frontier-connect>
<load balance="proxies"/>
<proxy url="http://squid-lhc-01.cr.cnaf.infn.it:3128"/>
<proxy url="http://squid-lhc-02.cr.cnaf.infn.it:3128"/>
<proxy url="http://squid-lhc-03.cr.cnaf.infn.it:3128"/>
<proxy url="http://squid-lhc-04.cr.cnaf.infn.it:3128"/>
<proxy url="http://squid-lhc-05.cr.cnaf.infn.it:3128"/>
<proxy url="http://squid-lhc-06.cr.cnaf.infn.it:3128"/>
<backupproxy url="http://cmsbpfrontier.cern.ch:3128"/>
<backupproxy url="http://cmsbproxy.fnal.gov:3128"/>
<server url="http://cmsfrontier.cern.ch:8000/FrontierInt"/>
<server url="http://cmsfrontier1.cern.ch:8000/FrontierInt"/>
<server url="http://cmsfrontier2.cern.ch:8000/FrontierInt"/>
<server url="http://cmsfrontier3.cern.ch:8000/FrontierInt"/>
</frontier-connect>
</calib-data>
</site>
</site-local-config>
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
<phedex-node value="T1_US_FNAL_Disk"/>
</local-stage-out>
<stage-out>
<method volume="FNAL_dCache_EOS" protocol="XRootD" command="xrdcp" option="-p"/>
<method volume="FNAL_dCache_EOS" protocol="XRootD" option="-p"/>
<method volume="FNAL_dCache_EOS" protocol="xrootd"/>
<method volume="FNAL_dCache_EOS" protocol="SRMv2"/>
<method volume="FNAL_dCache_EOS" protocol="WebDAV"/>
</stage-out>
Expand Down