티스토리 뷰

Java

ClientSocketUtil 개선버전

개발자-김씨 2023. 4. 12. 19:50
반응형

클라이언트에서 바이트 송수신 시, 아래처럼 간단하게 구현할 수 있도록 유틸을 만들어 봄

10.10.10.1:10090으로 요청 바이트 송신 후, 12바이트 수신
ClientSocketUtils.request("10.10.10.1", 10090, sendBytes, in -> in.read(12));

10.10.10.1:10090으로 요청 바이트 송신 후, 헤더 5바이트 수신 후 헤더에 지정된 길이만큼 읽기
ClientSocketUtils.request("10.10.10.1", 10090, sendBytes, LENGTH_HEAD_RES_BODY_RECEIVER.apply(5));

 

개선된 ClientSocketUtil 

public class ClientSocketUtils {

    private static final int DEFAULT_CONNECT_TIMEOUT = 3000;
    private static final int DEFAULT_READ_TIMEOUT = 3000;

    /**
    * 응답 없는 경우
    */
    public static final Function<SimpleInputStream, byte[]> NO_RECEIVER = null;

    /**
    * 응답이 고정길이인 경우
    */
    public static final Function<Integer, Function<SimpleInputStream, byte[]>> FIXED_LENGTH_RES_RECEIVER = headLength -> (simpleInputStream) -> simpleInputStream.read(headLength);


    /**
    * 응답이 고정길이(헤더) + 가변길이(본문)인 경우, return -> 본문 바이트
    */
    public static final Function<Integer, Function<SimpleInputStream, byte[]>> LENGTH_HEAD_RES_BODY_RECEIVER = headLength -> (simpleInputStream) -> {
        byte[] head = simpleInputStream.read(headLength);
        return simpleInputStream.read(Integer.parseInt(new String(head)));
    };

    /**
    * 응답이 고정길이(헤더) + 가변길이(본문)인 경우, return -> 헤더 + 본문 바이트
    */
    public static final Function<@NonNull Integer, Function<SimpleInputStream, byte[]>> LENGTH_HEAD_RES_HEAD_BODY_RECEIVER = headLength -> (simpleInputStream) -> {
        byte[] head = simpleInputStream.read(headLength);
        simpleInputStream.read(Integer.parseInt(new String(head)));
        return simpleInputStream.getUntilReadBytes();
    };

    public static byte[] request(String host, int port, byte[] sendBytes, Function<SimpleInputStream, byte[]> readFunc) {
        return request(host, port, sendBytes, readFunc, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT);
    }

    /**
    * 접속 -> 바이트 전송 -> 바이트 수신
    * @param host
    * @param port
    * @param sendBytes 접속 후 보낼 바이트
    * @param readFunc 바이트 전송 후, 읽기 처리 func
    * @param connectTimeout
    * @param readTimeout
    * @throws SocketRequestException request()
    * @throws SocketRequestConnectionTimeoutException} - if connection timeout occurs
    * @throws SocketRequestReadTimeoutException} - if read timeout occurs
    * @return readFunc return byte[]
    */
    public static byte[] request(String host, int port, byte[] sendBytes, Function<SimpleInputStream, byte[]> readFunc, int connectTimeout, int readTimeout) {
    
        Socket socket = new Socket();
        BufferedOutputStream bufferedOutputStream = null;
        
        try {
            socket.connect(new InetSocketAddress(host, port), connectTimeout);
            socket.setSoTimeout(readTimeout);
            
            bufferedOutputStream = new BufferedOutputStream(socket.getOutputStream());
            bufferedOutputStream.write(sendBytes);
            bufferedOutputStream.flush();
        } catch (SocketTimeoutException ste) {
            closeQuietly(bufferedOutputStream);
            closeQuietly(socket);
            throw new SocketRequestConnectionTimeoutException(ste);
        } catch (Exception e) {
            closeQuietly(bufferedOutputStream);
            closeQuietly(socket);
            throw new SocketRequestException(e);
        }
        
        if (readFunc == null) {
            closeQuietly(bufferedOutputStream);
            closeQuietly(socket);
            return new byte[0];
        }
        
        BufferedInputStream bufferedInputStream = null;
        
        try {
            bufferedInputStream = new BufferedInputStream(socket.getInputStream());
            SimpleInputStream simpleInputStream = new SimpleInputStream(bufferedInputStream);
            
            byte[] returnBytes = readFunc.apply(simpleInputStream);
            simpleInputStream.close();
            return returnBytes;
        } catch (SocketRequestException sre) {
            throw sre;
        } catch (Exception e) {
            throw new SocketRequestException(e);
        } finally {
            closeQuietly(bufferedOutputStream);
            closeQuietly(bufferedInputStream);
            closeQuietly(socket);
        }
    }

    public static void closeQuietly(Closeable closeable) {
        try {
            if (closeable != null) {
                closeable.close();
            }
        } catch (IOException var2) {
        }
    }

