Simple Continuous Query Processing for Real-time Feature Engineering

Mustafa Kıraç
iyzico.engineering
Published in
5 min readDec 4, 2017

--

Data stream is defined as flow of frequent data updates. Recently emerged data stream applications include retail transactions, stock quotes, IOT monitoring, and clickstream tracking. Because the data elements in the stream arrive rapidly and continuously, queries over data streams are not one-time queries. Instead, queries over data stream are continuous queries that are defined before the data arrives, and the query result is refreshed periodically as the data arrives.

Many industries are making use of real time analytics and data stream processing.

In comparison with data streams, traditional relational databases suit better for queries that run on immutable (i.e., using the same data snapshot during the query) data sets. Database queries are optimized for higher query complexity, normalization, and high data volume whereas continuous stream queries are optimized for capturing changes and adapting the query result against such change in low latency. Data warehouses are built on top of traditional DBMSs, hence business views within the data warehouse are mapped to the original data sources through a complicated set of transformations (i.e., in the form of external ETL, or as an ELT process within the data warehouse). Regardless of the data transformation strategy underneath a data warehouse, data views are usually updated in large batches (e.g., daily). The data views are periodically updated or reconstructed from transactional data that is continuously updated by live data sources. Most business intelligence tools or applications can tolerate batch nature of data warehouses. On the other hand, when an application needs to access to fresh data, Change Data Capture (CDC) or Materialized Views (MVs) come to the rescue. CDC is a data replication mechanism that all changes to a table or database can be reflected towards its replica at another location. A materialized view is a special database table whose contents are periodically refreshed according to a registered query. CDC replication in combination with an ELT task, and materialized views are very popular techniques for keeping the data warehouse business views up to date. However, both methodologies are only for reducing the data refreshing period, and they are far from providing near-real-time data views.

Continuous (stream) queries are very similar to materialized views. The user or application defines a view over data sources in the form of a query. In contrast with materialized views, continuous query applications may refresh the view contents after every single update taking place at the data source. Because of the high computational cost of repeatedly running a query flow, continuous queries consist of simpler transformations in comparison with materialized views or ETL/ELT processes.

In addition to basic analytics applications such as Web clickstream and IOT monitoring, real-time predictive applications also require continuous query processing due to the need for feature engineering (e.g., rolling sums, window aggregations, time interval dependence).

At iyzico, we have been working on an automated fraud prevention solution that contains a continuous query processing application that we built in-house. We compute several aggregations in real time, for every single payment transaction that needs to be risk-scored by our solution. Here, we would like to provide more details about continuous query processing using an in-memory cache like Redis.

Example: window-function (feature) over (partition by key order by sort_key row/range between interval/x rows preceding and .. following)

The SQL construct above is an example of an analytical window function. For instance the query

SELECT transaction_id, sum(stock_price) over (partition by ticker order by time)

computes the cumulative sum of the stock price for each ticker, running over time.

In our fraud prediction solution, we implement various ML model features such as the average basket size of a merchant with respect to the transactions in past 3 hours, namely avg(basket_price) over (partition by merchant order by timestamp range between interval ‘3’ hour preceding and current row).

The first most important capability to implement “partition by” portion is the ability to store and retrieve data structures using a key. Once you implement a distributed hash table or a key-value store, you can create a hash key based on the query you are running (e.g., if you have an internal mechanism to assign IDs to continuous queries that you are keeping track of, such as Q1, Q2, Q3 etc.) augmented with the partition key (i.e., the merchant identifier in the example above). So the augmented key becomes (Q1, merchant_key).

The second capability to implement is the “order by” portion of the query. It requires sorting of all past basket_price values according to the sort key (i.e., timestamp). When it is guaranteed that the new records always arrive in monotonically increasing time order, a linked list is sufficient to implement a FILO queue (i.e., a stack-like structure) that chains the basket_price values as (price_1, timestamp_1) →(price_2, timestamp_2) →(price_n, timestamp_n)… where the time order dictates timestamp_1 < timestamp_2. If there is no interval requirement in the query, the average value can be simply updated using simple accumulators like

  • sum = sum + basket_price
  • count = count + 1
  • avg = sum / count

The interval requirement forces us to store all the past values of basket prices along with their timestamps. In our running example, we are taking the average of values within past 3 hours. Thus, we need to maintain the linked list in sorted order (or use a B-tree or another sorted list data structure), and remove/skip all timestamps that do not satisfy the requirement timestamp_n -timestamp_k <= 3 hours. After locating the start and end of the list for the given key (namely (Q1, merchant_key)), we can either choose to traverse over the whole list to compute the new avg(basket_price) value for the most recent record, or we can also choose to use an accumulator to update the new average by reducing the count and sum according to the removed/skipped items in the list, and increase the count and the sum using the newly added items.

At iyzico, we have been using Redis as our distributed hash-table and Redis ZSet data structure (see https://redis.io/commands/zadd) as the sorted list. Once basic continuous querying capability is implemented over Redis, it is possible to implement additional optimizations such as sharing the common list objects against multiple queries. Continuous queries are an integral part of our data generation pipeline.

Since continuous query computation is part of our real-time predictive pipeline, we repeatedly check the output of live metrics with offline counterparts that are computed using regular batch SQL. We rarely find few mismatches, e.g., once in every 3–5 million transactions and analyze the reason for divergence. If the divergence is found to be due to a bug, instead of emerging from simple causes such as clock synchronization among server nodes, we immediately fix the problem and make data migration if needed.

There are many other ways to implement continuous query computation capabilities. Please note that Apache Storm, Apache Flink, and the Apache Beam are some of the viable frameworks to implement scalable partitioning and sorting over real-time streams. Moreover, there are also some complex event processing (CEP) and time series database systems available with continuous querying capabilities (e.g. InfluxDB has short-period updates for CQs, PipelineDB provides complex query change notifications for PostgreSQL, RethinkDB is a NoSQL database with rich continuous query semantics, and Oracle offers Oracle Continuous Query Language (CQL), a query language based on SQL with added constructs that support streaming data).

The reason we chose Redis is due to our overall iyzico architectural design. At iyzico, Redis is also the main mechanism for lock management and DBMS query result caching and is thoroughly monitored and maintained by our ops team.

--

--