Realtime Analytics in Chango Private
It is not easy to accomplish realtime analytics in Data Lakehouses. In order to do so, for example, you need to send streaming events to Kafka and consume events from Kafka and save them to the Data Lakehouse using Spark Streaming, for which you need Kafka Cluster and Spark Cluster.
In Chango Private, you don’t have to create such Kafka Cluster and Spark Cluster to insert streaming events to Chango Data Lakehouse. Just send streaming events to Chango Data API Server, then Chango Private will do all the rest of the work.
Chango Private provides the following solutions to analyze data in realtime.
- Send streaming events from external applications using
Chango Client
. - Aggregate distributed logs using
Chango Log
. - Change Data Capture using
Chango CDC
.
In this article, I will show you how to send streaming events to Chango Private for realtime analytics.
Create Iceberg Table
Before sending json as streaming events to Chango Data API
server, Iceberg table needs to be created beforehand with trino clients like Trino CLI and Apache Superset.
It is good practice that Iceberg table for streaming events needs to be partitioned with date, for example, year
, month
and day
. In addition, timestamp column like ts
also needs to be added to Iceberg table. Actually, with ts
column, Chango Private will compact small data, manifest, and position delete files and expire snapshots to improve query performance.
Especially, in order to compact small files and add partitioning in Iceberg table in Chango, you need to follow the rules.
The name of timestamp column must be
ts
whose type isbigint
which is equivalent tolong
in java. Column namesyear
,month
,day
will be used as partitioning columns.
ts
: the number of milliseconds since 1970-01-01 00:00:00.year
: year with the format ofyyyy
which is necessary for partitioning.month
: month of the year with the format ofMM
which is necessary for partitioning.day
: day of the month with the format ofdd
which is necessary for partitioning.
For example, create logs
table with partitioning and timestamp.
-- create iceberg schema.
CREATE SCHEMA IF NOT EXISTS iceberg.iceberg_db;
-- create iceberg table.
CREATE TABLE iceberg.iceberg_db.logs (
day varchar,
level varchar,
message varchar,
month varchar,
ts bigint,
year varchar
)
WITH (
partitioning=ARRAY['year', 'month', 'day'],
format = 'PARQUET'
);
The sequence of table column names in lower case must be alphanumeric in ascending order.
You can create Iceberg table with Superset provided by Chango Private like this.
Send JSON Events with Chango Client
You need to add Chango Client dependency to your project.
<dependency>
<groupId>co.cloudcheflabs.chango</groupId>
<artifactId>chango-client</artifactId>
<version>2.0.1</version>
</dependency>
It is very simple to use Chango Client API in your code.
You can construct ChangoClient
instance like this.
ChangoClient changoClient = new ChangoClient(
token,
dataApiServer,
schema,
table,
batchSize,
interval
);
token
: Data access credential issued byChango Authorizer
.dataApiServer
: Endpoint URL ofChango Data API
.schema
: Target Iceberg schema which needs to be created before sending json data to chango.table
: Target Iceberg table which also needs to be created beforehand.batchSize
: The size of json list which will be sent to chango in batch mode and in gzip format.interval
: Json data will be queued internally in chango client. The queued json list will be sent in this period whose unit is milliseconds.
To get the endpoint of Chango Data API
, go to Components
-> Chango Data API
and get URL in Endpoint
section.
To access Chango Data API Server, you need Chango Credential as token
with data access privileges. Please see Chango Security for more details. In this example, you want to insert streaming events to table logs
of schema iceberg_db
in catalog iceberg
. First create a role.
And create credential of the role logs
.
Finally, add privileges to the role logs
.
As seen in the picture, you have created just WRITE
privileges for iceberg.iceberg_db.*
, which means you have WRITE
privilege to all the tables of schema iceberg_db
in catalog iceberg
.
Now, you have Chango Credential to access Chango Data API Server.
To send JSON Events, just call the following method.
// send json.
changoClient.add(json);
Let’s see the full codes for this example.
import co.cloudcheflabs.chango.client.component.ChangoClient;
import co.cloudcheflabs.chango.client.util.JsonUtils;
import org.joda.time.DateTime;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class SendLogsToDataAPI {
private static Logger LOG = LoggerFactory.getLogger(SendLogsToDataAPI.class);
@Test
public void sendLogs() throws Exception {
String token = System.getProperty("token");
String dataApiServer = System.getProperty("dataApiServer");
String table = System.getProperty("table");
int batchSize = 10000;
long interval = 1000;
String schema = "iceberg_db";
ChangoClient changoClient = new ChangoClient(
token,
dataApiServer,
schema,
table,
batchSize,
interval
);
long count = 0;
while (true) {
int MAX = 50 * 1000;
for (int i = 0; i < MAX; i++) {
Map<String, Object> map = new HashMap<>();
DateTime dt = DateTime.now();
String year = String.valueOf(dt.getYear());
String month = padZero(dt.getMonthOfYear());
String day = padZero(dt.getDayOfMonth());
long ts = dt.getMillis(); // in milliseconds.
map.put("level", "INFO");
map.put("message", "any log message ... [" + count + "]");
map.put("ts", ts);
map.put("year", year);
map.put("month", month);
map.put("day", day);
String json = JsonUtils.toJson(map);
try {
// send json.
changoClient.add(json);
count++;
} catch (Exception e) {
LOG.error(e.getMessage());
// reconstruct chango client.
changoClient = new ChangoClient(
token,
dataApiServer,
schema,
table,
batchSize,
interval
);
LOG.info("Chango client reconstructed.");
Thread.sleep(1000);
}
}
Thread.sleep(10 * 1000);
LOG.info("log [{}] sent...", count);
}
}
private String padZero(int value) {
String strValue = String.valueOf(value);
if(strValue.length() == 1) {
strValue = "0" + strValue;
}
return strValue;
}
}
Take a look at the value of ts
must be the number of milliseconds since 1970-01-01 00:00:00.
Run Query in Iceberg Table for Streaming Events
You can run queries in iceberg table logs
to which streaming events are inserted.
-- select with partitioning columns.
select *, from_unixtime(ts/1000) from iceberg.iceberg_db.logs where year = '2023' and month = '11' and day = '07' limit 1000;
You have seen how easy to insert streaming events to Chango Private for Realtime Analytics.
Visit and use Chango Private!