Asynchronous message queuing, retries, and error handling

Continuing from this Slack conversation.

We have several emerging projects that require integration / communication between OpenMRS and external systems. These range from PACS systems, lab systems, client registries, and shared health records in both facility-level networks and national HIEs.

I am aware of many different initiatives and approaches that have been used to cover similar use cases, from Ozone’s approach that utilizes a number of separate “EIP” applications that run independently of OpenMRS, to the Bahmni approach that leverages Atom Feeds, to a number of modules that have been authored to achieve specific integrations.

In all of these cases, there is a common workflow that needs to be solved:

  • Listening for a particular event or events to occur in OpenMRS
  • Adding entries to a persistent queue in response to these events
  • Asynchronously performing some logic (usually constructing a message and sending to a remote endpoint) for each queue entry
  • Tracking whether the logic was successful or not, and recording this status and any relevant error messages
  • Retrying any failed queue entries on some sort of schedule, depending on the application and the nature of the failure
  • Providing a mechanism for administrators to be alerted to failed queue entries and a mechanism to view and manage the queue and resolve any persistent issues.

It seems that some (though generally not all) of this workflow has been implemented many different times in various places.

  • OpenMRS EIP has it’s management database, which consists of debezium_event_queue which contains all messages, and sender_retry_queue which contains failed messages in need of reprocessing.

  • The Atom Feed module creates event_records, event_records_queue, failed_events, event_records_offset_marker, markers, and chunking_history

  • The labonfhir module appears to create a task_request and a failed_task table

  • The PIH pacsintegration module creates a pacsintegration_outbound_queue table.

  • The Client Registry module appears to acknowledge the need for this in it’s optional clientregistry_item table

Other module that do similar things include:

It seems that we might be well served by tackling this problem generically in a module that can provide support for the workflow described above, that can be shared across various use cases. Many of the above modules may still create their own specific data model tables where needed, though the hope would be that they could leverage this common module for tracking success or failure and retries of communication with external systems.

I’m interested in gathering perspectives as to pros and cons of working towards such a general-purpose module.

1 Like

We came across a use case last year of a laboratory that required billing features. From Ozone’s perspective, such a setup means a LIMS app + an ERP app—but no EMR app, meaning OpenMRS wasn’t involved in this scenario.

This illustrates one of our architectural challenges and highlights why Ozone aims to avoid embedding cross-app communication logic, including queueing, directly within individual apps (in the example above all the good work done within OpenMRS wouldn’t be accessible to this implementation); and why we prefer handling app-to-app messaging externally in the middleware layer. Especially given the fact that there are proven middleware open-source message queueing techs out there.

(Another reason, that has become more front, middle and center recently, is sustainability. Adding more code—a new module—means more maintenance to be handled within our community.)

However, I fully understand your perspective and the convenience of managing features from within OpenMRS itself, including in terms of DevOps; and there is no doubt that we would leverage, as we always did, any good work done by PIH and made available within OpenMRS.

Both modules and standalone services have their own pros and cons which is why in my mind I have always preferred having a framework/API to centralize the logic to avoid duplication of common code and then modules and standalone services can consume it. This is something we strived to achieve with OpenMRS-EIP, you can create custom apps as documented here or even build a module on top of it, I have just never tried it with a module, purely because of Java and dependency version requirement differences e.g. the last the time I thought of trying it in a module, OpenMRS-EIP required newer versions of Java, Spring, hibernate, liquibase etc. If we can try to align the required versions, then in theory you should be able to use it in a module.

1 Like

Indeed. My intention is not to advocate for one over the other. They are not mutually exclusive. But the fact remains that many modules are being written that have integration and queue-like components and workflows embedded in them, as illustrated in my examples above. I am advocating for us to have a well-designed, well-tested, supported module that such components can depend and rely upon, rather than seeing a proliferation of “event” and “retry_queue” tables and APIs, that all have their own independent bugs and issues.

As much as I like the microservice approach that has been done so well with OpenMRS EIP, this is - as far as I can tell - only made possible due to Debezium, as it can consume events from OpenMRS without any involvement from OpenMRS at all. But if there is to be any higher level of message sending and queuing out of OpenMRS beyond a low-level database row change (eg. OpenMRS APIs that intentionally sent out messages to be consumed asynchronously), then this may call for alternative solutions that can initiate and manage events, messages and delivery within OpenMRS itself. These solutions also have the advantage that they have full access to the OpenMRS API and services.

It may be that the event module and it’s use of an embedded (or external) ActiveMQ service is already well positioned to address these needs, but I’m not convinced that this is so. I do think that it is worth discussing where technologies like this may help or hinder this effort.

1 Like

