Asynchronous Message Processing

Understanding how mutations work in the Arkipel API.


Overview

When sending mutation messages (e.g., people:upsert, distributions:upsert), the response will not immediately include the created, updated, or destroyed resource. Instead, the message will be processed asynchronously—either queued for background processing or pending user validation.

This approach provides:

  • Decoupling: Allows for background processing, user validation, or delayed execution
  • Reliability: Ensures integrators can always retrieve the result, even if processing takes time
  • Scalability: Prevents blocking during high-load operations

Response Structure

The response will always include a message_id, which serves as a reference to track the status or result of the operation.

{
  "source_public_key": "community_key",
  "source_site": { "protocol": "https", "fqdn": "..." },
  "created_at": "2025-11-13T20:52:49Z",
  "signature": "...",
  "payload": {
    "type": "people:upsert",
    "message_id": "6916452112f746b2b4cf48c1"
  }
}

You can use this message_id to query the outcome later using the arkipel_messages:query message type.


How to Retrieve Results

1. Store the message_id

Save the message_id from the mutation response:

const response = await client.createPerson(data);
const messageId = response.payload.message_id;
// Store messageId for later polling

2. Query the status

Send an arkipel_messages:query message with the message_id:

{
  "payload": {
    "type": "arkipel_messages:query",
    "message_id": "6916452112f746b2b4cf48c1"
  }
}

3. Check the response

The response will include the final state of the operation:

{
  "payload": {
    "type": "arkipel_messages:query",
    "message_id": "6916452112f746b2b4cf48c1",
    "status": "persisted",
    "message_type": "people:upsert",
    "resource": {
      "id": 123,
      "first_name": "Jane",
      "last_name": "Doe",
      "import_id": "JD0001"
    }
  }
}

Status Values

Status Meaning Has Resource Action
pending Message received, waiting to be processed No Continue polling
processing Currently being processed No Continue polling
persisted Successfully completed Yes Operation complete
failed Processing failed No Check error, may retry
rejected Rejected by validation or policy No Fix and retry

Error Response

If the operation fails:

{
  "payload": {
    "type": "arkipel_messages:query",
    "message_id": "abc123",
    "status": "failed",
    "error": "Validation failed: email has already been taken"
  }
}

Polling Strategy

Basic Polling Loop

async function waitForCompletion(messageId, options = {}) {
  const maxAttempts = options.maxAttempts || 30;
  const pollInterval = options.pollInterval || 2000; // 2 seconds
  
  for (let attempt = 0; attempt < maxAttempts; attempt++) {
    const response = await client.queryMessage(messageId);
    const status = response.payload.status;
    
    switch (status) {
      case 'persisted':
        return response.payload.resource;
        
      case 'failed':
      case 'rejected':
        throw new Error(`Operation failed: ${response.payload.error}`);
        
      case 'pending':
      case 'processing':
        await sleep(pollInterval);
        break;
    }
  }
  
  throw new Error(`Operation timed out after ${maxAttempts} attempts`);
}
Attempt Delay Total Time
1 2s 2s
2 2s 4s
3 2s 6s
5 2s 10s
10 2s 20s
15 2s 30s
30 2s 60s

Best Practice: Start with 2-second intervals. Most operations complete within 5-10 seconds. Adjust based on your use case.


Key Points

Asynchronous by Design

Mutations are not blocking; the system acknowledges receipt but processes them later. This is intentional and applies to all write operations:

  • people:upsert
  • households:upsert
  • organizations:upsert
  • distributions:upsert
  • publications:upsert
  • notes:upsert
  • All delete operations

Tracking

The message_id is your reference to poll for updates. Store it immediately after receiving the mutation response.

Idempotency

The same message_id can be queried multiple times with consistent results. The operation result doesn’t change once set to persisted, failed, or rejected.


Sequence Diagram

sequenceDiagram
    participant Integrator
    participant ArkipelAPI
    participant Queue
    participant User

    Integrator->>ArkipelAPI: POST people:upsert
    ArkipelAPI-->>Integrator: { "type": "arkipel_messages:query", "message_id": "abc123", ... }
    ArkipelAPI->>Queue: Enqueue message
    Queue->>User: (Optional) Await validation
    User-->>Queue: (Optional) Approve/Reject
    Queue->>ArkipelAPI: Process message
    Integrator->>ArkipelAPI: POST arkipel_messages:query { "message_id": "abc123" }
    ArkipelAPI-->>Integrator: { "message_id": "abc123", "type": "arkipel_messages:query", "resource": { ... } }

Common Patterns

Fire and Forget

If you don’t need the result immediately:

const response = await client.createPerson(data);
const messageId = response.payload.message_id;
// Log messageId for later reconciliation
console.log(`Person creation queued: ${messageId}`);
// Continue without waiting

Wait with Timeout

For operations where you need the result:

try {
  const person = await client.createPersonAndWait(data, {
    maxAttempts: 15,
    pollInterval: 2000
  });
  console.log(`Created person: ${person.id}`);
} catch (error) {
  console.error(`Failed to create person: ${error.message}`);
}

Batch Operations

For multiple operations:

const messageIds = [];

// Send all mutations
for (const personData of peopleData) {
  const response = await client.createPerson(personData);
  messageIds.push(response.payload.message_id);
}

// Poll all for completion
const results = await Promise.all(
  messageIds.map(id => client.waitForMessage(id))
);


Return to API Specifications


Back to top

Welcome to the Arkipel DevKit! This documentation will guide you through everything you need to build clients for Arkipel communities.

Contact: devkit@arkipel.co | Page URLs

Copyright © 2026 Arkipel. Distributed under an MIT license.