Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed inserts (_bulk, replace, update) #392

Open
1 of 6 tasks
donhardman opened this issue Nov 5, 2024 · 10 comments
Open
1 of 6 tasks

Distributed inserts (_bulk, replace, update) #392

donhardman opened this issue Nov 5, 2024 · 10 comments
Assignees

Comments

@donhardman
Copy link
Contributor

Proposal:

We should implement the logic and stick to JSON protocol ONLY for the initial version when we are able to insert into sharded tables on the Buddy side.

Key considerations:

  • Implement id generation on the Buddy side that is easy to maintain and can be moved to the daemon part later
  • Ensure we use a proper algorithm for sharding logic. If we allow users to pass id, we should use MD5 or similar hashing; otherwise, don't allow passing IDs
  • For replace or update operations:
    • When performed without an ID, send to all nodes
    • When performed with an ID, we can determine the specific shard from the ID

Checklist:

To be completed by the assignee. Check off tasks that have been completed or are not applicable.

  • Implementation completed
  • Tests developed
  • Documentation updated
  • Documentation reviewed
  • Changelog updated
  • OpenAPI YAML updated and issue created to rebuild clients
@donhardman donhardman self-assigned this Nov 5, 2024
@donhardman
Copy link
Contributor Author

Two Approaches for ID Generation and Sharding

1. Snowflake-like ID Generation

Even distribution is prioritized. This approach generates unique IDs with embedded shard information.

Structure (63-bit integer):

  • 41 bits: timestamp (milliseconds since custom epoch)
  • 10 bits: shard/node ID (0-1023)
  • 12 bits: sequence number (0-4095)

Formula:

$id = ($timestamp << 22) | ($shardId % 1024 << 12) | ($sequence);

Example in PHP:

class SnowflakeGenerator {
    private const CUSTOM_EPOCH = 1640995200000; // 2022-01-01
    private $sequence = 0;
    private $lastTimestamp = -1;

    public function generateId($shardId) {
        $timestamp = $this->getCurrentTimestamp();

        if ($timestamp == $this->lastTimestamp) {
            $this->sequence = ($this->sequence + 1) & 4095;
            if ($this->sequence == 0) {
                $timestamp = $this->waitNextMillis($this->lastTimestamp);
            }
        } else {
            $this->sequence = 0;
        }

        $this->lastTimestamp = $timestamp;

        return (($timestamp - self::CUSTOM_EPOCH) << 22)
             | ($shardId % 1024 << 12)
             | $this->sequence;
    }

    // Helper methods...
}

Characteristics:

  • Number of shards limited but not fixed
  • Cannot support custom IDs
  • No extra overhead
  • Guaranteed sequential IDs

2. MD5-based Sharding

For scenarios where custom IDs are needed.

Formula:

$shardNumber = hexdec(substr(md5($id), 0, 8)) % $totalShards;

Example in PHP:

class Md5Sharding {
    private $totalShards;

    public function __construct($totalShards) {
        $this->totalShards = $totalShards;
    }

    public function getShardNumber($id) {
        return hexdec(substr(md5((string)$id), 0, 8)) % $this->totalShards;
    }
}

// Usage example
$sharding = new Md5Sharding(16);
$customId = 12345;
$shardNumber = $sharding->getShardNumber($customId);

Characteristics:

  • Fixed number of shards
  • Supports custom IDs
  • Small performance overhead for MD5 calculation
  • Even distribution across shards

Both approaches have their use cases:

  • Use Snowflake when you need sequential IDs and embedded shard information
  • Use MD5-based sharding when you need to support custom IDs or have a fixed number of shards

@donhardman donhardman assigned sanikolaev and unassigned donhardman Nov 12, 2024
@sanikolaev
Copy link
Collaborator

As we discussed in Slack, let's avoid using the snowflake ID approach, as we need to keep the option to provide custom IDs. Instead of the modulo function, let's explore other options, like jump consistent hashing.

@sanikolaev sanikolaev assigned donhardman and unassigned sanikolaev Nov 12, 2024
@sanikolaev
Copy link
Collaborator

