'Why is JobConsumer not being hit/run?

I am trying out the new MassTransit IJobConsumer implementation, and although I've tried to follow the documentation, the JobConsumer I have written is never being run/hit.

I have:

  • created the JobConsumer which has a run method that runs the code I need it to

      public class CalculationStartRunJobConsumer : IJobConsumer<ICalculationStartRun> 
      {
          private readonly ICalculationRunQueue runQueue;
    
          public CalculationStartRunJobConsumer(ICalculationRunQueue runQueue)
          {
              this.runQueue = runQueue;
          }
    
          public Task Run(JobContext<ICalculationStartRun> context)
          {
              return Task.Run(
                  () =>
                  {
                      var longRunningJob = new LongRunningJob<ICalculationStartRun>
                      {
                          Job = context.Job,
                          CancellationToken = context.CancellationToken,
                          JobId = context.JobId,
                      };
    
                      runQueue.StartSpecial(longRunningJob);
                  },
                  context.CancellationToken);
          } 
      }
    
  • I have registered that consumer trying both ConnectReceiveEndpoint and AddConsumer

  • Configured the ServiceInstance as shown in the documentation

      services.AddMassTransit(busRegistrationConfigurator =>
          {
              // TODO: Get rid of this ugly if statement.
              if (consumerTypes != null)
              {
                  foreach (var consumerType in consumerTypes)
                  {
                      busRegistrationConfigurator.AddConsumer(consumerType);
                  }
              }
    
              if(requestClientType != null)
              {
                  busRegistrationConfigurator.AddRequestClient(requestClientType);
              }
    
              busRegistrationConfigurator.UsingRabbitMq((context, cfg) =>
              {
                  cfg.UseNewtonsoftJsonSerializer();
                  cfg.UseNewtonsoftJsonDeserializer();
                  cfg.ConfigureNewtonsoftJsonSerializer(settings =>
                  {
                      // The serializer by default omits fields that are set to their default value, but this causes unintended effects
                      settings.NullValueHandling = NullValueHandling.Include;
                      settings.DefaultValueHandling = DefaultValueHandling.Include;
                      return settings;
                  });
    
                  cfg.Host(
                      messagingHostInfo.HostAddress,
                      hostConfigurator =>
                      {
                          hostConfigurator.Username(messagingHostInfo.UserName);
                          hostConfigurator.Password(messagingHostInfo.Password);
                      });
    
                  cfg.ServiceInstance(instance =>
                  {
                      instance.ConfigureJobServiceEndpoints(serviceCfg =>
                      {
                          serviceCfg.FinalizeCompleted = true;
                      });
    
                      instance.ConfigureEndpoints(context);
                  });
              });
          });
    
  • Seen that the queue for the job does appear in the queue for RabbitMQ

  • When I call .Send to send a message to that queue, it does not activate the Run method on the JobConsumer.

      public async Task Send<T>(string queueName, T message) where T : class
      {
          var endpointUri = GetEndpointUri(messagingHostInfo.HostAddress, queueName);
          var sendEndpoint = await bus.GetSendEndpoint(endpointUri);
    
          await sendEndpoint.Send(message);
      }
    

Can anyone help?

Software

  • MassTransit 8.0.2
  • MassTransit.RabbitMq 8.0.2
  • MassTransit.NewtonsoftJson 8.0.2
  • .NET6
  • Using in-memory for JobConsumer


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source