How to Export Raw Mixpanel Data to Your Database
Mixpanel is great, but it's very helpful to have your raw event data in your database so you can use SQL to extract insights. In this post, I'll show you how you can accomplish that. We'll be using Ruby on Rails and PostgreSQL, but the general approach of pulling data from Mixpanel, inserting it into the database, and scheduling a background job on an interval should work with any language and database.
Overview
Here's what we'll cover:
- A service class that is responsible for consuming the Mixpanel API, parsing the response body into JSON, building a database row for each event, and inserting those events efficiently
- A background job that calls the service class
- A scheduler that runs the background job on an interval (eg every minute)
There are some considerations that make this not so simple:
- How to avoid downloading duplicate events from Mixpanel?
- How to convert Mixpanel event timestamps to UTC?
- What to do if the request to Mixpanel fails?
- What to do if your database is slow and the background job takes longer than the interval?
- How can we do this in a memory efficient way?
Dependencies
Here are some of the technologies we're using:
- Sidekiq for background jobs
- sidekiq-unique-jobs to create a lock while the background job is running
- Zhong for scheduling jobs
- Oj for fast JSON parsing
- activerecord-import for bulk database inserts
- Mixpanel...duh 😉
The background job
class EventWorker
include Sidekiq::Worker
sidekiq_options retry: false, unique: :until_executed, log_duplicate_payload: true
PT = Time.find_zone!('America/Los_Angeles')
def perform
from_date = PT.now.hour < 5 ? PT.yesterday : PT.today
EventService.ingest_mixpanel_events(from_date: from_date, to_date: PT.today)
end
end
We specify retry: false
because if the job fails, it's okay, let it try again on the next interval.
The unique
and log_duplicate_payload
options are provided by sidekiq-unique-jobs
. This will create a lock and release it only when the job is finished executing. That way, only one of these jobs can run at a time. If someone tries to enqueue this job and one is still running, it will be logged, and it the duplicate won't be processed.
The math with from_date
is so that when 12:01am strikes, we don't just ignore yesterday's events, because our systems could still push an event that happened minutes ago, or Mixpanel could do the same if they have a delay in their processing. We're effectively creating a 5 hour buffer here. You can adjust this to whatever you like (eg 1 hour, 8 hours, etc).
The service class
# frozen_string_literal: true
class EventService
def self.ingest_mixpanel_events(from_date:, to_date:)
ApplicationRecord.transaction do
# Blow away all the events in this range
Time.use_zone(PT) do
query = <<-SQL.squish
delete from events
where time between '%s' and '%s'
SQL
sanitized_query = ActiveRecord::Base.send(:sanitize_sql_array, [query, from_date.beginning_of_day.to_s(:db), to_date.end_of_day.to_s(:db)])
ActiveRecord::Base.connection.exec_query(sanitized_query, "EventService#ingest_mixpanel_events")
log "Deleted events between #{from_date.beginning_of_day} and #{to_date.end_of_day}"
end
# Ingest the events in this range
(from_date..to_date).each do |day|
events = []
jsonl_string = fetch_mixpanel_raw_data(from_date: day, to_date: day)
log "Fetched JSONL events from mixpanel for #{day}"
Tempfile.open(['mixpanel', '.json']) do |file|
file.write(jsonl_string.force_encoding('UTF-8'))
log "Wrote JSONL to #{file.path}"
file.rewind
file.each_line do |line|
events << convert_line_to_event(line)
end
end
# Perform the inserts
ActiveRecord::Base.logger.silence do
Event.import(events, batch_size: 500)
end
log "Imported events for #{day}"
end
end
end
def self.convert_line_to_event(line)
raw_event = Oj.load(line)
raw_time = Time.zone.at(raw_event.dig('properties', 'time')) # This is a Pacific unix timestamp, but Rails doesn't know that
pacific_time = Time.find_zone!('America/Los_Angeles').parse(raw_time.iso8601.gsub(/Z$/, '')) # Tell Rails it's a Pacific timestamp
raw_event['properties']['time'] = pacific_time.utc # Convert the timestamp to UTC
build_event_from_raw_event(raw_event)
end
def self.fetch_mixpanel_raw_data(from_date:, to_date:)
conn = Faraday.new do |conn|
conn.basic_auth(Rails.application.secrets.mixpanel_api_secret, '')
conn.request :url_encoded
conn.adapter Faraday.default_adapter
conn.options.timeout = 60 # 60 seconds
end
resp = conn.get('https://data.mixpanel.com/api/2.0/export/', {from_date: from_date, to_date: to_date})
resp.body
end
def self.build_event_from_raw_event(raw_event)
# Choose whatever Mixpanel event properties you want in your table
Event.new({
name: raw_event['event'],
time: raw_event.dig('properties', 'time'),
distinct_id: raw_event.dig('properties', 'distinct_id')
})
end
private
def self.log(msg)
Rails.logger.tagged(self.name) do
Rails.logger.debug(msg)
end
end
end
There's a lot going on here, so let's try and break it down.
ingest_mixpanel_events
the main method of the class. It's responsible for deleting events in our database in the from_date..to_date
range, asking Mixpanel for the events in this range, and performing the database inserts. Why do we need to delete events in this range first? The reason is because Mixpanel's raw data export API doesn't give you unique identifiers for each event, so you can't do something like "give me all of today's events, but don't insert ones that I already have." You could do this with timestamps, like "give me all of today's events, but ignore ones up to $lastimport_time," but then there's a chance you'll miss some events if you or Mixpanel ever insert events _before $last_import_time.
We use a Tempfile
to store the response body from Mixpanel on disk rather than in an Array, which keeps the memory usage of this method low. Then we iterate over each line and call convert_line_to_event
.
Everything is wrapped in a transaction because without one, if someone ran a query for today's events and the background job just deleted today's events, then the query would return nothing. But if they ran that query again in a few seconds when the background job has a chance to complete the inserts, then it would have results. Using a transaction prevents this confusing experience.
convert_line_to_event
is responsible for taking line
(String), parsing it as JSON, converting the timestamp to UTC, and calling build_event_from_raw_event
. The timestamp work is necessary because Mixpanel stores events in your project's timezone, and it's best practise to store timestamps in your database as UTC.
fetch_mixpanel_raw_data
is responsible for hitting Mixpanel's raw data export API and returning the response body.
build_event_from_raw_event
is responsible for taking raw_event
(JSON) and creating an Event
instance (assuming you have an Event
model and events
table).
The scheduler
every 1.minute, 'Run EventWorker' do
EventWorker.perform_async
end
At PopSQL, we use Zhong to schedule jobs. Simply add a few lines to your clock.rb
file to run the background job every minute:
Ways to make this better
How can we reduce the memory footprint of the background job? When I used memory_profiler to measure the memory usage, a lot of it was coming from initializing instances of Event
for each row, and doing the time parsing/math.
@andrewkane pointed out I could pass activerecord-import arrays of columns and values, which is its fastest import mechanism, rather than creating Event
instances.
We could also hit the Mixpanel API before opening the database transaction.
Conclusion
Now that your database has your Mixpanel events up to the minute, you can use PopSQL to extract all kinds of insights. Who are the newest users? Which users activated or need help activating? Who's taking high value actions on the site right now that sales should follow up with? Enjoy, and feel free to chime in in the comments below.
Thanks to Nick Elser and Andrew Kane for guidance and suggestions.