Asynchronous Message Processing
Understanding how mutations work in the Arkipel API.
Overview
When sending mutation messages (e.g., people:upsert, distributions:upsert), the response will not immediately include the created, updated, or destroyed resource. Instead, the message will be processed asynchronously—either queued for background processing or pending user validation.
This approach provides:
- Decoupling: Allows for background processing, user validation, or delayed execution
- Reliability: Ensures integrators can always retrieve the result, even if processing takes time
- Scalability: Prevents blocking during high-load operations
Response Structure
The response will always include a message_id, which serves as a reference to track the status or result of the operation.
{
"source_public_key": "community_key",
"source_site": { "protocol": "https", "fqdn": "..." },
"created_at": "2025-11-13T20:52:49Z",
"signature": "...",
"payload": {
"type": "people:upsert",
"message_id": "6916452112f746b2b4cf48c1"
}
}
You can use this message_id to query the outcome later using the arkipel_messages:query message type.
How to Retrieve Results
1. Store the message_id
Save the message_id from the mutation response:
const response = await client.createPerson(data);
const messageId = response.payload.message_id;
// Store messageId for later polling
2. Query the status
Send an arkipel_messages:query message with the message_id:
{
"payload": {
"type": "arkipel_messages:query",
"message_id": "6916452112f746b2b4cf48c1"
}
}
3. Check the response
The response will include the final state of the operation:
{
"payload": {
"type": "arkipel_messages:query",
"message_id": "6916452112f746b2b4cf48c1",
"status": "persisted",
"message_type": "people:upsert",
"resource": {
"id": 123,
"first_name": "Jane",
"last_name": "Doe",
"import_id": "JD0001"
}
}
}
Status Values
| Status | Meaning | Has Resource | Action |
|---|---|---|---|
pending | Message received, waiting to be processed | No | Continue polling |
processing | Currently being processed | No | Continue polling |
persisted | Successfully completed | Yes | Operation complete |
failed | Processing failed | No | Check error, may retry |
rejected | Rejected by validation or policy | No | Fix and retry |
Error Response
If the operation fails:
{
"payload": {
"type": "arkipel_messages:query",
"message_id": "abc123",
"status": "failed",
"error": "Validation failed: email has already been taken"
}
}
Polling Strategy
Basic Polling Loop
async function waitForCompletion(messageId, options = {}) {
const maxAttempts = options.maxAttempts || 30;
const pollInterval = options.pollInterval || 2000; // 2 seconds
for (let attempt = 0; attempt < maxAttempts; attempt++) {
const response = await client.queryMessage(messageId);
const status = response.payload.status;
switch (status) {
case 'persisted':
return response.payload.resource;
case 'failed':
case 'rejected':
throw new Error(`Operation failed: ${response.payload.error}`);
case 'pending':
case 'processing':
await sleep(pollInterval);
break;
}
}
throw new Error(`Operation timed out after ${maxAttempts} attempts`);
}
Recommended Polling Intervals
| Attempt | Delay | Total Time |
|---|---|---|
| 1 | 2s | 2s |
| 2 | 2s | 4s |
| 3 | 2s | 6s |
| 5 | 2s | 10s |
| 10 | 2s | 20s |
| 15 | 2s | 30s |
| 30 | 2s | 60s |
Best Practice: Start with 2-second intervals. Most operations complete within 5-10 seconds. Adjust based on your use case.
Key Points
Asynchronous by Design
Mutations are not blocking; the system acknowledges receipt but processes them later. This is intentional and applies to all write operations:
people:upserthouseholds:upsertorganizations:upsertdistributions:upsertpublications:upsertnotes:upsert- All delete operations
Tracking
The message_id is your reference to poll for updates. Store it immediately after receiving the mutation response.
Idempotency
The same message_id can be queried multiple times with consistent results. The operation result doesn’t change once set to persisted, failed, or rejected.
Sequence Diagram
sequenceDiagram
participant Integrator
participant ArkipelAPI
participant Queue
participant User
Integrator->>ArkipelAPI: POST people:upsert
ArkipelAPI-->>Integrator: { "type": "arkipel_messages:query", "message_id": "abc123", ... }
ArkipelAPI->>Queue: Enqueue message
Queue->>User: (Optional) Await validation
User-->>Queue: (Optional) Approve/Reject
Queue->>ArkipelAPI: Process message
Integrator->>ArkipelAPI: POST arkipel_messages:query { "message_id": "abc123" }
ArkipelAPI-->>Integrator: { "message_id": "abc123", "type": "arkipel_messages:query", "resource": { ... } }
Common Patterns
Fire and Forget
If you don’t need the result immediately:
const response = await client.createPerson(data);
const messageId = response.payload.message_id;
// Log messageId for later reconciliation
console.log(`Person creation queued: ${messageId}`);
// Continue without waiting
Wait with Timeout
For operations where you need the result:
try {
const person = await client.createPersonAndWait(data, {
maxAttempts: 15,
pollInterval: 2000
});
console.log(`Created person: ${person.id}`);
} catch (error) {
console.error(`Failed to create person: ${error.message}`);
}
Batch Operations
For multiple operations:
const messageIds = [];
// Send all mutations
for (const personData of peopleData) {
const response = await client.createPerson(personData);
messageIds.push(response.payload.message_id);
}
// Poll all for completion
const results = await Promise.all(
messageIds.map(id => client.waitForMessage(id))
);
Related Documentation
-
arkipel_messages:query- Query operation status - Global Schemas - Response envelope structure
- Error Handling - Handling failed operations
Return to API Specifications