OpenMRS ELT/ETL Pipeline Demo

A major focus within the OpenMRS community is how to efficiently extract data from the production database for reporting, analysis, and clinical decision support. Using modern data engineering tools to extract and transform data from OpenMRS can greatly reduce workload from the production line as well as improve the workflow of reporting and analysis. Additionally, the manual process of designing and developing ETL queries have been proven to be time-consuming, tedious, complex, and not scalable. To encourage collaboration within the community, a modern approach is desperately needed. AMPATH and Antara have been collaborating the past few months in designing and developing a modern ELT pipeline (proof of concept) using widely used data engineering tools and programming language. Here is what we have accomplished so far

  1. We have been able to design a friendly-to-analytics DataFrame/Table schema for OpenMRS data that is relevant to many different groups and not specific to any OpenMRS implementation. As a strawman starting point, the derived dataframe consist of data elements extracted from Obs, Encounter, Orders, and Person table, with the flexibility to include other desired data elements.

  2. Instead of using the ETL paradigm, which discourages collaboration effort, we leveraged on ELT pattern to extract and load data out of OpenMRS. Additionally, using ELT provides the ability to store extracted data on an intermediate datastore, so that the data can be efficiently loaded into an analytics environment/platform of choice. Currently, the intermediate datastore supports delta lake and Cassandra with the ability to plug in NoSQL/RDMS of choice. All logic for sourcing, sinking, and transformation is implemented using opensource stacks such as spark, debezium, kafka, casssandra, and recently open-sourced delta lake.

  3. The pipeline supports both batch and streaming modes. After the batch process is completed, all updates done through OpenMRS are incrementally updated to the intermediate data store. To avoid code duplication, the batch pipeline and streaming share some common algorithms, with the ability to extend it very easily. We have tested this pipeline on a production level during peak-hour of data-entry (~80 events per second) with minimal latency (~ 4 seconds)

    For more details, we will have a brief demo during Burke’s ETL vision statement meeting (yet to be scheduled), feel free to join us, and we will be able to discuss possible directions we can take as a community. Hopefully, the call and the proof of concept will take this conversation to the next level. Please feel free to post follow up questions or some of the things you need or would like to see /discuss during the demo. Should you need immediate clarification, please feel free to post via this thread.

2 Likes

Hi @akimaina,

Thanks for sharing!

Which call is/was that?

We were just talking about ETL during today’s Technical Action Committee meeting and it seems like there was another discussion about ETL happening during some recent FHIR squad meetings. One idea that came out of the TAC meeting was to schedule a virtual Lightning Talk dedicated to ETL.

@akimaina, it would be great to have you demo this as a Lightning Talk - and we can also schedule a Design Forum for a deeper dive.

Thank you Jennifer, that will be great!

thank you @mksd, Burke will be doing a post about the ETL vision statement. This demo will happen during that call. Time/date is yet to be determined so @jennifer will circulate a Doodle poll.

Thanks @akimaina for this great post. A few questions:

  1. Is the code available on GitHub for everyone to take a look?

  2. How sensitive is this implementation to changes in the input DB schema? For example, is the same pipeline applicable to different versions of OpenMRS where the schema of some of the input tables are different?

  3. I see that you have tried to prevent code duplication by sharing common code. Did you consider implementing the pipeline by Apache Beam such that the same pipeline can be used in both streaming and batch modes?

  4. Re. the output Spark dataframes, did you consider the alternative of storing the middle output in columnar file formats (like Parquet)? In general, if someone wants to avoid PySpark and run SQL like queries on the intermediate storage, is it possible? Sorry, I don’t know much about Delta Lake, maybe it already does what I am asking about.

  5. Re. the stats that you reported for the streaming case, how many nodes/cores were being used for the streaming pipeline? And how big was the input DB in terms of approximate number of patients, observations, etc.

Thanks again.

1 Like

Also, for the batch process, is this assumed to be one way out of OpenMRS? I understand that there is a desire to populate OpenMRS from another database when starting up a COVID-19 system. Is there an existing workflow which can leverage a transform layer or do we need that going in the other direction (patient, visit, obs, orders, etc.)?

1 Like

Thank you @bashir for taking the time to go through this post and raising important design considerations :grinning: I’m glad you are interested in this conversation of having a community-driven ETL pipeline.

Yes, there are two components to it: OpenMRS CDC - docker containers and Batch/Streaming Codebase This is prototyped using python; however, once the community is on board with the idea of designing and developing a new community-driven ETL/ELT pipeline, we can use whichever programming language that favors the majority. Also, the design of the ultimate pipeline should be driven by the community, and it doesn’t need to use this architecture; this is just but a proof of concept. Please let me know if you need help running the CDC and bringing up the pipeline.

The first component i.e., CDC component, supports all versions of OpenMRS; however, we haven’t tested the second component on version <2.X. Ultimately, we need a pipeline that is version-able and can support any version of OpenMRS. Therefore, moving forward, we should come up with a design that evolves with OpenMRS versions. The good thing is that OpenMRS base table schema rarely changes.

This is an excellent suggestion, during the design session of the ultimate pipeline, we should definitely consider Beam. We did not explore other options because spark provides a unified API for both streaming and batch mode. Essentially you can write one pipeline for both modes using spark. However, a pure unified streaming/batch pipeline puts a very strong assumption on the ingestion mechanism of the pipeline, i.e., it assumes your streaming/batch input shares identical source, schema, and structure. This is almost impossible to pull without modifying OpenMRS API to support streaming or alternatively pre-structuring the input data stream/batch. Lambda architecture comes to the rescue when you do not have the luxury of modifying pre-existing system. Lastly, another important attribute that we need to consider while selecting the ultimate tech-stack is: guarantee for “exactly-once” processing semantics (with respect to availability and consistency). In essence, we don’t want to miss some updates in case of a disconnection between the data source (OpenMRS) and the ETL/ELT pipeline.

Delta Lake supports columnar file format and is actually built on top of Parquet files, with support for both batch/streaming including, ACID transaction and schema evolution e.t.c. When we finally design the ultimate pipeline, I think it will be important to allow flexibility for selecting the intermediate storage of choice - whether on-premise or cloud storage. Ultimately we should make it configurable.

Delta, through spark API, supports SQL like queries. Both PySpark batch/streaming context provides out of the box support for SQL-like queries. I can’t stress enough that we need the flexibility of selecting the intermediate storage of choice. The community won’t be receptive if the ultimate pipeline doesn’t support this flexibility. As a strawman’s starting point, we should have a default intermediate storage while allowing support for popular storage systems.

The entire pipeline has been tested on a server with 8-cores and 30GB of RAM; however, only 21 GB was available since other containers were running on the same server.

Again thank you for following up with these design questions, please let me know if you need further elaboration.

1 Like

Thank you @akanter for this question. It highly depends on the kind of data you want to transfer between these two systems. Transactional and analytical data are not synonymous, therefore for a 2-way transfer, between 2 transactional systems (OLTP), you would want to leverage on FHIR/REST modules (I’m not sure if they support batch). However, for a pure analytics system (OLAP), you can have the flexibility of having a pipeline that goes both directions. This prototype is only intended for analytics purpose such as reporting and can only support a one-way data movement. However, we can come up with a design that supports 2-way modes for specific usecases such as clinical decision support e.t.c. Thank you for this suggestion!

@mksrom @wyclif that’s TL;DR but please go through all the above when you have half an hour.

Thanks again @akimaina, please keep us in the loop about the demo, we will certainly want to be involved at Mekom moving forward.