What I suggested in my previous post would address this exactly, i.e. the new module or a refactoring of the event module would mean it leverages the OpenMRS EIP API as a custom application would, that way we don’t duplicate that logic. In theory, the module’s processing logic is what would ‘OpenMRS-ize’ a raw database event it receives from OpenMRS EIP since it would have access to the OpenMRS API.

With the approach am proposing it means, that those who wish to use a module, they can use the new or refactored event module built on top of OpenMRS EIP and those that opt for a standalone service can also create custom apps built on top of the same framework, this has the advantage that development of the core shareable logic is kept centralized in OpenMRS-EIP.

I think there’s a need for a module that shows best practices (including the use of e.g. AMQP 1.0), because most of the modules listed above do things their way, which is not always architected the right way.

For example handling queues in a DB (basically acting as message brokers) as most of the modules mentioned above do is not necessarily the best choice. It’s a message broker that should be responsible for handling messages (delivery, failures, retries) and not a custom code in a module…

Let’s come to an agreement of what the module could do.

The architecture that I think we could recommend is:

OpenMRS ↔ module firing/listening for events ↔ local message broker (embedded or distributed/standalone speaking AMQP 1.0) ↔ EIP middleware (Camel/Kafka Connect) ↔ external system

Or typical CDC:

OpenMRS → DB → Debezium (embedded or standalone) → local message broker (embedded or distributed/standalone speaking AMQP 1.0) → EIP middleware (Camel/Kafka Connect) → external system

By local message broker I mean message broker that is embedded in the OpenMRS instance or on the same network so connectivity is rarely an issue.

I agree that for integration needs you would almost always want to deploy a middleware as a standalone service, but if you really need to be lean then you could embed Camel routes with transformations to an external system in an OpenMRS module as well…

From the above I think you can read that our improved Event module could embed Debezium (borrowing from what openmrs-eip do) and a message broker of our choice that is preconfigured and comes with some examples using messaging API.

It would be also important to showcase pointing this module to a distributed message broker and standalone Debezium server instead of using embedded one to be able to run in a clustered/distributed environment.

If you want to fire some custom events you would deploy our improved Event module that provides Debezium and a message broker out of the box and in your custom module you would just use messaging API to fire events or configure embedded debezium to include your changes.

With this kind of design, you can start small with embedded solutions and switch to distributed services with a configuration change.

I think the current Event module or openrms-watcher from openmrs-eip are not far from that.

I wouldn’t write any tooling around monitoring queues in a module rather use what the message broker has to offer or tools like https://hawt.io/

Does it sound like something what you hoped for @mseaton?

At PIH, we have a module that we’ve been experimenting with here: GitHub - PIH/openmrs-module-dbevent , which embeds Debezium and provides an interface to register 1-N consumers of Debezium events.

The initial motivation behind this was to provide a mechanism that could be aware of all changes to records related to patient data in the database - not just those changes that were the result of certain API calls (eg. from AOP), and not just those changes that were the result of using the Hibernate persistence layer (eg. from a Hibernate Interceptor).

We constantly see use cases come up in the community that would benefit from such a feature, so our intention was to see how this works out and get things in a somewhat stable state before getting broader feedback and/or moving to the OpenMRS github, but happy to do that if others are interested sooner rather than later ( @ibacher / @dkayiwa FYI ).

(One area in the short-term that might benefit from this module is in the GCoC-proposed Improved Auditing feature potentially. It’s worth a discussion.)

Anyway, I think the step in your architecture that is OpenMRS ↔ module firing/listening for events ↔ or OpenMRS → DB → Debezium (embedded or standalone) → is handled by the above or other existing technologies, and so the main thing I’m interested in next is the second part of your equation: ↔ local message broker (embedded or distributed/standalone speaking AMQP 1.0) ↔ EIP middleware (Camel/Kafka Connect) ↔ external system

This is really where I’d like to have the discussion. I would like to better understand and explore how we could do a better job (eg. in the event module) of using a technology like ActiveMQ and ensuring it can and does do all of the things that you suggest that it excels at. Despite ActiveMQ being used in the event module in OpenMRS for a decade or more, it is pretty opaque, not something that is easily managed, and not considered 100% reliable as a result. I’d be very interested in exploring a solution that:

  • Is pluggable (like the current event module) - modules can both subscribe to events and publish their own custom events

  • Different event listeners can have different retry semantics. For example, an event Listener that is expected to run in the same VM, same machine, or same local network, may have one type of retry logic, whereas something that is expected to communicate to an external service over the internet may have a different type of retry logic (eg. one shouldn’t have to have a separate EIP application like OpenHIM running locally that handles this).

  • No messages should ever be lost, any dead-letter queue or queues with messages that are being retried should be fully visible and a web-based UI should be available to OpenMRS administrators (ideally within the same application, with the same credentials) that would allow administrators to manually fix and retry entries, view errors, and generally have full visibility and confidence in message delivery

