journal-remote: several follow-ups for Compression= option handling

Follow-ups for cfaf78001c.

- use OrderedHashmap to manage configured compression algorithms, then
  drop CompressionArgs,
- rename CompressionOpts -> CompressionConfig,
- refuse 'none' in Compression= setting, but accept boolean false, which
  disables compression,
- when Compression= option is unspecified, enable all supported compression
  algorithms by default,
- do not set 'none' to the Accept-Encoding header.
This commit is contained in:
Yu Watanabe
2025-02-11 16:49:46 +09:00
parent 8c7e4e0d5e
commit c259c9e253
10 changed files with 207 additions and 133 deletions

View File

@@ -58,19 +58,6 @@
[Remote] section:</para>
<variablelist class='config-directives'>
<varlistentry>
<term><varname>Compression=</varname></term>
<listitem><para>Acceptable compression algorithms to be used by <command>systemd-journal-upload</command>. Compression algorithms are
used for <literal>Accept-Encoding</literal> header construction with priorities set according to an order in configuration.
This parameter takes space separated list of compression algorithms. Example:
<programlisting>Compression=zstd lz4</programlisting>
This option can be specified multiple times. If an empty string is assigned, then all the previous assignments are cleared.
</para>
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Seal=</varname></term>
@@ -142,6 +129,27 @@
<xi:include href="version-info.xml" xpointer="v253"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Compression=</varname></term>
<listitem>
<para>Configurs acceptable compression algorithms to be announced through
<literal>Accept-Encoding</literal> HTTP header. The header suggests
<command>systemd-journal-upload</command> to compress data to be sent. Takes a space separated
list of compression algorithms, or <literal>no</literal>. Supported algorithms are
<literal>zstd</literal>, <literal>xz</literal>, or <literal>lz4</literal>. When a list of
algorithms is specified, <literal>Accept-Encoding</literal> header will be constructed with
priorities based on the order of the algorithms in the list. When <literal>no</literal>,
<literal>Accept-Encoding</literal> header will not be sent. This option can be specified multiple
times. If an empty string is assigned, then all the previous assignments are cleared. Defaults to
unset and all supported compression algorithms will be listed in the header.</para>
<para>Example:
<programlisting>Compression=zstd lz4</programlisting></para>
<xi:include href="version-info.xml" xpointer="v258"/>
</listitem>
</varlistentry>
</variablelist>
</refsect1>

View File

@@ -60,37 +60,6 @@
<xi:include href="version-info.xml" xpointer="v232"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Compression=</varname></term>
<listitem><para>Takes a space separated list of compression algorithms to be applied to logs data before sending.
Supported algorithms are <literal>none</literal>, <literal>zstd</literal>, <literal>xz</literal>,
or <literal>lz4</literal>. Optionally, each algorithm (except for <literal>none</literal>)
followed by a colon (<literal>:</literal>) and its compression level, for example <literal>zstd:4</literal>.
The compression level is expected to be a positive integer. This option can be specified multiple times.
If an empty string is assigned, then all previous assignments are cleared.
Defaults to unset, and data will not be compressed.</para>
<para>Example:
<programlisting>Compression=zstd:4 lz4:2</programlisting></para>
<para>Even when compression is enabled, the initial requests are sent without compression.
It becomes effective either if <literal>ForceCompression=</literal> is enabled,
or the server response contains <literal>Accept-Encoding</literal> headers with a list of
compression algorithms that contains one of the algorithms specified in this option.</para>
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>ForceCompression=</varname></term>
<listitem><para>Takes a boolean value, enforces using compression without content encoding negotiation.
Defaults to <literal>false</literal>.</para>
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>ServerKeyFile=</varname></term>
@@ -128,6 +97,40 @@
<xi:include href="version-info.xml" xpointer="v249"/></listitem>
</varlistentry>
<varlistentry>
<term><varname>Compression=</varname></term>
<listitem>
<para>Configures compression algorithm to be applied to logs data before sending. Takes a space
separated list of compression algorithms, or <literal>no</literal>. Supported algorithms are
<literal>zstd</literal>, <literal>xz</literal>, or <literal>lz4</literal>. Optionally, each
algorithm followed by a colon (<literal>:</literal>) and its compression level, for example
<literal>zstd:4</literal>. The compression level is expected to be a positive integer. When
<literal>no</literal> is specified, no compression algorithm will be applied to data to be sent.
This option can be specified multiple times. If an empty string is assigned, then all previous
assignments are cleared. Defaults to unset, and all supported compression algorithms with their
default compression levels are listed.</para>
<para>Example:
<programlisting>Compression=zstd:4 lz4:2</programlisting></para>
<para>Even when compression is enabled, the initial requests are sent without compression.
It becomes effective either if <literal>ForceCompression=</literal> is enabled,
or the server response contains <literal>Accept-Encoding</literal> headers with a list of
compression algorithms that contains one of the algorithms specified in this option.</para>
<xi:include href="version-info.xml" xpointer="v258"/>
</listitem>
</varlistentry>
<varlistentry>
<term><varname>ForceCompression=</varname></term>
<listitem><para>Takes a boolean value, enforces using compression without content encoding negotiation.
Defaults to <literal>false</literal>.</para>
<xi:include href="version-info.xml" xpointer="v258"/></listitem>
</varlistentry>
</variablelist>
</refsect1>

