#!/usr/bin/python # Copyright (C) 2012 Pablo Fernandez Fernandez # # Apel Parser Fake is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Apel Parser Face is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Apel Parser Fake. If not, see . # This script takes ARC accounting records and converts them to blahp.log-date format # Hopefully this will make APEL understand its records. # Output (in /var/log/cream/accounting/blahp-log-20110415) should be one line per job, with this format: # "timestamp=2012-11-01 03:50:15" "userDN=/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=atlpilo1/CN=614260/CN=Robot: ATLAS Pilot1" "userFQAN=/atlas/Role=pilot/Capability=NULL" "userFQAN=/atlas/Role=NULL/Capability=NULL" "userFQAN=/atlas/lcg1/Role=NULL/Capability=NULL" "userFQAN=/atlas/usatlas/Role=NULL/Capability=NULL" "ceID=cream01.lcg.cscs.ch:8443/cream-pbs-atlas" "jobID=CREAM513648178" "lrmsID=10789640.lrms02.lcg.cscs.ch" "localUser=4800" "clientID=cre01_513648178" # Input (in /var/spool/nordugrid/usagerecords/archive/* ) has XML format. If we remove the section we have: # # # # gsiftp://arc01.lcg.cscs.ch:2811/jobs/zZoLDmP5uignZOuIepQ9oyOmABFKDmABFKDmRlKKDmABFKDmjcsN9n # 10558922.lrms02.lcg.cscs.ch # # # atlasprd # /C=SI/O=SiGNET/O=IJS/OU=F9/CN=Andrej Filipcic # # atlas # /DC=ch/DC=cern/OU=computers/CN=voms.cern.ch # # atlas # production # # # atlas # # # atlas/lcg1 # # # # mc12_8TeV.146854.AlpgenJimmy_Auto_AUET3CTEQ6L1_ZtautauNp4Excl_Mll10to60.simul.e1551_s1499_tid00993911._002085.job # completed # arc01.lcg.cscs.ch # atlas # wn35.lcg.cscs.ch # 1 # 1 # 2012-10-18T02:32:19Z # 2012-10-18T17:44:19Z # 2012-10-18T17:49:08Z # PT289S # PT244S # PT244S # PT0S # 0 # 0 # APPS/HEP/ATLAS-17.2.2.6-X86_64-SLC5-GCC43-OPT # ARC1-URLogger import libxml2 import re,datetime,os,socket from pwd import getpwnam # pre-defined stuff, feel free to change it. hostname = socket.gethostname().split('.')[0] dir_base = '/var/spool/nordugrid/usagerecords' # where input and output dirs are dir_archive = dir_base + '/archive' dir_archive_processed = dir_base + '/archive_apel' dir_blahp_accounting = dir_base + '/blahp' # Info from APEL (check both accounting db: "select * from SpecRecords;" and BDII GlueCEUniqueID string ) queueinfo = socket.gethostname() + ":2811/nordugrid-torque-arc" def parse_file (file): doc = libxml2.parseDoc(open(file, 'r').read()) context = doc.xpathNewContext() context.xpathRegisterNs('ur','http://schema.ogf.org/urf/2003/09/urf') result = {} timezulu = context.xpathEval("//ur:StartTime")[0].content result['timestamp'] = datetime.datetime(*map(int, re.split('[^\d]', timezulu)[:-1])) # Jobs may not have an lrmsID if they were failed to submit internally jobids = context.xpathEval("//ur:LocalJobId") if len(jobids) == 0: return {} else: result['lrmsID'] = jobids[0].content result['userDN'] = context.xpathEval("//ur:GlobalUserName")[0].content username = context.xpathEval("//ur:LocalUserId")[0].content result['localUser'] = getpwnam(username).pw_uid #jobname = context.xpathEval("//ur:JobName")[0].content jobname = result['lrmsID'].split('.')[0] # We take the pbsid instead of the NG jobname # From jobname we extract (invent!) the jobID and clientID result['jobID'] = "ARC" + jobname result['clientID'] = hostname + '_' + jobname # And the CE string (ceID) result['ceID'] = queueinfo # We are missing the userFQAN, but they are in a different namespace result['fqan_array'] = [] context.xpathRegisterNs('vo','http://www.sgas.se/namespaces/2009/05/ur/vo') vo = context.xpathEval("//vo:VO/vo:Attribute") for fqan in vo: # Elements like this: atlasproduction element = fqan.children values = {} while element is not None: values[element.name] = element.content element = element.next # Form the TXT syntax txt = "" if "Group" in values: txt += "/%s" % values["Group"] else: txt += "%s" % values["User"].ERROR # Not tested yet if "Role" in values: txt += "/Role=%s" % values["Role"] else: txt += "/Role=NULL" if "Capability" in values: txt += "/Capability=%s" % values["Role"].ERROR # Not tested yet else: txt += "/Capability=NULL" # And append the result to the array result['fqan_array'].append(txt) return result # BEGIN # We create the output dirs (in case they don't exist) if not os.access (dir_archive_processed, os.W_OK): os.mkdir (dir_archive_processed, 0755) # to move the archive files into (and avoid repetition) if not os.access (dir_blahp_accounting, os.W_OK): os.mkdir (dir_blahp_accounting, 0755) # to generate the blahp records for root, subFolders, files in os.walk(dir_archive): for file in files: #print os.path.join(root,file) line = parse_file(os.path.join(root,file)) if len(line) == 0: continue # To test for just one day # if line['timestamp'].strftime('%Y%m%d') != '20121005': # continue # Open the target file (concat) outputfile = dir_blahp_accounting + '/blahp.log-' + line['timestamp'].strftime('%Y%m%d') fd = open(outputfile, 'a') # os.O_APPEND | os.O_CREAT) # Print the full line: fd.write ('"timestamp=%s" "userDN=%s" ' % (line['timestamp'], line['userDN'])) for i in line['fqan_array']: fd.write ('"userFQAN=%s" ' % i) fd.write ('"ceID=%s" "jobID=%s" "lrmsID=%s" "localUser=%i" "clientID=%s"\n' % (line['ceID'], line['jobID'], line['lrmsID'], line['localUser'], line['clientID'])) fd.close() # And finally move out the file from the archive os.rename(os.path.join(dir_archive,file), os.path.join(dir_archive_processed,file)) # But the un-processed ones will remain (they stay there for one month by default, cleaned up by ARC)