    public static class SimpleInputStream {
    final private BufferedInputStream bufferedInputStream;
    final private ByteArrayOutputStream byteArrayOutputStream;
    
    SimpleInputStream(BufferedInputStream bufferedInputStream) {
    this.bufferedInputStream = bufferedInputStream;
    byteArrayOutputStream = new ByteArrayOutputStream();
    }
    
    public byte[] getUntilReadBytes() {
    return byteArrayOutputStream.toByteArray();
    }
    
    public BufferedInputStream getBufferedInputStream() {
    return bufferedInputStream;
    }
    
        /**
        *
        * @param readBytes 읽어야 하는 바이트 수
        * @return 실제 읽은 바이트 배열
        */
        public byte[] read(int readBytes) {
            try {
            
                byte[] buffer = new byte[readBytes];
                
                int sumReadSize = 0;
                
                do {
                    int readSize = bufferedInputStream.read(buffer, sumReadSize, readBytes - sumReadSize);
                    if (readSize == -1) {   
                        break;
                    }
                    sumReadSize += readSize;
                } while (sumReadSize < readBytes);
                
                byteArrayOutputStream.write(buffer, 0, sumReadSize);
                if (sumReadSize != readBytes) {
                    return Arrays.copyOfRange(buffer, 0, sumReadSize);
                } else {
                    return buffer;
                }
            
            } catch (SocketTimeoutException ste) {
                throw new SocketReadTimeoutException(ste);
            } catch (SocketRequestException sre) {
                throw sre;
            } catch (Exception ioe) {
                throw new SocketRequestException(ioe);
            }
        }
    
        void close() {
            closeQuietly(byteArrayOutputStream);
        }
    }
}

request메소드의 주요 동작은 크게 3가지임

socket.connect -> 접속

outputstream.write -> 보내기

inputstream.read -> 받기

따라서 request메소드에서 예외가 발생하면 어느 단계에서 예외가 발생한지 확인할 수  있도록 진행 단계를 추가려다 조금 더 단순하게 데이터 전송여부만 가지도록 함.

 

public class SocketRequestException extends RuntimeException {

    boolean completeSend = false;

    public SocketRequestException(String message, boolean completeSend) {
        super(message);
        this.completeSend = completeSend;
    }

    public SocketRequestException(Throwable th) {
        super(th);
    }

    public SocketRequestException(Throwable th, boolean completeSend) {
        super(th);
        this.completeSend = completeSend;
    }

    public boolean isCompleteSend() {
        return completeSend;
    }
}

 

추가로 SocketRequestException 하위로 ConnnectionTimeoutException과 ReadTimeoutException추가함

public class SocketRequestConnectionTimeoutException extends SocketRequestException {

    public SocketRequestConnectionTimeoutException(Throwable th) {
        super(th, false);
    }
}
public class SocketRequestReadTimeoutException extends SocketRequestException {

    public SocketRequestReadTimeoutException(SocketTimeoutException ste) {
        super(ste, true);
    }
}

 

끝 ~ 

 

테스트 코드

