[GH-ISSUE #1867] Add support for Elasticsearch with Bulk API and data stream #800

Closed
opened 2026-03-04 02:18:08 +03:00 by kerem · 1 comment
Owner

Originally created by @muratpurc on GitHub (Jan 17, 2024).
Original GitHub issue: https://github.com/Seldaek/monolog/issues/1867

Summary of problem or feature request

We use the Monolog\Handler\ElasticsearchHandler to write log entries to Elasticsearch (Bulk API and data stream) with Monolog.

It is not possible for us to use Monolog ElasticsearchHandler with the Elasticsearch PHP client in order to write entries into Elasticsearch by using Bulk API and data stream.

The versions used are:

  • monolog/monolog 2.9.2
  • elasticsearch/elasticsearch 8.11.0

Code snippet of problem

Our Monolog setup looks like this:

$index = 'my-elasticsearch-index';
$formatter = new Monolog\Formatter\ElasticsearchFormatter($index, '_doc');

$host = 'https://<username>:<password>@hostname';
$client = Elastic\Elasticsearch\ClientBuilder::create()
    ->setHosts([$host])
    ->build();

$handler = new Monolog\Handler\ElasticsearchHandler($client, ['op_type' => 'create']);
$handler->setFormatter($formatter);

$logger = new Monolog\Logger('Logger name', [$handler]);
$logger->info('Some message');

The Elasticsearch documentation says the following about Bulk API and data stream in it:

To automatically create a data stream, Elasticsearch expects the following request:

PUT my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" }

In the current configuration, the request from above is sent as POST, which leads to an Elasticsearch error.

We can get around this by creating the data stream manually using an HttpClient that calls the API with the following request:

PUT _data_stream/my-data-stream

After that is is possible to add entries with the following request:

PUT my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" }

It doesn't matter whether we create the data stream automatically or create it manually beforehand.
When we want to add a log entry ($logger->info()), a parameter array is created in Monolog\Handler\ElasticsearchHandler->bulkSend(), which is not compatible with the Elasticsearch Bulk API and data stream.
In order for the function Elastic\Elasticsearch\Traits\ClientEndpointsTrait->bulk() to create a request that is accepted by Elasticsearch, it needs some changes in Monolog\Handler\ElasticsearchHandler->bulkSend(). We managed this by adjusting bulkSend() as follows:

     protected function bulkSend(array $records): void
     {
         try {
             $params = [
                 'body' => [],
             ];

             foreach ($records as $record) {
                 if ($this->options['op_type'] === 'create') {
                     if (!isset($params['index'])) {
                         $params['index'] = $record['_index'];
                     }
                     $params['body'][] = ['create' => new \stdClass()];
                 } else {
                     $params['body'][] = [
                         'index' => $this->needsType ? [
                             '_index' => $record['_index'],
                             '_type' => $record['_type'],
                         ] : [
                             '_index' => $record['_index'],
                         ],
                     ];
                 }

                 unset($record['_index'], $record['_type']);

                 $params['body'][] = $record;
             }

             /** @var Elasticsearch */
             $responses = $this->client->bulk($params);

             if ($responses['errors'] === true) {
                 throw $this->createExceptionFromResponses($responses);
             }
         } catch (Throwable $e) {
             if (! $this->options['ignore_error']) {
                 throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e);
             }
         }
     }

I'm not sure if that's the correct place and if it has any side effects to other Monolog-Elasticsearch-setups. It also seems to be necessary to make certain adjustments to the Elasticsearch PHP client. More on this is below. Therefore I didn't create a pull-request.

The Elastic\Elasticsearch\Traits\ClientEndpointsTrait->bulk() creates the following POST request, which according to the documentation should be PUT, but it still works:

POST my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" }

The Elasticsearch PHP client should also be adapted to create a PUT request for Bulk API with data stream.
I've created a ticket 1389 to address the issue on the Elasticsearch PHP client side.

It would be very desirable if both packages support the Elasticsearch Bulk API with data stream.

System details

  • Operating System: Ubuntu 20.04.6 LTS or debian:11-slim in docker
  • PHP Version: 8.1
  • ES-PHP client version: 8.11.0
  • Elasticsearch version: 8.11.3
