Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/python/WMComponent/JobSubmitter/JobSubmitterPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from WMCore.Services.ReqMgrAux.ReqMgrAux import ReqMgrAux

from WMComponent.JobSubmitter.JobSubmitAPI import availableScheddSlots

from WMCore.BossAir.Plugins.SimpleCondorPlugin import CondorScheddUnavailable

def jobSubmitCondition(jobStats):
for jobInfo in jobStats:
Expand Down Expand Up @@ -754,7 +754,25 @@ def submitJobs(self, jobsToSubmit):
myThread.transaction.begin()

# Run the actual underlying submit code using bossAir
successList, failList = self.bossAir.submit(jobs=jobList)
try:
successList, failList = self.bossAir.submit(jobs=jobList)

except CondorScheddUnavailable as ex:
msg = "Condor Schedd is unavailable: %s" % str(ex)
logging.error(msg)
myThread.logdbClient.post("JobSubmitter_submitWork", msg, "error")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogDB usage is annoying in the sense that documents are not automatically cleaned up. In other words, if we create an error record, the agent/component would keep this error in the LogDB (and WMStats) until someone decides to delete it.

I would be in favor of not adding LogDB here and rely on other ways to monitor agent job submission.

# dont raise WMException, just return
logging.warning("JobSubmitter didn't submit any jobs due to condor schedd being unavailable.")
# TODO: verify if we shoule rollback the transaction or not?
myThread.transaction.rollback()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please investigate further if self.bossAir.submit(jobs=jobList) is actually persisting anything in the relational database? I think it relies on the lines below for persisting data in the database. If that is true, then there is no need to add rollback logic in here.

return

except Exception as ex:
msg = "Error submitting jobs: %s" % str(ex)
logging.error(msg)
myThread.logdbClient.post("JobSubmitter_submitWork", msg, "error")
raise WMException(msg) from ex

logging.info("Jobs that succeeded/failed submission: %d/%d.", len(successList), len(failList))

# Propagate states in the WMBS database
Expand Down
1 change: 1 addition & 0 deletions src/python/WMComponent/JobUpdater/JobUpdaterPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def synchronizeJobPriority(self):
# if there are jobs in wmbs executing state, update their prio in condor
if self.executingJobsDAO.execute(workflow) > 0:
logging.info("Updating condor jobs priority for request: %s", workflow)
# TODO: verify if we should wrap this in a try/except for the CondorScheddException as well?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I would rather address this in a new/separate issue such that we only print a friendly error message, instead of:

2025-03-01 12:42:46,832:139956067981056:ERROR:SimpleCondorPlugin:Unable to edit jobs matching constraint
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/WMCore/BossAir/Plugins/SimpleCondorPlugin.py", line 486, in updateJobInformation
    schedd.edit(constraint, 'JobPrio', classad.Literal(newPriority))
  File "/usr/local/lib/python3.8/site-packages/htcondor/_lock.py", line 70, in wrapper
    rv = func(*args, **kwargs)
htcondor.HTCondorIOError: Unable to edit jobs matching constraint

self.bossAir.updateJobInformation(workflow,
requestPriority=priorityCache[workflow])
workflowsToUpdateWMBS[workflow] = priorityCache[workflow]
Expand Down
50 changes: 42 additions & 8 deletions src/python/WMCore/BossAir/Plugins/SimpleCondorPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from WMCore.WMInit import getWMBASE
from WMCore.Lexicon import getIterMatchObjectOnRegexp, WMEXCEPTION_REGEXP, CONDOR_LOG_FILTER_REGEXP
from WMCore.Services.TagCollector.TagCollector import TagCollector
from WMCore.WMException import WMException

def activityToType(jobActivity):
"""
Expand All @@ -42,6 +43,22 @@ def activityToType(jobActivity):
return activityMap.get(jobActivity, "unknown")


class CondorScheddUnavailable(WMException):
"""
_CondorScheddUnavailable_

Exception raised when we fail to create a condor schedd object
"""

def __init__(self, msg):
"""
_CondorScheddUnavailable_

Create a new exception
"""
WMException.__init__(self, msg)


class SimpleCondorPlugin(BasePlugin):
"""
_SimpleCondorPlugin_
Expand Down Expand Up @@ -144,6 +161,21 @@ def __init__(self, config):
self.useCMSToken = getattr(config.JobSubmitter, 'useOauthToken', False)

