File

src/jobs/jobs.service.ts

Index

Properties
Methods

Constructor

constructor(configService: ConfigService, datasetsService: DatasetsService, jobModel: Model<JobDocument>, mailService: MailService, policiesService: PoliciesService)
Parameters :
Name Type Optional
configService ConfigService No
datasetsService DatasetsService No
jobModel Model<JobDocument> No
mailService MailService No
policiesService PoliciesService No

Methods

Async applyPolicyAndSendEmail
applyPolicyAndSendEmail(jobType: string, policy: Partial<Policy>, emailContext: Record, to: string, cc: string)
Parameters :
Name Type Optional Default value
jobType string No
policy Partial<Policy> No
emailContext Record<string | > No
to string No
cc string No ""
Returns : any
Async create
create(createJobDto: CreateJobDto)
Parameters :
Name Type Optional
createJobDto CreateJobDto No
Returns : Promise<Job>
Async findAll
findAll(filter: IFilters<JobDocument | FilterQuery<JobDocument>>)
Parameters :
Name Type Optional
filter IFilters<JobDocument | FilterQuery<JobDocument>> No
Returns : Promise<Job[]>
Async findOne
findOne(filter: FilterQuery<JobDocument>)
Parameters :
Name Type Optional
filter FilterQuery<JobDocument> No
Async fullfacet
fullfacet(filters: IFacets<FilterQuery<JobDocument>>)
Parameters :
Name Type Optional
filters IFacets<FilterQuery<JobDocument>> No
Returns : Promise<Record[]>
Async fullquery
fullquery(filter: IFilters<JobDocument | FilterQuery<JobDocument>>)
Parameters :
Name Type Optional
filter IFilters<JobDocument | FilterQuery<JobDocument>> No
Returns : Promise<Job[]>
Async getPolicy
getPolicy(datasetId: string)
Parameters :
Name Type Optional
datasetId string No
Async markDatasetsAsScheduled
markDatasetsAsScheduled(ids: string[], jobType: string)
Parameters :
Name Type Optional
ids string[] No
jobType string No
Returns : any
Async remove
remove(filter: FilterQuery<JobDocument>)
Parameters :
Name Type Optional
filter FilterQuery<JobDocument> No
Returns : Promise<>
Async sendEmail
sendEmail(to: string, cc: string, emailContext: Record)
Parameters :
Name Type Optional
to string No
cc string No
emailContext Record<string | > No
Returns : any
Async sendFinishJobEmail
sendFinishJobEmail(context: literal type)
Decorators :
@OnEvent('jobUpdated')
Parameters :
Name Type Optional
context literal type No
Returns : any
Async sendStartJobEmail
sendStartJobEmail(context: literal type)
Decorators :
@OnEvent('jobCreated')
Parameters :
Name Type Optional
context literal type No
Returns : any
Async update
update(filter: FilterQuery<JobDocument>, updateJobDto: UpdateJobDto)
Parameters :
Name Type Optional
filter FilterQuery<JobDocument> No
updateJobDto UpdateJobDto No

Properties

Private domainName
Default value : process.env.HOST
Private smtpMessageFrom
import { Injectable, Logger, NotFoundException } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { OnEvent } from "@nestjs/event-emitter";
import { InjectModel } from "@nestjs/mongoose";
import { readFileSync } from "fs";
import { compile } from "handlebars";
import { FilterQuery, Model, PipelineStage, QueryOptions } from "mongoose";
import { IFacets, IFilters } from "src/common/interfaces/common.interface";
import { MailService } from "src/common/mail.service";
import {
  createFullfacetPipeline,
  createFullqueryFilter,
  parseLimitFilters,
} from "src/common/utils";
import { DatasetsService } from "src/datasets/datasets.service";
import { IDatasetFields } from "src/datasets/interfaces/dataset-filters.interface";
import { DatasetDocument } from "src/datasets/schemas/dataset.schema";
import { PoliciesService } from "src/policies/policies.service";
import { Policy } from "src/policies/schemas/policy.schema";
import { CreateJobDto } from "./dto/create-job.dto";
import { UpdateJobDto } from "./dto/update-job.dto";
import { JobType } from "./job-type.enum";
import { Job, JobDocument } from "./schemas/job.schema";

@Injectable()
export class JobsService {
  private domainName = process.env.HOST;
  private smtpMessageFrom;

  constructor(
    private configService: ConfigService,
    private datasetsService: DatasetsService,
    @InjectModel(Job.name) private jobModel: Model<JobDocument>,
    private mailService: MailService,
    private policiesService: PoliciesService,
  ) {
    this.smtpMessageFrom = this.configService.get<string>("smtp.messageFrom");
  }

  async create(createJobDto: CreateJobDto): Promise<Job> {
    const createdJob = new this.jobModel(createJobDto);
    return createdJob.save();
  }

