Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wangdi/google 26 dfuse 1 #15653

Open
wants to merge 7 commits into
base: google/2.6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/user/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ Additionally, there are several optional command-line options:
| --container=<label\|uuid\> | container label or uuid to open |
| --sys-name=<name\> | DAOS system name |
| --foreground | run in foreground |
| --singlethreaded | run single threaded |
| --thread-count=<count> | Number of threads to use |
| --multi-user | Run in multi user mode |
| --read-only | Mount in read-only mode |
Expand Down
1 change: 1 addition & 0 deletions src/client/dfuse/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ DFUSE_SRC = ['dfuse_core.c',
'dfuse_main.c',
'dfuse_fuseops.c',
'inval.c',
'file.c',
'dfuse_cont.c',
'dfuse_thread.c',
'dfuse_pool.c']
Expand Down
63 changes: 53 additions & 10 deletions src/client/dfuse/dfuse.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ struct dfuse_info {
char *di_mountpoint;
int32_t di_thread_count;
uint32_t di_eq_count;
bool di_threaded;
bool di_foreground;
bool di_caching;
bool di_multi_user;
Expand Down Expand Up @@ -137,9 +136,10 @@ struct dfuse_inode_entry;
* when EOF is returned to the kernel. If it's still present on release then it's freed then.
*/
struct dfuse_pre_read {
pthread_mutex_t dra_lock;
d_list_t req_list;
struct dfuse_event *dra_ev;
int dra_rc;
bool complete;
};

/** what is returned as the handle for fuse fuse_file_info on create/open/opendir */
Expand All @@ -149,8 +149,6 @@ struct dfuse_obj_hdl {
/** the DFS object handle. Not created for directories. */
dfs_obj_t *doh_obj;

struct dfuse_pre_read *doh_readahead;

/** the inode entry for the file */
struct dfuse_inode_entry *doh_ie;

Expand All @@ -169,17 +167,24 @@ struct dfuse_obj_hdl {
/* Pointer to the last returned drc entry */
struct dfuse_readdir_c *doh_rd_nextc;

/* Linear read function, if a file is read from start to end then this normally requires
* a final read request at the end of the file that returns zero bytes. Detect this case
* and when the final read is detected then just return without a round trip.
* Store a flag for this being enabled (starts as true, but many I/O patterns will set it
* to false), the expected position of the next read and a boolean for if EOF has been
* detected.
/* Linear read tracking. If a file is opened and read from start to finish then this is
* called a linear read, linear reads however may or may not read EOF at the end of a file,
* as the reader may be checking the file size.
*
* Detect this case and track it at the file handle level, this is then used in two places:
* For read of EOF it means the round-trip can be avoided.
* On release we can use this flag to apply a setting to the directory inode.
*
* This flag starts enabled and many I/O patterns will disable it. We also store the next
* expected read position and if EOF has been reached.
*/

off_t doh_linear_read_pos;
bool doh_linear_read;
bool doh_linear_read_eof;

bool doh_set_linear_read;

/** True if caching is enabled for this file. */
bool doh_caching;

Expand Down Expand Up @@ -401,11 +406,20 @@ struct dfuse_event {
d_iov_t de_iov;
d_sg_list_t de_sgl;
d_list_t de_list;

/* Position in a list of events, this will either be off active->open_reads or
* de->de_read_slaves.
*/
d_list_t de_read_list;
/* List of slave events */
d_list_t de_read_slaves;
struct dfuse_eq *de_eqt;
union {
struct dfuse_obj_hdl *de_oh;
struct dfuse_inode_entry *de_ie;
struct read_chunk_data *de_cd;
};
struct dfuse_info *de_di;
off_t de_req_position; /**< The file position requested by fuse */
union {
size_t de_req_len;
Expand Down Expand Up @@ -1009,10 +1023,32 @@ struct dfuse_inode_entry {
*/
ATOMIC bool ie_linear_read;

struct active_inode *ie_active;

/* Entry on the evict list */
d_list_t ie_evict_entry;
};

struct active_inode {
d_list_t chunks;
d_list_t open_reads;
pthread_spinlock_t lock;
ATOMIC uint64_t read_count;
struct dfuse_pre_read *readahead;
};

/* Increase active count on inode. This takes a reference and allocates ie->active as required */
int
active_ie_init(struct dfuse_inode_entry *ie, bool *preread);

/* Mark a oh as closing and drop the ref on inode active */
void
active_oh_decref(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh);

/* Decrease active count on inode, called on error where there is no oh */
void
active_ie_decref(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie);

/* Flush write-back cache writes to a inode. It does this by waiting for and then releasing an
* exclusive lock on the inode. Writes take a shared lock so this will block until all pending
* writes are complete.
Expand Down Expand Up @@ -1108,6 +1144,13 @@ dfuse_compute_inode(struct dfuse_cont *dfs,
void
dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie);

/* Free any read chunk data for an inode.
*
* Returns true if feature was used.
*/
bool
read_chunk_close(struct dfuse_inode_entry *ie);

/* Metadata caching functions. */

/* Mark the cache as up-to-date from now */
Expand Down
3 changes: 3 additions & 0 deletions src/client/dfuse/dfuse_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,7 @@ dfuse_ie_close(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie)
atomic_load_relaxed(&ie->ie_il_count));
D_ASSERTF(atomic_load_relaxed(&ie->ie_open_count) == 0, "open_count is %d",
atomic_load_relaxed(&ie->ie_open_count));
D_ASSERT(!ie->ie_active);

if (ie->ie_obj) {
rc = dfs_release(ie->ie_obj);
Expand Down Expand Up @@ -1317,6 +1318,8 @@ dfuse_read_event_size(void *arg, size_t size)
ev->de_sgl.sg_nr = 1;
}

D_INIT_LIST_HEAD(&ev->de_read_slaves);

rc = daos_event_init(&ev->de_ev, ev->de_eqt->de_eq, NULL);
if (rc != -DER_SUCCESS) {
return false;
Expand Down
26 changes: 8 additions & 18 deletions src/client/dfuse/dfuse_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ dfuse_bg(struct dfuse_info *dfuse_info)
*
* Should be called from the post_start plugin callback and creates
* a filesystem.
* Returns a DAOS error code.
* Returns true on success, false on failure.
*/
int
Expand Down Expand Up @@ -204,18 +205,17 @@ dfuse_launch_fuse(struct dfuse_info *dfuse_info, struct fuse_args *args)
DFUSE_TRA_ERROR(dfuse_info, "Error sending signal to fg: "DF_RC, DP_RC(rc));

/* Blocking */
if (dfuse_info->di_threaded)
rc = dfuse_loop(dfuse_info);
else
rc = fuse_session_loop(dfuse_info->di_session);
if (rc != 0)
rc = dfuse_loop(dfuse_info);
if (rc != 0) {
DHS_ERROR(dfuse_info, rc, "Fuse loop exited");
rc = daos_errno2der(rc);
}

umount:

fuse_session_unmount(dfuse_info->di_session);

return daos_errno2der(rc);
return rc;
}

#define DF_POOL_PREFIX "pool="
Expand Down Expand Up @@ -279,7 +279,6 @@ show_help(char *name)
" --path=<path> Path to load UNS pool/container data\n"
" --sys-name=STR DAOS system name context for servers\n"
"\n"
" -S --singlethread Single threaded (deprecated)\n"
" -t --thread-count=count Total number of threads to use\n"
" -e --eq-count=count Number of event queues to use\n"
" -f --foreground Run in foreground\n"
Expand Down Expand Up @@ -423,7 +422,6 @@ main(int argc, char **argv)
{"pool", required_argument, 0, 'p'},
{"container", required_argument, 0, 'c'},
{"sys-name", required_argument, 0, 'G'},
{"singlethread", no_argument, 0, 'S'},
{"thread-count", required_argument, 0, 't'},
{"eq-count", required_argument, 0, 'e'},
{"foreground", no_argument, 0, 'f'},
Expand All @@ -447,13 +445,12 @@ main(int argc, char **argv)
if (dfuse_info == NULL)
D_GOTO(out_debug, rc = -DER_NOMEM);

dfuse_info->di_threaded = true;
dfuse_info->di_caching = true;
dfuse_info->di_wb_cache = true;
dfuse_info->di_eq_count = 1;

while (1) {
c = getopt_long(argc, argv, "Mm:St:o:fhe:v", long_options, NULL);
c = getopt_long(argc, argv, "Mm:t:o:fhe:v", long_options, NULL);

if (c == -1)
break;
Expand Down Expand Up @@ -491,13 +488,6 @@ main(int argc, char **argv)
case 'P':
path = optarg;
break;
case 'S':
/* Set it to be single threaded, but allow an extra one
* for the event queue processing
*/
dfuse_info->di_threaded = false;
dfuse_info->di_thread_count = 2;
break;
case 'e':
dfuse_info->di_eq_count = atoi(optarg);
break;
Expand Down Expand Up @@ -564,7 +554,7 @@ main(int argc, char **argv)
* check CPU binding. If bound to a number of cores then launch that number of threads,
* if not bound them limit to 16.
*/
if (dfuse_info->di_threaded && !have_thread_count) {
if (!have_thread_count) {
struct hwloc_topology *hwt;
hwloc_const_cpuset_t hw;
int total;
Expand Down
139 changes: 139 additions & 0 deletions src/client/dfuse/file.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* (C) Copyright 2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/

#include "dfuse_common.h"
#include "dfuse.h"

/* A lock is needed here, not for ie_open_count which is updated atomcially here and elsewhere
* but to ensure that ie_active is also atomically updated with the reference count.
*/
static pthread_mutex_t alock = PTHREAD_MUTEX_INITIALIZER;

/* Perhaps combine with dfuse_open_handle_init? */
int
active_ie_init(struct dfuse_inode_entry *ie, bool *preread)
{
uint32_t oc;
int rc = -DER_SUCCESS;

D_MUTEX_LOCK(&alock);

oc = atomic_fetch_add_relaxed(&ie->ie_open_count, 1);

DFUSE_TRA_DEBUG(ie, "Addref to %d", oc + 1);

if (oc != 0) {
if (preread && *preread)
*preread = false;
goto out;
}

D_ALLOC_PTR(ie->ie_active);
if (!ie->ie_active)
D_GOTO(out, rc = -DER_NOMEM);

rc = D_SPIN_INIT(&ie->ie_active->lock, 0);
if (rc != -DER_SUCCESS) {
D_FREE(ie->ie_active);
goto out;
}
D_INIT_LIST_HEAD(&ie->ie_active->chunks);
D_INIT_LIST_HEAD(&ie->ie_active->open_reads);
atomic_init(&ie->ie_active->read_count, 0);
if (preread && *preread) {
D_ALLOC_PTR(ie->ie_active->readahead);
if (ie->ie_active->readahead) {
D_INIT_LIST_HEAD(&ie->ie_active->readahead->req_list);
atomic_fetch_add_relaxed(&ie->ie_open_count, 1);
}
}
/* Take a reference on the inode to prevent it being released */
atomic_fetch_add_relaxed(&ie->ie_ref, 1);
out:
D_MUTEX_UNLOCK(&alock);
return rc;
}

static void
ah_free(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie)
{
struct active_inode *active = ie->ie_active;

if (active->readahead) {
struct dfuse_event *ev;

D_ASSERT(active->readahead->complete);
D_ASSERT(d_list_empty(&active->readahead->req_list));

ev = active->readahead->dra_ev;

if (ev) {
daos_event_fini(&ev->de_ev);
d_slab_release(ev->de_eqt->de_pre_read_slab, ev);
}
D_FREE(active->readahead);
}

D_SPIN_DESTROY(&active->lock);
D_FREE(ie->ie_active);
dfuse_inode_decref(dfuse_info, ie);
}

void
active_oh_decref(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh)
{
uint32_t oc;

D_MUTEX_LOCK(&alock);

oc = atomic_fetch_sub_relaxed(&oh->doh_ie->ie_open_count, 1);
D_ASSERTF(oc >= 1, "Invalid decref from %d on %p %p", oc, oh, oh->doh_ie);

DFUSE_TRA_DEBUG(oh->doh_ie, "Decref to %d", oc - 1);

/* Leave set_linear_read as false in this case */
if (oc != 1)
goto out;

if (read_chunk_close(oh->doh_ie))
oh->doh_linear_read = true;

/* Do not set linear read in the case where there's no reads or writes, this could be
* simple open/close calls but it could also be cache use so leave the setting unchanged
* in this case.
*/
if (oh->doh_linear_read) {
if (oh->doh_ie->ie_active->read_count != 0)
oh->doh_set_linear_read = true;
} else {
oh->doh_set_linear_read = true;
}

ah_free(dfuse_info, oh->doh_ie);
out:
D_MUTEX_UNLOCK(&alock);
}

void
active_ie_decref(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie)
{
uint32_t oc;
D_MUTEX_LOCK(&alock);

oc = atomic_fetch_sub_relaxed(&ie->ie_open_count, 1);
D_ASSERTF(oc >= 1, "Invalid decref from %d on %p", oc, ie);

DFUSE_TRA_DEBUG(ie, "Decref to %d", oc - 1);

if (oc != 1)
goto out;

read_chunk_close(ie);

ah_free(dfuse_info, ie);
out:
D_MUTEX_UNLOCK(&alock);
}
Loading
Loading