If this can be accomplished with an embedded AMQP 1.0 instance and an embedded hawt.io application, great!

1 Like

Anecdotally, this conversation from back in 2007 / 2008 is still very relevant and pretty much everything we hoped to do here never happened, as far as I know:

We’re are not suggesting that we want a tool to take the place of a message broker, I think you need to distinguish between internal and external queues, before an event is sent to an external queue for delivery to an external consumer there is usually some smart pre-processing business logic and rules that need to be applied e.g. to preserve ordering where necessary, organizational semantic rules, determine what can be sent, skipped, ignored, what can be processed in parallel vs serial for better performance, add windowing etc. I actually touched about some of the things in my slack message here. I will speak for OpenMRS EIP, these internal DB based queues are intended for handling these pre-processing failures and retries that are specific to a source system and usually you need to query these queues to achieve this, message brokers are not designed for querying and this is one of the reasons we avoided an embedded message broker for internal queues, let alone one ‘bad’ message can block an entire queue, plus how messages are sent to a dead letter queue is agnostic to your business rules.

I think all of us do agree we might need some sort of module, but regardless as to whether it is a module and/or middleware service, I would love to have a common independent API/framework that can then be used in a module or standalone application as a building block.

@mseaton I like that the dbevent module relies on the Event module. I was thinking of adding Debezium functionality to the Event module itself, but it can live in a separate module… The Event module could be then just for providing API for application level events, queues (embedded or connecting to distributed services), Camel dependencies and embedding hawt.io for monitoring. The EIP middleware could be deployed then in custom modules or as standalone.

I think the issue is that the module was built with “internal event bus” in mind i.e. between modules and core. It may not even provide an endpoint for external connections (at least I don’t see anything in docs) and it has no queue monitoring instructions.

You typically configure retry policy per queue/topic.

One thing I wanted us to get clarity on is that you do not want to expose your queue to an external system directly. By external I mean one that you do not have full control over. It’s the EIP middleware that should communicate with an external system. You either provide an endpoint for pulling or push to an external system via e.g. Camel route that connects to your queue. In other cases it is preferred to directly connect your services to the queue (even if they run in remote locations). You may have EIP pull messages from queue, transform and put to a new queue for other services to consume directly from the new queue so you get the most benefits of a message broker and asynchronous communication.

We should be able to embed hawt.io, Camel 4.x and ActiveMQ Classic on versions of core (2.7.0+) supporting Java 17.

Hawt.io covers monitoring of queues and Camel routes. It doesn’t send alerts so we would have to add some code to send alerts when queue exceeds specific size.

@wyclif I’m just trying to say that building any logic around handling and monitoring queues/event processing is better avoided since there are tools that probably do it better already and can run in distributed environment and be scaled up if needed. I don’t know all the details of OpenMRS EIP current design and what it would take to make it use a message broker, but probably there’s a different design possible to the problem it tries to solve with its internal DB queue.

Thanks @wyclif and @raff . I find myself both relating to what Wyclif is describing and also interested in seeing if we can improve the event module in the way that Rafal describes. Let’s take the following use case:

Detect any new patients or updates to existing patients and update national MPI by posting a FHIR message to an external endpoint.

To accomplish this, let’s look at how we might address the following needs:

  • Listen on a stream of database change events to all relevant tables. eg. a single patient registration would see an event stream like: INSERT person, INSERT patient, INSERT person_name, INSERT person_address, INSERT person_attribute, INSERT person_attribute, INSERT patient_identifier, ....

    • Presumably we could solve this via registering a new DbEventListener instance using the dbevent module, and configuring it to monitor the given tables
  • Using either the dbevent module directly, or by publishing from dbevent → event and then in a series of events published using the event module, do the following:

    • For each row, identify the patient (id or uuid) associated with the row
    • Ensure that all of the events for a single patient are processed in series, even if multiple workers/threads are used to process events (eg. by keying on patient uuid)
    • Maintain some state that is able to aggregate all changes to the same patient over some period of time and/or delay following latest update, and after this window has passed, aggregates that state into a new message to process and clears the state. (eg. for the 7 database table events listed above, only create a single MPI update message, not 7 messages)
    • Submit the new aggregate message to an external endpoint
    • Handle failures in event processing at any of the above stages by routing messages appropriately (eg. re-publishing messages to the same queue +/- with some delay or move to error queue, etc)
    • Maintain some (maybe the same) state to identify patients that have previously failed records, and route any subsequent messages appropriately to ensure all patient updates remain in series and are not processed out of order.

Can we lay out what aspects of the above could be handled within the context of a shared framework and approach, and what aspects need to be developed custom within the context of a particular listener?

Thanks! Mike