  async findAll(
    filter: IFilters<JobDocument, FilterQuery<JobDocument>>,
  ): Promise<Job[]> {
    const whereFilters: FilterQuery<JobDocument> = filter.where ?? {};
    const { limit, skip, sort } = parseLimitFilters(filter.limits);

    return this.jobModel
      .find(whereFilters)
      .limit(limit)
      .skip(skip)
      .sort(sort)
      .exec();
  }

  async fullquery(
    filter: IFilters<JobDocument, FilterQuery<JobDocument>>,
  ): Promise<Job[]> {
    const filterQuery: FilterQuery<JobDocument> =
      createFullqueryFilter<JobDocument>(this.jobModel, "id", filter.fields);
    const modifiers: QueryOptions = parseLimitFilters(filter.limits);

    return await this.jobModel.find(filterQuery, null, modifiers).exec();
  }

  async fullfacet(
    filters: IFacets<FilterQuery<JobDocument>>,
  ): Promise<Record<string, unknown>[]> {
    const fields = filters.fields ?? {};
    const facets = filters.facets ?? [];

    const pipeline: PipelineStage[] = createFullfacetPipeline<
      JobDocument,
      FilterQuery<JobDocument>
    >(this.jobModel, "id", fields, facets);

    return await this.jobModel.aggregate(pipeline).exec();
  }

  async findOne(filter: FilterQuery<JobDocument>): Promise<Job | null> {
    return this.jobModel.findOne(filter).exec();
  }

  async update(
    filter: FilterQuery<JobDocument>,
    updateJobDto: UpdateJobDto,
  ): Promise<Job | null> {
    return this.jobModel
      .findOneAndUpdate(filter, updateJobDto, { new: true })
      .exec();
  }

  async remove(filter: FilterQuery<JobDocument>): Promise<unknown> {
    return this.jobModel.findOneAndRemove(filter).exec();
  }

  @OnEvent("jobCreated")
  async sendStartJobEmail(context: { instance: Job }) {
    const ids: string[] = context.instance.datasetList.map(
      (dataset) => dataset.pid as string,
    );
    const to: string = context.instance.emailJobInitiator;
    const jobType: string = context.instance.type;
    await this.markDatasetsAsScheduled(ids, jobType);

    const filter: IFilters<DatasetDocument, IDatasetFields> = {
      where: {
        pid: {
          $in: ids,
        },
      },
    };

    const jobData = ["archive", "retrieve"].includes(jobType)
      ? (await this.datasetsService.findAll(filter)).map((dataset) => ({
          pid: dataset.pid,
          ownerGroup: dataset.ownerGroup,
          sourceFolder: dataset.sourceFolder,
          size: dataset.size,
          archivable: dataset.datasetlifecycle?.archivable,
          retrievable: dataset.datasetlifecycle?.retrievable,
        }))
      : [];

    const emailContext = {
      domainName: this.domainName,
      subject: `Your ${jobType} job submitted successfully`,
      jobSubmissionNotification: {
        jobId: context.instance.id,
        jobType,
        jobData,
      },
    };

    const policy = await this.getPolicy(ids[0]);
    await this.applyPolicyAndSendEmail(jobType, policy, emailContext, to);
  }

  // Populate email context for finished job notification
  @OnEvent("jobUpdated")
  async sendFinishJobEmail(context: {
    instance: Job;
    hookState: { oldData: Job[] };
  }) {
    // Iterate through list of jobs that were updated
    // Iterate in case of bulk update send out email to each job
    context.hookState.oldData.forEach(async (oldData) => {
      const currentData = await this.findOne({ id: oldData.id });
      //Check that statusMessage has changed. Only run on finished job
      if (
        currentData &&
        currentData.jobStatusMessage !== oldData.jobStatusMessage &&
        currentData.jobStatusMessage.indexOf("finish") !== -1
      ) {
        const ids = currentData.datasetList.map(
          (dataset) => dataset.pid as string,
        );
        const to = currentData.emailJobInitiator;
        const {
          type: jobType,
          id: jobId,
          jobStatusMessage,
          jobResultObject,
        } = currentData;
        const failure = jobStatusMessage.indexOf("finishedSuccessful") === -1;
        const filter = {
          where: {
            pid: {
              $in: ids,
            },
          },
        };

        const datasets = (await this.datasetsService.findAll(filter)).map(
          (dataset) => ({
            pid: dataset.pid,
            ownerGroup: dataset.ownerGroup,
            sourceFolder: dataset.sourceFolder,
            size: dataset.size,
            archiveStatusMessage:
              dataset.datasetlifecycle?.archiveStatusMessage,
            retrieveStatusMessage:
              dataset.datasetlifecycle?.retrieveStatusMessage,
            archiveReturnMessage:
              dataset.datasetlifecycle?.archiveReturnMessage,
            retrieveReturnMessage:
              dataset.datasetlifecycle?.retrieveReturnMessage,
            retrievable: dataset.datasetlifecycle?.retrievable,
          }),
        );

        // split result into good and bad
        const good = datasets.filter((dataset) => dataset.retrievable);
        const bad = datasets.filter((dataset) => !dataset.retrievable);

        // add cc message in case of failure to scicat archivemanager
        const cc =
          bad.length > 0 && this.smtpMessageFrom ? this.smtpMessageFrom : "";
        const creationTime = currentData.creationTime
          .toISOString()
          .replace(/T/, " ")
          .replace(/\..+/, "");
        const additionalMsg =
          jobType === JobType.Retrieve && good.length > 0
            ? "You can now use the command 'datasetRetriever' to move the retrieved datasets to their final destination."
            : "";

        const emailContext = {
          domainName: this.domainName,
          subject: ` Your ${jobType} job from ${creationTime} is finished ${
            failure ? "with failure" : "successfully"
          }`,
          jobFinishedNotification: {
            jobId,
            jobType,
            failure,
            creationTime,
            jobStatusMessage,
            jobResultObject: jobResultObject,
            datasets: {
              good,
              bad,
            },
            additionalMsg,
          },
        };

        const policy = await this.getPolicy(ids[0]);
        await this.applyPolicyAndSendEmail(
          jobType,
          policy,
          emailContext,
          to,
          cc,
        );
      }
    });
  }