View File

@@ -4,10 +4,76 @@
#include "journal-compression-util.h"
#include "parse-util.h"
void compression_args_clear(CompressionArgs *args) {
assert(args);
args->size = 0;
args->opts = mfree(args->opts);
static int compression_config_put(OrderedHashmap **configs, Compression c, int level) {
assert(configs);
if (!compression_supported(c))
return 0;
/* If the compression algorithm is already specified, update the compression level. */
CompressionConfig *cc = ordered_hashmap_get(*configs, INT_TO_PTR(c));
if (cc)
cc->level = level;
else {
_cleanup_free_ CompressionConfig *new_config = new(CompressionConfig, 1);
if (!new_config)
return log_oom();
*new_config = (CompressionConfig) {
.algorithm = c,
.level = level,
};
if (ordered_hashmap_ensure_put(configs, &trivial_hash_ops_value_free, INT_TO_PTR(c), new_config) < 0)
return log_oom();
TAKE_PTR(new_config);
}
if (c == COMPRESSION_NONE) {
/* disables all configs except for 'none' */
ORDERED_HASHMAP_FOREACH(cc, *configs)
if (cc->algorithm != COMPRESSION_NONE)
free(ordered_hashmap_remove(*configs, INT_TO_PTR(cc->algorithm)));
} else
/* otherwise, drop 'none' if stored. */
free(ordered_hashmap_get(*configs, INT_TO_PTR(COMPRESSION_NONE)));
return 1;
}
int compression_configs_mangle(OrderedHashmap **configs) {
int r;
/* When compression is explicitly disabled, then free the list. */
if (ordered_hashmap_contains(*configs, INT_TO_PTR(COMPRESSION_NONE))) {
*configs = ordered_hashmap_free(*configs);
return 0;
}
/* When compression algorithms are exlicitly specifed, then honor the list. */
if (!ordered_hashmap_isempty(*configs))
return 0;
/* If nothing specified, then list all supported algorithms with the default compression level. */
_cleanup_(ordered_hashmap_freep) OrderedHashmap *h = NULL;
/* First, put the default algorithm. */
if (DEFAULT_COMPRESSION != COMPRESSION_NONE) {
r = compression_config_put(&h, DEFAULT_COMPRESSION, -1);
if (r < 0)
return r;
}
/* Then, list all other algorithms. */
for (Compression c = 1; c < _COMPRESSION_MAX; c++) {
r = compression_config_put(&h, c, -1);
if (r < 0)
return r;
}
return free_and_replace_full(*configs, h, ordered_hashmap_free);
}
int config_parse_compression(
@@ -22,19 +88,20 @@ int config_parse_compression(
void *data,
void *userdata) {
CompressionArgs *args = ASSERT_PTR(data);
OrderedHashmap **configs = ASSERT_PTR(data);
bool parse_level = ltype;
int r;
assert(filename);
assert(lvalue);
assert(rvalue);
if (isempty(rvalue)) {
compression_args_clear(args);
/* an empty string clears the previous assignments. */
*configs = ordered_hashmap_free(*configs);
return 1;
}
if (parse_boolean(rvalue) == 0)
/* 'no' disables compression. To indicate that, store 'none'. */
return compression_config_put(configs, COMPRESSION_NONE, -1);
for (const char *p = rvalue;;) {
_cleanup_free_ char *word = NULL;
int level = -1;
@@ -60,30 +127,14 @@ int config_parse_compression(
}
Compression c = compression_lowercase_from_string(word);
if (c < 0 || !compression_supported(c)) {
if (c <= 0 || !compression_supported(c)) {
log_syntax(unit, LOG_WARNING, filename, line, c,
"Compression algorithm '%s' is not supported on the system, ignoring.", word);
continue;
}
bool found = false;
FOREACH_ARRAY(opt, args->opts, args->size)
if (opt->algorithm == c) {
found = true;
if (parse_level)
opt->level = level;
break;
}
if (found)
continue;
if (!GREEDY_REALLOC(args->opts, args->size + 1))
return log_oom();
args->opts[args->size++] = (CompressionOpts) {
.algorithm = c,
.level = level,
};
r = compression_config_put(configs, c, level);
if (r < 0)
return r;
}
}

View File

@@ -3,17 +3,13 @@
#include "compress.h"
#include "conf-parser.h"
#include "hashmap.h"
typedef struct CompressionOpts {
typedef struct CompressionConfig {
Compression algorithm;
int level;
} CompressionOpts;
} CompressionConfig;
typedef struct CompressionArgs {
CompressionOpts *opts;
size_t size;
} CompressionArgs;
int compression_configs_mangle(OrderedHashmap **configs);
CONFIG_PARSER_PROTOTYPE(config_parse_compression);
void compression_args_clear(CompressionArgs *args);

View File

@@ -38,7 +38,6 @@ static char *arg_getter = NULL;
static char *arg_listen_raw = NULL;
static char *arg_listen_http = NULL;
static char *arg_listen_https = NULL;
static CompressionArgs arg_compression = {};
static char **arg_files = NULL;
static bool arg_compress = true;
static bool arg_seal = false;
@@ -62,6 +61,8 @@ static uint64_t arg_max_size = UINT64_MAX;
static uint64_t arg_n_max_files = UINT64_MAX;
static uint64_t arg_keep_free = UINT64_MAX;
static OrderedHashmap *arg_compression = NULL;
STATIC_DESTRUCTOR_REGISTER(arg_url, freep);
STATIC_DESTRUCTOR_REGISTER(arg_getter, freep);
STATIC_DESTRUCTOR_REGISTER(arg_listen_raw, freep);
@@ -73,7 +74,7 @@ STATIC_DESTRUCTOR_REGISTER(arg_key, freep);
STATIC_DESTRUCTOR_REGISTER(arg_cert, freep);
STATIC_DESTRUCTOR_REGISTER(arg_trust, freep);
STATIC_DESTRUCTOR_REGISTER(arg_output, freep);
STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear);
STATIC_DESTRUCTOR_REGISTER(arg_compression, ordered_hashmap_freep);
static const char* const journal_write_split_mode_table[_JOURNAL_WRITE_SPLIT_MAX] = {
[JOURNAL_WRITE_SPLIT_NONE] = "none",
@@ -164,10 +165,17 @@ static int dispatch_http_event(sd_event_source *event,
static int build_accept_encoding(char **ret) {
assert(ret);
float q = 1.0, step = 1.0 / arg_compression.size;
if (ordered_hashmap_isempty(arg_compression)) {
*ret = NULL;
return 0;
}
_cleanup_free_ char *buf = NULL;
FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) {
const char *c = compression_lowercase_to_string(opt->algorithm);
float q = 1.0, step = 1.0 / ordered_hashmap_size(arg_compression);
const CompressionConfig *cc;
ORDERED_HASHMAP_FOREACH(cc, arg_compression) {
const char *c = compression_lowercase_to_string(cc->algorithm);
if (strextendf_with_separator(&buf, ",", "%s;q=%.1f", c, q) < 0)
return -ENOMEM;
q -= step;
@@ -1107,6 +1115,10 @@ static int run(int argc, char **argv) {
if (r <= 0)
return r;
r = compression_configs_mangle(&arg_compression);
if (r < 0)
return r;
journal_browse_prepare();
if (arg_listen_http || arg_listen_https) {

View File

@@ -26,3 +26,4 @@
# KeepFree=
# MaxFileSize=
# MaxFiles=
# Compression=zstd lz4 xz

View File

@@ -263,7 +263,7 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
j = u->journal;
if (u->compression.algorithm != COMPRESSION_NONE) {
if (u->compression) {
compression_buffer = malloc_multiply(nmemb, size);
if (!compression_buffer) {
log_oom();
@@ -309,12 +309,12 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
u->entries_sent, u->current_cursor);
}
if (filled > 0 && u->compression.algorithm != COMPRESSION_NONE) {
if (filled > 0 && u->compression) {
size_t compressed_size;
r = compress_blob(u->compression.algorithm, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression.level);
r = compress_blob(u->compression->algorithm, compression_buffer, filled, buf, size * nmemb, &compressed_size, u->compression->level);
if (r < 0) {
log_error_errno(r, "Failed to compress %zu bytes (Compression=%s, Level=%d): %m",
filled, compression_lowercase_to_string(u->compression.algorithm), u->compression.level);
log_error_errno(r, "Failed to compress %zu bytes by %s with level %i: %m",
filled, compression_lowercase_to_string(u->compression->algorithm), u->compression->level);
return CURL_READFUNC_ABORT;
}

View File

@@ -58,7 +58,7 @@ static bool arg_merge = false;
static int arg_follow = -1;
static char *arg_save_state = NULL;
static usec_t arg_network_timeout_usec = USEC_INFINITY;
static CompressionArgs arg_compression = {};
static OrderedHashmap *arg_compression = NULL;
static bool arg_force_compression = false;
STATIC_DESTRUCTOR_REGISTER(arg_url, freep);
@@ -71,7 +71,7 @@ STATIC_DESTRUCTOR_REGISTER(arg_cursor, freep);
STATIC_DESTRUCTOR_REGISTER(arg_machine, freep);
STATIC_DESTRUCTOR_REGISTER(arg_namespace, freep);
STATIC_DESTRUCTOR_REGISTER(arg_save_state, freep);
STATIC_DESTRUCTOR_REGISTER(arg_compression, compression_args_clear);
STATIC_DESTRUCTOR_REGISTER(arg_compression, ordered_hashmap_freep);
static void close_fd_input(Uploader *u);
@@ -215,8 +215,8 @@ int start_upload(Uploader *u,
return log_oom();
h = l;
if (u->compression.algorithm != COMPRESSION_NONE) {
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm));
if (u->compression) {
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression->algorithm));
if (!header)
return log_oom();
@@ -327,7 +327,7 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
assert(!size_multiply_overflow(size, nmemb));
if (u->compression.algorithm != COMPRESSION_NONE) {
if (u->compression) {
compression_buffer = malloc_multiply(nmemb, size);
if (!compression_buffer) {
log_oom();
@@ -338,14 +338,14 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user
n = read(u->input, compression_buffer ?: buf, size * nmemb);
if (n > 0) {
log_debug("%s: allowed %zu, read %zd", __func__, size * nmemb, n);
if (u->compression.algorithm == COMPRESSION_NONE)
if (!u->compression)
return n;
size_t compressed_size;
r = compress_blob(u->compression.algorithm, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression.level);
r = compress_blob(u->compression->algorithm, compression_buffer, n, buf, size * nmemb, &compressed_size, u->compression->level);
if (r < 0) {
log_error_errno(r, "Failed to compress %zd bytes using (Compression=%s, Level=%d): %m",
n, compression_lowercase_to_string(u->compression.algorithm), u->compression.level);
log_error_errno(r, "Failed to compress %zd bytes by %s with level %i: %m",
n, compression_lowercase_to_string(u->compression->algorithm), u->compression->level);
return CURL_READFUNC_ABORT;
}
assert(compressed_size <= size * nmemb);
@@ -432,12 +432,10 @@ static int setup_uploader(Uploader *u, const char *url, const char *state_file)
*u = (Uploader) {
.input = -1,
.compression.algorithm = COMPRESSION_NONE,
.compression.level = -1,
};
if (arg_force_compression && arg_compression.size > 0)
u->compression = arg_compression.opts[0];
if (arg_force_compression)
u->compression = ordered_hashmap_first(arg_compression);
host = STARTSWITH_SET(url, "http://", "https://");
if (!host) {
@@ -520,36 +518,35 @@ static int update_content_encoding(Uploader *u, const char *accept_encoding) {
if (c <= 0 || !compression_supported(c))
continue; /* unsupported or invalid algorithm. */
FOREACH_ARRAY(opt, arg_compression.opts, arg_compression.size) {
if (opt->algorithm != c)
continue;
const CompressionConfig *cc = ordered_hashmap_get(arg_compression, INT_TO_PTR(c));
if (!cc)
continue; /* The specified algorithm is not enabled. */
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(u->compression.algorithm));
if (!header)
return log_oom();
_cleanup_free_ char *header = strjoin("Content-Encoding: ", compression_lowercase_to_string(cc->algorithm));
if (!header)
return log_oom();
/* First, update existing Content-Encoding header. */
bool found = false;
for (struct curl_slist *l = u->header; l; l = l->next)
if (startswith(l->data, "Content-Encoding:")) {
free_and_replace(l->data, header);
found = true;
break;
}
/* If Content-Encoding header is not found, append new one. */
if (!found) {
struct curl_slist *l = curl_slist_append(u->header, header);
if (!l)
return log_oom();
u->header = l;
/* First, update existing Content-Encoding header. */
bool found = false;
for (struct curl_slist *l = u->header; l; l = l->next)
if (startswith(l->data, "Content-Encoding:")) {
free_and_replace(l->data, header);
found = true;
break;
}
CURLcode code;
easy_setopt(u->easy, CURLOPT_HTTPHEADER, u->header, LOG_ERR, return -EXFULL);
u->compression = *opt;
return 0;
/* If Content-Encoding header is not found, append new one. */
if (!found) {
struct curl_slist *l = curl_slist_append(u->header, header);
if (!l)
return log_oom();
u->header = l;
}
CURLcode code;
easy_setopt(u->easy, CURLOPT_HTTPHEADER, u->header, LOG_ERR, return -EXFULL);
u->compression = cc;
return 0;
}
}
#endif
@@ -589,7 +586,7 @@ static int perform_upload(Uploader *u) {
else {
#if LIBCURL_VERSION_NUM >= 0x075300
int r;
if (u->compression.algorithm == COMPRESSION_NONE) {
if (!u->compression) {
struct curl_header *encoding_header;
CURLHcode hcode;
@@ -618,7 +615,7 @@ static int parse_config(void) {
{ "Upload", "ServerCertificateFile", config_parse_path_or_ignore, 0, &arg_cert },
{ "Upload", "TrustedCertificateFile", config_parse_path_or_ignore, 0, &arg_trust },
{ "Upload", "NetworkTimeoutSec", config_parse_sec, 0, &arg_network_timeout_usec },
{ "Upload", "Compression", config_parse_compression, true, &arg_compression },
{ "Upload", "Compression", config_parse_compression, /* with_level */ true, &arg_compression },
{ "Upload", "ForceCompression", config_parse_bool, 0, &arg_force_compression },
{}
};
@@ -882,6 +879,10 @@ static int run(int argc, char **argv) {
if (r <= 0)
return r;
r = compression_configs_mangle(&arg_compression);
if (r < 0)
return r;
journal_browse_prepare();
r = setup_uploader(&u, arg_url, arg_save_state);

View File

@@ -21,3 +21,5 @@
# ServerKeyFile={{CERTIFICATE_ROOT}}/private/journal-upload.pem
# ServerCertificateFile={{CERTIFICATE_ROOT}}/certs/journal-upload.pem
# TrustedCertificateFile={{CERTIFICATE_ROOT}}/ca/trusted.pem
# Compression=zstd lz4 xz
# ForceCompression=no

View File

@@ -54,7 +54,7 @@ typedef struct Uploader {
char *last_cursor, *current_cursor;
usec_t watchdog_timestamp;
usec_t watchdog_usec;
CompressionOpts compression;
const CompressionConfig *compression;
} Uploader;
#define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC)