We have an issue with the Continuous Aggregation feature with TimescaleDB 2.x upgrade.
We storing data with 1 seconds granularity in hypertable and doing rollups to 10m and 1h intervals with continuous aggregation views.
Before 2.0 we have no issues with refreshes, but with 2.0 rollups get stuck after inserting a bunch of historical data.
We've looked through a code and was able to tell that the issue raises in invalidation logic.

Our system doing separate inserts of data points rounded to 1-second grid (shown in red on the diagram) and this action leads to creating invalidation records in the _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log table. Because all of these records are 0 length intervals - the aggregation process cannot merge them into bigger intervals.
This leads to adding dozens of records into _timescaledb_catalog.continuous_aggs_materialization_invalidation_log and the continuous aggregation job invalidates the same 10 minutes interval again and again for each changed record.
Maybe the invalidation process can be more clever and join intervals that fall into the same bucket to avoid recalculating the same intervals over and over again? Another thought is to recalculate the bucket once taking into account all intervals currently invalidating it and not once per invalidation log record.
I hope this problem is not ours only. Please help us to understand and investigate the problem properly.
postgres=# SELECT version();
version
---------------------------------------------------------------------------------------------------------
PostgreSQL 12.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-39), 64-bit
(1 row)
postgres=# \dx
List of installed extensions
Name | Version | Schema | Description
-------------+---------+------------+-------------------------------------------------------------------
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
timescaledb | 2.0.0 | public | Enables scalable inserts and complex queries for time-series data
It would be great to solve this problem somehow. I've tried workaround with insert triggers on continuous_aggs_hypertable_invalidation_log to extend zero length interval to 1-second interval and hoped merge function to merge them. But the trigger seems to not being executed when data inserted via low-level functions (which used by TimescaleDB extension).
Thanks for reporting this issue. Looks indeed to be an issue with the invalidation logic.
I am seeing similar issues with refresh_continuous_aggregate ie. it’s taking forever. It seems to not affect newly inserted data, the CAGG policies are keeping up. I am not backfilling data, but the initial data dump was inserting historic data. And to get the CAGGs up to date I did run refresh_continuous_aggregate but it is extremely slow. I am seeing millions of rows in _timescaledb_catalog.continuous_aggs_materialization_invalidation_log so my guess is that it is connected to this issue.
The aggregations are making per minute, per hour and per day buckets from 2 months of data (100s of millions of rows). Some lighter (in number of rows per bucket) CAGGs have been able to complete (very slowly), but the heavier ones are not completing (have been running for 24h) even when restricting the refresh_continuous_aggregate interval to a single day.
I also have some issues I think may be related to this. We upgraded to 2.0 a couple of days ago, but after that continuous aggregate refresh jobs have "stopped working". What happens now is that they just run forever. I've tried all I could think of and even removed all jobs and just created one single continuous_aggregate_policy. That job has now been running for more than 24 hours constantly using 100% of one cpu core. The job is materializing 1 minute data into hourly with start_offset => INTERVAL '30 days', end_offset => INTERVAL '1 hour'. Like @aelg, running refresh_continuous_aggregate manually on smaller intervals also takes a very long time.
Some background info: we are running 400 nodes caching data and each node writing to the timescaledb every 30 minutes. These nodes may have unstable connection, so the write into timescaledb may also be hours and even days late. In other words, data are being backfilled constantly.
Another thing maybe worth mentioned (not sure if this is relevant or not?); the hypertable was created using default chunk size, so I see that chunks are apparently created weekly with a total size of up to 74GB per chunk. I lowered to 1 day chunk size now, maybe I should even go lower? (server has 90 GB RAM).
We are using a workaround to mitigate this issue in production now.
Materialization process (automatically and manual) doing its job by processing all records in _timescaledb_catalog.continuous_aggs_materialization_invalidation_log which falls into invalidation interval. And because this table being polluted with dozens of small intervals it takes forever. So the workaround is following:
1) Check what's going on by running:
select materialization_id,
_timescaledb_internal.to_timestamp(lowest_modified_value) as lowest_modified_value,
_timescaledb_internal.to_timestamp(greatest_modified_value) as greatest_modified_value
from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log;
2) There are "special" records in this table that guarantee the invalidation of recent data. lowest_modified_value value for this records is somewhere in the near past and greatest_modified_value is +infinity.
select materialization_id,
_timescaledb_internal.to_timestamp(lowest_modified_value),
_timescaledb_internal.to_timestamp(greatest_modified_value)
from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
where greatest_modified_value = _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);
This query will return a record for each cont. agg. view.
3) Next thing we are cleaning all other records:
delete from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
where greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);
This query will probably hang on lock because background jobs are running so in other console execute:
select _timescaledb_internal.restart_background_workers();
4) After this operation all recent data will be processed as normal.
5) If you need to re-process some data in past (let's say last hour, day, or week) - just update lowest_modified_value value with desired start data and a chunk of data will be processed.
Thanks, I will try to see if we can do something similar!
However, the first query resulted in error: "ERROR: timestamp out of range" so I looked closer to this table and found the result a bit strange.
Is this making any sense or can this be part of the reason to the issues we experience?

This 2 values are +inf and -inf.
# select _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);
to_unix_microseconds
----------------------
9223372036854775807
(1 row)
# select _timescaledb_internal.to_unix_microseconds('-infinity'::timestamp);
to_unix_microseconds
----------------------
-9223372036854775808
(1 row)
Seems its a bit messed up (i'm not sure about all values, but some of them are makes sense).
In TSDB source code I saw that they are using -inf to +inf records in this table to indicate that data not being processed at all and all range should be reprocessed.
I would suggest to just clear all records in this table and re-add "special" record for each view you have.
Save all uniq cont. agg. view ids:
select distinct(materialization_id) from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log;
For us it returns 505 and 506.
Then delete all this mess with:
delete from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log;
-- maybe restart workers in separate console with
-- select _timescaledb_internal.restart_background_workers();
And re-add just few records (change with your ids!):
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
SELECT 505, _timescaledb_internal.to_unix_microseconds(now()), _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
SELECT 506, _timescaledb_internal.to_unix_microseconds(now()), _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);
Thanks! So far this seems to solve several issues. Will continue testing tomorrow.
Currently looking into this issue, but would need some help to understand the underlying cause. Do people generally experience these issues after upgrading their continuous aggregates from a previous version? Specifically, I am wondering if the invalidation logs already had a lot of entries prior to updating or whether lots of entries appeared after the update?
If anyone has a script to reproduce these issues on a fresh installation, that would be tremendously helpful. I am trying to reproduce it myself in the meantime.
Sure.
CREATE TABLE datapoints
(
timestamp TIMESTAMPTZ NOT NULL,
device_id bigint,
data float not null
);
SELECT create_hypertable('datapoints', 'timestamp');
CREATE MATERIALIZED VIEW datapoints_10m
WITH (timescaledb.continuous, timescaledb.materialized_only = true)
AS
SELECT time_bucket('10m', timestamp) as timestamp,
avg(data) as data
FROM datapoints
GROUP BY time_bucket('10m', timestamp)
WITH NO DATA;
-- inserting data for device_id == 1
INSERT INTO datapoints (timestamp, device_id, data)
SELECT g.id, 1, random()
FROM generate_series(now() - '1mon'::interval, now(), '1 second') AS g (id);
CALL refresh_continuous_aggregate('datapoints_10m', now() - '1mon'::interval, now());
SELECT hypertable_id, _timescaledb_internal.to_timestamp(watermark)
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold;
hypertable_id | to_timestamp
---------------+------------------------
1 | 2021-02-04 09:10:00+00
Everything fine till this moment and all data in datapoints table are materialized.
Now backfilling data to period below invalidation threshold (10_000 individual insert statements)
In bash shell:
for i in {3600..13600}; do
psql -p 5433 -h localhost -U postgres tdb_test -c "INSERT INTO datapoints (timestamp, device_id, data) VALUES(now() - '${i}s'::interval, 2, random());"
done;
Now there are 10_000 record in hypertable invalidation log
For production cases this number can be huge (hundreds of thousands)
# select count(*)
from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
count
-------
10000
And when calling refresh_continuous_aggregate all records from continuous_aggs_hypertable_invalidation_log will be copied to continuous_aggs_materialization_invalidation_log, and materialization process will run again and again.
Expected behavior will be to invalidate and materialize all this little changes in one pass.
@dimonzozo Thank you. Can you clarify the last bit: "And when calling refresh_continuous_aggregate all records from continuous_aggs_hypertable_invalidation_log will be copied to continuous_aggs_materialization_invalidation_log, and materialization process will run again and again."
How do you run refresh_continuous_aggregate at this point (can you give exact statement?). When you say the materialization process will run "again and again", do you mean that refresh_continuous_aggregate does not complete or are you referring to a background job/policy? If the latter, how is the policy configured?
For tests i run the same command CALL refresh_continuous_aggregate('datapoints_10m', now() - '1mon'::interval, now());, but the same issue happens with background refresh.
With test data, this call takes lots more time than first call. On production, it never completes because in our data invalidation of each 10m interval takes up to 40 seconds.
And when calling refresh_continuous_aggregate all records from continuous_aggs_hypertable_invalidation_log will be copied to continuous_aggs_materialization_invalidation_log
My thought process was the following. I did look through the source code and found out that the first step of executing refresh_continuous_aggregate is copying data from continuous_aggs_hypertable_invalidation_log to continuous_aggs_materialization_invalidation_log for each materialized view exists. Then I checked processing logic and it runs materialization for each record in materialization_invalidation_log. Next, I assume that because the table contains lots of records for the same 10m interval - they all being processed separately which causes slow processing.
@dimonzozo Thanks for the additional information. I tested your reproduction case and, while the second refresh was indeed slower, it did complete without too much delay. Obviously, the refresh time is somewhat proportional to the amount of invalidations it needs to process and range to materialize and maybe there is something we can do to handle a huge amount of invalidations better.
A couple of observations, though. If you have many, many invalidation records due to single row (non batched) inserts, then the processing of those invalidations will also take a longer time (as evident by the example). We do merge invalidations, but only if ranges are adjacent or overlap, and only for the cagg being processed by the current command.
One workaround, until we can optimize for single row inserts, might be to manually materialize smaller ranges of backfill in a single refresh, and then do several of them instead.
Another option might be to provide a "hard refresh" option where we clear all the invalidations in the refreshed range without further processing, and then proceed with refreshing the whole range instead of smaller bits within the refresh window.
We do merge invalidations, but only if ranges are adjacent or overlap, and only for the cagg being processed by the current command.
Yeah. I saw this logic in source code. Great work, BTW! Source code is very clear, has lots of comments and easy to read.
"hard refresh" is what we doing in our case and that what I suggested to @slasktrat in comments before. As workaround this is totally fine.
Also I thought I can do a script which will replace lots of small invalidation ranges with single record, but running it by TSDB action will fail due locks on the table. Other workaround ideas would be very helpful.
Can confirm that the workaround suggested by @dimonzozo seems to fix things for me as well. That is clearing out the continuous_aggs_materialization_invalidation_log-table and replacing it with a single row per CAGG that covers the whole interval that needs to be refreshed.
@erimatnor FYI I created the database in 2.0.0 and then filled it with data, the instance was upgraded from 1.7.4 though, but the databases where dropped and recreated, then filled with 2 months of data. This still seems to have created a lot of rows in the invalidation log.
Also I'm using BIGINT with nanoseconds since epoch as the time column if that should matter.
I've been testing some more and when using the built-in add_continuous_aggregate_policy ,continuous_aggs_materialization_invalidation_log is filled with an extreme amount of entries and the result is that the initial job runs forever (I killed the job after 28 hours). As long as the schedule interval is not too big it so far seems sufficient for me to do some cleanup in the continuous_aggs_materialization_invalidation_log immediately after creating the aggregate policy. The same job that was running for 28 hours without completing did now have an initial run duration of 12 minutes, and subsequent runs are completed in seconds, when creating the aggregate policy using my custom function below.
CREATE OR REPLACE FUNCTION add_continuous_aggregate_policy_custom(
continuous_aggregate regclass,
start_offset interval,
end_offset interval,
schedule_interval interval,
if_not_exists boolean DEFAULT false)
RETURNS integer
AS $$
DECLARE
job_id integer;
mat_id integer;
lowest_modified_v bigint;
BEGIN
mat_id := (SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = continuous_aggregate::name);
lowest_modified_v := (SELECT MIN(lowest_modified_value)
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = mat_id
GROUP BY materialization_id);
IF lowest_modified_v IS NULL THEN
lowest_modified_v := _timescaledb_internal.to_unix_microseconds(now() - INTERVAL '1 hour');
END IF;
SELECT add_continuous_aggregate_policy(continuous_aggregate, start_offset => start_offset, end_offset => end_offset, schedule_interval => schedule_interval, if_not_exists => if_not_exists) INTO job_id;
DELETE FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log where materialization_id = mat_id;
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log SELECT mat_id, lowest_modified_v, _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);
RETURN job_id;
END;
$$ LANGUAGE plpgsql;
The joy did only last a few hours, already the continuous_aggs_materialization_invalidation_log is polluted with more entries than the refresh job are able to manage.. Seems we have to go all manual like @dimonzozo after all. :/
I combined some ideas and have another possible workaround (not properly tested!).
This procedure will properly handle invalidations intervals and can be used instead of add_continuous_aggregate_policy, but unfortunately, it won't work because of #2876:
CREATE OR REPLACE PROCEDURE refresh_continuous_aggregates(job_id int, config jsonb)
LANGUAGE PLPGSQL
AS
$$
DECLARE
hyp name;
cont_aggs jsonb;
start_offset interval;
end_offset interval;
mat_ids integer[];
hyp_id integer;
lowest_modified_v bigint;
cont_agg name;
BEGIN
SELECT jsonb_object_field_text(config, 'hypertable') INTO STRICT hyp;
SELECT jsonb_extract_path(config, 'cont_aggs')::jsonb INTO STRICT cont_aggs;
SELECT jsonb_object_field_text(config, 'start_offset')::interval INTO STRICT start_offset;
SELECT jsonb_object_field_text(config, 'end_offset')::interval INTO STRICT end_offset;
mat_ids := (SELECT array_agg(mat_hypertable_id) FROM _timescaledb_catalog.continuous_agg WHERE cont_aggs ? user_view_name::text);
RAISE NOTICE 'Materialized tables ids: "%"', mat_ids;
hyp_id := (SELECT id FROM _timescaledb_catalog.hypertable WHERE table_name = hyp);
RAISE NOTICE 'Hypertable id: "%"', hyp_id;
lowest_modified_v := (SELECT MIN(lowest_modified_value)
FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
WHERE hypertable_id = hyp_id
GROUP BY hypertable_id);
RAISE NOTICE 'Lowest modified value: "%"', lowest_modified_v;
IF lowest_modified_v IS NOT NULL THEN
DELETE
FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
WHERE hypertable_id = hyp_id;
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
(
SELECT id,
lowest_modified_v,
COALESCE(SELECT cagg_watermark
FROM _timescaledb_internal.cagg_watermark(id)),
_timescaledb_internal.to_unix_microseconds(now())
FROM unnest(mat_ids) as id
);
END IF;
FOR cont_agg IN
SELECT jsonb_array_elements(cont_aggs)
LOOP
RAISE NOTICE 'Refreshing %', cont_agg::text;
CALL refresh_continuous_aggregate(cont_agg::regclass, now() - start_offset,
now() - end_offset);
END LOOP;
END
$$;
Possible usage is:
SELECT add_job('refresh_continuous_aggregates', '1h',
config => '{"hypertable":"datapoints","cont_aggs":["datapoints_10m", "datapoints_1h"],"start_offset":"12h","end_offset":"2h"}');
Funny! I'm testing almost exact the same thing
CREATE OR REPLACE PROCEDURE run_all_continuous_aggregates(job_id int, config jsonb)
LANGUAGE PLPGSQL
AS $$
DECLARE
JOB RECORD;
mat_id integer;
lowest_modified_v bigint;
BEGIN
FOR JOB IN SELECT * FROM timescaledb_information.jobs j inner join timescaledb_information.job_stats s on s.job_id = j.job_id WHERE application_name LIKE 'Refresh Continuous%'
LOOP
mat_id := (SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg a1 inner join timescaledb_information.continuous_aggregates a2 on a2.view_name = a1.user_view_name WHERE materialization_hypertable_name = JOB.hypertable_name);
lowest_modified_v := (SELECT MIN(lowest_modified_value)
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = mat_id
GROUP BY materialization_id);
IF lowest_modified_v IS NULL THEN
lowest_modified_v := _timescaledb_internal.to_unix_microseconds(now() - INTERVAL '1 hour');
END IF;
RAISE NOTICE 'Refreshing % (%) from % ', JOB.hypertable_name, JOB.job_id, _timescaledb_internal.to_timestamp(lowest_modified_v);
DELETE FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log where materialization_id = mat_id;
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log SELECT mat_id, lowest_modified_v, _timescaledb_internal.to_unix_microseconds('infinity'::timestamp);
PERFORM alter_job(JOB.job_id, next_start => now());
END LOOP;
END
$$;
select add_job('run_all_continuous_aggregates', '1h');
Wow! Great way to avoid calling refresh_continuous_aggregate.
I see the problem here. This procedure join all records which already in continuous_aggs_materialization_invalidation_log, but new records will be moved from continuous_aggs_hypertable_invalidation_log to continuous_aggs_materialization_invalidation_log on start of materialization job and we'll have the same issue.
And another difficulty is that all records in hypertable_invalidation_log should be processed (squashed) and then copied to materialization_invalidation_log in multiple copies (for each cont. agg on target hypertable).
I tried to solve it my procedure, but stuck with transactions issue.
Yeah, I also tried to chunk the job up in smaller pieces and running the refresh synchronous, but also got stuck with transaction issue.. I'll let this method run over night and see how it performs. So far I got much higher success rate than with the built-in logic, but not 100%. This workaround can be improved in many ways, but hopefully the timescaledb guys will fix the root issue. Should not be necessary to use these workarounds..
It's far from perfect and some values are set to fit our case, but I modified my own custom job to the following and now we have an automated refresh with all jobs having at least some success rate - unlike with the built-in functionality. But I hope this issue will be prioritized as upgrading from 1.7 to 2.0 in practice broke our service and caused several days of "downtime" until this workaround was up and running. 😢
CREATE OR REPLACE PROCEDURE run_all_continuous_aggregates(my_job_id int, config jsonb) LANGUAGE PLPGSQL AS
$$
DECLARE
JOB RECORD;
CAGG RECORD;
lowest_modified_v bigint;
refresh_to timestamp;
running_job_count integer;
max_concurrent_jobs integer := 3;
max_job_runtime interval := INTERVAL '20 minutes';
BEGIN
SELECT count(*) FROM timescaledb_information.job_stats WHERE job_id <> my_job_id AND next_start = '-infinity'::timestamp AND now() - last_run_started_at < max_job_runtime INTO running_job_count;
FOR JOB IN SELECT * FROM timescaledb_information.jobs j
LEFT OUTER JOIN timescaledb_information.job_stats s ON s.job_id = j.job_id
WHERE application_name LIKE 'Refresh Continuous%' AND (s.next_start <> '-infinity'::timestamp OR now() - s.last_run_started_at > max_job_runtime)
ORDER BY s.last_successful_finish ASC NULLS FIRST
LOOP
IF (running_job_count >= max_concurrent_jobs) THEN
EXIT;
END IF;
SELECT * FROM _timescaledb_catalog.continuous_agg a1
INNER JOIN timescaledb_information.continuous_aggregates a2 on a2.view_name = a1.user_view_name
WHERE materialization_hypertable_name = JOB.hypertable_name
INTO CAGG;
lowest_modified_v := (SELECT MIN(lowest_modified_value)
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = CAGG.mat_hypertable_id
GROUP BY materialization_id);
RAISE NOTICE 'lowest_modified_v %', _timescaledb_internal.to_timestamp(lowest_modified_v);
IF lowest_modified_v IS NOT NULL THEN
running_job_count := running_job_count + 1;
lowest_modified_v := LEAST(lowest_modified_v, (SELECT last_refresh_to FROM custom_invalidation_log c WHERE c.job_id = JOB.job_id));
IF JOB.config->'start_offset' IS NOT NULL THEN
lowest_modified_v := GREATEST(lowest_modified_v, _timescaledb_internal.to_unix_microseconds(now() + INTERVAL '1 hour' - CAST(JOB.config->>'start_offset' AS INTERVAL)));
END IF;
IF (now() - _timescaledb_internal.to_timestamp(lowest_modified_v) > INTERVAL '2 days') THEN
refresh_to := _timescaledb_internal.to_timestamp(lowest_modified_v) + INTERVAL '1 day'; --* (random() * 10);
ELSE
refresh_to := now();
END IF;
INSERT INTO custom_invalidation_log (job_id, last_refresh_to) VALUES (JOB.job_id, _timescaledb_internal.to_unix_microseconds(refresh_to))
ON CONFLICT (job_id) DO UPDATE SET last_refresh_to = _timescaledb_internal.to_unix_microseconds(refresh_to);
RAISE LOG 'Refreshing % (job % - mat_hypertable_id %) from % to % ', CAGG.view_name, JOB.job_id, CAGG.mat_hypertable_id, _timescaledb_internal.to_timestamp(lowest_modified_v), refresh_to;
DELETE FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = CAGG.mat_hypertable_id;
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log SELECT CAGG.mat_hypertable_id, lowest_modified_v, _timescaledb_internal.to_unix_microseconds(refresh_to);
PERFORM alter_job(JOB.job_id, next_start => now(), max_runtime => max_job_runtime);
END IF;
END LOOP;
PERFORM alter_job(my_job_id, next_start => now() + INTERVAL '10 seconds');
END
$$;
--SELECT add_job('run_all_continuous_aggregates', '1m', initial_start => now() + INTERVAL '1 minute');
Update: The latest workaround has been running for a few days now and it actually works very well. An additional benefit is that all caggs are now continuously updated with increased control of concurrent jobs without the log being spammed with out of background workers.
@slasktrat Great to hear that you were able to work around the issue.
A quick update on our end. I believe we have a solution to optimize our invalidation handling for lots of small invalidations. Essentially, what we are testing is a way to expand each invalidation to the closest bucket boundaries. This should be safe since we always materialize full buckets (except for some corner-cases when you drop chunks, but in that case we might at worst invalidate more data than necessary, which shouldn't be an issue either). Thus, if you insert a value every minute and you have a 10 minute continuous aggregate bucket, you will expand each minute invalidation to the full 10 minute bucket, which in turn will merge with the next bucket if that one was invalidated too, and so on.
Still, I think there are some corner cases where this might still not be optimal. For instance, let's say you have 1 minute buckets and you insert a value every 2 minutes. Then you will only invalidate every other bucket, which still leads to lots of invalidations if you e.g., refresh 1 week's worth of data. Internally, we will actually materialize each invalidated range separately, which is why materialization is slow for these corner cases where we cannot merge ranges into bigger ones.
Obviously, the situation we want to avoid on the other end of the spectrum is that you have to re-materialize too much when you've, e.g., only invalidated a couple of buckets across a refresh window o, e.g., a year. Then it is better to do a number of smaller materializations instead of re-materializing the whole year's worth of data.
There might be some additional heuristics we can implement to optimize further for these worst-case scenarios where you have backfill across, e.g., every other bucket. For instance, we could try to set a limit on how many materializations we do in a refresh window and try to expand invalidations across the N adjacent buckets or simply fall back to a brute-force refresh of the whole refresh window.
I think we might take an incremental approach here to see what works best and tweak this further if necessary across multiple releases. Sometimes, the approach that works well for one use case does not work well for other use case, so we want to be cautious about making too many assumptions.
Thanks for the update @erimatnor. In my case the bucket sizes are 1 hour and 1 day, and if the current workaround would not have worked my next play was to do exact what you are already are into; merging the invalidation log into bucket size chunks. So I think that would have helped a lot for my scenario. But I also think it would be great if it was possible to set a custom max window of materializing for each run, like one could in version 1.7.
But I also think it would be great if it was possible to set a custom max window of materializing for each run, like one could in version 1.7.
The equivalent thing can be achieved in version 2.0 by doing several calls to continuous_aggregate_refresh across multiple smaller time ranges. In fact, what we tried to do is to also break the user-given window up across multiple materialization based on the invalidation information, so that if one gives a big window, we don't have to refresh everything unless all of the window is out-of-date. However, our design wasn't optimized for lots of small backfills, which is what is experienced here. This is what we are trying to improve right now, with better heuristics.
If you are referring to policies specifically, that's a different beast. These work a bit different in 2.0 vs the previous releases in order to address other issues where people struggled to get recent data to be materialized.
I recommend creating a policy that only refreshes recent data (i.e., using a window going back in time only a limited amount). If every policy runs trie to refresh everything from the beginning of time, and you also have lots of backfill, then it is bound to be a cost involved and you need to make a trade-off between focusing on recent data or backfill.
The main problem about continuous_aggregate_refresh is that just like @dimonzozo I got an error when trying to call that from within another function or stored procedure. It might very well be some way around that, but it was not straight forward at least.
@slasktrat I hear you, and the limitation of not being able to refresh in a stored procedure is a known current limitation: https://github.com/timescale/timescaledb/issues/2876. Baby steps :).
Just for the benefit of anyone else who runs into this, here’s an example of how to implement the solution @dimonzozo and @slasktrat discussed above. Thank you both for your work on this.
Prerequisites :
Continous Aggregates Job(s) must already be defined (generally done by add_continous_aggregate_policy )
Following this process requires a login from the user who has write permission to _timescaledb_catalog schema (generally ‘postgres’ )
Steps :
_Step 1:_ Create supporting tables for the User Defined Action to be created
CREATE TABLE custom_invalidation_log(
job_id integer not null unique,
last_refresh_to bigint
);
_Step 2:_ Create Procedure for User Defined Action using TimescaleDB Automation Framework
CREATE OR REPLACE PROCEDURE run_all_continuous_aggregates(my_job_id int, config jsonb) LANGUAGE PLPGSQL AS $$
DECLARE
JOB RECORD;
CAGG RECORD;
lowest_modified_v bigint;
refresh_to timestamp;
running_job_count integer;
max_concurrent_jobs integer := 3;
max_job_runtime interval := INTERVAL '20 minutes';
BEGIN
SELECT count(*) FROM timescaledb_information.job_stats WHERE job_id <> my_job_id AND next_start = '-infinity'::timestamp AND now() - last_run_started_at < max_job_runtime INTO running_job_count;
FOR JOB IN SELECT * FROM timescaledb_information.jobs j
LEFT OUTER JOIN timescaledb_information.job_stats s ON s.job_id = j.job_id
WHERE application_name LIKE 'Refresh Continuous%' AND (s.next_start <> '-infinity'::timestamp OR now() - s.last_run_started_at > max_job_runtime)
ORDER BY s.last_successful_finish ASC NULLS FIRST
LOOP
IF (running_job_count >= max_concurrent_jobs) THEN
EXIT;
END IF;
SELECT * FROM _timescaledb_catalog.continuous_agg a1
INNER JOIN timescaledb_information.continuous_aggregates a2 on a2.view_name = a1.user_view_name
WHERE materialization_hypertable_name = JOB.hypertable_name
INTO CAGG;
lowest_modified_v := (SELECT MIN(lowest_modified_value)
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = CAGG.mat_hypertable_id
GROUP BY materialization_id);
RAISE NOTICE 'lowest_modified_v %', _timescaledb_internal.to_timestamp(lowest_modified_v);
IF lowest_modified_v IS NOT NULL THEN
running_job_count := running_job_count + 1;
lowest_modified_v := LEAST(lowest_modified_v, (SELECT last_refresh_to FROM custom_invalidation_log c WHERE c.job_id = JOB.job_id));
IF JOB.config->'start_offset' IS NOT NULL THEN
lowest_modified_v := GREATEST(lowest_modified_v, _timescaledb_internal.to_unix_microseconds(now() + INTERVAL '1 hour' - CAST(JOB.config->>'start_offset' AS INTERVAL)));
END IF;
IF (now() - _timescaledb_internal.to_timestamp(lowest_modified_v) > INTERVAL '2 days') THEN
refresh_to := _timescaledb_internal.to_timestamp(lowest_modified_v) + INTERVAL '1 day'; --* (random() * 10);
ELSE
refresh_to := now();
END IF;
INSERT INTO custom_invalidation_log (job_id, last_refresh_to) VALUES (JOB.job_id, _timescaledb_internal.to_unix_microseconds(refresh_to))
ON CONFLICT (job_id) DO UPDATE SET last_refresh_to = _timescaledb_internal.to_unix_microseconds(refresh_to);
RAISE LOG 'Refreshing % (job % - mat_hypertable_id %) from % to % ', CAGG.view_name, JOB.job_id, CAGG.mat_hypertable_id, _timescaledb_internal.to_timestamp(lowest_modified_v), refresh_to;
DELETE FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log WHERE greatest_modified_value <> _timescaledb_internal.to_unix_microseconds('infinity'::timestamp) AND materialization_id = CAGG.mat_hypertable_id;
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log SELECT CAGG.mat_hypertable_id, lowest_modified_v, _timescaledb_internal.to_unix_microseconds(refresh_to);
PERFORM alter_job(JOB.job_id, next_start => now(), max_runtime => max_job_runtime);
END IF;
END LOOP;
PERFORM alter_job(my_job_id, next_start => now() + INTERVAL '10 seconds');
END $$;
```
Note that there are two default variables :
max_concurrent_jobs (integer)
max_job_runtime (interval)
You might want to change these variables as per your need.
Also, note that this script/action will run for all continuous aggregates which might or might not be required. You might want to run this script for specific jobs at specific schedules, for which you will need to alter the script.
_Step 3:_ [Register the procedure](https://docs.timescale.com/latest/api#add_job) run_all_continuous_aggregates to be run every hour (or whenever as per the need).
SELECT add_job('run_all_continuous_aggregates','1h');
```
Heads up: The fix for this will appear in TimescaleDB 2.0.2, which has just been tagged and will be released shortly.
Most helpful comment
Heads up: The fix for this will appear in TimescaleDB 2.0.2, which has just been tagged and will be released shortly.