Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] fix offline predict input tile model with sequence #14

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions tzrec/benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,9 @@ def _benchmark_train_eval(
log_path: str,
) -> bool:
"""Run train_eval for benchmark."""
port = misc_util.get_free_port()
cmd_str = (
f"PYTHONPATH=. torchrun --master_addr=localhost "
f"--master_port={port} --nnodes=1 "
"--nproc-per-node=2 --node_rank=0 "
"PYTHONPATH=. torchrun --standalone "
"--nnodes=1 --nproc-per-node=2 "
f"--log_dir {log_path} -r 3 -t 3 tzrec/train_eval.py "
f"--pipeline_config_path {run_config_path}"
)
Expand Down
13 changes: 7 additions & 6 deletions tzrec/modules/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,22 +927,23 @@ def forward(
# sequence_sparse_features
# when input_tile_emb need to tile(batch_size,1):
# sequence_dense_features always need to tile
need_tile = is_user and (
(need_input_tile and not is_sparse) or need_input_tile_emb
)
need_tile = False
if is_user:
if is_sparse:
need_tile = need_input_tile_emb
else:
need_tile = need_input_tile
jt = (
sparse_jt_dict[name] if is_sparse else sequence_dense_features[name]
)
if i == 0:
sequence_length = jt.lengths()
group_sequence_length = _int_item(torch.max(sequence_length))
if need_tile:
group_sequence_length = _int_item(sequence_length)
results[f"{group_name}.sequence_length"] = sequence_length.tile(
batch_size
)

else:
group_sequence_length = _int_item(torch.max(sequence_length))
results[f"{group_name}.sequence_length"] = sequence_length
jt = jt.to_padded_dense(group_sequence_length)

Expand Down
35 changes: 31 additions & 4 deletions tzrec/tests/train_eval_export_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,9 @@ def test_multi_tower_din_with_fg_train_eval_export_input_tile(self):
input_tile_dir_emb = os.path.join(self.test_dir, "input_tile_emb")
pred_output = os.path.join(self.test_dir, "predict_result")
tile_pred_output = os.path.join(self.test_dir, "predict_result_tile")
tile_pred_output_emb = os.path.join(self.test_dir, "predict_result_tile_emb")

# quant and no-input-tile
# export quant and no-input-tile
if self.success:
self.success = utils.test_export(
os.path.join(self.test_dir, "pipeline.config"), self.test_dir
Expand All @@ -327,20 +328,22 @@ def test_multi_tower_din_with_fg_train_eval_export_input_tile(self):
test_dir=self.test_dir,
)

# no-quant and no-input-tile
# export no-quant and no-input-tile
if self.success:
os.environ["QUANT_EMB"] = "0"
self.success = utils.test_export(
os.path.join(self.test_dir, "pipeline.config"), no_quant_dir
)

# quant and input-tile
# export quant and input-tile
if self.success:
os.environ["QUANT_EMB"] = "1"
os.environ["INPUT_TILE"] = "2"
self.success = utils.test_export(
os.path.join(self.test_dir, "pipeline.config"), input_tile_dir
)

# predict quant and input-tile
if self.success:
# we should not set INPUT_TILE env when predict
os.environ.pop("QUANT_EMB", None)
Expand All @@ -360,14 +363,38 @@ def test_multi_tower_din_with_fg_train_eval_export_input_tile(self):
df_t = df_t.sort_values(by=list(df_t.columns)).reset_index(drop=True)
self.assertTrue(df.equals(df_t))

# quant and input-tile emb
# export quant and input-tile emb
if self.success:
os.environ["QUANT_EMB"] = "1"
os.environ["INPUT_TILE"] = "3"
self.success = utils.test_export(
os.path.join(self.test_dir, "pipeline.config"), input_tile_dir_emb
)

# predict quant and input-tile emb
if self.success:
# we should not set INPUT_TILE env when predict
os.environ.pop("QUANT_EMB", None)
os.environ.pop("INPUT_TILE", None)
self.success = utils.test_predict(
scripted_model_path=os.path.join(input_tile_dir_emb, "export"),
predict_input_path=os.path.join(self.test_dir, r"eval_data/\*.parquet"),
predict_output_path=tile_pred_output_emb,
reserved_columns="user_id,item_id,clk",
output_columns="probs",
test_dir=input_tile_dir_emb,
)
# compare INPUT_TILE and no INPUT_TILE result consistency
df = ds.dataset(pred_output, format="parquet").to_table().to_pandas()
df_t = (
ds.dataset(tile_pred_output_emb, format="parquet")
.to_table()
.to_pandas()
)
df = df.sort_values(by=list(df.columns)).reset_index(drop=True)
df_t = df_t.sort_values(by=list(df_t.columns)).reset_index(drop=True)
self.assertTrue(df.equals(df_t))

self.assertTrue(self.success)

self.assertTrue(
Expand Down
48 changes: 16 additions & 32 deletions tzrec/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,12 +841,10 @@ def test_train_eval(

test_config_path = os.path.join(test_dir, "pipeline.config")
config_util.save_message(pipeline_config, test_config_path)
port = misc_util.get_free_port()
log_dir = os.path.join(test_dir, "log_train_eval")
cmd_str = (
"PYTHONPATH=. torchrun "
f"--master_addr=localhost --master_port={port} --nnodes=1 "
f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} "
"PYTHONPATH=. torchrun --standalone "
f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} "
"-r 3 -t 3 tzrec/train_eval.py "
f"--pipeline_config_path {test_config_path} {args_str}"
)
Expand All @@ -860,12 +858,10 @@ def test_train_eval(

def test_eval(pipeline_config_path: str, test_dir: str) -> bool:
"""Run evaluate integration test."""
port = misc_util.get_free_port()
log_dir = os.path.join(test_dir, "log_eval")
cmd_str = (
"PYTHONPATH=. torchrun "
f"--master_addr=localhost --master_port={port} --nnodes=1 "
f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} "
"PYTHONPATH=. torchrun --standalone "
f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} "
"-r 3 -t 3 tzrec/eval.py "
f"--pipeline_config_path {pipeline_config_path}"
)
Expand All @@ -881,12 +877,10 @@ def test_export(
pipeline_config_path: str, test_dir: str, asset_files: str = ""
) -> bool:
"""Run export integration test."""
port = misc_util.get_free_port()
log_dir = os.path.join(test_dir, "log_export")
cmd_str = (
"PYTHONPATH=. torchrun "
f"--master_addr=localhost --master_port={port} --nnodes=1 "
f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} "
"PYTHONPATH=. torchrun --standalone "
f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} "
"-r 3 -t 3 tzrec/export.py "
f"--pipeline_config_path {pipeline_config_path} "
f"--export_dir {test_dir}/export "
Expand All @@ -903,12 +897,10 @@ def test_export(

def test_feature_selection(pipeline_config_path: str, test_dir: str) -> bool:
"""Run export integration test."""
port = misc_util.get_free_port()
log_dir = os.path.join(test_dir, "log_feature_selection")
cmd_str = (
"PYTHONPATH=. torchrun "
f"--master_addr=localhost --master_port={port} --nnodes=1 "
f"--nproc-per-node=1 --node_rank=0 --log_dir {log_dir} "
"PYTHONPATH=. torchrun --standalone "
f"--nnodes=1 --nproc-per-node=1 --log_dir {log_dir} "
"-m tzrec.tools.feature_selection "
f"--pipeline_config_path {pipeline_config_path} "
"--topk 5 "
Expand All @@ -932,12 +924,10 @@ def test_predict(
test_dir: str,
) -> bool:
"""Run predict integration test."""
port = misc_util.get_free_port()
log_dir = os.path.join(test_dir, "log_predict")
cmd_str = (
"PYTHONPATH=. torchrun "
f"--master_addr=localhost --master_port={port} --nnodes=1 "
f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} "
"PYTHONPATH=. torchrun --standalone "
f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} "
"-r 3 -t 3 tzrec/predict.py "
f"--scripted_model_path {scripted_model_path} "
f"--predict_input_path {predict_input_path} "
Expand Down Expand Up @@ -986,12 +976,10 @@ def test_hitrate(
test_dir: str,
) -> bool:
"""Run hitrate integration test."""
port = misc_util.get_free_port()
log_dir = os.path.join(test_dir, "log_hitrate")
cmd_str = (
"OMP_NUM_THREADS=16 PYTHONPATH=. torchrun "
f"--master_addr=localhost --master_port={port} --nnodes=1 "
f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} "
"OMP_NUM_THREADS=16 PYTHONPATH=. torchrun --standalone "
f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} "
"-r 3 -t 3 tzrec/tools/hitrate.py "
f"--user_gt_input {user_gt_input} "
f"--item_embedding_input {item_embedding_input} "
Expand Down Expand Up @@ -1110,12 +1098,10 @@ def test_tdm_retrieval(
test_dir: str,
) -> bool:
"""Run tdm retrieval test."""
port = misc_util.get_free_port()
log_dir = os.path.join(test_dir, "log_tdm_retrieval")
cmd_str = (
"PYTHONPATH=. torchrun "
f"--master_addr=localhost --master_port={port} --nnodes=1 "
f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} "
"PYTHONPATH=. torchrun --standalone "
f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} "
"-r 3 -t 3 tzrec/tools/tdm/retrieval.py "
f"--scripted_model_path {scripted_model_path} "
f"--predict_input_path {eval_data_path} "
Expand Down Expand Up @@ -1196,12 +1182,10 @@ def test_tdm_cluster_train_eval(
test_config_path = os.path.join(test_dir, "learnt_tree/pipeline.config")
config_util.save_message(pipeline_config, test_config_path)

port = misc_util.get_free_port()
log_dir = os.path.join(test_dir, "log_learnt_train_eval")
cmd_str = (
"PYTHONPATH=. torchrun "
f"--master_addr=localhost --master_port={port} --nnodes=1 "
f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} "
"PYTHONPATH=. torchrun --standalone "
f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} "
"-r 3 -t 3 tzrec/train_eval.py "
f"--pipeline_config_path {test_config_path}"
)
Expand Down
Loading