I am also interested in thinking more critically about where it is appropriate to use a publish/subscribe message broker system (eg. the event module) vs. a system built more around an append-only log (eg. the atomfeed module or something like Kafka). Specifically, I’m interested in whether any of the above workflow might be better done (even internally) with module-based functionality that would:

  • Support writing event producers that write events to a topic in an append-only log
  • Support writing event consumers that read events from topics and track current offset
  • Support similar needs around failures, error handling, retries, and guaranteed processing of messages in order for a given key/partition, as described above.

Thoughts on this?

My approach on the MPI updates would be to have the dbevent module push events on tables: person, patient, person_name, etc. to the event module that puts everything as is in queues.

Then I would write a module or standalone middleware that has the following Camel routes (Camel comes preconfigured in the Event module to use its activemq instance):

  1. from patient/person tables queues route messages to a single queue putting patient_id in header for all messages and then to the aggregate component with correlationExpression set to patient_id and completionTimeoutExpression set to e.g. 1s and save a list of aggregated messages to the patient_changes queue.
  2. optional: If you want to introduce some parallel processing you could route from patient_changes queue to parallel routes based on patient id and each to the FHIR route below.
  3. from the patient_changes queue route to the external FHIR server with the FHIR component or to patient_changes_dead_letter if patient_id in patient_changes_dead_letter or FHIR error occurs.

I would configure redelivery on server or connection exceptions from the FHIR component and have a dedicated dead letter queue for undelivered messages due to connections/FHIR issues and a different one on any other exceptions.

The reason to put aggregated messages in the patient_changes queue instead of sending to the external FHIR server directly is to be able to easily route messages back from dead letter queue for sending to the FHIR server without having to aggregate again once issue is fixed. I would also store patient IDs that ended in dead letter queue to put any following messages in dead letter queue so they are sent in sequence when issue is fixed.

Basically the shared framework provides dependencies, configuration and monitoring via Hawt.io for Debezium, ActiveMQ, Camel. Maybe also some notification mechanism for route issues to the OpenMRS administrator and simple UI for re-routing from dead letter queues to a proper queue (which can be achieved to some extent with additional routes enabled/disabled via Hawt.io). All the processing is done with Camel routes and tailored to your use case in custom module or standalone middleware.

Thanks @raff ! The aggregate functionality of Camel seems interesting.

One clarification, when you say module, @raff, are you suggesting this could possibility be an OpenMRS module? One issue I see is that there needs to be some sort of OpenMRS API access at this point in the chain (which an OpenMRS module could provide, of course).

As just one example, one data point that we’d likely want to send to the external FHIR server or MPI is patient telephone number. In OpenMRS telephone number is just a person attribute of a certain configured type in the person_attribute table. I wouldn’t want to have to build a custom piece of middleware (that needs to be separately configured from OpenMRS) to translate the raw OpenMRS data model changes into something the MPI can understand.

I think that’s what I’m struggling with a bit: I understand the idea of getting the data “out” of OpenMRS early in the pipeline so that we don’t need to build any kind of queuing and dead letter support into OpenMRS, but I think we need to rely on the OpenMRS API for message translation and aggregation (and support not losing any messages that trigger API errors). Not that these two needs are mutually exclusive, just trying to visualize how it would work.

Hope that makes some sense!

Take care, Mark

Yes, Camel routes could be deployed as OpenMRS module or as standalone middleware. When they are in a module you have full access to OpenMRS API, whereas in standalone middleware you usually work with json. I haven’t tried it, but in theory we could make it work so that you could also include openmrs-api.jar in standalone middleware project and use OpenMRS data model there, but probably some adjustments in core would be needed.

Message translation, aggregation and error support for routes are part of Camel framework. You need to add conversion for data you want to push forward. OpenMRS API can sometimes help here, but not if you want to work with e.g. Debezium events as they come in raw db changes.

It’s possible to use Debezium as a trigger of an update event and then call OpenMRS API to get data in OpenMRS data model, if you are okay with eventual consistency and don’t mind intermediate states being lost (e.g. phone number was updated twice in a short period of time, but only the latest update makes it to MPI, because of the processing queue being a bit long), but for performance and strong consistency it’s usually best to work with Debezium events and data they contain.

If you need strong consistency and you want to work with OpenMRS API then it’s best to use API triggered events and not mix them with Debezium, but then there are issues when some code can do an update that is not correctly triggering API event… which is a bug that is sometimes hard to discover and address. A solution to that is adding periodic polling for changes you are interested in that may have been omitted.

My general preference is Debezium and working with the data contained in events. It usually doesn’t involve a lot of coding to get the data you are interested in out of those events and you don’t compromise performance and consistency. I would consider our OpenMRS DB schema quite stable between versions and changes are usually easy to work around if you work with plain json.