diff --git a/tzrec/benchmark/benchmark.py b/tzrec/benchmark/benchmark.py index 1f173f2..38b153f 100644 --- a/tzrec/benchmark/benchmark.py +++ b/tzrec/benchmark/benchmark.py @@ -88,11 +88,9 @@ def _benchmark_train_eval( log_path: str, ) -> bool: """Run train_eval for benchmark.""" - port = misc_util.get_free_port() cmd_str = ( - f"PYTHONPATH=. torchrun --master_addr=localhost " - f"--master_port={port} --nnodes=1 " - "--nproc-per-node=2 --node_rank=0 " + "PYTHONPATH=. torchrun --standalone " + "--nnodes=1 --nproc-per-node=2 " f"--log_dir {log_path} -r 3 -t 3 tzrec/train_eval.py " f"--pipeline_config_path {run_config_path}" ) diff --git a/tzrec/modules/embedding.py b/tzrec/modules/embedding.py index fc2bc12..c576458 100644 --- a/tzrec/modules/embedding.py +++ b/tzrec/modules/embedding.py @@ -927,22 +927,23 @@ def forward( # sequence_sparse_features # when input_tile_emb need to tile(batch_size,1): # sequence_dense_features always need to tile - need_tile = is_user and ( - (need_input_tile and not is_sparse) or need_input_tile_emb - ) + need_tile = False + if is_user: + if is_sparse: + need_tile = need_input_tile_emb + else: + need_tile = need_input_tile jt = ( sparse_jt_dict[name] if is_sparse else sequence_dense_features[name] ) if i == 0: sequence_length = jt.lengths() + group_sequence_length = _int_item(torch.max(sequence_length)) if need_tile: - group_sequence_length = _int_item(sequence_length) results[f"{group_name}.sequence_length"] = sequence_length.tile( batch_size ) - else: - group_sequence_length = _int_item(torch.max(sequence_length)) results[f"{group_name}.sequence_length"] = sequence_length jt = jt.to_padded_dense(group_sequence_length) diff --git a/tzrec/tests/train_eval_export_test.py b/tzrec/tests/train_eval_export_test.py index 7a363cf..2c97d11 100644 --- a/tzrec/tests/train_eval_export_test.py +++ b/tzrec/tests/train_eval_export_test.py @@ -311,8 +311,9 @@ def test_multi_tower_din_with_fg_train_eval_export_input_tile(self): input_tile_dir_emb = os.path.join(self.test_dir, "input_tile_emb") pred_output = os.path.join(self.test_dir, "predict_result") tile_pred_output = os.path.join(self.test_dir, "predict_result_tile") + tile_pred_output_emb = os.path.join(self.test_dir, "predict_result_tile_emb") - # quant and no-input-tile + # export quant and no-input-tile if self.success: self.success = utils.test_export( os.path.join(self.test_dir, "pipeline.config"), self.test_dir @@ -327,20 +328,22 @@ def test_multi_tower_din_with_fg_train_eval_export_input_tile(self): test_dir=self.test_dir, ) - # no-quant and no-input-tile + # export no-quant and no-input-tile if self.success: os.environ["QUANT_EMB"] = "0" self.success = utils.test_export( os.path.join(self.test_dir, "pipeline.config"), no_quant_dir ) - # quant and input-tile + # export quant and input-tile if self.success: os.environ["QUANT_EMB"] = "1" os.environ["INPUT_TILE"] = "2" self.success = utils.test_export( os.path.join(self.test_dir, "pipeline.config"), input_tile_dir ) + + # predict quant and input-tile if self.success: # we should not set INPUT_TILE env when predict os.environ.pop("QUANT_EMB", None) @@ -360,7 +363,7 @@ def test_multi_tower_din_with_fg_train_eval_export_input_tile(self): df_t = df_t.sort_values(by=list(df_t.columns)).reset_index(drop=True) self.assertTrue(df.equals(df_t)) - # quant and input-tile emb + # export quant and input-tile emb if self.success: os.environ["QUANT_EMB"] = "1" os.environ["INPUT_TILE"] = "3" @@ -368,6 +371,30 @@ def test_multi_tower_din_with_fg_train_eval_export_input_tile(self): os.path.join(self.test_dir, "pipeline.config"), input_tile_dir_emb ) + # predict quant and input-tile emb + if self.success: + # we should not set INPUT_TILE env when predict + os.environ.pop("QUANT_EMB", None) + os.environ.pop("INPUT_TILE", None) + self.success = utils.test_predict( + scripted_model_path=os.path.join(input_tile_dir_emb, "export"), + predict_input_path=os.path.join(self.test_dir, r"eval_data/\*.parquet"), + predict_output_path=tile_pred_output_emb, + reserved_columns="user_id,item_id,clk", + output_columns="probs", + test_dir=input_tile_dir_emb, + ) + # compare INPUT_TILE and no INPUT_TILE result consistency + df = ds.dataset(pred_output, format="parquet").to_table().to_pandas() + df_t = ( + ds.dataset(tile_pred_output_emb, format="parquet") + .to_table() + .to_pandas() + ) + df = df.sort_values(by=list(df.columns)).reset_index(drop=True) + df_t = df_t.sort_values(by=list(df_t.columns)).reset_index(drop=True) + self.assertTrue(df.equals(df_t)) + self.assertTrue(self.success) self.assertTrue( diff --git a/tzrec/tests/utils.py b/tzrec/tests/utils.py index ad83793..c7e205c 100644 --- a/tzrec/tests/utils.py +++ b/tzrec/tests/utils.py @@ -841,12 +841,10 @@ def test_train_eval( test_config_path = os.path.join(test_dir, "pipeline.config") config_util.save_message(pipeline_config, test_config_path) - port = misc_util.get_free_port() log_dir = os.path.join(test_dir, "log_train_eval") cmd_str = ( - "PYTHONPATH=. torchrun " - f"--master_addr=localhost --master_port={port} --nnodes=1 " - f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} " + "PYTHONPATH=. torchrun --standalone " + f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} " "-r 3 -t 3 tzrec/train_eval.py " f"--pipeline_config_path {test_config_path} {args_str}" ) @@ -860,12 +858,10 @@ def test_train_eval( def test_eval(pipeline_config_path: str, test_dir: str) -> bool: """Run evaluate integration test.""" - port = misc_util.get_free_port() log_dir = os.path.join(test_dir, "log_eval") cmd_str = ( - "PYTHONPATH=. torchrun " - f"--master_addr=localhost --master_port={port} --nnodes=1 " - f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} " + "PYTHONPATH=. torchrun --standalone " + f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} " "-r 3 -t 3 tzrec/eval.py " f"--pipeline_config_path {pipeline_config_path}" ) @@ -881,12 +877,10 @@ def test_export( pipeline_config_path: str, test_dir: str, asset_files: str = "" ) -> bool: """Run export integration test.""" - port = misc_util.get_free_port() log_dir = os.path.join(test_dir, "log_export") cmd_str = ( - "PYTHONPATH=. torchrun " - f"--master_addr=localhost --master_port={port} --nnodes=1 " - f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} " + "PYTHONPATH=. torchrun --standalone " + f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} " "-r 3 -t 3 tzrec/export.py " f"--pipeline_config_path {pipeline_config_path} " f"--export_dir {test_dir}/export " @@ -903,12 +897,10 @@ def test_export( def test_feature_selection(pipeline_config_path: str, test_dir: str) -> bool: """Run export integration test.""" - port = misc_util.get_free_port() log_dir = os.path.join(test_dir, "log_feature_selection") cmd_str = ( - "PYTHONPATH=. torchrun " - f"--master_addr=localhost --master_port={port} --nnodes=1 " - f"--nproc-per-node=1 --node_rank=0 --log_dir {log_dir} " + "PYTHONPATH=. torchrun --standalone " + f"--nnodes=1 --nproc-per-node=1 --log_dir {log_dir} " "-m tzrec.tools.feature_selection " f"--pipeline_config_path {pipeline_config_path} " "--topk 5 " @@ -932,12 +924,10 @@ def test_predict( test_dir: str, ) -> bool: """Run predict integration test.""" - port = misc_util.get_free_port() log_dir = os.path.join(test_dir, "log_predict") cmd_str = ( - "PYTHONPATH=. torchrun " - f"--master_addr=localhost --master_port={port} --nnodes=1 " - f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} " + "PYTHONPATH=. torchrun --standalone " + f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} " "-r 3 -t 3 tzrec/predict.py " f"--scripted_model_path {scripted_model_path} " f"--predict_input_path {predict_input_path} " @@ -986,12 +976,10 @@ def test_hitrate( test_dir: str, ) -> bool: """Run hitrate integration test.""" - port = misc_util.get_free_port() log_dir = os.path.join(test_dir, "log_hitrate") cmd_str = ( - "OMP_NUM_THREADS=16 PYTHONPATH=. torchrun " - f"--master_addr=localhost --master_port={port} --nnodes=1 " - f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} " + "OMP_NUM_THREADS=16 PYTHONPATH=. torchrun --standalone " + f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} " "-r 3 -t 3 tzrec/tools/hitrate.py " f"--user_gt_input {user_gt_input} " f"--item_embedding_input {item_embedding_input} " @@ -1110,12 +1098,10 @@ def test_tdm_retrieval( test_dir: str, ) -> bool: """Run tdm retrieval test.""" - port = misc_util.get_free_port() log_dir = os.path.join(test_dir, "log_tdm_retrieval") cmd_str = ( - "PYTHONPATH=. torchrun " - f"--master_addr=localhost --master_port={port} --nnodes=1 " - f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} " + "PYTHONPATH=. torchrun --standalone " + f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} " "-r 3 -t 3 tzrec/tools/tdm/retrieval.py " f"--scripted_model_path {scripted_model_path} " f"--predict_input_path {eval_data_path} " @@ -1196,12 +1182,10 @@ def test_tdm_cluster_train_eval( test_config_path = os.path.join(test_dir, "learnt_tree/pipeline.config") config_util.save_message(pipeline_config, test_config_path) - port = misc_util.get_free_port() log_dir = os.path.join(test_dir, "log_learnt_train_eval") cmd_str = ( - "PYTHONPATH=. torchrun " - f"--master_addr=localhost --master_port={port} --nnodes=1 " - f"--nproc-per-node=2 --node_rank=0 --log_dir {log_dir} " + "PYTHONPATH=. torchrun --standalone " + f"--nnodes=1 --nproc-per-node=2 --log_dir {log_dir} " "-r 3 -t 3 tzrec/train_eval.py " f"--pipeline_config_path {test_config_path}" )