@donhardman also, as we discussed select uuid_short() may be required in the daemon. I've discussed it with @tomatolog and it's not a big deal to add it. Pls create a separate task about it if required.

@donhardman
Copy link
Contributor Author

I have created a task: manticoresoftware/manticoresearch#2752

@sanikolaev
Copy link
Collaborator

@donhardman
Copy link
Contributor Author

Buddy:

Testing with following parameters:
Curl command: curl -H 'Content-type: application/x-ndjson' --data-binary @bulk.json http://localhost:9308/_bulk
Number of requests: 1000
Concurrency: 12
----------------------------------------
Single-thread curl test (sequential):
----------------------------------------
Progress: 1000/1000
Total time: 63.841850320s
Average time per request: 63.841ms

Concurrent test using parallel execution:
----------------------------------------
Total time: 14.040817747s
Average time per request: 14.040ms

Direct bulk insert into table:

Testing with following parameters:
Curl command: curl -H 'Content-type: application/x-ndjson' --data-binary @bulk-direct.json http://localhost:9308/_bulk
Number of requests: 1000
Concurrency: 12
----------------------------------------
Single-thread curl test (sequential):
----------------------------------------
Progress: 1000/1000
Total time: 7.573245040s
Average time per request: 7.573ms

Concurrent test using parallel execution:
----------------------------------------
Total time: 1.792843179s
Average time per request: 1.792ms

The bulk file has 1024 rows with single document.

Script used to test it
#!/bin/bash

# Check if required parameters are provided
if [ "$#" -lt 3 ]; then
	echo "Usage: $0 \"<curl_command>\" <number_of_requests> <concurrency>"
	echo "Example: $0 \"curl -X GET http://example.com\" 100 2"
	exit 1
fi

# Store parameters
CURL_CMD="$1"
NUM_REQUESTS=$2
CONCURRENCY=$3

echo "Testing with following parameters:"
echo "Curl command: $CURL_CMD"
echo "Number of requests: $NUM_REQUESTS"
echo "Concurrency: $CONCURRENCY"
echo "----------------------------------------"

echo "Single-thread curl test (sequential):"
echo "----------------------------------------"

# Run sequential curl requests and measure time
start_time=$(date +%s.%N)
for ((i=1; i<=$NUM_REQUESTS; i++)); do
	eval "$CURL_CMD" >/dev/null 2>&1
	echo -ne "\rProgress: $i/$NUM_REQUESTS"
done
echo
end_time=$(date +%s.%N)

# Calculate average time for sequential requests
duration=$(echo "$end_time - $start_time" | bc)
avg_time=$(echo "scale=3; $duration * 1000 / $NUM_REQUESTS" | bc)

echo "Total time: ${duration}s"
echo "Average time per request: ${avg_time}ms"
echo ""

echo "Concurrent test using parallel execution:"
echo "----------------------------------------"

# Create a temporary file to store the commands
temp_file=$(mktemp)
for ((i=1; i<=$NUM_REQUESTS; i++)); do
	echo "$CURL_CMD >/dev/null 2>&1" >> "$temp_file"
done

# Run commands concurrently using parallel
start_time=$(date +%s.%N)
parallel -j "$CONCURRENCY" < "$temp_file"
end_time=$(date +%s.%N)

# Calculate average time for concurrent requests
duration=$(echo "$end_time - $start_time" | bc)
avg_time=$(echo "scale=3; $duration * 1000 / $NUM_REQUESTS" | bc)

echo "Total time: ${duration}s"
echo "Average time per request: ${avg_time}ms"

# Cleanup
rm "$temp_file"

@donhardman
Copy link
Contributor Author

donhardman commented Nov 27, 2024

While we are waiting for things that are blocking our next move, let's cover the functionality with tests that we can already implement and manually verify.

Location: manticoresoftware/manticoresearch#2784

Documentation reference for test cases: https://manual.manticoresearch.com/dev/Data_creation_and_modification/Adding_documents_to_a_table/Adding_documents_to_a_real-time_table?client=Elasticsearch#Bulk-adding-documents

We should test the following Elasticsearch-like endpoints with distributed tables:

  1. insert
  2. replace
  3. update
  4. _bulk
  5. delete

