001package com.pi4j.io.serial.impl;
002
003/*
004 * #%L
005 * **********************************************************************
006 * ORGANIZATION  :  Pi4J
007 * PROJECT       :  Pi4J :: Java Library (Core)
008 * FILENAME      :  SerialByteBuffer.java
009 *
010 * This file is part of the Pi4J project. More information about
011 * this project can be found here:  https://www.pi4j.com/
012 * **********************************************************************
013 * %%
014 * Copyright (C) 2012 - 2021 Pi4J
015 * %%
016 * This program is free software: you can redistribute it and/or modify
017 * it under the terms of the GNU Lesser General Public License as
018 * published by the Free Software Foundation, either version 3 of the
019 * License, or (at your option) any later version.
020 *
021 * This program is distributed in the hope that it will be useful,
022 * but WITHOUT ANY WARRANTY; without even the implied warranty of
023 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
024 * GNU General Lesser Public License for more details.
025 *
026 * You should have received a copy of the GNU General Lesser Public
027 * License along with this program.  If not, see
028 * <http://www.gnu.org/licenses/lgpl-3.0.html>.
029 * #L%
030 */
031
032import java.io.IOException;
033import java.io.InputStream;
034import java.nio.BufferOverflowException;
035
036/**
037 * This class implements a dynamic expanding byte buffer to
038 * accommodate new data received from the serial port
039 *
040 * Adapted from sources at:
041 * http://ostermiller.org/utils/src/CircularByteBuffer.java.html
042 * Stephen Ostermiller http://ostermiller.org/contact.pl?regarding=Java+Utilities
043 *
044 */
045public class SerialByteBuffer {
046
047    public static int DEFAULT_BUFFER_SCALE_FACTOR = 2;
048    public static int DEFAULT_INITIAL_BUFFER_SIZE = 4096;
049
050    private InputStream stream = new SerialByteBufferInputStream();
051    private volatile int writeIndex = 0;
052    private volatile int readIndex = 0;
053    private byte[] buffer;
054
055    public SerialByteBuffer(){
056        // initialize buffer with default capacity
057        this(DEFAULT_INITIAL_BUFFER_SIZE);
058    }
059
060    public SerialByteBuffer(int initialCapacity){
061        // initialize buffer with user provided capacity
062        buffer = new byte[initialCapacity];
063    }
064
065    public synchronized void clear(){
066        // reset read and write index pointers
067        readIndex = writeIndex = 0;
068    }
069
070    public InputStream getInputStream(){
071        // return the input stream
072        return stream;
073    }
074
075    public synchronized int capacity(){
076        // return the buffer's total capacity
077        return buffer.length;
078    }
079
080    public synchronized int remaining(){
081        // return the number of (unused) bytes still available in the current buffer's capacity
082        if (writeIndex < readIndex)
083            return (readIndex - writeIndex - 1);
084        return ((buffer.length - 1) - (writeIndex - readIndex));
085    }
086
087    public synchronized int available(){
088        // return the number of bytes that are ready to be read
089        if (readIndex <= writeIndex)
090            return (writeIndex - readIndex);
091        return (buffer.length - (readIndex - writeIndex));
092    }
093
094    private void resize(int length) {
095
096        int min_capacity = buffer.length + length;
097        int new_capacity = buffer.length;
098
099        // double the capacity until the buffer is large enough to accommodate the new demand
100        while (new_capacity < min_capacity) {
101            new_capacity *= DEFAULT_BUFFER_SCALE_FACTOR;
102        }
103
104        // create a new buffer that can hold the newly determined
105        // capacity and copy the bytes from the old buffer into the new buffer
106        byte[] new_buffer = new byte[new_capacity];
107        System.arraycopy(buffer, readIndex, new_buffer, 0, writeIndex);
108
109        // update pointers
110        buffer = new_buffer; // old buffer should get garbage collected
111        readIndex = 0;
112        writeIndex = available();
113    }
114
115    public  void write(byte[] data) throws IOException, BufferOverflowException {
116        write(data, 0, data.length);
117    }
118
119    public  void write(byte[] data, int offset, int length) throws IOException {
120        while (length > 0) {
121            int remaining_space = remaining();
122            if(remaining_space < length) {
123                resize(length);
124            }
125            int realLen = Math.min(length, remaining_space);
126            int firstLen = Math.min(realLen, buffer.length - writeIndex);
127            int secondLen = Math.min(realLen - firstLen, buffer.length - readIndex - 1);
128            int written = firstLen + secondLen;
129            if (firstLen > 0) {
130                System.arraycopy(data, offset, buffer, writeIndex, firstLen);
131            }
132            if (secondLen > 0) {
133                System.arraycopy(data, offset + firstLen, buffer, 0, secondLen);
134                writeIndex = secondLen;
135            } else {
136                writeIndex += written;
137            }
138            if (writeIndex == buffer.length) {
139                writeIndex = 0;
140            }
141            offset += written;
142            length -= written;
143        }
144        if (length > 0) {
145            try {
146                Thread.sleep(100);
147            } catch (Exception x) {
148                throw new IOException("Waiting for available space in buffer interrupted.");
149            }
150        }
151    }
152
153    protected class SerialByteBufferInputStream extends InputStream {
154
155        @Override
156        public int available() throws IOException {
157            synchronized (SerialByteBuffer.this){
158                return (SerialByteBuffer.this.available());
159            }
160        }
161
162        @Override
163        public int read() throws IOException {
164            while (true){
165                synchronized (SerialByteBuffer.this){
166                    int available = SerialByteBuffer.this.available();
167                    if (available > 0){
168                        int result = buffer[readIndex] & 0xff; // we only care about fist 8 bits
169                        readIndex++; // increment read index position
170                        // if the read index reaches the maximum buffer capacity, then rollover to zero index
171                        if (readIndex == buffer.length)
172                            readIndex = 0;
173                        return result;
174                    }
175                }
176                try {
177                    Thread.sleep(100);
178                } catch(Exception x){
179                    throw new IOException("Blocking read operation interrupted.");
180                }
181            }
182        }
183
184        @Override
185        public int read(byte[] data) throws IOException {
186            return read(data, 0, data.length);
187        }
188
189        @Override
190        public int read(byte[] data, int off, int len) throws IOException {
191            while (true){
192                synchronized (SerialByteBuffer.this){
193                    int available = SerialByteBuffer.this.available();
194                    if (available > 0){
195                        int length = Math.min(len, available);
196                        int firstLen = Math.min(length, buffer.length - readIndex);
197                        int secondLen = length - firstLen;
198                        System.arraycopy(buffer, readIndex, data, off, firstLen);
199                        if (secondLen > 0){
200                            System.arraycopy(buffer, 0, data, off+firstLen,  secondLen);
201                            readIndex = secondLen;
202                        } else {
203                            readIndex += length;
204                        }
205                        if (readIndex == buffer.length) {
206                            readIndex = 0;
207                        }
208
209                        return length;
210                    }
211                }
212                try {
213                    Thread.sleep(100);
214                } catch(Exception x){
215                    throw new IOException("Blocking read operation interrupted.");
216                }
217            }
218        }
219
220        @Override
221        public long skip(long n) throws IOException, IllegalArgumentException {
222            while (true){
223                synchronized (SerialByteBuffer.this){
224                    int available = SerialByteBuffer.this.available();
225                    if (available > 0){
226                        int length = Math.min((int)n, available);
227                        int firstLen = Math.min(length, buffer.length - readIndex);
228                        int secondLen = length - firstLen;
229                        if (secondLen > 0){
230                            readIndex = secondLen;
231                        } else {
232                            readIndex += length;
233                        }
234                        if (readIndex == buffer.length) {
235                            readIndex = 0;
236                        }
237                        return length;
238                    }
239                }
240                try {
241                    Thread.sleep(100);
242                } catch(Exception x){
243                    throw new IOException("Blocking read operation interrupted.");
244                }
245            }
246        }
247    }
248}