adding parallel run lock for jobs

This commit is contained in:
Patrik J. Braun 2019-12-31 13:38:23 +01:00
parent 5a0222024b
commit 1acbc67638
18 changed files with 118 additions and 61 deletions

View File

@ -51,21 +51,21 @@ export class AdminMWs {
} }
public static startJob(soloRun: boolean) { public static async startJob(req: Request, res: Response, next: NextFunction) {
return async (req: Request, res: Response, next: NextFunction) => { try {
try { const id = req.params.id;
const id = req.params.id; const JobConfig: any = req.body.config;
const JobConfig: any = req.body.config; const soloRun: boolean = req.body.soloRun;
await ObjectManagers.getInstance().JobManager.run(id, JobConfig, soloRun); const allowParallelRun: boolean = req.body.allowParallelRun;
req.resultPipe = 'ok'; await ObjectManagers.getInstance().JobManager.run(id, JobConfig, soloRun, allowParallelRun);
return next(); req.resultPipe = 'ok';
} catch (err) { return next();
if (err instanceof Error) { } catch (err) {
return next(new ErrorDTO(ErrorCodes.JOB_ERROR, 'Job error: ' + err.toString(), err)); if (err instanceof Error) {
} return next(new ErrorDTO(ErrorCodes.JOB_ERROR, 'Job error: ' + err.toString(), err));
return next(new ErrorDTO(ErrorCodes.JOB_ERROR, 'Job error: ' + JSON.stringify(err, null, ' '), err));
} }
}; return next(new ErrorDTO(ErrorCodes.JOB_ERROR, 'Job error: ' + JSON.stringify(err, null, ' '), err));
}
} }
public static stopJob(req: Request, res: Response, next: NextFunction) { public static stopJob(req: Request, res: Response, next: NextFunction) {

View File

@ -4,7 +4,7 @@ import {JobDTO} from '../../../../common/entities/job/JobDTO';
export interface IJobManager { export interface IJobManager {
run(jobId: string, config: any, soloRun: boolean): Promise<void>; run(jobId: string, config: any, soloRun: boolean, allowParallelRun: boolean): Promise<void>;
stop(jobId: string): void; stop(jobId: string): void;

View File

@ -22,15 +22,28 @@ export class JobManager implements IJobManager, IJobListener {
this.runSchedules(); this.runSchedules();
} }
protected get JobRunning(): boolean {
return JobRepository.Instance.getAvailableJobs().findIndex(j => j.InProgress === true) !== -1;
}
protected get JobNoParallelRunning(): boolean {
return JobRepository.Instance.getAvailableJobs()
.findIndex(j => j.InProgress === true && j.allowParallelRun) !== -1;
}
getProgresses(): { [id: string]: JobProgressDTO } { getProgresses(): { [id: string]: JobProgressDTO } {
return this.progressManager.Progresses; return this.progressManager.Progresses;
} }
async run<T>(jobName: string, config: T, soloRun: boolean): Promise<void> { async run<T>(jobName: string, config: T, soloRun: boolean, allowParallelRun: boolean): Promise<void> {
if ((allowParallelRun === false && this.JobRunning === true) || this.JobNoParallelRunning === true) {
throw new Error('Can\'t start this job while an other is running');
}
const t = this.findJob(jobName); const t = this.findJob(jobName);
if (t) { if (t) {
t.JobListener = this; t.JobListener = this;
await t.start(config, soloRun); await t.start(config, soloRun, allowParallelRun);
} else { } else {
Logger.warn(LOG_TAG, 'cannot find job to start:' + jobName); Logger.warn(LOG_TAG, 'cannot find job to start:' + jobName);
} }
@ -49,7 +62,6 @@ export class JobManager implements IJobManager, IJobListener {
this.progressManager.onJobProgressUpdate(progress.toDTO()); this.progressManager.onJobProgressUpdate(progress.toDTO());
}; };
onJobFinished = async (job: IJob<any>, state: JobProgressStates, soloRun: boolean): Promise<void> => { onJobFinished = async (job: IJob<any>, state: JobProgressStates, soloRun: boolean): Promise<void> => {
// if it was not finished peacefully or was a soloRun, do not start the next one // if it was not finished peacefully or was a soloRun, do not start the next one
if (state !== JobProgressStates.finished || soloRun === true) { if (state !== JobProgressStates.finished || soloRun === true) {
@ -61,7 +73,7 @@ export class JobManager implements IJobManager, IJobListener {
(<AfterJobTrigger>s.trigger).afterScheduleName === sch.name); (<AfterJobTrigger>s.trigger).afterScheduleName === sch.name);
for (let i = 0; i < children.length; ++i) { for (let i = 0; i < children.length; ++i) {
try { try {
await this.run(children[i].jobName, children[i].config, false); await this.run(children[i].jobName, children[i].config, false, children[i].allowParallelRun);
} catch (e) { } catch (e) {
NotificationManager.warning('Job running error:' + children[i].name, e.toString()); NotificationManager.warning('Job running error:' + children[i].name, e.toString());
} }
@ -96,7 +108,7 @@ export class JobManager implements IJobManager, IJobListener {
const timer: NodeJS.Timeout = setTimeout(async () => { const timer: NodeJS.Timeout = setTimeout(async () => {
this.timers = this.timers.filter(t => t.timer !== timer); this.timers = this.timers.filter(t => t.timer !== timer);
await this.run(schedule.jobName, schedule.config, false); await this.run(schedule.jobName, schedule.config, false, schedule.allowParallelRun);
this.runSchedule(schedule); this.runSchedule(schedule);
}, nextDate.getTime() - Date.now()); }, nextDate.getTime() - Date.now());
this.timers.push({schedule: schedule, timer: timer}); this.timers.push({schedule: schedule, timer: timer});

View File

@ -33,6 +33,7 @@ export class JobProgressManager {
} }
onJobProgressUpdate(progress: JobProgressDTO) { onJobProgressUpdate(progress: JobProgressDTO) {
this.db.progresses[progress.HashName] = {progress: progress, timestamp: Date.now()}; this.db.progresses[progress.HashName] = {progress: progress, timestamp: Date.now()};
this.delayedSave(); this.delayedSave();

View File

@ -7,8 +7,10 @@ export interface IJob<T> extends JobDTO {
Supported: boolean; Supported: boolean;
Progress: JobProgress; Progress: JobProgress;
JobListener: IJobListener; JobListener: IJobListener;
InProgress: boolean;
allowParallelRun: boolean;
start(config: T, soloRun?: boolean): Promise<void>; start(config: T, soloRun: boolean, allowParallelRun: boolean): Promise<void>;
cancel(): void; cancel(): void;

View File

@ -11,6 +11,7 @@ declare const global: any;
const LOG_TAG = '[JOB]'; const LOG_TAG = '[JOB]';
export abstract class Job<T = void> implements IJob<T> { export abstract class Job<T = void> implements IJob<T> {
public allowParallelRun: boolean = null;
protected progress: JobProgress = null; protected progress: JobProgress = null;
protected config: T; protected config: T;
protected prResolve: () => void; protected prResolve: () => void;
@ -33,15 +34,16 @@ export abstract class Job<T = void> implements IJob<T> {
return this.progress; return this.progress;
} }
protected get InProgress(): boolean { public get InProgress(): boolean {
return this.Progress !== null && (this.Progress.State === JobProgressStates.running || return this.Progress !== null && (this.Progress.State === JobProgressStates.running ||
this.Progress.State === JobProgressStates.cancelling); this.Progress.State === JobProgressStates.cancelling);
} }
public start(config: T, soloRun = false): Promise<void> { public start(config: T, soloRun = false, allowParallelRun = false): Promise<void> {
if (this.InProgress === false && this.Supported === true) { if (this.InProgress === false && this.Supported === true) {
Logger.info(LOG_TAG, 'Running job ' + (soloRun === true ? 'solo' : '') + ': ' + this.Name); Logger.info(LOG_TAG, 'Running job ' + (soloRun === true ? 'solo' : '') + ': ' + this.Name);
this.soloRun = soloRun; this.soloRun = soloRun;
this.allowParallelRun = allowParallelRun;
this.config = config; this.config = config;
this.progress = new JobProgress(this.Name, JobDTO.getHashName(this.Name, this.config)); this.progress = new JobProgress(this.Name, JobDTO.getHashName(this.Name, this.config));
this.progress.OnChange = this.jobListener.onProgressUpdate; this.progress.OnChange = this.jobListener.onProgressUpdate;

View File

@ -29,14 +29,14 @@ export class ThumbnailGenerationJob extends FileJob<{ sizes: number[], indexedOn
return true; return true;
} }
start(config: { sizes: number[], indexedOnly: boolean }, soloRun = false): Promise<void> { start(config: { sizes: number[], indexedOnly: boolean }, soloRun = false, allowParallelRun = false): Promise<void> {
for (let i = 0; i < config.sizes.length; ++i) { for (let i = 0; i < config.sizes.length; ++i) {
if (Config.Client.Media.Thumbnail.thumbnailSizes.indexOf(config.sizes[i]) === -1) { if (Config.Client.Media.Thumbnail.thumbnailSizes.indexOf(config.sizes[i]) === -1) {
throw new Error('unknown thumbnails size: ' + config.sizes[i] + '. Add it to the possible thumbnail sizes.'); throw new Error('unknown thumbnails size: ' + config.sizes[i] + '. Add it to the possible thumbnail sizes.');
} }
} }
return super.start(config, soloRun); return super.start(config, soloRun, allowParallelRun);
} }
protected async filterMediaFiles(files: FileDTO[]): Promise<FileDTO[]> { protected async filterMediaFiles(files: FileDTO[]): Promise<FileDTO[]> {

View File

@ -46,13 +46,7 @@ export class AdminRouter {
app.post('/api/admin/jobs/scheduled/:id/start', app.post('/api/admin/jobs/scheduled/:id/start',
AuthenticationMWs.authenticate, AuthenticationMWs.authenticate,
AuthenticationMWs.authorise(UserRoles.Admin), AuthenticationMWs.authorise(UserRoles.Admin),
AdminMWs.startJob(false), AdminMWs.startJob,
RenderingMWs.renderResult
);
app.post('/api/admin/jobs/scheduled/:id/soloStart',
AuthenticationMWs.authenticate,
AuthenticationMWs.authorise(UserRoles.Admin),
AdminMWs.startJob(true),
RenderingMWs.renderResult RenderingMWs.renderResult
); );
app.post('/api/admin/jobs/scheduled/:id/stop', app.post('/api/admin/jobs/scheduled/:id/stop',

View File

@ -75,6 +75,7 @@ export class PrivateConfigDefaultsClass extends PublicConfigClass implements IPr
{ {
name: DefaultsJobs[DefaultsJobs.Indexing], name: DefaultsJobs[DefaultsJobs.Indexing],
jobName: DefaultsJobs[DefaultsJobs.Indexing], jobName: DefaultsJobs[DefaultsJobs.Indexing],
allowParallelRun: false,
config: {}, config: {},
trigger: {type: JobTriggerType.never} trigger: {type: JobTriggerType.never}
}, },
@ -82,24 +83,27 @@ export class PrivateConfigDefaultsClass extends PublicConfigClass implements IPr
name: DefaultsJobs[DefaultsJobs['Thumbnail Generation']], name: DefaultsJobs[DefaultsJobs['Thumbnail Generation']],
jobName: DefaultsJobs[DefaultsJobs['Thumbnail Generation']], jobName: DefaultsJobs[DefaultsJobs['Thumbnail Generation']],
config: {sizes: [240]}, config: {sizes: [240]},
allowParallelRun: false,
trigger: { trigger: {
type: JobTriggerType.after, type: JobTriggerType.after,
afterScheduleName: DefaultsJobs[DefaultsJobs.Indexing] afterScheduleName: DefaultsJobs[DefaultsJobs.Indexing]
} }
}, },
/* { /* {
name: DefaultsJobs[DefaultsJobs['Photo Converting']], name: DefaultsJobs[DefaultsJobs['Photo Converting']],
jobName: DefaultsJobs[DefaultsJobs['Photo Converting']], jobName: DefaultsJobs[DefaultsJobs['Photo Converting']],
config: {}, config: {},
trigger: { parallelRunEnabled:false,
type: JobTriggerType.after, trigger: {
afterScheduleName: DefaultsJobs[DefaultsJobs['Thumbnail Generation']] type: JobTriggerType.after,
} afterScheduleName: DefaultsJobs[DefaultsJobs['Thumbnail Generation']]
},*/ }
},*/
{ {
name: DefaultsJobs[DefaultsJobs['Video Converting']], name: DefaultsJobs[DefaultsJobs['Video Converting']],
jobName: DefaultsJobs[DefaultsJobs['Video Converting']], jobName: DefaultsJobs[DefaultsJobs['Video Converting']],
config: {}, config: {},
allowParallelRun: false,
trigger: { trigger: {
type: JobTriggerType.after, type: JobTriggerType.after,
afterScheduleName: DefaultsJobs[DefaultsJobs['Thumbnail Generation']] afterScheduleName: DefaultsJobs[DefaultsJobs['Thumbnail Generation']]
@ -109,6 +113,7 @@ export class PrivateConfigDefaultsClass extends PublicConfigClass implements IPr
name: DefaultsJobs[DefaultsJobs['Temp Folder Cleaning']], name: DefaultsJobs[DefaultsJobs['Temp Folder Cleaning']],
jobName: DefaultsJobs[DefaultsJobs['Temp Folder Cleaning']], jobName: DefaultsJobs[DefaultsJobs['Temp Folder Cleaning']],
config: {}, config: {},
allowParallelRun: false,
trigger: { trigger: {
type: JobTriggerType.after, type: JobTriggerType.after,
afterScheduleName: DefaultsJobs[DefaultsJobs['Video Converting']] afterScheduleName: DefaultsJobs[DefaultsJobs['Video Converting']]

View File

@ -30,6 +30,7 @@ export interface JobScheduleDTO {
name: string; name: string;
jobName: string; jobName: string;
config: any; config: any;
allowParallelRun: boolean;
trigger: NeverJobTrigger | ScheduledJobTrigger | PeriodicJobTrigger | AfterJobTrigger; trigger: NeverJobTrigger | ScheduledJobTrigger | PeriodicJobTrigger | AfterJobTrigger;
} }

View File

@ -114,6 +114,7 @@
<app-settings-job-button #indexingButton <app-settings-job-button #indexingButton
[soloRun]="true" [soloRun]="true"
(error)="error=$event" (error)="error=$event"
[allowParallelRun]="false"
[jobName]="indexingJobName"></app-settings-job-button> [jobName]="indexingJobName"></app-settings-job-button>
@ -121,6 +122,7 @@
danger="true" danger="true"
[soloRun]="true" [soloRun]="true"
(error)="error=$event" (error)="error=$event"
[allowParallelRun]="false"
[disabled]="indexingButton.Running" [disabled]="indexingButton.Running"
[jobName]="resetJobName"></app-settings-job-button> [jobName]="resetJobName"></app-settings-job-button>

View File

@ -18,6 +18,7 @@ export class JobButtonComponent {
@Input() shortName = false; @Input() shortName = false;
@Input() disabled = false; @Input() disabled = false;
@Input() soloRun = false; @Input() soloRun = false;
@Input() allowParallelRun = false;
@Input() danger = false; @Input() danger = false;
JobProgressStates = JobProgressStates; JobProgressStates = JobProgressStates;
@Output() error = new EventEmitter<string>(); @Output() error = new EventEmitter<string>();
@ -40,7 +41,7 @@ export class JobButtonComponent {
public async start() { public async start() {
this.error.emit(''); this.error.emit('');
try { try {
await this.jobsService.start(this.jobName, this.config, this.soloRun); await this.jobsService.start(this.jobName, this.config, this.soloRun, this.allowParallelRun);
this.notification.success(this.i18n('Job started') + ': ' + this.backendTextService.getJobName(this.jobName)); this.notification.success(this.i18n('Job started') + ': ' + this.backendTextService.getJobName(this.jobName));
return true; return true;
} catch (err) { } catch (err) {
@ -57,7 +58,7 @@ export class JobButtonComponent {
this.error.emit(''); this.error.emit('');
try { try {
await this.jobsService.stop(this.jobName); await this.jobsService.stop(this.jobName);
this.notification.info(this.i18n('Job stopped') + ': ' + this.backendTextService.getJobName(this.jobName)); this.notification.info(this.i18n('Stopping job') + ': ' + this.backendTextService.getJobName(this.jobName));
return true; return true;
} catch (err) { } catch (err) {
console.log(err); console.log(err);

View File

@ -24,16 +24,19 @@
*ngSwitchCase="JobTriggerType.scheduled">{{schedule.trigger.time | date:"medium"}}</ng-container> *ngSwitchCase="JobTriggerType.scheduled">{{schedule.trigger.time | date:"medium"}}</ng-container>
<ng-container *ngSwitchCase="JobTriggerType.never" i18n>never</ng-container> <ng-container *ngSwitchCase="JobTriggerType.never" i18n>never</ng-container>
<ng-container *ngSwitchCase="JobTriggerType.after"> <ng-container *ngSwitchCase="JobTriggerType.after">
<ng-container i18n>after</ng-container>: <ng-container i18n>after</ng-container>
:
{{schedule.trigger.afterScheduleName}} {{schedule.trigger.afterScheduleName}}
</ng-container> </ng-container>
</ng-container> </ng-container>
</div> </div>
<div> <div>
<button class="btn btn-danger job-control-button ml-0" (click)="remove(schedule)"><span class="oi oi-trash"></span> <button class="btn btn-danger job-control-button ml-0" (click)="remove(schedule)"><span
class="oi oi-trash"></span>
</button> </button>
<app-settings-job-button class="job-control-button ml-md-2 mt-2 mt-md-0" <app-settings-job-button class="job-control-button ml-md-2 mt-2 mt-md-0"
(error)="error=$event" (error)="error=$event"
[allowParallelRun]="schedule.allowParallelRun"
[jobName]="schedule.jobName" [config]="schedule.config" [jobName]="schedule.jobName" [config]="schedule.config"
[shortName]="true"></app-settings-job-button> [shortName]="true"></app-settings-job-button>
</div> </div>
@ -51,10 +54,11 @@
{{backendTextService.getJobName(schedule.jobName)}} {{backendTextService.getJobName(schedule.jobName)}}
</div> </div>
<div class="col-md-6"> <div class="col-md-6">
<app-settings-job-button class="float-right" <app-settings-job-button class="float-right"
[jobName]="schedule.jobName" [jobName]="schedule.jobName"
(error)="error=$event" [allowParallelRun]="schedule.allowParallelRun"
[config]="schedule.config"></app-settings-job-button> (error)="error=$event"
[config]="schedule.config"></app-settings-job-button>
</div> </div>
</div> </div>
@ -121,6 +125,26 @@
[(timestamp)]="schedule.trigger.atTime"></app-timestamp-timepicker> [(timestamp)]="schedule.trigger.atTime"></app-timestamp-timepicker>
</div> </div>
</div> </div>
<div class="form-group row">
<label class="col-md-2 control-label" [for]="'allowParallelRun'+i" i18n>Allow parallel run</label>
<div class="col-md-10">
<bSwitch
class="switch"
[name]="'allowParallelRun'+'_'+i"
[id]="'allowParallelRun'+'_'+i"
[switch-on-color]="'primary'"
[switch-inverse]="true"
[switch-off-text]="text.Disabled"
[switch-on-text]="text.Enabled"
[switch-handle-width]="100"
[switch-label-width]="20"
[(ngModel)]="schedule.allowParallelRun">
</bSwitch>
<small class="form-text text-muted"
i18n>Enables the job to start even if an other job is already running.
</small>
</div>
</div>
</div> </div>
@ -137,7 +161,6 @@
<ng-container [ngSwitch]="configEntry.type"> <ng-container [ngSwitch]="configEntry.type">
<ng-container *ngSwitchCase="'boolean'"> <ng-container *ngSwitchCase="'boolean'">
<bSwitch <bSwitch
id="enableThreading"
class="switch" class="switch"
[name]="configEntry.id+'_'+i" [name]="configEntry.id+'_'+i"
[id]="configEntry.id+'_'+i" [id]="configEntry.id+'_'+i"

View File

@ -43,7 +43,8 @@ export class JobsSettingsComponent extends SettingsComponent<ServerConfig.JobCon
jobName: '', jobName: '',
trigger: { trigger: {
type: JobTriggerType.never type: JobTriggerType.never
} },
allowParallelRun: false
}; };
constructor(_authService: AuthenticationService, constructor(_authService: AuthenticationService,
@ -118,7 +119,8 @@ export class JobsSettingsComponent extends SettingsComponent<ServerConfig.JobCon
config: <any>{}, config: <any>{},
trigger: { trigger: {
type: JobTriggerType.never type: JobTriggerType.never
} },
allowParallelRun: false
}; };
const job = this._settingsService.availableJobs.value.find(t => t.Name === jobName); const job = this._settingsService.availableJobs.value.find(t => t.Name === jobName);

View File

@ -103,6 +103,7 @@
<app-settings-job-button class="mt-2 mt-md-0 float-left" <app-settings-job-button class="mt-2 mt-md-0 float-left"
[soloRun]="true" [soloRun]="true"
(error)="error=$event" (error)="error=$event"
[allowParallelRun]="false"
[jobName]="jobName"></app-settings-job-button> [jobName]="jobName"></app-settings-job-button>
<ng-container *ngIf="Progress != null"> <ng-container *ngIf="Progress != null">

View File

@ -41,14 +41,23 @@ export class ScheduledJobsService {
return await this.loadProgress(); return await this.loadProgress();
} }
public async start(jobName: string, config?: any, soloStart: boolean = false): Promise<void> { public async start(jobName: string, config?: any, soloStart: boolean = false, allowParallelRun = false): Promise<void> {
this.jobStartingStopping[jobName] = true; try {
await this._networkService.postJson('/admin/jobs/scheduled/' + jobName + '/' + (soloStart === true ? 'soloStart' : 'start'), this.jobStartingStopping[jobName] = true;
{config: config}); await this._networkService.postJson('/admin/jobs/scheduled/' + jobName + '/start',
// placeholder to force showing running job {
this.addDummyProgress(jobName, config); config: config,
delete this.jobStartingStopping[jobName]; allowParallelRun: allowParallelRun,
this.forceUpdate(); soloStart: soloStart
});
// placeholder to force showing running job
this.addDummyProgress(jobName, config);
} catch (e) {
throw e;
} finally {
delete this.jobStartingStopping[jobName];
this.forceUpdate();
}
} }
public async stop(jobName: string): Promise<void> { public async stop(jobName: string): Promise<void> {
@ -70,7 +79,7 @@ export class ScheduledJobsService {
this.progress.value[prg].state === JobProgressStates.cancelling) this.progress.value[prg].state === JobProgressStates.cancelling)
)) { )) {
this.onJobFinish.emit(prg); this.onJobFinish.emit(prg);
this.notification.info(this.i18n('Job finished') + ': ' + this.backendTextService.getJobName(prevPrg[prg].jobName)); this.notification.success(this.i18n('Job finished') + ': ' + this.backendTextService.getJobName(prevPrg[prg].jobName));
} }
} }
} }

View File

@ -78,6 +78,7 @@
[soloRun]="true" [soloRun]="true"
(error)="error=$event" (error)="error=$event"
[jobName]="jobName" [jobName]="jobName"
[allowParallelRun]="false"
[config]="Config"></app-settings-job-button> [config]="Config"></app-settings-job-button>

View File

@ -130,6 +130,7 @@
<app-settings-job-button class="mt-2 mt-md-0 float-left" <app-settings-job-button class="mt-2 mt-md-0 float-left"
[soloRun]="true" [soloRun]="true"
(error)="error=$event" (error)="error=$event"
[allowParallelRun]="false"
[jobName]="jobName"></app-settings-job-button> [jobName]="jobName"></app-settings-job-button>
<ng-container *ngIf="Progress != null"> <ng-container *ngIf="Progress != null">