Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 57 additions & 53 deletions src/python/WMCore/MicroService/MSRuleCleaner/MSRuleCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

# futures
from __future__ import division, print_function
from future.utils import viewvalues

import json
import re
Expand All @@ -24,6 +23,8 @@
from threading import current_thread
from pprint import pformat

from future.utils import viewitems

# WMCore modules
from WMCore.MicroService.DataStructs.DefaultStructs import RULECLEANER_REPORT
from WMCore.MicroService.MSRuleCleaner.MSRuleCleanerWflow import MSRuleCleanerWflow
Expand All @@ -37,6 +38,7 @@
from WMCore.MicroService.Tools.Common import findParent
from Utils.Pipeline import Pipeline, Functor
from Utils.CertTools import ckey, cert
from Utils.IteratorTools import getChunk


class MSRuleCleanerResolveParentError(WMException):
Expand Down Expand Up @@ -104,6 +106,9 @@ def __init__(self, msConfig, logger=None):
# 2 days of expiration time
self.alertExpiration = self.msConfig.get("alertExpireSecs", 2 * 24 * 60 * 60)

# chunk size for requests processing
self.chunkSize = self.msConfig.get("requestChunkSize", 100)

# Building all the Pipelines:
pName = 'plineMSTrCont'
self.plineMSTrCont = Pipeline(name=pName,
Expand Down Expand Up @@ -207,59 +212,52 @@ def execute(self, reqStatus):
self.logger.info("MSRuleCleaner is running in mode: %s.", self.mode)

# Build the list of workflows to work on:
#requestRecords = {}
try:
requestRecords = {}
self.getGlobalLocks()
# in this loop we'll only allocate single wflow object, process it and collect metrics
# therefore, the memory allocation will be flat regardless of number of records.
cleanNumRequests = 0
totalNumRequests = 0
for status in reqStatus:
requestRecords.update(self.getRequestRecords(status))
reqIds = self.getRequestRecords(status, detail=False)
ids = reqIds.keys()
for chunk in getChunk(ids, self.chunkSize):
cnum, tnum = self.processRequestsChunk(chunk)
cleanNumRequests += cnum
totalNumRequests += tnum

# Report the counters:
for pline in self.cleanuplines:
msg = "Workflows cleaned by pipeline: %s: %d"
self.logger.info(msg, pline.name, self.wfCounters['cleaned'][pline.name])
normalArchivedNumRequests = self.wfCounters['archived']['normalArchived']
forceArchivedNumRequests = self.wfCounters['archived']['forceArchived']
self.logger.info("Workflows normally archived: %d", self.wfCounters['archived']['normalArchived'])
self.logger.info("Workflows force archived: %d", self.wfCounters['archived']['forceArchived'])

self.updateAllMetrics(summary, totalNumRequests, cleanNumRequests, normalArchivedNumRequests, forceArchivedNumRequests)

except Exception as err: # general error
msg = "Unknown exception while fetching requests from ReqMgr2. Error: %s", str(err)
self.logger.exception(msg)
self.updateReportDict(summary, "error", msg)

# Call _execute() and feed the relevant pipeline with the objects popped from requestRecords
try:
self.getGlobalLocks()
totalNumRequests, cleanNumRequests, normalArchivedNumRequests, forceArchivedNumRequests = self._execute(requestRecords)
msg = "\nNumber of processed workflows: %s."
msg += "\nNumber of properly cleaned workflows: %s."
msg += "\nNumber of normally archived workflows: %s."
msg += "\nNumber of force archived workflows: %s."
self.logger.info(msg,
totalNumRequests,
cleanNumRequests,
normalArchivedNumRequests,
forceArchivedNumRequests)
self.updateReportDict(summary, "total_num_requests", totalNumRequests)
self.updateReportDict(summary, "clean_num_requests", cleanNumRequests)
self.updateReportDict(summary, "normal_archived_num_requests", normalArchivedNumRequests)
self.updateReportDict(summary, "force_archived_num_requests", forceArchivedNumRequests)
except Exception as ex:
msg = "Unknown exception while running MSRuleCleaner thread Error: {}".format(str(ex))
self.logger.exception(msg)
self.updateReportDict(summary, "error", msg)

return summary

def _execute(self, reqRecords):
def processRequestsChunk(self, chunk):
"""
Executes the MSRuleCleaner pipelines based on the workflow status
:param reqList: A list of RequestRecords to work on
:return: a tuple with:
number of properly cleaned requests
number of processed workflows
number of archived workflows
number of forced archived workflows
Helper function to process requests chunk
:param chunk: list of request's ids
:return: tuple of clean and total number of requests
"""
# NOTE: The Input Cleanup, the Block Level Cleanup and the Archival
# Pipelines are executed sequentially in the above order.
# This way we assure ourselves that we archive only workflows
# that have accomplished the needed cleanup

cleanNumRequests = 0
totalNumRequests = 0

# Call the workflow dispatcher:
for req in viewvalues(reqRecords):
result = self.reqmgr2.getRequestByNames(chunk)
requests = {}
if result:
requests = result[0]
for req in viewitems(requests):
wflow = MSRuleCleanerWflow(req)
self._dispatchWflow(wflow)
msg = "\n----------------------------------------------------------"
Expand All @@ -269,16 +267,22 @@ def _execute(self, reqRecords):
totalNumRequests += 1
if self._checkClean(wflow):
cleanNumRequests += 1

# Report the counters:
for pline in self.cleanuplines:
msg = "Workflows cleaned by pipeline: %s: %d"
self.logger.info(msg, pline.name, self.wfCounters['cleaned'][pline.name])
normalArchivedNumRequests = self.wfCounters['archived']['normalArchived']
forceArchivedNumRequests = self.wfCounters['archived']['forceArchived']
self.logger.info("Workflows normally archived: %d", self.wfCounters['archived']['normalArchived'])
self.logger.info("Workflows force archived: %d", self.wfCounters['archived']['forceArchived'])
return totalNumRequests, cleanNumRequests, normalArchivedNumRequests, forceArchivedNumRequests
return cleanNumRequests, totalNumRequests

def updateAllMetrics(self, summary, totalNumRequests, cleanNumRequests, normalArchivedNumRequests, forceArchivedNumRequests):
msg = "\nNumber of processed workflows: %s."
msg += "\nNumber of properly cleaned workflows: %s."
msg += "\nNumber of normally archived workflows: %s."
msg += "\nNumber of force archived workflows: %s."
self.logger.info(msg,
totalNumRequests,
cleanNumRequests,
normalArchivedNumRequests,
forceArchivedNumRequests)
self.updateReportDict(summary, "total_num_requests", totalNumRequests)
self.updateReportDict(summary, "clean_num_requests", cleanNumRequests)
self.updateReportDict(summary, "normal_archived_num_requests", normalArchivedNumRequests)
self.updateReportDict(summary, "force_archived_num_requests", forceArchivedNumRequests)

def _dispatchWflow(self, wflow):
"""
Expand Down Expand Up @@ -776,14 +780,14 @@ def cleanRucioRules(self, wflow):
wflow['CleanupStatus'][currPline] = all(delResults)
return wflow

def getRequestRecords(self, reqStatus):
def getRequestRecords(self, reqStatus, detail=True):
"""
Queries ReqMgr2 for requests in a given status.
:param reqStatus: The status for the requests to be fetched from ReqMgr2
:return requests: A dictionary with all the workflows in the given status
"""
self.logger.info("Fetching requests in status: %s", reqStatus)
result = self.reqmgr2.getRequestByStatus([reqStatus], detail=True)
result = self.reqmgr2.getRequestByStatus([reqStatus], detail=detail)
if not result:
requests = {}
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# system modules
import os
import unittest
import tracemalloc

# WMCore modules
from WMCore.MicroService.MSRuleCleaner.MSRuleCleaner import MSRuleCleaner, MSRuleCleanerArchivalSkip
Expand Down Expand Up @@ -392,9 +393,23 @@ def testPipelineArchiveStepChain(self):
with self.assertRaises(MSRuleCleanerArchivalSkip):
self.msRuleCleaner.plineArchive.run(wflow)

def testRunning(self):
result = self.msRuleCleaner._execute(self.reqRecords)
self.assertEqual(result, (3, 2, 0, 0))
def testMemoryLeak(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably didn't want to persist this memory leak investigation as a unit test (?)

"""
unit test for measuring potential memory leak in MSRuleCleanerWflow
"""
tracemalloc.start()
MSRuleCleanerWflow(self.taskChainReq)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.assertTrue(abs(peak-current), 100)

# now let's test if we have memory leak in a loop
tracemalloc.start()
for _ in range(1000):
MSRuleCleanerWflow(self.taskChainReq)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.assertTrue(abs(peak-current), 100)

def testCheckClean(self):
# NOTE: All of the bellow checks are well visualized at:
Expand Down