Additionally, we need to verify that these operations work with distributed tables that have remote agents. We can use sharding to create such table configurations for testing.

@sanikolaev
Copy link
Collaborator

Here's how the new functionality can be tested and one bug:

snikolaev@dev2:~$ docker run --name kit --rm -it ghcr.io/manticoresoftware/manticoresearch:test-kit-buddy-402 bash

root@9356d020d256:/# searchd
Manticore 6.3.9 9183ab762@24112704 dev (columnar 2.3.1 edadc69@24112219) (secondary 2.3.1 edadc69@24112219) (knn 2.3.1 edadc69@24112219)
...

root@9356d020d256:/# mysql -P9306 -h0
mysql> create table t(f text) shards=3;

root@9356d020d256:/# curl 0:9308/insert -d '{"table": "t", "doc": {"f": "abc"}}'
{"error":{"type":"parse_exception","reason":"Document ids should be integer or array of integers","table":"t_s2","index":"t"},"status":400}

Same against a non-distributed table works fine:

root@9356d020d256:/# curl 0:9308/insert -d '{"table": "l", "doc": {"f": "abc"}}'
{"table":"l","_id":6776366998629646347,"created":true,"result":"created","status":201}

@sanikolaev
Copy link
Collaborator

One more bug: all docs are routed to the same shard:

root@9356d020d256:/# mysql -P9306 -h0 -e "create table t(f text) shards=3 rf=1;"

root@9356d020d256:/# for n in `seq 1 200`; do curl -s 0:9308/insert -d '{"table": "t", "id": 0, "doc": {"f": "abc"}}' > /dev/null; done;
root@9356d020d256:/# mysql -P9306 -h0 -e "select count(*) from t"
+----------+
| count(*) |
+----------+
|      200 |
+----------+
root@9356d020d256:/# mysql -P9306 -h0 -e "select count(*) from t_s0"
+----------+
| count(*) |
+----------+
|      200 |
+----------+
root@9356d020d256:/# mysql -P9306 -h0 -e "select count(*) from t_s1"
+----------+
| count(*) |
+----------+
|        0 |
+----------+

@donhardman
Copy link
Contributor Author

For make it simpler to write tests here what I used to test:

curl -H 'Content-type: application/x-ndjson' --data-binary @bulk.json http://localhost:9308/_bulk

Same for /insert and other endpoitns.

And here what I used for example of document

insert, update, replace
{
  "index": "a",
  "id": 1,
  "doc":
  {
    "value": "Hello world"
  }
}
delete
{
  "index": "test",
  "id": 1
}
bulk
{"index":{"_index":"test"}}
{"value":"Donald John Trump (born June 14, 1946) is an American politician, media personality, and businessman who served as the 45th president of the United States from 2017 to 2021. He won the 2024 presidential election as the nominee of the Republican Party and is scheduled to be inaugurated as the 47th president on January 20, 2025.\n\n Trump graduated with a bachelor's degree in economics from the University of Pennsylvania in 1968. After becoming president of the family real estate business in 1971, he renamed it the Trump Organization. After a series of bankruptcies in the 1990s, he launched side ventures, mostly licensing the Trump name. From 2004 to 2015, he produced and hosted the reality television series The Apprentice. In 2015 he launched a presidential campaign.\n\n Trump won the 2016 presidential election. His election and policies sparked numerous protests. In his first term, he ordered a travel ban targeting Muslims and refugees, funded the Trump wall expanding the U.S.–Mexico border wall, and implemented a family separation policy at the border. He rolled back more than 100 environmental policies and regulations, signed the Tax Cuts and Jobs Act of 2017,[a] and appointed three justices to the Supreme Court.[b] He initiated a trade war with China, withdrew the U.S. from several international agreements,[c] and met with North Korean leader Kim Jong Un without progress on denuclearization. He responded to the COVID-19 pandemic with the CARES Act,[d] and downplayed its severity. He was impeached in 2019 for abuse of power and obstruction of Congress, and in 2021 for incitement of insurrection; the Senate acquitted him in both cases."}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants