MockServer's AsyncAPI broker mocking lets you drive Kafka and MQTT brokers with realistic example messages, all derived directly from an AsyncAPI 2.x or 3.x specification. MockServer can also subscribe to broker channels to record incoming messages for verification — mirroring how HTTP requests are recorded.

 

How it works

  1. ParseAsyncApiParser reads the AsyncAPI document (JSON or YAML, auto-detected) and builds an in-memory model of channels and their message schemas.
  2. GenerateMessageExampleGenerator produces a JSON example payload for each channel. It uses explicit examples from the spec when present; otherwise it synthesises a schema-aware example (respecting enum, default, format, minimum/maximum, minLength, and other JSON Schema constraints).
  3. PublishAsyncApiMockOrchestrator sends each payload to the broker via a MessagePublisher adapter. Kafka publishing supports record keys and headers; MQTT supports configurable QoS (0/1/2) and binary payloads.
  4. Subscribe (optional) — MessageSubscriber implementations subscribe to broker channels and record incoming messages, including keys, headers, and schema validation results.
 

REST control-plane

Load an AsyncAPI spec and start mocking via the REST API:

Load a spec: PUT /mockserver/asyncapi

The request body can be either a plain AsyncAPI spec (JSON or YAML) or a JSON wrapper with broker configuration:

{
  "spec": {
    "asyncapi": "2.6.0",
    "info": { "title": "Orders API", "version": "1.0.0" },
    "channels": {
      "orders": {
        "publish": {
          "message": {
            "payload": {
              "type": "object",
              "properties": {
                "orderId": { "type": "integer" },
                "status": { "type": "string", "enum": ["pending", "shipped"] }
              },
              "required": ["orderId"]
            }
          }
        }
      }
    }
  },
  "brokerConfig": {
    "kafkaBootstrapServers": "localhost:9092",
    "publishOnLoad": true,
    "consume": true
  }
}

Broker configuration options