  async markDatasetsAsScheduled(ids: string[], jobType: string) {
    const statusMessage = {
      retrieve: "scheduledForRetrieval",
      archive: "scheduledForArchiving",
    };
    const filter = {
      pid: {
        $in: ids,
      },
    };

    switch (jobType) {
      case JobType.Archive: {
        const values = {
          $set: {
            "datasetlifecycle.archivable": false,
            "datasetlifecycle.retrievable": false,
            [`datasetlifecycle.${jobType}StatusMessage`]:
              statusMessage[jobType],
          },
        };
        await this.datasetsService.updateAll(filter, values);
        break;
      }
      case JobType.Retrieve: {
        const values = {
          $set: {
            [`datasetlifecycle.${jobType}StatusMessage`]:
              statusMessage[jobType],
          },
        };
        await this.datasetsService.updateAll(filter, values);
        break;
      }
      default:
        break;
    }
  }

  async getPolicy(datasetId: string): Promise<Partial<Policy>> {
    try {
      const dataset = await this.datasetsService.findOne({
        where: { pid: datasetId },
      });
      if (!dataset) {
        throw new NotFoundException(
          "Could not dataset with pid " + datasetId,
          "JobsService",
        );
      }
      const policy = await this.policiesService.findOne({
        ownerGroup: dataset.ownerGroup,
      });

      if (policy) {
        return policy;
      }
    } catch (error) {
      const message = "Error when looking for Policy of pgroup " + error;
      Logger.error("Dataset ID: " + datasetId, "JobsService");
      Logger.error(message);
    }

    Logger.log(
      "No policy found for dataset with id: " + datasetId,
      "JobsService",
    );
    Logger.log("Returning default policy instead", "JobsService");
    // this should not happen anymore, but kept as additional safety belt
    const defaultPolicy: Partial<Policy> = {
      archiveEmailNotification: true,
      retrieveEmailNotification: true,
      archiveEmailsToBeNotified: [],
      retrieveEmailsToBeNotified: [],
    };
    return defaultPolicy;
  }

  async applyPolicyAndSendEmail(
    jobType: string,
    policy: Partial<Policy>,
    emailContext: Record<string, unknown>,
    to: string,
    cc = "",
  ) {
    const { failure } = emailContext;

    switch (jobType) {
      case JobType.Archive: {
        const { archiveEmailNotification, archiveEmailsToBeNotified } = policy;
        if (archiveEmailsToBeNotified) {
          to += "," + archiveEmailsToBeNotified.join();
        }

        // Always notify on failure
        if (archiveEmailNotification || failure) {
          await this.sendEmail(to, cc, emailContext);
        }
        break;
      }
      case JobType.Retrieve: {
        const { retrieveEmailNotification, retrieveEmailsToBeNotified } =
          policy;

        if (retrieveEmailsToBeNotified) {
          to += "," + retrieveEmailsToBeNotified.join();
        }

        // Always notify on failure
        if (retrieveEmailNotification || failure) {
          await this.sendEmail(to, cc, emailContext);
        }
        break;
      }
      default: {
        // For other jobs like reset job
        await this.sendEmail(to, cc, emailContext);
        break;
      }
    }
  }

  async sendEmail(
    to: string,
    cc: string,
    emailContext: Record<string, unknown>,
  ) {
    const htmlTemplate = readFileSync(
      "src/common/email-templates/job-template.html",
      "utf-8",
    );
    const emailTemplate = compile(htmlTemplate);
    const email = emailTemplate(emailContext);
    const subject = emailContext.subject as string;
    await this.mailService.sendMail(to, cc, subject, null, email);
  }
}

results matching ""

    No results matching ""