记一次sftp工具类导致jvm频繁GC事件

背景

项目中需要使用sftp进行远程文件的读取,入库。原有代码中存在一个SFTPUtil类,底层使用的是jsch库调用方法。

经过

  1. 事件前一天正好进行了发版,上线版本中,我修改了原有SFTPUtil中的一个问题:原有代码读取完成了之后,没有关闭连接,导致读取了文件之后,连接一直保持着。我在此次版本中关闭了连接。
  2. 当天中午,当我正好中午出去吃饭时,产线上突然出现了Cat预警。报错:jvm eden区频繁GC,2分钟内ConcurrentMarkSweepCount达到63次,超过预警值10次
  3. 然后赶紧回来排查。这段时间正好是sftp读取文件的任务在执行,所以确定是sftp读取文件有问题。但是以前这段代码一直运行正常,没有出现过问题,此次突然出现问题,所以初步觉得可能是自己修改关闭连接的代码有问题。但是这段代码我是经过了多次测试的,不应该出现问题的!

结果

因为产线一直在报警,所以紧急通过删除远程文件的方式,终止了文件的读取。为了不影响当天的业务进行,在尚不确定问题的情况下,只能先进行版本回退。

原因

后续我们去找业务碰了一下,发现此次读取的文件比较大,有500M,比以前的都要大!事故前正好此任务被添加上去了,所以每间隔几分钟调度执行时,都会出现Cat预警!

那么为什么读取文件时,会出现出现频繁GC异常呢?!难道是直接把文件读取到内存中了?

带着疑问,我们重新审视了一下这个SFTPUtil类,底层读取文件使用的是这个方法。

public void get(String src, OutputStream dst) throws SftpException{
    get(src, dst, null, OVERWRITE, 0);
  }

SFTPUtil工具类new了一个空的ByteArrayOutputStream,作为参数传入此方法。get拿到了outputStream之后,外层层层包装成了BufferedReader,然后利用BufferedReader.readLine()逐行读取。因为拿到outputStream时,sftp连接已经被关闭了,所以推断jsch中的get方法应该是将文件全部读取到outputStream了
顺着get方法,进入源码中真正执行读取的地方查看

private void _get(String src, OutputStream dst,
                      SftpProgressMonitor monitor, int mode, long skip) throws SftpException {
        //System.err.println("_get: "+src+", "+dst);

        byte[] srcb = Util.str2byte(src, fEncoding);
        try {
            sendOPENR(srcb);

            ChannelSftp.Header header = new ChannelSftp.Header();
            header = header(buf, header);
            int length = header.length;
            int type = header.type;

            fill(buf, length);

            if (type != SSH_FXP_STATUS && type != SSH_FXP_HANDLE) {
                throw new SftpException(SSH_FX_FAILURE, "");
            }

            if (type == SSH_FXP_STATUS) {
                int i = buf.getInt();
                throwStatusError(buf, i);
            }

            byte[] handle = buf.getString();         // filename

            long offset = 0;
            if (mode == RESUME) {
                offset += skip;
            }

            int request_max = 1;
            rq.init();
            long request_offset = offset;

            int request_len = buf.buffer.length - 13;
            if (server_version == 0) {
                request_len = 1024;
            }

            loop:
            while (true) {

                while (rq.count() < request_max) {
                    sendREAD(handle, request_offset, request_len, rq);
                    request_offset += request_len;
                }

                header = header(buf, header);
                length = header.length;
                type = header.type;

                ChannelSftp.RequestQueue.Request rr = null;
                try {
                    rr = rq.get(header.rid);
                } catch (ChannelSftp.RequestQueue.OutOfOrderException e) {
                    request_offset = e.offset;
                    skip(header.length);
                    rq.cancel(header, buf);
                    continue;
                }

                if (type == SSH_FXP_STATUS) {
                    fill(buf, length);
                    int i = buf.getInt();
                    if (i == SSH_FX_EOF) {
                        break loop;
                    }
                    throwStatusError(buf, i);
                }

                if (type != SSH_FXP_DATA) {
                    break loop;
                }

                buf.rewind();
                fill(buf.buffer, 0, 4);
                length -= 4;
                int length_of_data = buf.getInt();   // length of data 

                /**
                 Since sftp protocol version 6, "end-of-file" has been defined,

                 byte   SSH_FXP_DATA
                 uint32 request-id
                 string data
                 bool   end-of-file [optional]

                 but some sftpd server will send such a field in the sftp protocol 3 ;-(
                 */
                int optional_data = length - length_of_data;

                int foo = length_of_data;
                while (foo > 0) {
                    int bar = foo;
                    if (bar > buf.buffer.length) {
                        bar = buf.buffer.length;
                    }
                    int data_len = io_in.read(buf.buffer, 0, bar);
                    if (data_len < 0) {
                        break loop;
                    }

                    dst.write(buf.buffer, 0, data_len);

                    offset += data_len;
                    foo -= data_len;

                    if (monitor != null) {
                        if (!monitor.count(data_len)) {
                            skip(foo);
                            if (optional_data > 0) {
                                skip(optional_data);
                            }
                            break loop;
                        }
                    }

                }
                //System.err.println("length: "+length);  // length should be 0

                if (optional_data > 0) {
                    skip(optional_data);
                }

                if (length_of_data < rr.length) {  //
                    rq.cancel(header, buf);
                    sendREAD(handle, rr.offset + length_of_data, (int) (rr.length - length_of_data), rq);
                    request_offset = rr.offset + rr.length;
                }

                if (request_max < rq.size()) {
                    request_max++;
                }
            }
            dst.flush();

            if (monitor != null) monitor.end();

            rq.cancel(header, buf);

            _sendCLOSE(handle, header);
        } catch (Exception e) {
            if (e instanceof SftpException) throw (SftpException) e;
            if (e instanceof Throwable)
                throw new SftpException(SSH_FX_FAILURE, "", (Throwable) e);
            throw new SftpException(SSH_FX_FAILURE, "");
        }
    }

注意到其中这段代码,while循环将buffer写入到了outputStream中了

while (foo > 0) {
    int bar = foo;
    if (bar > buf.buffer.length) {
        bar = buf.buffer.length;
    }
    int data_len = io_in.read(buf.buffer, 0, bar);
    if (data_len < 0) {
        break loop;
    }

    dst.write(buf.buffer, 0, data_len);

    offset += data_len;
    foo -= data_len;

    if (monitor != null) {
        if (!monitor.count(data_len)) {
            skip(foo);
            if (optional_data > 0) {
                skip(optional_data);
            }
            break loop;
        }
    }

}

好吧,原因找到了。确实是直接写入到outputStream中了。
这就不难解释为什么读取文件时出现JVM频繁GC的问题了,文件太大了,全部读取到内存中,加上配置的eden区也不大,只有700多M,所以就GC了。。。

反思

  • 文件中内容太大,原先肯定没有考虑到这一点,导致测试不充分,没有覆盖到这种场景
  • 对于jsch不熟悉,就直接进行使用,以为从outputStream读取出来,进行包装后逐行读取就不会一次性全部读取到内存,太naive了。知其理方能善其器,蒙眼狂奔肯定会出事的
  • 对于远程sftp文件读取,究竟是直接读取到内存,还是先拉取到本地,值得好好考虑。直接读取固然简单,但是存在隐患。最好还是先拉取到本地,然后按照文件读取规范进行读取。jsch中get方法存在直接读取到本地的重载,是逐行读取、写入的,可以直接使用

好了,到这里就结束了。顺便吐槽一句,项目中原有的SFTPUtil问题好多,大爷们别在挖坑了~~~