FieldTypeDefaultDescription
kafkaBootstrapServersstringnullKafka bootstrap servers (e.g. localhost:9092)
kafkaGroupIdstringmockserver-async-consumerConsumer group ID for Kafka subscribers
mqttBrokerUrlstringnullMQTT broker URL (e.g. tcp://localhost:1883)
mqttClientIdstringauto-generatedMQTT client ID prefix
mqttQosint1MQTT QoS level (0, 1, or 2)
publishOnLoadbooleantruePublish example messages immediately when spec is loaded
publishIntervalMillislong0 (disabled)Publish examples periodically at this interval
consumebooleanfalseSubscribe to channels and record incoming messages
kafkaSecurityobjectnullSecurity settings applied to Kafka broker connections: securityProtocol, saslMechanism, saslJaasConfig, and SSL truststore/keystore location and password
mqttSecurityobjectnullSecurity settings applied to MQTT broker connections: username, password, and an sslProperties map for TLS

Check status: GET /mockserver/asyncapi

Returns the loaded spec info, active channels, publisher/subscriber counts, and recorded messages (including per-message schema validation).

Verify recorded messages: PUT /mockserver/asyncapi/verify

Verify that messages recorded by subscribers match given criteria. This mirrors the semantics of PUT /mockserver/verify for HTTP requests.

The request body is a JSON object with the following fields:

FieldTypeRequiredDescription
channel string yes The channel or topic to check for messages
payloadSubstring string no The message payload must contain this substring
payloadJsonPath string no Dot-notation JSON path to extract from the payload (e.g. user.name)
expectedValue string no Expected value at the JSON path (used together with payloadJsonPath)
count object no Count constraints: {atLeast, atMost, exactly}. Default: {atLeast: 1}

Responses

StatusMeaning
202 AcceptedVerification passed
406 Not AcceptableVerification failed (body contains a human-readable failure reason)
400 Bad RequestMalformed request (missing channel, invalid JSON)
501 Not ImplementedThe mockserver-async module is not on the classpath

Example: verify a message with a specific field value

PUT /mockserver/asyncapi/verify HTTP/1.1
Content-Type: application/json

{
  "channel": "orders",
  "payloadJsonPath": "user.name",
  "expectedValue": "Alice",
  "count": { "atLeast": 1 }
}

Example: verify no error messages were published (negative assertion)

PUT /mockserver/asyncapi/verify HTTP/1.1
Content-Type: application/json

{
  "channel": "errors",
  "count": { "exactly": 0 }
}

Reset

All async mocking state (publishers, subscribers, and recorded messages) is cleared when you call PUT /mockserver/reset.

 

Supported AsyncAPI versions

VersionChannel structureExample resolution
AsyncAPI 2.x channels.<name>.publish|subscribe.message.payload Inline payload.example or message.examples[].payload
AsyncAPI 3.x channels.<name>.messages.<msgName>.payload examples[].payload; basic $ref to #/components/messages/<name>

Both JSON and YAML spec formats are accepted. Missing or incomplete structures are tolerated gracefully.

 

Schema validation

When a channel's message definition includes a JSON Schema (payload), MockServer validates:

  • Generated examples before publishing — warnings are logged and reported if a generated example does not conform to the schema
  • Recorded messages from subscriptions — each recorded message in the status response includes a schemaValid field and any schemaErrors

Schema validation supports JSON Schema Draft 4 through Draft 2019-09, including constraints like required, enum, minimum/maximum, pattern, and format.

 

Java usage

Add the mockserver-async dependency to your project, then wire the components together:

import org.mockserver.async.AsyncApiMockOrchestrator;
import org.mockserver.async.asyncapi.AsyncApiParser;
import org.mockserver.async.asyncapi.AsyncApiSpec;
import org.mockserver.async.publish.KafkaMessagePublisher;

// 1. Parse your AsyncAPI spec (from a string, file, or resource)
String specYaml = Files.readString(Path.of("asyncapi.yaml"));
AsyncApiSpec spec = new AsyncApiParser().parse(specYaml);

// 2. Create a publisher pointed at your test broker
KafkaMessagePublisher publisher = new KafkaMessagePublisher("localhost:9092");

// 3. Create the orchestrator and publish once
AsyncApiMockOrchestrator orchestrator = new AsyncApiMockOrchestrator(spec, publisher);
orchestrator.publishAll();

// Or publish repeatedly on a schedule (e.g. every 500 ms)
orchestrator.startPublishing(500);
// ... run your consumer tests ...
orchestrator.stop();

// Always close the publisher to release broker connections
publisher.close();

For subscribing to record messages:

import org.mockserver.async.subscribe.KafkaMessageSubscriber;
import org.mockserver.async.subscribe.RecordedMessage;

KafkaMessageSubscriber subscriber = new KafkaMessageSubscriber("localhost:9092", "test-group");
subscriber.subscribe("orders");

// ... wait for messages ...

List<RecordedMessage> messages = subscriber.getRecordedMessages("orders");
for (RecordedMessage msg : messages) {
    System.out.println("Key: " + msg.getKey() + ", Payload: " + msg.getPayload());
}

subscriber.close();
 

Supported brokers

BrokerPublisherSubscriberFeatures
Kafka KafkaMessagePublisher KafkaMessageSubscriber Record keys, headers, consumer group
MQTT MqttMessagePublisher MqttMessageSubscriber QoS 0/1/2, binary payloads
 

Configuration properties

These properties provide server-wide defaults for async messaging. Per-request brokerConfig values override them.

Property Env Variable Type Default Description
mockserver.asyncKafkaBootstrapServers MOCKSERVER_ASYNC_KAFKA_BOOTSTRAP_SERVERS string "" (unset) Default Kafka bootstrap servers used when the per-request brokerConfig does not include kafkaBootstrapServers.
mockserver.asyncMqttBrokerUrl MOCKSERVER_ASYNC_MQTT_BROKER_URL string "" (unset) Default MQTT broker URL used when the per-request brokerConfig does not include mqttBrokerUrl.
mockserver.asyncRecordedMessageMaxEntries MOCKSERVER_ASYNC_RECORDED_MESSAGE_MAX_ENTRIES int 1000 Maximum number of recorded messages retained per channel. When the cap is reached, the oldest messages are evicted (FIFO).

See the Configuration Properties page for the full four-form reference (Java code, system property, environment variable, property file).

 

Java client helpers

The MockServerClient class provides three convenience methods for async messaging:

MockServerClient client = new MockServerClient("localhost", 1080);

// 1. Load an AsyncAPI spec and start mocking
String status = client.loadAsyncApi("{\"spec\":{\"asyncapi\":\"2.6.0\", ...}, \"brokerConfig\":{\"kafkaBootstrapServers\":\"localhost:9092\", \"consume\":true}}");

// 2. Check current async mocking status
String currentStatus = client.asyncApiStatus();

// 3. Verify recorded messages
client.verifyAsyncMessage("{\"channel\":\"orders\", \"payloadJsonPath\":\"user.name\", \"expectedValue\":\"Alice\", \"count\":{\"atLeast\":1}}");
// throws AssertionError if verification fails
 

The AsyncAPI broker state is now visible in the MockServer dashboard's AsyncAPI (Async) view, reachable from the dashboard's top toolbar. It shows the loaded spec's channels, a publisher/subscriber summary, and messages recorded from broker subscriptions.

Not yet supported

  • Advanced AsyncAPI bindings — channel-specific binding configurations (e.g. Kafka partition assignment, MQTT retain flag) are not parsed or applied
  • Security schemes — security scheme definitions declared in the AsyncAPI spec document are not auto-applied to broker connections. Broker security itself is supported: configure it explicitly via the kafkaSecurity and mqttSecurity broker-configuration options above (SASL, TLS truststore/keystore, username/password)
 

Also supported

  • Multi-message channelsall message definitions in a channel are published, not just the first. AsyncAPI 3.x channels with multiple messages and AsyncAPI 2.x oneOf message variants each result in one publish call per message.
  • Correlation IDs — AsyncAPI correlationId definitions (inline or via $ref to #/components/correlationIds/<name>) are parsed. At publish time MockServer generates a unique correlation ID and injects it at the message's location — either as a message header ($message.header#/…) or into the payload at the given JSON Pointer ($message.payload#/…). Note: header-location correlation IDs are delivered over Kafka headers but not over MQTT (which has no per-message headers); payload-location correlation IDs are injected for both brokers.