I’m very happy to show how Omni is able to easily handle one of the most challenging scenarios in analytics: embedding custom fields. In this guide we’ll cover the problem statement, a brief overview of the conceptual solution and Omni’s features created to solve it, and finally a python script you can adapt to your specific scenario.
Video overview:
The Problem Statement
All businesses generate data for their customers, but it usually follows a schema and varies within the lines. Occasionally fully custom data is generated, which tends to happen in software companies especially.
Imagine you offer your customers the ability to track custom attributes on their employee profiles. It’s a great feature and customers love it. But when you have launched a reporting module, now customers are asking to report on these custom attributes. It makes sense, the custom fields are the most important because they bothered creating them!
Your engineers had stored the custom fields as EAV or key value data to account for all the different data types any given custom field could be. But now the problem is that there is no pre-defined schema, no self service option really exists and creating custom reports using your customer’s fields is nearly impossible.
Here is what it looks like:
- Company id tracks an overall customer account / tenant (in this scenario, all users at a specific company have the same profile fields, but there is a row per user to track the values)
- user id tracks the specific employee profile
- The field tracks the custom field name
- Data type is needed because all the values are stored as text
- Value is a varchar column which can store any type of information, but it needs to be cast by the application on read
Your data may look exactly like this, or vary somewhat. Even JSON custom attributes are fundamentally the same problem manifesting a little differently. The core element is the need for “mass customization”, be able to store a schema per customer and encode how these fields should be retrieved, cast, and presented.
Advanced Omni features to the rescue
Extension Models
Omni works through layers – the shared model and workbook model give you the best of both worlds for governance and the freedom in the workbook.
Omni has created a new ‘layer’ which helps solve the problem of EAV / custom fields in a scalable way.
The idea is a base or shared model, and “extensions” which are overlayed for a specific user or group of users possessing a user attribute value. The two models hub and specific extension will combine, with the extension taking precedence.
NOTE: This feature is in an open beta and will require someone at Omni to turn it on for you.
First in our “hub” or core project, we can add a new extension model by going to File > New Shared Extension
In your develop tab you’ll see the extension model tucked under its parent in the list, like so:
You could repeat this as many times as needed for the use case.
And back in the core model file, we need to define what attribute to listen to, and then what values correspond to which extensions. This is done through the dynamic_shared_extensions
attribute:
You could overlay any type of customization in the extension models, I suspect they will be very useful for localization as another example. But in the custom field scenario, it will allow us to populate the extension models with customer specific code that renders their specific fields in the topic and workbook. We’ll walk through the exact recipe and provide code samples in further sections.
YAML APIs
The second feature that’s helpful here is APIs which allow us to write code directly to Omni over HTTP. Raw API docs are here, but are also covered by the Python SDK by the methods yamlr()
for reading and yamlw()
for writing.
The raw http endpoints are:
GET {base_url}/api/{version}/models/{model_id}/yaml
for reading
POST {base_url}/api/{version}/models/{model_id}/yaml
for writing
With custom fields per customer we may ultimately have a large volume of code, and this gives us a straight forward means of automating the synthesis and deployment of it all.
The Solution
Okay, so I’ve got my Omni person to turn on the shared extensions and I’ve installed the Python SDK. What do I do now?
We’ll walk through this step by step, but all the final code you can find in this Github Repo of the hub / core project of this example: Core Repo
Step 1) Set up the structure
For the key value data, we have it stored in our EAV table we had seen above in the problem statement. We can add some fields in Omni in the core project that everyone will need which cast the data as the correct type:
# Reference this view as custom_profile_fields
schema: PUBLIC
table_name: CUSTOM_PROFILE_FIELDS
dimensions:
id:
sql: '"ID"'
format: ID
hidden: true
primary_key: true
company_id:
sql: '"COMPANY_ID"'
format: ID
hidden: true
company_id_str:
sql: CAST("COMPANY_ID" AS VARCHAR)
format: ID
hidden: true
user_id:
sql: '"USER_ID"'
format: ID
hidden: true
field_name:
sql: '"FIELD_NAME"'
hidden: true
data_type:
sql: '"DATA_TYPE"'
hidden: true
value:
sql: '"VALUE"'
hidden: true
string_value:
sql: |
CASE
WHEN ${data_type} = 'string' THEN ${value}
ELSE NULL
END
hidden: true
numeric_value:
sql: |
CASE
WHEN ${data_type} = 'number' THEN CAST(${value} as INTEGER)
ELSE NULL
END
hidden: true
date_value:
sql: |
CASE
WHEN ${data_type} = 'datetime' THEN CAST(${value} as DATETIME)
ELSE NULL
END
hidden: true
boolean_value:
sql: |
CASE
WHEN ${data_type} = 'boolean' THEN
CASE
WHEN ${value} = 'True' THEN True
WHEN ${value} = 'False' THEN False
ELSE NULL END
ELSE NULL
END
hidden: true
Notice the numeric_value
, date_value
, string_value
and boolean_value
these cast a given row from their overloaded varchar column into appropriately typed fields.
Next, we create a topic based on this file in the core model, it will get extended by the customer models with MAX/CASE..WHEN fields that flatten the sparse matrix represented by the EAV data, e.g. case when a field name is something specific the Omni measure will max / case the value out so it looks normal. The topic can be hidden, since it won’t be used directly.
Next we set up a query view, which will leverage our hidden topic. This will select our typed and CASE/WHEN filtered measures inside of a CTE. By doing this, we will be effectively flattening the key/value data into what looks and feels like a custom table for the customer.
This is just the core, it doesn’t have anything besides a user id, all the action (and specific fields) will happen in the extension models:
# Reference this view as tenant_flattened_fields
label: User Profile
query:
fields:
custom_profile_fields.user_id: user_id
base_view: custom_profile_fields
filters:
custom_profile_fields.company_id_str:
bind: user_profile.company_id_str
topic: custom_profile_fields
dimensions:
user_id:
hidden: true
primary_key: true
Now in our custom profile topic we can join our “tenant_flattened_fields”.
Topic file:
# this is the user_profile topic
access_filters:
- field: user_profile.company_id_str
user_attribute: company_id
joins:
company: {}
tenant_flattened_fields: {}
Relationship file:
- join_from_view: user_profile
join_to_view: tenant_flattened_fields
join_type: always_left
on_sql: ${user_profile.id} = ${tenant_flattened_fields.user_id}
relationship_type: one_to_one
join_from_view_as_label: user_profile
Now our core model is set up to receive its extensions. The extensions will have the max / case when measures that look for a specific tenant / company id and field name and data type. Then they will be flattened into a normal table structure in our tenant_flattened_fields query view and joined into a topic which will be custom for each customer.
Now for the fun part! The python code which automates and deploys our extension models per customer.
It does the following:
- Calls the Omni API to obtain the metadata from our EAV / KV table, a list of the tenants, their custom fields and data types
- It creates an extension model for the customer if it doesn’t yet exist
- It writes the max / case when measures to the custom_profile_fields view
- It writes the custom fields into
You can see the full code in github here.
from omni_python_sdk import OmniAPI
from yaml import safe_load as yaml_load, dump as yaml_dump
import pyarrow
from dotenv import load_dotenv
load_dotenv()
# Define your query which pulls the unique keys and data types
query = {'query':{
"sorts": [
{
"column_name": "custom_profile_fields.company_id",
"sort_descending": False
}
],
"table": "custom_profile_fields",
"fields": [
"custom_profile_fields.company_id",
"custom_profile_fields.field_name",
"custom_profile_fields.data_type"
],
"pivots": [],
"dbtMode": False,
"filters": {},
"modelId": "4cdba071-e61c-4881-99b0-d9d225fdc770",
"version": 7,
"rewriteSql": True,
"column_limit": 50,
"dimensionIndex": 3,
"default_group_by": True,
"custom_summary_types": {},
"join_paths_from_topic_name": "custom_profile_fields"
}}
def transform_query_data(query_data:pyarrow.lib.ChunkedArray) -> dict:
# Transform the query data into a dictionary with company_id as keys
transformed = {}
for row in query_data:
company_id = int(str(row['custom_profile_fields.company_id']))
field_name = str(row['custom_profile_fields.field_name'])
data_type = str(row['custom_profile_fields.data_type'])
if company_id not in transformed:
transformed[company_id] = {}
transformed[company_id].update({
field_name: {
'data_type': data_type
}
})
return transformed
# Initialize the API with your credentials
api = OmniAPI(env_file='.env')
# Run the driving metadata query, with the company_id, field_name, and data_type
table = api.run_query_blocking(query)
# Transform the metadata query results into a dictionary
customers = transform_query_data(table[0].to_struct_array())
# Get the core / hub model
model = api.list_models(name='eav', modelKind='SHARED')['records'][0]
modelID, connectionID = model['id'], model['connectionId']
extensionModels = api.list_models(baseModelId=modelID, modelKind='SHARED_EXTENSION')
modelFile = api.yamlr(modelID, body={'fileName': 'model'})
userProfileTopicFile = api.yamlr(modelID, body={'fileName': 'user_profile.topic'})['files']['user_profile.topic']
# Obtain the currently existing extension models (if any)
extensionModels = {
model['name']:model
for model in api.list_models(
baseModelId=modelID,
modelKind='SHARED_EXTENSION'
)['records']
}
# Define the mapping of data types to SQL expressions for easier field creation
type_map = {
'boolean': '${boolean_value}',
'string': '${string_value}',
'number': '${numeric_value}',
'datetime': '${date_value}',
}
for customer_id, fields in customers.items():
print(f"Processing company ID: {customer_id}")
cid = f'c{customer_id}'
# Step 1) Create Extension Model if it doesn't exist
if cid in extensionModels:
print(f" Extension model for company {customer_id} already exists, skipping creation.")
tenantModel = { 'model': extensionModels[cid] }
else:
print(f" Creating extension model for company {customer_id}.")
tenantModel = api.create_model(
modelName=cid,
connection_id=connectionID,
baseModelId=modelID,
modelKind='SHARED_EXTENSION',
)
# Step 2) Set up the custom fields and flattened custom fields dicts
customProfileFields = {'measures':{}}
flatteningQueryView = {
'query':{
'fields': {
'custom_profile_fields.user_id': 'user_id'
},
'base_view': 'custom_profile_fields',
'filters': {'custom_profile_fields.company_id_str': {'bind': 'user_profile.company_id_str'}},
'topic': 'custom_profile_fields'
},
'dimensions': {
'user_id': {
'primary_key': True,
'hidden': True
},
}
}
# Loop over the metadata query, adding each field to the customProfileFields and flatteningQueryView dicts
for field in fields:
dataType = fields[field]['data_type']
print(f" Adding field {field} with data type {dataType}")
customProfileFields['measures'][field] = {
'sql': type_map[dataType],
'aggregate_type': 'max',
'filters': {
'field_name': {
'is': field,
}
}
}
# Add the field to the flattening query view
flatteningQueryView['query']['fields'][f'custom_profile_fields.{field}'] = field
flatteningQueryView['dimensions'][field] = {}
# Step 3) write the two YAML files to the extension model
api.yamlw(tenantModel['model']['id'],
{
'fileName': f'PUBLIC/custom_profile_fields.view',
'yaml': yaml_dump(customProfileFields),
'mode': 'extension',
'commitMessage': f'Add custom field {field} for company {customer_id}',
}
)
api.yamlw(tenantModel['model']['id'],
{
'fileName': f'tenant_flattened_fields.query.view',
'yaml': yaml_dump(flatteningQueryView),
'mode': 'extension',
'commitMessage': f'Add custom field {field} for company {customer_id}',
}
)
print(f" Wrote custom extension files.")
# Step 4) Add extension model to the hub mappings
modelContents = yaml_load(api.yamlr(modelID, body={'fileName': 'model'})['files']['model'])
modelContents['dynamic_shared_extensions'][0]['mappings'].update({cid:{'values_for_model':[f'{customer_id}']}})
api.yamlw(modelID, {
"fileName": f"model",
"yaml": yaml_dump(modelContents),
"mode": "combined",
"commitMessage": f"Add mapping for {customer_id}",
})
print(f" Written to hub model mappings. Finished processing company ID: {customer_id}\n")