return

def getScheddObject(self):
"""
__getScheddObject_
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to remove this line (old docstring style that we are trying not to use anymore) - same for the new exception implemented above.


Return a Schedd object for the current condor schedd
"""
try:
schedd = htcondor.Schedd()
except RuntimeError as ex:
msg = "Failed to create a condor schedd object: %s" % str(ex)
raise CondorScheddUnavailable(msg)

return schedd


def submit(self, jobs, info=None):
"""
Expand All @@ -158,7 +190,7 @@ def submit(self, jobs, info=None):
# Then was have nothing to do
return successfulJobs, failedJobs

schedd = htcondor.Schedd()
schedd = self.getScheddObject()

# Submit the jobs
for jobsReady in grouper(jobs, self.jobsPerSubmit):
Expand Down Expand Up @@ -208,7 +240,7 @@ def track(self, jobs):
# get info about all active and recent jobs
logging.debug("SimpleCondorPlugin is going to track %s jobs", len(jobs))

schedd = htcondor.Schedd()
schedd = self.getScheddObject()

logging.debug("Start: Retrieving classAds using Condor Python query")
try:
Expand Down Expand Up @@ -369,7 +401,8 @@ def updateSiteInformation(self, jobs, siteName, excludeSite):
Parameters: excludeSite = False when moving to Normal
excludeSite = True when moving to Down, Draining or Aborted
"""
sd = htcondor.Schedd()

sd = self.getScheddObject()
jobIdToKill = []
jobtokill = []
origSiteLists = set()
Expand Down Expand Up @@ -436,8 +469,8 @@ def kill(self, jobs, raiseEx=False):
Kill can happen for schedd running on localhost... TBC.
"""
logging.info("Killing %i jobs from the queue", len(jobs))

schedd = htcondor.Schedd()
schedd = self.getScheddObject()
gridIds = [job['gridid'] for job in jobs]
try:
schedd.act(htcondor.JobAction.Remove, gridIds)
Expand All @@ -455,8 +488,8 @@ def killWorkflowJobs(self, workflow):
Kill all the jobs belonging to a specific workflow.
"""
logging.info("Going to remove all the jobs for workflow %s", workflow)

schedd = htcondor.Schedd()
schedd = self.getScheddObject()

try:
schedd.act(htcondor.JobAction.Remove, "WMAgent_RequestName == %s" % classad.quote(str(workflow)))
Expand All @@ -478,7 +511,8 @@ def updateJobInformation(self, workflow, **kwargs):
Since the default priority is very high, we only need to adjust new priorities
for processing/production task types (which have a task priority of 0)
"""
schedd = htcondor.Schedd()

schedd = self.getScheddObject()

if 'requestPriority' in kwargs:
newPriority = int(kwargs['requestPriority'])
Expand Down
17 changes: 16 additions & 1 deletion src/python/WMCore/BossAir/StatusPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from WMCore.WMExceptions import WM_JOB_ERROR_CODES
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread
from WMCore.BossAir.BossAirAPI import BossAirAPI
from WMCore.BossAir.Plugins.SimpleCondorPlugin import CondorScheddUnavailable

class StatusPollerException(WMException):
"""
Expand Down Expand Up @@ -68,6 +69,14 @@ def algorithm(self, parameters=None):
try:
logging.info("Running job status poller algorithm...")
self.checkStatus()

except CondorScheddUnavailable as ex:
msg = "Condor Schedd is unavailable: %s" % str(ex)
logging.error(msg)
if getattr(myThread, 'transaction', None):
myThread.transaction.rollbackForError()
logging.info("JobStatusLite failed to run, will retry in next cycle")

except WMException as ex:
if getattr(myThread, 'transaction', None):
myThread.transaction.rollbackForError()
Expand All @@ -90,8 +99,14 @@ def checkStatus(self):
and then check for jobs that have timed out.
"""

try:
runningJobs = self.bossAir.track()
except Exception as ex:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code needs to be rolled back. Otherwise, it defeats the new CondorScheddUnavailable catch implemented in the algorithm above.

msg = "Error in BossAir track call: %s" % str(ex)
logging.error(msg)
runningJobs = []
raise WMException(msg)from ex

runningJobs = self.bossAir.track()

if len(runningJobs) < 1:
# Then we have no jobs
Expand Down