'Why is JobConsumer not being hit/run?
I am trying out the new MassTransit
IJobConsumer
implementation, and although I've tried to follow the documentation, the JobConsumer
I have written is never being run/hit.
I have:
created the
JobConsumer
which has a run method that runs the code I need it topublic class CalculationStartRunJobConsumer : IJobConsumer<ICalculationStartRun> { private readonly ICalculationRunQueue runQueue; public CalculationStartRunJobConsumer(ICalculationRunQueue runQueue) { this.runQueue = runQueue; } public Task Run(JobContext<ICalculationStartRun> context) { return Task.Run( () => { var longRunningJob = new LongRunningJob<ICalculationStartRun> { Job = context.Job, CancellationToken = context.CancellationToken, JobId = context.JobId, }; runQueue.StartSpecial(longRunningJob); }, context.CancellationToken); } }
I have registered that consumer trying both
ConnectReceiveEndpoint
andAddConsumer
Configured the
ServiceInstance
as shown in the documentationservices.AddMassTransit(busRegistrationConfigurator => { // TODO: Get rid of this ugly if statement. if (consumerTypes != null) { foreach (var consumerType in consumerTypes) { busRegistrationConfigurator.AddConsumer(consumerType); } } if(requestClientType != null) { busRegistrationConfigurator.AddRequestClient(requestClientType); } busRegistrationConfigurator.UsingRabbitMq((context, cfg) => { cfg.UseNewtonsoftJsonSerializer(); cfg.UseNewtonsoftJsonDeserializer(); cfg.ConfigureNewtonsoftJsonSerializer(settings => { // The serializer by default omits fields that are set to their default value, but this causes unintended effects settings.NullValueHandling = NullValueHandling.Include; settings.DefaultValueHandling = DefaultValueHandling.Include; return settings; }); cfg.Host( messagingHostInfo.HostAddress, hostConfigurator => { hostConfigurator.Username(messagingHostInfo.UserName); hostConfigurator.Password(messagingHostInfo.Password); }); cfg.ServiceInstance(instance => { instance.ConfigureJobServiceEndpoints(serviceCfg => { serviceCfg.FinalizeCompleted = true; }); instance.ConfigureEndpoints(context); }); }); });
Seen that the queue for the job does appear in the queue for
RabbitMQ
When I call
.Send
to send a message to that queue, it does not activate theRun
method on theJobConsumer
.public async Task Send<T>(string queueName, T message) where T : class { var endpointUri = GetEndpointUri(messagingHostInfo.HostAddress, queueName); var sendEndpoint = await bus.GetSendEndpoint(endpointUri); await sendEndpoint.Send(message); }
Can anyone help?
Software
MassTransit
8.0.2MassTransit.RabbitMq
8.0.2MassTransit.NewtonsoftJson
8.0.2.NET6
- Using in-memory for
JobConsumer
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|