import org.junit.Assert;
import org.junit.Test;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketTest {


	public void echoServerOpen()  {
		try {
			ServerSocket serverSocket = new ServerSocket(10090);
			Socket socket = serverSocket.accept();
			InputStream inputStream = socket.getInputStream();
			byte[] buffer = new byte[10];
			inputStream.read(buffer);
			System.out.println("클라이언트로부터 받은 데이터: " + new String(buffer));

			OutputStream outputStream = socket.getOutputStream();
			outputStream.write(buffer);
			inputStream.close();
			outputStream.close();
			socket.close();
			serverSocket.close();
		}catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void serverOpen2()  {
		try {
			Thread.sleep(3000);
			ServerSocket serverSocket = new ServerSocket(10091);
			Socket socket = serverSocket.accept();
			InputStream inputStream = socket.getInputStream();
			byte[] buffer = new byte[10];
			inputStream.read(buffer);
			System.out.println("클라이언트로부터 받은 데이터: " + new String(buffer));

			OutputStream outputStream = socket.getOutputStream();
			outputStream.write(buffer);
			inputStream.close();
			outputStream.close();
			socket.close();
			serverSocket.close();
		}catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void serverOpen3()  {
		try {

			ServerSocket serverSocket = new ServerSocket(10092);
			Socket socket = serverSocket.accept();
			InputStream inputStream = socket.getInputStream();
			byte[] buffer = new byte[10];
			inputStream.read(buffer);
			System.out.println("클라이언트로부터 받은 데이터: " + new String(buffer));
			Thread.sleep(3000);
			OutputStream outputStream = socket.getOutputStream();
			outputStream.write(buffer);
			inputStream.close();
			outputStream.close();
			socket.close();
			serverSocket.close();
		}catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void test1() {	//정상 동작 확인
		new Thread(this::echoServerOpen).start();
		byte[] sendBytes = "0123456789".getBytes();
		byte[] receiveBytes = ClientSocketUtils.request("localhost", 10090, sendBytes, in -> in.read(10));
		Assert.assertArrayEquals(sendBytes, receiveBytes);
	}

	@Test(expected = SocketRequestException.class)
	public void test2() { //데이터 수신이 부족한 경우
                new Thread(this::echoServerOpen).start();
                byte[] sendBytes = "0123456789".getBytes();
                byte[] receiveBytes = ClientSocketUtils.request("localhost", 10090, sendBytes, in -> in.read(12));
                Assert.assertArrayEquals(sendBytes, receiveBytes);
	}

	@Test
	public void test3() { // read() 나눠서 호출 확인 
		new Thread(this::echoServerOpen).start();
		byte[] sendBytes = "0123456789".getBytes();
		byte[] receiveBytes = ClientSocketUtils.request("localhost", 10090, sendBytes, in -> {
			in.read(4);
			in.read(6);
			return in.getUntilReadBytes();
		});
		Assert.assertArrayEquals(sendBytes, receiveBytes);
	}
    
	@Test
	public void test4() { //뒤 6바이트 return확인
		new Thread(this::echoServerOpen).start();
		byte[] sendBytes = "0123456789".getBytes();
		byte[] bodyBytes = ClientSocketUtils.request("localhost", 10090, sendBytes, in -> {
			in.read(4);
			return in.read(6);
		});
		Assert.assertTrue(bodyBytes.length == 6);
	}
    
        @Test
        public void test5() {	//헤더 4바이트 읽기 -> 헤더에 정의된 길이 만큼 읽기 -> 본문 AAAAAA
                new Thread(this::echoServerOpen).start();
                byte[] receiveBytes = ClientSocketUtils.request("localhost", 10090, "0006AAAAAA".getBytes(), ClientSocketUtils.LENGTH_HEAD_RES_BODY_RECEIVER.apply(4));
                Assert.assertArrayEquals("AAAAAA".getBytes(), receiveBytes);
        }

	@Test(expected = SocketRequestException.class)
	public void test6() { //잘못된 host호출시 SocketRequestException 발생여부 확인
		try {
			new Thread(this::echoServerOpen).start();
			ClientSocketUtils.request("wrong host", 10090, "0123456789".getBytes(), ClientSocketUtils.NO_RECEIVER);
		} catch (Exception e) {
			e.printStackTrace();
			throw e;
		}
	}

	@Test(expected = SocketRequestConnectionTimeoutException.class)
	public void test7() {	//커넥션 타임아웃 발생여부 확인
		try {
			new Thread(this::serverOpen2).start();
			byte[] res = ClientSocketUtils.request("localhost", 10091, "0123456789".getBytes(), ClientSocketUtils.NO_RECEIVER, 1, 60000);
			System.out.println("res=" + new String(res));
		} catch (Exception e) {
			e.printStackTrace();
			throw e;
		}
	}

	@Test(expected = SocketRequestReadTimeoutException.class)
	public void test8() {	//리드타임아웃 발생 여부 확인
		new Thread(this::serverOpen3).start();
		ClientSocketUtils.request("localhost", 10092, "0123456789".getBytes(), in -> in.read(10), 3000, 1000);
	}

	@Test
	public void test9() {	//SocketRequestException isCompleteSend 동작 여부 확인
		try {	
			new Thread(this::echoServerOpen).start();
			ClientSocketUtils.request("localhost", 10090, "0123456789".getBytes(), in -> in.read(12));
		} catch (SocketRequestException e) {
			Assert.assertTrue(e.isCompleteSend());
		}
	}
    
	@Test
	public void test10() {	//보내기 오류, isCompleteSend = false 확인
		try {
			new Thread(this::echoServerOpen).start();
			ClientSocketUtils.request("localhost", 10090, null, ClientSocketUtils.NO_RECEIVER);
		} catch (SocketRequestException e) {
			e.printStackTrace();
			Assert.assertFalse(e.isCompleteSend());
		}
	}

	@Test
	public void test11() {	//읽기 동작없음
		new Thread(this::echoServerOpen).start();
		byte[] res = ClientSocketUtils.request("localhost", 10090, "0123456789".getBytes(), null); // ClientSocketUtils.NO_RECEIVER
		Assert.assertTrue(res.length == 0);
	}
    
    @Test
	public void test12() { 	//BufferedInputStream 직접 사용하기
		new Thread(this::echoServerOpen).start();
		byte[] res = ClientSocketUtils.request("localhost", 10090, "0123456789".getBytes(), (in) -> {
			byte[] buf = new byte[10];
			try {
				in.getBufferedInputStream().read(buf);
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
			return buf;
		});
		Assert.assertArrayEquals("0123456789".getBytes(), res);
	}
}
반응형

'Java' 카테고리의 다른 글

Socket Util 만들어 보기  (0) 2023.04.11
자바 동기화 처리 - volatile 와 synchronized  (1) 2020.11.25
synchronized 와 Double-checked locking  (0) 2020.11.20
Optional에 대해....  (0) 2020.10.22
ThreadPoolExecutor  (0) 2017.01.09
댓글