Originally created by @muratpurc on GitHub (Jan 17, 2024). Original GitHub issue: https://github.com/Seldaek/monolog/issues/1867 ### Summary of problem or feature request We use the `Monolog\Handler\ElasticsearchHandler` to write log entries to Elasticsearch (Bulk API and data stream) with Monolog. It is not possible for us to use Monolog ElasticsearchHandler with the Elasticsearch PHP client in order to write entries into Elasticsearch by using Bulk API and data stream. **The versions used are:** - monolog/monolog 2.9.2 - elasticsearch/elasticsearch 8.11.0 ### Code snippet of problem **Our Monolog setup looks like this:** ```php $index = 'my-elasticsearch-index'; $formatter = new Monolog\Formatter\ElasticsearchFormatter($index, '_doc'); $host = 'https://<username>:<password>@hostname'; $client = Elastic\Elasticsearch\ClientBuilder::create() ->setHosts([$host]) ->build(); $handler = new Monolog\Handler\ElasticsearchHandler($client, ['op_type' => 'create']); $handler->setFormatter($formatter); $logger = new Monolog\Logger('Logger name', [$handler]); $logger->info('Some message'); ``` **The [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/set-up-a-data-stream.html#create-data-stream) says the following about Bulk API and data stream in it:** To automatically create a data stream, Elasticsearch expects the following request: ``` PUT my-data-stream/_bulk { "create":{ } } { "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" } { "create":{ } } { "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" } ``` In the current configuration, the request from above is sent as `POST`, which leads to an Elasticsearch error. We can get around this by creating the data stream manually using an HttpClient that calls the API with the following request: ``` PUT _data_stream/my-data-stream ``` After that is is possible to add entries with the following request: ``` PUT my-data-stream/_bulk { "create":{ } } { "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" } { "create":{ } } { "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" } ``` It doesn't matter whether we create the data stream automatically or create it manually beforehand. When we want to add a log entry (`$logger->info()`), a parameter array is created in `Monolog\Handler\ElasticsearchHandler->bulkSend()`, which is not compatible with the Elasticsearch Bulk API and data stream. In order for the function `Elastic\Elasticsearch\Traits\ClientEndpointsTrait->bulk()` to create a request that is accepted by Elasticsearch, it needs some changes in `Monolog\Handler\ElasticsearchHandler->bulkSend()`. We managed this by adjusting `bulkSend()` as follows: ```php protected function bulkSend(array $records): void { try { $params = [ 'body' => [], ]; foreach ($records as $record) { if ($this->options['op_type'] === 'create') { if (!isset($params['index'])) { $params['index'] = $record['_index']; } $params['body'][] = ['create' => new \stdClass()]; } else { $params['body'][] = [ 'index' => $this->needsType ? [ '_index' => $record['_index'], '_type' => $record['_type'], ] : [ '_index' => $record['_index'], ], ]; } unset($record['_index'], $record['_type']); $params['body'][] = $record; } /** @var Elasticsearch */ $responses = $this->client->bulk($params); if ($responses['errors'] === true) { throw $this->createExceptionFromResponses($responses); } } catch (Throwable $e) { if (! $this->options['ignore_error']) { throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e); } } } ``` I'm not sure if that's the correct place and if it has any side effects to other Monolog-Elasticsearch-setups. It also seems to be necessary to make certain adjustments to the [Elasticsearch PHP client](https://github.com/elastic/elasticsearch-php). More on this is below. Therefore I didn't create a pull-request. The `Elastic\Elasticsearch\Traits\ClientEndpointsTrait->bulk()` creates the following `POST` request, which according to the documentation should be `PUT`, but it still works: ``` POST my-data-stream/_bulk { "create":{ } } { "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/ bg.jpg HTTP/1.0\" 200 24736" } { "create":{ } } { "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon. ico HTTP/1.0\" 200 3638" } ``` The [Elasticsearch PHP client](https://github.com/elastic/elasticsearch-php) should also be adapted to create a `PUT` request for Bulk API with data stream. I've created a [ticket 1389](https://github.com/elastic/elasticsearch-php/issues/1389) to address the issue on the Elasticsearch PHP client side. It would be very desirable if both packages support the Elasticsearch Bulk API with data stream. ### System details - Operating System: Ubuntu 20.04.6 LTS or debian:11-slim in docker - PHP Version: 8.1 - ES-PHP client version: 8.11.0 - Elasticsearch version: 8.11.3
kerem 2026-03-04 02:18:08 +03:00
  • closed this issue
  • added the
    Feature
    label
Author
Owner

@muratpurc commented on GitHub (Jan 18, 2024):

The issue was with the used Monolog version 2.9.2, it works with Monolog >=3.3.
The ticket can be closed.

<!-- gh-comment-id:1898777651 --> @muratpurc commented on GitHub (Jan 18, 2024): The issue was with the used Monolog version 2.9.2, it works with Monolog >=3.3. The ticket can be closed.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/monolog#800
No description provided.