diff --git a/plugins/postgresql-db-bulk-loader/impl/src/main/java/org/pentaho/di/trans/steps/pgbulkloader/PGBulkLoader.java b/plugins/postgresql-db-bulk-loader/impl/src/main/java/org/pentaho/di/trans/steps/pgbulkloader/PGBulkLoader.java index b378ee0e26b4..a084e626f2c1 100644 --- a/plugins/postgresql-db-bulk-loader/impl/src/main/java/org/pentaho/di/trans/steps/pgbulkloader/PGBulkLoader.java +++ b/plugins/postgresql-db-bulk-loader/impl/src/main/java/org/pentaho/di/trans/steps/pgbulkloader/PGBulkLoader.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2022 by Hitachi Vantara : http://www.pentaho.com + * Copyright (C) 2002-2023 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -30,22 +30,16 @@ // // -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.charset.Charset; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; - +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; import org.pentaho.di.core.Const; -import org.pentaho.di.core.util.Utils; import org.pentaho.di.core.database.Database; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.LoggingObjectInterface; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.ValueMetaInterface; +import org.pentaho.di.core.util.Utils; import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; @@ -54,11 +48,15 @@ import org.pentaho.di.trans.step.StepInterface; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.step.StepMetaInterface; +import org.postgresql.PGConnection; import org.postgresql.copy.PGCopyOutputStream; -import com.google.common.annotations.VisibleForTesting; - -import org.postgresql.PGConnection; +import java.math.BigDecimal; +import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; /** * Performs a bulk load to a postgres table. @@ -492,11 +490,8 @@ public void dispose( StepMetaInterface smi, StepDataInterface sdi ) { meta = (PGBulkLoaderMeta) smi; data = (PGBulkLoaderData) sdi; - try { - pgCopyOut.close(); - } catch ( IOException e ) { - logError( "Error while closing the Postgres Output Stream", e.getMessage() ); - } + IOUtils.closeQuietly( pgCopyOut, + e -> logError( "Error while closing the Postgres Output Stream", e.getMessage() ) ); if ( data.db != null ) { data.db.close(); diff --git a/plugins/postgresql-db-bulk-loader/impl/src/test/java/org/pentaho/di/trans/steps/pgbulkloader/PGBulkLoaderTest.java b/plugins/postgresql-db-bulk-loader/impl/src/test/java/org/pentaho/di/trans/steps/pgbulkloader/PGBulkLoaderTest.java index 42bab62db79e..37bcc14bc2a9 100644 --- a/plugins/postgresql-db-bulk-loader/impl/src/test/java/org/pentaho/di/trans/steps/pgbulkloader/PGBulkLoaderTest.java +++ b/plugins/postgresql-db-bulk-loader/impl/src/test/java/org/pentaho/di/trans/steps/pgbulkloader/PGBulkLoaderTest.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2017-2022 by Hitachi Vantara : http://www.pentaho.com + * Copyright (C) 2017-2023 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -21,20 +21,6 @@ ******************************************************************************/ package org.pentaho.di.trans.steps.pgbulkloader; -import static org.hamcrest.core.StringContains.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -58,10 +44,20 @@ import java.io.IOException; import java.lang.reflect.Field; +import static org.hamcrest.core.StringContains.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.pentaho.di.core.row.ValueMetaInterface.TYPE_BOOLEAN; public class PGBulkLoaderTest { @@ -207,6 +203,37 @@ public void writeBooleanToPgOutput() throws Exception { assertEquals( "false", "0" + Const.CR, out.toString() ); } + /** + * Regression test for PDI-18989 + */ + @Test + public void emptyInput() throws Exception { + final PGBulkLoaderMeta meta = initMeta( "field" ); + final PGBulkLoaderData data = initData(); + final PGBulkLoader loader = spy( this.pgBulkLoader ); + doReturn( null ).when( loader ).getRow(); + + loader.init( meta, data ); + assertFalse( "Step finished", loader.processRow( meta, data ) ); + loader.dispose( meta, data ); + } + + /** + * Step does not have defined any output fields -> initialize of copy command failed. + */ + @Test + public void do_copyFailed() throws Exception { + final PGBulkLoaderMeta meta = initMeta(); + final PGBulkLoaderData data = initData(); + final PGBulkLoader loader = spy( this.pgBulkLoader ); + doReturn( new Object[] { "test data" } ).when( loader ).getRow(); + + loader.init( meta, data ); + assertFalse( "Process row failed", loader.processRow( meta, data ) ); + assertEquals( "Step has error", 1, loader.getErrors() ); + loader.dispose( meta, data ); + } + private ByteArrayOutputStream initPGCopyOutputStream() throws IOException, NoSuchFieldException, IllegalAccessException { final ByteArrayOutputStream out = new ByteArrayOutputStream(); final PGCopyOutputStream pgCopy = mock( PGCopyOutputStream.class ); @@ -226,10 +253,10 @@ private RowMeta initRowMeta( String valueName, int type ) throws KettlePluginExc return rowMeta; } - private PGBulkLoaderMeta initMeta( String valueName ) throws KettleXMLException { + private PGBulkLoaderMeta initMeta( String... valueNames ) throws KettleXMLException { final PGBulkLoaderMeta meta = getPgBulkLoaderMock( null ); - when( meta.getFieldStream() ).thenReturn( new String[] {valueName} ); - when( meta.getDateMask() ).thenReturn( new String[] {""} ); + when( meta.getFieldStream() ).thenReturn( valueNames ); + when( meta.getDateMask() ).thenReturn( new String[] { "" } ); return meta; }